第三节:工具执行流程
执行器与错误处理

作者:小学子 📚 | 日期:2026年4月2日 | 第二阶段 · 模块二


# 第二阶段 · 模块二 · 第三节:工具执行机制

核心问题

StreamingToolExecutor 的并行执行是如何工作的?runTools 的顺序执行和 StreamingToolExecutor 的并行执行有什么区别?工具执行失败的错误处理流程是什么?


◇ 本节位置


        Claude Code 全局架构
        
        ┌─────────────────────────────────────────────────────────────────────┐
        │  查询引擎层(QueryEngine ──> query)                                │
        │                                                                      │
        │  query.ts                                                          │
        │  ├── callModel() ──> API 调用                                       │
        │  ├── StreamingToolExecutor ──> 并行工具执行 ← 本节                  │
        │  └── runTools ──> 顺序工具执行                                       │
        └─────────────────────────────────────────────────────────────────────┘
        


一、两种执行模式

1.1 runTools vs StreamingToolExecutor

方面runToolsStreamingToolExecutor
执行方式顺序执行并行执行
时机模型流式完成后与模型流式并行
并发数1可配置(默认10)
代码位置`toolOrchestration.ts``StreamingToolExecutor.ts`

1.2 五问分析

问 1:为什么需要两种执行模式?


        // 场景:模型返回多个工具调用
        {
          type: 'assistant',
          content: [
            { type: 'tool_use', name: 'Read', input: { file_path: 'a.txt' } },
            { type: 'tool_use', name: 'Read', input: { file_path: 'b.txt' } },
            { type: 'tool_use', name: 'Read', input: { file_path: 'c.txt' } },
          ]
        }
        
        // runTools:顺序执行
        read('a.txt') → read('b.txt') → read('c.txt')  // 总耗时 = t1 + t2 + t3
        
        // StreamingToolExecutor:并行执行
        read('a.txt')
        read('b.txt')  // 同时执行
        read('c.txt')
        // 总耗时 = max(t1, t2, t3)
        

问 2:StreamingToolExecutor 的核心思想?


        // 核心:工具执行和模型输出流式并行
        
        // 模型流式输出:
        for await (const event of model.stream()) {
          if (event.type === 'tool_use') {
            // 边接收 tool_use,边执行工具
            streamingExecutor.addPending(event);
          }
        }
        
        // 同时,工具执行结果可以边产生边返回
        for await (const result of streamingExecutor.getRemainingResults()) {
          yield result;
        }
        

问 3:什么是 isConcurrencySafe?


        // 某些工具调用可以并行,某些不能
        // 并行安全 = 不修改共享状态
        
        const concurrencySafeTools = [
          'Read',      // 读文件,不修改任何东西
          'Glob',      // 只搜索,不修改
          'Grep',      // 只搜索,不修改
          'WebSearch', // 只查询,不修改
        ];
        
        const unsafeTools = [
          'Write',     // 写文件,可能冲突
          'Edit',      // 修改文件,可能冲突
          'Bash',      // 执行命令,副作用未知
        ];
        
        // 安全的工具可以并行,不安全的必须顺序执行
        


二、StreamingToolExecutor 详解

2.1 源码位置

源码位置:`src/services/tools/StreamingToolExecutor.ts`


        export class StreamingToolExecutor {
          // 待执行的工具调用
          private pendingToolUses: ToolUseBlock[] = [];
        
          // 并发控制器
          private semaphore: Semaphore;
        
          // 线程安全的队列
          private results: MessageUpdate[] = [];
        
          // Signal to wake up getRemainingResults when progress is available
          private wakeupSignal = new Signal();
        
          async *getRemainingResults(): AsyncGenerator<MessageUpdate, void> {
            while (true) {
              // 等待新结果或所有工具完成
              while (this.results.length === 0 && !this.isDone()) {
                await this.wakeupSignal.wait();
              }
        
              // yield 所有可用结果
              while (this.results.length > 0) {
                yield this.results.shift()!;
              }
        
              // 所有工具都执行完毕,退出
              if (this.isDone()) {
                return;
              }
            }
          }
        
          addPending(toolUse: ToolUseBlock): void {
            this.pendingToolUses.push(toolUse);
            this.executeTool(toolUse);  // 立即开始执行
          }
        }
        

2.2 执行流程


        StreamingToolExecutor 执行流程:
        
        addPending(toolUse) 收到工具调用
            │
            ├── 检查 isConcurrencySafe
            │
            ├── 如果安全:
            │     └── 立即并发执行(semaphore 控制)
            │
            └── 如果不安全:
                  └── 等待之前的工具完成后再执行
            │
            ▼
        executeTool() 执行工具
            │
            ├── 调用 canUseTool() 检查权限
            │
            ├── 如果拒绝:
            │     └── 记录拒绝原因,返回 permission denied
            │
            └── 如果允许:
                  └── 调用 tool.execute()
            │
            ▼
        结果放入 results 队列
            │
            ▼
        getRemainingResults() yield 结果
        

2.3 五问分析

问 1:信号量(Semaphore)的作用?


        // 控制并发数,避免同时执行过多工具
        class Semaphore {
          private permits: number;
          private waiters: (() => void)[] = [];
        
          constructor(permits: number) {
            this.permits = permits;
          }
        
          async acquire(): Promise<void> {
            if (this.permits > 0) {
              this.permits--;
              return;
            }
            // 等待
            await new Promise(resolve => this.waiters.push(resolve));
          }
        
          release(): void {
            this.permits++;
            const waiter = this.waiters.shift();
            if (waiter) {
              waiter();
            }
          }
        }
        
        // 使用
        const maxConcurrency = parseInt(process.env.MAX_TOOL_CONCURRENCY || '10');
        const semaphore = new Semaphore(maxConcurrency);
        
        async function executeTool(toolUse) {
          await semaphore.acquire();
          try {
            return await tool.execute();
          } finally {
            semaphore.release();
          }
        }
        

问 2:Signal.wait() 是什么?


        // 用于在没有结果时休眠,有结果时唤醒
        class Signal {
          private promise: Promise<void>;
          private resolve: () => void;
        
          constructor() {
            this.promise = new Promise(resolve => {
              this.resolve = resolve;
            });
          }
        
          wait(): Promise<void> {
            return this.promise;
          }
        
          signal(): void {
            // 创建一个新的 Promise,等待下一次 signal
            this.promise = new Promise(resolve => {
              this.resolve = resolve;
            });
          }
        }
        

问 3:为什么不安全工具要等待?


        // 示例:不安全工具的并发问题
        
        // 场景:两个 Edit 工具同时修改同一文件
        Edit({ file_path: 'a.txt', old: 'foo', new: 'bar' })
        Edit({ file_path: 'a.txt', old: 'baz', new: 'qux' })
        
        // 并发执行可能产生冲突:
        // Tool1: 读取文件
        // Tool2: 读取文件
        // Tool1: 修改内容
        // Tool2: 修改内容  // 覆盖了 Tool1 的修改!
        
        // 解决:顺序执行
        // Tool1: 读取 → 修改 → 完成
        // Tool2: 等待 Tool1 完成 → 读取(最新内容)→ 修改
        

问 4:StreamingToolExecutor 如何知道工具完成?


        // 每次 tool.execute() 完成后,检查是否所有工具都完成
        async function executeTool(toolUse: ToolUseBlock): Promise<void> {
          try {
            const result = await tool.execute();
            this.results.push({ message: result, ... });
          } finally {
            // 无论成功还是失败,都标记为完成
            this.completedCount++;
        
            // 如果所有工具都完成,唤醒等待者
            if (this.completedCount === this.totalCount) {
              this.wakeupSignal.signal();
            }
          }
        }
        

问 5:StreamingToolExecutor 的局限性?


        // 局限性 1:只能处理简单依赖
        // 无法处理复杂依赖链:A 依赖 B 的结果,B 依赖 C 的结果
        
        // 局限性 2:结果顺序不确定
        // A, B, C 同时开始,A 最慢,C 最快
        // 结果顺序可能是:C, B, A
        // 但模型期望按调用顺序返回
        
        // 局限性 3:内存压力
        // 同时执行太多工具会占用大量内存
        


三、runTools 详解

3.1 源码位置

源码位置:`src/services/tools/toolOrchestration.ts`


        export async function* runTools(
          toolUseMessages: ToolUseBlock[],
          assistantMessages: AssistantMessage[],
          canUseTool: CanUseToolFn,
          toolUseContext: ToolUseContext,
        ): AsyncGenerator<MessageUpdate, void> {
          let currentContext = toolUseContext;
        
          // 按顺序处理每个工具
          for (const { isConcurrencySafe, blocks } of partitionToolCalls(
            toolUseMessages,
            currentContext,
          )) {
            if (isConcurrencySafe) {
              // 安全工具:并行执行
              for await (const update of runToolsConcurrently(
                blocks,
                assistantMessages,
                canUseTool,
                currentContext,
              )) {
                yield update;
              }
            } else {
              // 不安全工具:顺序执行
              for (const block of blocks) {
                for await (const update of runToolUse(
                  block,
                  assistantMessages,
                  canUseTool,
                  currentContext,
                )) {
                  yield update;
                }
              }
            }
          }
        }
        

3.2 partitionToolCalls 的作用


        // 将工具调用分组:安全的和需要顺序的
        
        function partitionToolCalls(
          toolUseMessages: ToolUseBlock[],
          context: ToolUseContext,
        ): Array<{ isConcurrencySafe: boolean; blocks: ToolUseBlock[] }> {
          // 分区逻辑
          const safeBlocks: ToolUseBlock[] = [];
          const unsafeBlocks: ToolUseBlock[] = [];
        
          for (const toolUse of toolUseMessages) {
            if (isToolConcurrencySafe(toolUse.name)) {
              safeBlocks.push(toolUse);
            } else {
              unsafeBlocks.push(toolUse);
            }
          }
        
          return [
            { isConcurrencySafe: true, blocks: safeBlocks },
            { isConcurrencySafe: false, blocks: unsafeBlocks },
          ];
        }
        

3.3 五问分析

问 1:为什么分区?


        // 分区的目的是:最大化并行,同时保证安全
        
        // 示例输入:
        [Read('a.txt'), Write('b.txt'), Read('c.txt')]
        
        // 分区后:
        [{
          isConcurrencySafe: true,
          blocks: [Read('a.txt'), Read('c.txt')]  // 并行执行
        }, {
          isConcurrencySafe: false,
          blocks: [Write('b.txt')]  // 顺序执行(但只有一个,不需要等待)
        }]
        
        // 执行顺序:
        // 1. Read('a.txt') 和 Read('c.txt') 并行
        // 2. Write('b.txt') 顺序执行(因为在 unsafe 分区)
        

问 2:runTools 和 StreamingToolExecutor 的选择?


        // 选择 StreamingToolExecutor:
        // - 模型支持流式输出
        // - 工具结果可以边产生边返回
        // - 需要最大化并行度
        
        // 选择 runTools:
        // - 模型不支持流式
        // - 需要严格的顺序保证
        // - 更简单的错误处理
        

问 3:runToolsConcurrently 的实现?


        async function* runToolsConcurrently(
          blocks: ToolUseBlock[],
          canUseTool: CanUseToolFn,
          context: ToolUseContext,
        ): AsyncGenerator<MessageUpdate, void> {
          // 使用 Promise.all 并行执行所有工具
          const promises = blocks.map(block =>
            runToolUse(block, canUseTool, context)
          );
        
          // 按顺序 yield 结果(即使完成顺序不同)
          for (const promise of promises) {
            const result = await promise;
            yield result;
          }
        }
        


四、工具执行结果处理

4.1 结果格式

源码位置:`src/services/tools/toolExecution.ts`


        // 工具执行结果
        interface ToolResult {
          type: 'tool_result';
          tool_use_id: string;      // 对应的 tool_use 块 ID
          content: string | ContentBlock[];  // 结果内容
          is_error?: boolean;       // 是否是错误
        }
        
        // 转换为消息格式
        function toolResultToMessage(result: ToolResult): Message {
          return {
            type: 'user',  // 注意:tool_result 是 user 类型的消息
            content: result.content,
            tool_use_id: result.tool_use_id,
            is_error: result.is_error,
          };
        }
        

4.2 错误处理流程


        async function executeToolWithErrorHandling(
          tool: Tool,
          args: ToolArgs,
          context: ToolContext,
        ): Promise<MessageUpdate> {
          try {
            // 1. 权限检查
            const permission = await canUseTool(tool, args, context);
            if (!permission.allowed) {
              return {
                message: createErrorMessage(
                  `Permission denied: ${permission.reason}`,
                  context.toolUseId,
                ),
                newContext: context,
              };
            }
        
            // 2. 执行工具
            const result = await tool.execute(args, context);
        
            // 3. 返回成功结果
            return {
              message: toolResultToMessage(result),
              newContext: context,
            };
          } catch (error) {
            // 4. 处理执行错误
            return {
              message: createErrorMessage(
                `Tool execution failed: ${error.message}`,
                context.toolUseId,
              ),
              newContext: context,
            };
          }
        }
        
        function createErrorMessage(
          content: string,
          toolUseId: string,
        ): Message {
          return {
            type: 'user',  // tool_result 是 user 类型
            content,
            tool_use_id: toolUseId,
            is_error: true,
          };
        }
        

4.3 五问分析

问 1:为什么 tool_result 是 user 类型?


        // 在 Claude 的消息格式中:
        // - user:用户发送的消息
        // - assistant:模型生成的消息
        
        // tool_result 本质上是"用户提供的额外信息"
        // 所以使用 user 类型
        
        {
          type: 'user',
          role: 'tool',
          content: 'File content here...',
          tool_use_id: 'abc123',
        }
        

问 2:工具失败后模型会怎样?


        // 模型会收到 tool_result(is_error: true)
        
        // 模型可能的反应:
        // 1. 尝试修复错误
        // 2. 请求用户帮助
        // 3. 放弃当前任务
        
        // 示例:模型试图读取不存在的文件
        {
          type: 'tool_result',
          content: 'Error: File not found',
          is_error: true,
        }
        // 模型看到错误后,可能:
        // - "File not found. Let me check the correct path..."
        // - "I couldn't read the file. Can you help me?"
        

问 3:如何避免工具执行超时?


        // 每个工具都有默认超时
        const DEFAULT_TIMEOUT_MS = 60_000; // 60秒
        
        async function executeWithTimeout(
          tool: Tool,
          args: ToolArgs,
          timeoutMs: number = DEFAULT_TIMEOUT_MS,
        ): Promise<MessageUpdate> {
          const controller = new AbortController();
        
          const timeout = setTimeout(() => {
            controller.abort();
          }, timeoutMs);
        
          try {
            const result = await tool.execute(args, {
              signal: controller.signal,
            });
            return { message: result };
          } catch (error) {
            if (error.name === 'AbortError') {
              return {
                message: {
                  type: 'tool_result',
                  content: `Tool timed out after ${timeoutMs}ms`,
                  is_error: true,
                },
              };
            }
            throw error;
          } finally {
            clearTimeout(timeout);
          }
        }
        


五、思考题

思考题 1:StreamingToolExecutor 适合所有场景吗?

问题:StreamingToolExecutor 的并行执行不适合哪些场景?

答案

1. 有依赖关系的工具


        // 工具 B 需要工具 A 的结果作为输入
        // 不能并行
        const resultA = await toolA();
        const resultB = await toolB(resultA.output);
        

2. 修改共享状态


        // 两个工具都写同一个文件
        // 并行执行可能导致数据丢失
        Edit({ file: 'a.txt', old: 'x', new: 'y' })
        Edit({ file: 'a.txt', old: 'z', new: 'w' })
        

3. 资源密集型工具


        // 同时执行太多 CPU/内存密集型工具
        // 可能导致系统资源耗尽
        // 应该限制并发数
        


思考题 2:如何实现工具执行的重试?

问题:当工具执行失败时,如何实现自动重试?

答案


        async function executeWithRetry(
          tool: Tool,
          args: ToolArgs,
          maxRetries: number = 3,
        ): Promise<MessageUpdate> {
          let lastError: Error | undefined;
        
          for (let attempt = 0; attempt < maxRetries; attempt++) {
            try {
              const result = await tool.execute(args);
              return { message: result };
            } catch (error) {
              lastError = error;
        
              // 判断是否可重试
              if (!isRetryableError(error)) {
                break;
              }
        
              // 指数退避
              const delay = Math.pow(2, attempt) * 1000;
              await sleep(delay);
            }
          }
        
          // 所有重试都失败
          return {
            message: {
              type: 'tool_result',
              content: `Tool failed after ${maxRetries} retries: ${lastError?.message}`,
              is_error: true,
            },
          };
        }
        
        function isRetryableError(error: Error): boolean {
          // 网络错误、临时故障可重试
          // 权限错误、参数错误不可重试
          return error.name === 'NetworkError' ||
                 error.message.includes('temporary');
        }
        


思考题 3:工具执行结果的顺序重要吗?

问题:模型期望按调用顺序收到工具结果吗?如果不按顺序返回会怎样?

答案

重要


        // 模型按顺序思考
        const toolCalls = [
          { name: 'Read', input: { file: 'a.txt' } },  // 第一个
          { name: 'Read', input: { file: 'b.txt' } },  // 第二个
        ];
        
        // 模型期望的思考顺序:
        // 1. 先读取 a.txt
        // 2. 基于 a.txt 的内容,再读取 b.txt
        
        // 如果结果顺序错误:
        Result(b.txt)  // 模型先看到 b.txt 的内容
        Result(a.txt)  // 然后看到 a.txt 的内容
        // 模型可能会困惑,因为它的思考基于错误的上下文
        

StreamingToolExecutor 的解决方案


        // StreamingToolExecutor 按 tool_use_id 匹配结果
        // 但结果仍然是按调用顺序 yield
        
        // 虽然执行是并行的,但返回顺序由调用顺序决定
        for (const toolUse of toolUseMessages) {
          const result = await executeTool(toolUse);  // 按顺序等待
          yield result;
        }
        


六、延伸阅读

文件核心内容
`src/services/tools/StreamingToolExecutor.ts`并行执行器
`src/services/tools/toolOrchestration.ts`工具编排
`src/services/tools/toolExecution.ts`执行和错误处理


七、下节预告

下一节我们将深入 自定义工具开发

- 如何注册自定义工具?

- MCP 工具的接口

- 插件系统的工具扩展


*- 第一轮:□ 事实准确性*

*- 第二轮:□ 深度与洞见*

*- 第三轮:□ 可读性与价值*