1: <?php
2:
3: declare(strict_types=1);
4:
5: /**
6: * This file is part of the Nexus MCP SDK package.
7: *
8: * (c) 2026 John Paul E. Balandan, CPA <paulbalandan@gmail.com>
9: *
10: * For the full copyright and license information, please view
11: * the LICENSE file that was distributed with this source code.
12: */
13:
14: namespace Nexus\Mcp\Client\Transport;
15:
16: use Amp\Process\Process;
17: use Nexus\Assert\Assert;
18: use Nexus\Mcp\Core\JsonRpc\SafeDisplay;
19: use Nexus\Mcp\Core\Schema\JsonRpc\JsonRpcMessage;
20: use Nexus\Mcp\Core\Transport\LineDuplex;
21: use Nexus\Mcp\Core\Transport\LineReader;
22: use Nexus\Mcp\Core\Transport\SendContext;
23: use Nexus\Mcp\Core\Transport\SubscriptionInterface;
24: use Nexus\Mcp\Core\Transport\TransportInterface;
25: use Psr\Log\LoggerInterface;
26: use Psr\Log\NullLogger;
27:
28: /**
29: * Stdio MCP client transport. Launches an MCP server subprocess and exchanges
30: * line-framed JSON-RPC envelopes over its STDIN/STDOUT.
31: */
32: final class StdioClientTransport implements TransportInterface
33: {
34: private const string LABEL = 'Stdio client';
35:
36: /**
37: * Environment variable names safe to inherit by default, across POSIX and Windows hosts.
38: */
39: private const array INHERITED_ENV_NAMES = [
40: 'APPDATA',
41: 'HOME',
42: 'HOMEDRIVE',
43: 'HOMEPATH',
44: 'LOCALAPPDATA',
45: 'LOGNAME',
46: 'PATH',
47: 'PROCESSOR_ARCHITECTURE',
48: 'SHELL',
49: 'SYSTEMDRIVE',
50: 'SYSTEMROOT',
51: 'TEMP',
52: 'TERM',
53: 'USER',
54: 'USERNAME',
55: 'USERPROFILE',
56: ];
57:
58: private readonly LineDuplex $duplex;
59: private readonly LoggerInterface $logger;
60: private ?Process $process = null;
61:
62: /**
63: * @param list<string> $command Subprocess argv (no shell interpretation).
64: * @param null|array<string, string> $env Subprocess environment (`null` prunes to a safe allowlist).
65: */
66: public function __construct(
67: private readonly array $command,
68: private readonly ?string $workingDirectory = null,
69: private readonly ?array $env = null,
70: LoggerInterface $logger = new NullLogger(),
71: int $maxLineBytes = LineReader::DEFAULT_MAX_LINE_BYTES,
72: ) {
73: Assert::that($command)->isList(\sprintf('%s command must be a list, {type} given.', self::LABEL));
74: Assert::that(\count($command))->isPositiveInt(\sprintf('%s command must not be empty.', self::LABEL));
75:
76: $this->logger = $logger;
77: $this->duplex = new LineDuplex(
78: hostTransport: self::class,
79: label: self::LABEL,
80: logger: $logger,
81: maxLineBytes: $maxLineBytes,
82: onBeforeClose: function (): void {
83: if (null === $this->process) {
84: return;
85: }
86:
87: $this->process->getStdin()->close();
88: $this->process->kill();
89: },
90: );
91: }
92:
93: /**
94: * Builds the pruned default subprocess environment: the inherited-name allowlist
95: * populated from `$source`, skipping values that look like exported shell functions.
96: *
97: * @internal
98: *
99: * @param null|array<string, string> $source Defaults to the parent process environment.
100: *
101: * @return array<string, string>
102: */
103: public static function buildDefaultEnvironment(?array $source = null): array
104: {
105: $source ??= getenv();
106: $environment = [];
107:
108: foreach (self::INHERITED_ENV_NAMES as $name) {
109: $value = $source[$name] ?? null;
110:
111: if (null === $value) {
112: continue;
113: }
114:
115: if (str_starts_with($value, '()')) {
116: // Skip exported shell-function definitions (Shellshock mitigation).
117: continue;
118: }
119:
120: $environment[$name] = $value;
121: }
122:
123: return $environment;
124: }
125:
126: #[\Override]
127: public function start(): void
128: {
129: $process = Process::start($this->command, $this->workingDirectory, $this->env ?? self::buildDefaultEnvironment());
130:
131: try {
132: $this->duplex->start($process->getStdout(), $process->getStdin());
133: } catch (\Throwable $e) {
134: $process->getStdin()->close();
135: $process->kill();
136:
137: throw $e;
138: }
139:
140: $this->process = $process;
141: $this->logger->info(
142: '{label} transport spawned subprocess. Command: {command} (PID {pid}).',
143: ['label' => self::LABEL, 'command' => implode(' ', $this->command), 'pid' => $process->getPid()],
144: );
145:
146: $this->duplex->forwardLines(
147: $process->getStderr(),
148: function (string $line): void {
149: $this->logger->info('Subprocess stderr: {line}', ['line' => SafeDisplay::sanitise($line)]);
150: },
151: );
152: }
153:
154: #[\Override]
155: public function send(JsonRpcMessage $message, ?SendContext $context = null): void
156: {
157: $this->duplex->send($message);
158: }
159:
160: #[\Override]
161: public function close(): void
162: {
163: $this->duplex->close();
164: }
165:
166: #[\Override]
167: public function getSessionId(): ?string
168: {
169: return null;
170: }
171:
172: #[\Override]
173: public function onMessage(\Closure $listener): SubscriptionInterface
174: {
175: return $this->duplex->onMessage($listener);
176: }
177:
178: #[\Override]
179: public function onError(\Closure $listener): SubscriptionInterface
180: {
181: return $this->duplex->onError($listener);
182: }
183:
184: #[\Override]
185: public function onDrain(\Closure $listener): SubscriptionInterface
186: {
187: return $this->duplex->onDrain($listener);
188: }
189:
190: #[\Override]
191: public function onClose(\Closure $listener): SubscriptionInterface
192: {
193: return $this->duplex->onClose($listener);
194: }
195: }
196: