1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * This file is part of the Nexus MCP SDK package.
7: *
8: * (c) 2026 John Paul E. Balandan, CPA <paulbalandan@gmail.com>
9: *
10: * For the full copyright and license information, please view
11: * the LICENSE file that was distributed with this source code.
12: */
13:
14: namespace Nexus\Mcp\Server\Dispatch;
15:
16: use Amp\Cancellation;
17: use Amp\NullCancellation;
18: use Nexus\Mcp\Core\Dispatch\MessageDispatcherInterface;
19: use Nexus\Mcp\Core\Dispatch\PendingCoroutines;
20: use Nexus\Mcp\Core\Dispatch\PendingInboundRequests;
21: use Nexus\Mcp\Core\Dispatch\RequestBoundSender;
22: use Nexus\Mcp\Core\Dispatch\ResponseSender;
23: use Nexus\Mcp\Core\Exception\AbstractJsonRpcProtocolException;
24: use Nexus\Mcp\Core\Exception\DuplicateInboundRequestIdException;
25: use Nexus\Mcp\Core\Exception\MethodMisroutedException;
26: use Nexus\Mcp\Core\Exception\MethodNotFoundException;
27: use Nexus\Mcp\Core\Exception\TransportAlreadyClosedException;
28: use Nexus\Mcp\Core\Handler\HandlerRegistry;
29: use Nexus\Mcp\Core\Handler\NotificationHandlerInterface;
30: use Nexus\Mcp\Core\Handler\RequestHandlerInterface;
31: use Nexus\Mcp\Core\JsonRpc\JsonRpcMessageParser;
32: use Nexus\Mcp\Core\Schema\Error\InternalError;
33: use Nexus\Mcp\Core\Schema\JsonRpc\JsonRpcErrorResponse;
34: use Nexus\Mcp\Core\Schema\JsonRpc\JsonRpcNotification;
35: use Nexus\Mcp\Core\Schema\JsonRpc\JsonRpcRequest;
36: use Nexus\Mcp\Core\Schema\JsonRpc\JsonRpcResultResponse;
37: use Nexus\Mcp\Core\Schema\Notification\InitializedNotification;
38: use Nexus\Mcp\Core\Schema\Request\InitializeRequest;
39: use Nexus\Mcp\Core\Schema\Result;
40: use Nexus\Mcp\Core\Transport\TransportInterface;
41: use Nexus\Mcp\Server\Exception\ServerAlreadyInitializedException;
42: use Nexus\Mcp\Server\Exception\ServerNotInitializedException;
43: use Nexus\Mcp\Server\Logging\LoggingLevelGate;
44: use Nexus\Mcp\Server\ServerContext;
45: use Psr\Log\LoggerInterface;
46: use Psr\Log\NullLogger;
47:
48: use function Amp\async;
49:
50: /**
51: * Server-side per-envelope inbound dispatch. Parses, classifies, gates, resolves a handler,
52: * spawns a coroutine to run it, and sends the response (or error) on the transport.
53: */
54: final readonly class ServerMessageDispatcher implements MessageDispatcherInterface
55: {
56: private PendingCoroutines $coroutines;
57: private PendingInboundRequests $inboundRequests;
58: private ResponseSender $responseSender;
59:
60: /**
61: * @param HandlerRegistry<RequestHandlerInterface<non-empty-string, Result, ServerContext>> $requestHandlers
62: * @param HandlerRegistry<NotificationHandlerInterface<non-empty-string>> $notificationHandlers
63: */
64: public function __construct(
65: private HandlerRegistry $requestHandlers,
66: private HandlerRegistry $notificationHandlers,
67: private ServerInitializationGate $initializationGate,
68: private LoggingLevelGate $loggingLevelGate = new LoggingLevelGate(),
69: private LoggerInterface $logger = new NullLogger(),
70: private JsonRpcMessageParser $parser = new JsonRpcMessageParser(),
71: private Cancellation $cancellation = new NullCancellation(),
72: ) {
73: $this->coroutines = new PendingCoroutines();
74: $this->inboundRequests = new PendingInboundRequests();
75: $this->responseSender = new ResponseSender($this->logger);
76: }
77:
78: #[\Override]
79: public function flushPending(): void
80: {
81: $this->coroutines->flushPending();
82: }
83:
84: /**
85: * @param array<string, mixed> $envelope
86: */
87: #[\Override]
88: public function dispatch(array $envelope, TransportInterface $transport): void
89: {
90: if (\array_key_exists('result', $envelope) || \array_key_exists('error', $envelope)) {
91: $this->discardResponseEnvelope($envelope);
92:
93: return;
94: }
95:
96: $isNotification = ! \array_key_exists('id', $envelope);
97:
98: try {
99: $message = $this->parser->parse($envelope);
100: } catch (MethodMisroutedException $e) {
101: $this->logger->warning(
102: 'Rejecting envelope whose method was sent under the wrong JSON-RPC shape.',
103: ['envelope' => $envelope, 'exception' => $e],
104: );
105:
106: if (! $isNotification) {
107: // Envelope carried an id but the method is a notification method.
108: // JSON-RPC 2.0 §4.1 forbids responses to notifications. Drop silently.
109: return;
110: }
111:
112: // Envelope omitted the id but the method is a request method.
113: // §5 null-id fallback. Respond so the peer can fix the malformed request.
114: $this->responseSender->send($transport, ResponseSender::buildErrorResponse($e, null), 'misrouted');
115:
116: return;
117: } catch (AbstractJsonRpcProtocolException $e) {
118: if ($isNotification) {
119: $this->logger->info(
120: 'Dropping malformed notification (JSON-RPC 2.0 §4.1 forbids responses to notifications).',
121: ['envelope' => $envelope, 'exception' => $e],
122: );
123:
124: return;
125: }
126:
127: $this->responseSender->send($transport, ResponseSender::buildErrorResponse($e, null), 'parse-error');
128:
129: return;
130: }
131:
132: if ($message instanceof JsonRpcRequest) {
133: $this->dispatchRequest($message, $transport);
134: } elseif ($message instanceof JsonRpcNotification) {
135: $this->dispatchNotification($message);
136: }
137: }
138:
139: /**
140: * @param array<string, mixed> $envelope
141: */
142: private function discardResponseEnvelope(array $envelope): void
143: {
144: $this->logger->warning(
145: 'Discarding response envelope (server has no outbound-request correlation).',
146: ['envelope' => $envelope],
147: );
148: }
149:
150: /**
151: * @param JsonRpcRequest<non-empty-string> $request
152: */
153: private function dispatchRequest(JsonRpcRequest $request, TransportInterface $transport): void
154: {
155: $method = $request::getMethod();
156: $isInitializeRequest = InitializeRequest::getMethod() === $method;
157:
158: // Gate is mutated sync so a same-tick `notifications/initialized` sees `InitializeInFlight`.
159: if (! $this->initializationGate->allowsRequest($method)) {
160: $exception = $isInitializeRequest
161: ? new ServerAlreadyInitializedException($request->id)
162: : new ServerNotInitializedException($method, $request->id);
163:
164: $this->responseSender->send($transport, ResponseSender::buildErrorResponse($exception, $request->id), $method);
165:
166: return;
167: }
168:
169: if (! $this->inboundRequests->claim($request->id)) {
170: $exception = new DuplicateInboundRequestIdException($request->id);
171: $this->responseSender->send($transport, ResponseSender::buildErrorResponse($exception, $request->id), $method);
172:
173: return;
174: }
175:
176: if ($isInitializeRequest) {
177: $this->initializationGate->markInitializeInFlight();
178: }
179:
180: $this->coroutines->track(async(function () use ($request, $transport, $method, $isInitializeRequest): void {
181: try {
182: $sender = new RequestBoundSender($transport, $request->id);
183: $context = new ServerContext(
184: $request->id,
185: $this->cancellation,
186: $request->params->meta,
187: $transport->getSessionId(),
188: $sender,
189: $this->loggingLevelGate,
190: );
191:
192: try {
193: $handler = $this->requestHandlers->get($method)
194: ?? throw new MethodNotFoundException($method, $request->id);
195: $result = $handler->handle($request, $context);
196: } catch (TransportAlreadyClosedException $e) {
197: if ($isInitializeRequest) {
198: $this->initializationGate->revertInitializeInFlight();
199: }
200:
201: $this->responseSender->logSkippedDelivery($method, $e);
202:
203: return;
204: } catch (AbstractJsonRpcProtocolException $e) {
205: if ($isInitializeRequest) {
206: $this->initializationGate->revertInitializeInFlight();
207: }
208:
209: $this->responseSender->send($transport, ResponseSender::buildErrorResponse($e, $request->id), $method);
210:
211: return;
212: } catch (\Throwable $e) {
213: if ($isInitializeRequest) {
214: $this->initializationGate->revertInitializeInFlight();
215: }
216:
217: $this->logger->error(
218: 'Uncaught request handler exception.',
219: ['method' => $method, 'exception' => $e],
220: );
221: $this->responseSender->send($transport, new JsonRpcErrorResponse(
222: $request->id,
223: new InternalError(),
224: ), $method);
225:
226: return;
227: }
228:
229: if ($isInitializeRequest) {
230: $this->initializationGate->markInitializeCompleted();
231: }
232:
233: $this->responseSender->send($transport, new JsonRpcResultResponse($request->id, $result), $method);
234: } finally {
235: $this->inboundRequests->release($request->id);
236: }
237: }));
238: }
239:
240: /**
241: * @param JsonRpcNotification<non-empty-string> $notification
242: */
243: private function dispatchNotification(JsonRpcNotification $notification): void
244: {
245: $method = $notification::getMethod();
246:
247: if (InitializedNotification::getMethod() === $method) {
248: if (! $this->initializationGate->markInitialized()) {
249: $this->logger->warning(
250: 'Discarding "notifications/initialized" received in an unexpected initialize handshake state.',
251: ['method' => $method],
252: );
253:
254: return;
255: }
256: } elseif (! $this->initializationGate->isInitialized()) {
257: // Other notifications must wait for a complete handshake. The initialized arm above
258: // intentionally falls through even when buffered (gate may still be InitializeInFlight).
259: $this->logger->info(
260: 'Dropping notification before client has completed initialize.',
261: ['method' => $method],
262: );
263:
264: return;
265: }
266:
267: $handler = $this->notificationHandlers->get($method);
268:
269: if (null === $handler) {
270: return;
271: }
272:
273: $this->coroutines->track(async(function () use ($handler, $notification, $method): void {
274: try {
275: $handler->handle($notification);
276: } catch (\Throwable $e) {
277: // Notifications carry no response per JSON-RPC 2.0 §4.1. Failure is logged only.
278: $this->logger->error(
279: 'Uncaught notification handler exception.',
280: ['method' => $method, 'exception' => $e],
281: );
282: }
283: }));
284: }
285: }
286: