# 第二阶段 · 模块二 · 第三节:工具执行机制
StreamingToolExecutor 的并行执行是如何工作的?runTools 的顺序执行和 StreamingToolExecutor 的并行执行有什么区别?工具执行失败的错误处理流程是什么?
Claude Code 全局架构
┌─────────────────────────────────────────────────────────────────────┐
│ 查询引擎层(QueryEngine ──> query) │
│ │
│ query.ts │
│ ├── callModel() ──> API 调用 │
│ ├── StreamingToolExecutor ──> 并行工具执行 ← 本节 │
│ └── runTools ──> 顺序工具执行 │
└─────────────────────────────────────────────────────────────────────┘
| 方面 | runTools | StreamingToolExecutor |
| 执行方式 | 顺序执行 | 并行执行 |
| 时机 | 模型流式完成后 | 与模型流式并行 |
| 并发数 | 1 | 可配置(默认10) |
| 代码位置 | `toolOrchestration.ts` | `StreamingToolExecutor.ts` |
问 1:为什么需要两种执行模式?
// 场景:模型返回多个工具调用
{
type: 'assistant',
content: [
{ type: 'tool_use', name: 'Read', input: { file_path: 'a.txt' } },
{ type: 'tool_use', name: 'Read', input: { file_path: 'b.txt' } },
{ type: 'tool_use', name: 'Read', input: { file_path: 'c.txt' } },
]
}
// runTools:顺序执行
read('a.txt') → read('b.txt') → read('c.txt') // 总耗时 = t1 + t2 + t3
// StreamingToolExecutor:并行执行
read('a.txt')
read('b.txt') // 同时执行
read('c.txt')
// 总耗时 = max(t1, t2, t3)
问 2:StreamingToolExecutor 的核心思想?
// 核心:工具执行和模型输出流式并行
// 模型流式输出:
for await (const event of model.stream()) {
if (event.type === 'tool_use') {
// 边接收 tool_use,边执行工具
streamingExecutor.addPending(event);
}
}
// 同时,工具执行结果可以边产生边返回
for await (const result of streamingExecutor.getRemainingResults()) {
yield result;
}
问 3:什么是 isConcurrencySafe?
// 某些工具调用可以并行,某些不能
// 并行安全 = 不修改共享状态
const concurrencySafeTools = [
'Read', // 读文件,不修改任何东西
'Glob', // 只搜索,不修改
'Grep', // 只搜索,不修改
'WebSearch', // 只查询,不修改
];
const unsafeTools = [
'Write', // 写文件,可能冲突
'Edit', // 修改文件,可能冲突
'Bash', // 执行命令,副作用未知
];
// 安全的工具可以并行,不安全的必须顺序执行
源码位置:`src/services/tools/StreamingToolExecutor.ts`
export class StreamingToolExecutor {
// 待执行的工具调用
private pendingToolUses: ToolUseBlock[] = [];
// 并发控制器
private semaphore: Semaphore;
// 线程安全的队列
private results: MessageUpdate[] = [];
// Signal to wake up getRemainingResults when progress is available
private wakeupSignal = new Signal();
async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
while (true) {
// 等待新结果或所有工具完成
while (this.results.length === 0 && !this.isDone()) {
await this.wakeupSignal.wait();
}
// yield 所有可用结果
while (this.results.length > 0) {
yield this.results.shift()!;
}
// 所有工具都执行完毕,退出
if (this.isDone()) {
return;
}
}
}
addPending(toolUse: ToolUseBlock): void {
this.pendingToolUses.push(toolUse);
this.executeTool(toolUse); // 立即开始执行
}
}
StreamingToolExecutor 执行流程:
addPending(toolUse) 收到工具调用
│
├── 检查 isConcurrencySafe
│
├── 如果安全:
│ └── 立即并发执行(semaphore 控制)
│
└── 如果不安全:
└── 等待之前的工具完成后再执行
│
▼
executeTool() 执行工具
│
├── 调用 canUseTool() 检查权限
│
├── 如果拒绝:
│ └── 记录拒绝原因,返回 permission denied
│
└── 如果允许:
└── 调用 tool.execute()
│
▼
结果放入 results 队列
│
▼
getRemainingResults() yield 结果
问 1:信号量(Semaphore)的作用?
// 控制并发数,避免同时执行过多工具
class Semaphore {
private permits: number;
private waiters: (() => void)[] = [];
constructor(permits: number) {
this.permits = permits;
}
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return;
}
// 等待
await new Promise(resolve => this.waiters.push(resolve));
}
release(): void {
this.permits++;
const waiter = this.waiters.shift();
if (waiter) {
waiter();
}
}
}
// 使用
const maxConcurrency = parseInt(process.env.MAX_TOOL_CONCURRENCY || '10');
const semaphore = new Semaphore(maxConcurrency);
async function executeTool(toolUse) {
await semaphore.acquire();
try {
return await tool.execute();
} finally {
semaphore.release();
}
}
问 2:Signal.wait() 是什么?
// 用于在没有结果时休眠,有结果时唤醒
class Signal {
private promise: Promise<void>;
private resolve: () => void;
constructor() {
this.promise = new Promise(resolve => {
this.resolve = resolve;
});
}
wait(): Promise<void> {
return this.promise;
}
signal(): void {
// 创建一个新的 Promise,等待下一次 signal
this.promise = new Promise(resolve => {
this.resolve = resolve;
});
}
}
问 3:为什么不安全工具要等待?
// 示例:不安全工具的并发问题
// 场景:两个 Edit 工具同时修改同一文件
Edit({ file_path: 'a.txt', old: 'foo', new: 'bar' })
Edit({ file_path: 'a.txt', old: 'baz', new: 'qux' })
// 并发执行可能产生冲突:
// Tool1: 读取文件
// Tool2: 读取文件
// Tool1: 修改内容
// Tool2: 修改内容 // 覆盖了 Tool1 的修改!
// 解决:顺序执行
// Tool1: 读取 → 修改 → 完成
// Tool2: 等待 Tool1 完成 → 读取(最新内容)→ 修改
问 4:StreamingToolExecutor 如何知道工具完成?
// 每次 tool.execute() 完成后,检查是否所有工具都完成
async function executeTool(toolUse: ToolUseBlock): Promise<void> {
try {
const result = await tool.execute();
this.results.push({ message: result, ... });
} finally {
// 无论成功还是失败,都标记为完成
this.completedCount++;
// 如果所有工具都完成,唤醒等待者
if (this.completedCount === this.totalCount) {
this.wakeupSignal.signal();
}
}
}
问 5:StreamingToolExecutor 的局限性?
// 局限性 1:只能处理简单依赖
// 无法处理复杂依赖链:A 依赖 B 的结果,B 依赖 C 的结果
// 局限性 2:结果顺序不确定
// A, B, C 同时开始,A 最慢,C 最快
// 结果顺序可能是:C, B, A
// 但模型期望按调用顺序返回
// 局限性 3:内存压力
// 同时执行太多工具会占用大量内存
源码位置:`src/services/tools/toolOrchestration.ts`
export async function* runTools(
toolUseMessages: ToolUseBlock[],
assistantMessages: AssistantMessage[],
canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
let currentContext = toolUseContext;
// 按顺序处理每个工具
for (const { isConcurrencySafe, blocks } of partitionToolCalls(
toolUseMessages,
currentContext,
)) {
if (isConcurrencySafe) {
// 安全工具:并行执行
for await (const update of runToolsConcurrently(
blocks,
assistantMessages,
canUseTool,
currentContext,
)) {
yield update;
}
} else {
// 不安全工具:顺序执行
for (const block of blocks) {
for await (const update of runToolUse(
block,
assistantMessages,
canUseTool,
currentContext,
)) {
yield update;
}
}
}
}
}
// 将工具调用分组:安全的和需要顺序的
function partitionToolCalls(
toolUseMessages: ToolUseBlock[],
context: ToolUseContext,
): Array<{ isConcurrencySafe: boolean; blocks: ToolUseBlock[] }> {
// 分区逻辑
const safeBlocks: ToolUseBlock[] = [];
const unsafeBlocks: ToolUseBlock[] = [];
for (const toolUse of toolUseMessages) {
if (isToolConcurrencySafe(toolUse.name)) {
safeBlocks.push(toolUse);
} else {
unsafeBlocks.push(toolUse);
}
}
return [
{ isConcurrencySafe: true, blocks: safeBlocks },
{ isConcurrencySafe: false, blocks: unsafeBlocks },
];
}
问 1:为什么分区?
// 分区的目的是:最大化并行,同时保证安全
// 示例输入:
[Read('a.txt'), Write('b.txt'), Read('c.txt')]
// 分区后:
[{
isConcurrencySafe: true,
blocks: [Read('a.txt'), Read('c.txt')] // 并行执行
}, {
isConcurrencySafe: false,
blocks: [Write('b.txt')] // 顺序执行(但只有一个,不需要等待)
}]
// 执行顺序:
// 1. Read('a.txt') 和 Read('c.txt') 并行
// 2. Write('b.txt') 顺序执行(因为在 unsafe 分区)
问 2:runTools 和 StreamingToolExecutor 的选择?
// 选择 StreamingToolExecutor:
// - 模型支持流式输出
// - 工具结果可以边产生边返回
// - 需要最大化并行度
// 选择 runTools:
// - 模型不支持流式
// - 需要严格的顺序保证
// - 更简单的错误处理
问 3:runToolsConcurrently 的实现?
async function* runToolsConcurrently(
blocks: ToolUseBlock[],
canUseTool: CanUseToolFn,
context: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
// 使用 Promise.all 并行执行所有工具
const promises = blocks.map(block =>
runToolUse(block, canUseTool, context)
);
// 按顺序 yield 结果(即使完成顺序不同)
for (const promise of promises) {
const result = await promise;
yield result;
}
}
源码位置:`src/services/tools/toolExecution.ts`
// 工具执行结果
interface ToolResult {
type: 'tool_result';
tool_use_id: string; // 对应的 tool_use 块 ID
content: string | ContentBlock[]; // 结果内容
is_error?: boolean; // 是否是错误
}
// 转换为消息格式
function toolResultToMessage(result: ToolResult): Message {
return {
type: 'user', // 注意:tool_result 是 user 类型的消息
content: result.content,
tool_use_id: result.tool_use_id,
is_error: result.is_error,
};
}
async function executeToolWithErrorHandling(
tool: Tool,
args: ToolArgs,
context: ToolContext,
): Promise<MessageUpdate> {
try {
// 1. 权限检查
const permission = await canUseTool(tool, args, context);
if (!permission.allowed) {
return {
message: createErrorMessage(
`Permission denied: ${permission.reason}`,
context.toolUseId,
),
newContext: context,
};
}
// 2. 执行工具
const result = await tool.execute(args, context);
// 3. 返回成功结果
return {
message: toolResultToMessage(result),
newContext: context,
};
} catch (error) {
// 4. 处理执行错误
return {
message: createErrorMessage(
`Tool execution failed: ${error.message}`,
context.toolUseId,
),
newContext: context,
};
}
}
function createErrorMessage(
content: string,
toolUseId: string,
): Message {
return {
type: 'user', // tool_result 是 user 类型
content,
tool_use_id: toolUseId,
is_error: true,
};
}
问 1:为什么 tool_result 是 user 类型?
// 在 Claude 的消息格式中:
// - user:用户发送的消息
// - assistant:模型生成的消息
// tool_result 本质上是"用户提供的额外信息"
// 所以使用 user 类型
{
type: 'user',
role: 'tool',
content: 'File content here...',
tool_use_id: 'abc123',
}
问 2:工具失败后模型会怎样?
// 模型会收到 tool_result(is_error: true)
// 模型可能的反应:
// 1. 尝试修复错误
// 2. 请求用户帮助
// 3. 放弃当前任务
// 示例:模型试图读取不存在的文件
{
type: 'tool_result',
content: 'Error: File not found',
is_error: true,
}
// 模型看到错误后,可能:
// - "File not found. Let me check the correct path..."
// - "I couldn't read the file. Can you help me?"
问 3:如何避免工具执行超时?
// 每个工具都有默认超时
const DEFAULT_TIMEOUT_MS = 60_000; // 60秒
async function executeWithTimeout(
tool: Tool,
args: ToolArgs,
timeoutMs: number = DEFAULT_TIMEOUT_MS,
): Promise<MessageUpdate> {
const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort();
}, timeoutMs);
try {
const result = await tool.execute(args, {
signal: controller.signal,
});
return { message: result };
} catch (error) {
if (error.name === 'AbortError') {
return {
message: {
type: 'tool_result',
content: `Tool timed out after ${timeoutMs}ms`,
is_error: true,
},
};
}
throw error;
} finally {
clearTimeout(timeout);
}
}
问题:StreamingToolExecutor 的并行执行不适合哪些场景?
答案:
1. 有依赖关系的工具:
// 工具 B 需要工具 A 的结果作为输入
// 不能并行
const resultA = await toolA();
const resultB = await toolB(resultA.output);
2. 修改共享状态:
// 两个工具都写同一个文件
// 并行执行可能导致数据丢失
Edit({ file: 'a.txt', old: 'x', new: 'y' })
Edit({ file: 'a.txt', old: 'z', new: 'w' })
3. 资源密集型工具:
// 同时执行太多 CPU/内存密集型工具
// 可能导致系统资源耗尽
// 应该限制并发数
问题:当工具执行失败时,如何实现自动重试?
答案:
async function executeWithRetry(
tool: Tool,
args: ToolArgs,
maxRetries: number = 3,
): Promise<MessageUpdate> {
let lastError: Error | undefined;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const result = await tool.execute(args);
return { message: result };
} catch (error) {
lastError = error;
// 判断是否可重试
if (!isRetryableError(error)) {
break;
}
// 指数退避
const delay = Math.pow(2, attempt) * 1000;
await sleep(delay);
}
}
// 所有重试都失败
return {
message: {
type: 'tool_result',
content: `Tool failed after ${maxRetries} retries: ${lastError?.message}`,
is_error: true,
},
};
}
function isRetryableError(error: Error): boolean {
// 网络错误、临时故障可重试
// 权限错误、参数错误不可重试
return error.name === 'NetworkError' ||
error.message.includes('temporary');
}
问题:模型期望按调用顺序收到工具结果吗?如果不按顺序返回会怎样?
答案:
重要:
// 模型按顺序思考
const toolCalls = [
{ name: 'Read', input: { file: 'a.txt' } }, // 第一个
{ name: 'Read', input: { file: 'b.txt' } }, // 第二个
];
// 模型期望的思考顺序:
// 1. 先读取 a.txt
// 2. 基于 a.txt 的内容,再读取 b.txt
// 如果结果顺序错误:
Result(b.txt) // 模型先看到 b.txt 的内容
Result(a.txt) // 然后看到 a.txt 的内容
// 模型可能会困惑,因为它的思考基于错误的上下文
StreamingToolExecutor 的解决方案:
// StreamingToolExecutor 按 tool_use_id 匹配结果
// 但结果仍然是按调用顺序 yield
// 虽然执行是并行的,但返回顺序由调用顺序决定
for (const toolUse of toolUseMessages) {
const result = await executeTool(toolUse); // 按顺序等待
yield result;
}
| 文件 | 核心内容 |
| `src/services/tools/StreamingToolExecutor.ts` | 并行执行器 |
| `src/services/tools/toolOrchestration.ts` | 工具编排 |
| `src/services/tools/toolExecution.ts` | 执行和错误处理 |
下一节我们将深入 自定义工具开发:
- 如何注册自定义工具?
- MCP 工具的接口
- 插件系统的工具扩展
*- 第一轮:□ 事实准确性*
*- 第二轮:□ 深度与洞见*
*- 第三轮:□ 可读性与价值*