1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * This file is part of the Nexus MCP SDK package.
7: *
8: * (c) 2026 John Paul E. Balandan, CPA <paulbalandan@gmail.com>
9: *
10: * For the full copyright and license information, please view
11: * the LICENSE file that was distributed with this source code.
12: */
13:
14: namespace Nexus\Mcp\Core\Transport;
15:
16: use Nexus\Assert\Assert;
17: use Nexus\Mcp\Core\Exception\TransportAlreadyClosedException;
18: use Nexus\Mcp\Core\Exception\TransportAlreadyStartedException;
19: use Nexus\Mcp\Core\Exception\TransportNotStartedException;
20: use Nexus\Mcp\Core\Schema\JsonRpc\JsonRpcMessage;
21:
22: /**
23: * In-process JSON-RPC duplex between two `TransportInterface` instances. Each
24: * side's `send()` becomes the other side's inbound message. Pre-start inbound
25: * envelopes queue and drain on `start()`. `close()` cascades to the peer.
26: */
27: final class InMemoryTransport implements TransportInterface
28: {
29: /**
30: * Envelopes the peer's `send()` delivered before this side called `start()`. Drained in arrival order on `start()`.
31: *
32: * @var list<array<string, mixed>>
33: */
34: private array $pendingInbound = [];
35:
36: private TransportState $state = TransportState::Idle;
37: private ?self $peer = null;
38: private readonly TransportEvents $events;
39:
40: private function __construct()
41: {
42: $this->events = new TransportEvents();
43: }
44:
45: /**
46: * Returns two linked transports. Each side's `send()` delivers to the
47: * other side's `onMessage` listeners. Use one for the server, the other
48: * for the client.
49: *
50: * @return array{self, self}
51: */
52: public static function createPair(): array
53: {
54: $a = new self();
55: $b = new self();
56: $a->peer = $b;
57: $b->peer = $a;
58:
59: return [$a, $b];
60: }
61:
62: #[\Override]
63: public function start(): void
64: {
65: match ($this->state) {
66: TransportState::Running => throw new TransportAlreadyStartedException(transport: self::class),
67: TransportState::Closed => throw new TransportAlreadyClosedException(operation: 'start'),
68: TransportState::Idle => null,
69: };
70:
71: $this->state = TransportState::Running;
72:
73: foreach ($this->pendingInbound as $envelope) {
74: $this->events->emitMessage($envelope);
75: }
76: }
77:
78: /**
79: * `$context`'s fields (`relatedRequestId`, `resumptionToken`, `onResumptionToken`)
80: * are streamable-HTTP concerns with no in-process equivalent. The parameter is
81: * accepted for `TransportInterface` conformance and intentionally dropped.
82: *
83: * @throws \InvalidArgumentException
84: * @throws TransportAlreadyClosedException
85: * @throws TransportNotStartedException
86: */
87: #[\Override]
88: public function send(JsonRpcMessage $message, ?SendContext $context = null): void
89: {
90: match ($this->state) {
91: TransportState::Idle => throw new TransportNotStartedException(operation: 'send'),
92: TransportState::Closed => throw new TransportAlreadyClosedException(operation: 'send'),
93: TransportState::Running => null,
94: };
95:
96: \assert(method_exists($message, 'toArray'), 'In-memory transport requires a JsonRpcMessage exposing toArray().');
97:
98: $envelope = $message->toArray();
99: Assert::that($envelope)->isMap('In-memory transport: toArray() must return a string-keyed object.');
100:
101: $this->peer?->receive($envelope);
102: }
103:
104: #[\Override]
105: public function close(): void
106: {
107: if (TransportState::Closed === $this->state) {
108: return;
109: }
110:
111: try {
112: $this->events->emitDrain();
113: } finally {
114: $this->state = TransportState::Closed;
115:
116: $peer = $this->peer;
117: $this->peer = null;
118:
119: $peer?->close();
120:
121: $this->events->emitClose();
122: }
123: }
124:
125: #[\Override]
126: public function getSessionId(): ?string
127: {
128: return null;
129: }
130:
131: #[\Override]
132: public function onMessage(\Closure $listener): SubscriptionInterface
133: {
134: return $this->events->onMessage($listener);
135: }
136:
137: /**
138: * In-memory transport has no I/O failure surface, so registered error
139: * listeners are accepted for contract conformance but never invoked.
140: */
141: #[\Override]
142: public function onError(\Closure $listener): SubscriptionInterface
143: {
144: return $this->events->onError($listener);
145: }
146:
147: #[\Override]
148: public function onDrain(\Closure $listener): SubscriptionInterface
149: {
150: return $this->events->onDrain($listener);
151: }
152:
153: #[\Override]
154: public function onClose(\Closure $listener): SubscriptionInterface
155: {
156: return $this->events->onClose($listener);
157: }
158:
159: /**
160: * Cross-instance hand-off invoked by the peer's `send()`. Queues into
161: * `pendingInbound` while this side is `Idle`, otherwise emits to listeners.
162: *
163: * @param array<string, mixed> $envelope
164: */
165: private function receive(array $envelope): void
166: {
167: if (TransportState::Idle === $this->state) {
168: $this->pendingInbound[] = $envelope;
169:
170: return;
171: }
172:
173: $this->events->emitMessage($envelope);
174: }
175: }
176: