1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * This file is part of the Nexus MCP SDK package.
7: *
8: * (c) 2026 John Paul E. Balandan, CPA <paulbalandan@gmail.com>
9: *
10: * For the full copyright and license information, please view
11: * the LICENSE file that was distributed with this source code.
12: */
13:
14: namespace Nexus\Mcp\Server;
15:
16: use Amp\DeferredFuture;
17: use Nexus\Mcp\Core\Dispatch\MessageDispatcherInterface;
18: use Nexus\Mcp\Core\Transport\TransportInterface;
19: use Psr\Log\LoggerInterface;
20: use Psr\Log\NullLogger;
21:
22: /**
23: * Thin shell that drives a single transport's lifecycle. `run()` attaches the
24: * listeners once and blocks until the transport closes.
25: */
26: final readonly class Server
27: {
28: public function __construct(private MessageDispatcherInterface $dispatcher, private LoggerInterface $logger = new NullLogger())
29: {
30: }
31:
32: public function run(TransportInterface $transport): void
33: {
34: $this->logger->info('Starting MCP server.');
35:
36: $deferred = new DeferredFuture();
37:
38: $transport->onMessage(function (array $envelope) use ($transport): void {
39: $this->dispatcher->dispatch($envelope, $transport);
40: });
41: $transport->onError(function (\Throwable $e): void {
42: $this->logger->error('Transport error.', ['exception' => $e]);
43: });
44: $transport->onDrain(function (): void {
45: $this->dispatcher->flushPending();
46: });
47: $transport->onClose(static function () use ($deferred): void {
48: // Transports may emit `close` more than once if an error raises during shutdown
49: // (e.g. stdin EOF followed by a write failure). Ignore the duplicate.
50: if ($deferred->isComplete()) {
51: return;
52: }
53:
54: $deferred->complete();
55: });
56:
57: $transport->start();
58: $deferred->getFuture()->await();
59:
60: $this->logger->info('MCP server stopped.');
61: }
62: }
63: