GitHub user da-daken created a discussion: Parallel Tool Call Execution
## Motivation ### Background Flink Agents already supports async durable execution for high-latency I/O: + **Python**: `await ctx.durable_execute_async(...)` submits work to a thread pool and yields the operator. See [Discussion #404](https://github.com/apache/flink-agents/discussions/404). + **Java (JDK 21+)**: `ctx.durableExecuteAsync(DurableCallable)` uses the Continuation API to yield the mailbox thread while work runs on an async thread pool. See [Discussion #429](https://github.com/apache/flink-agents/discussions/429). The built-in `tool_call_action` listens to `ToolRequestEvent`, looks up each tool resource, executes it through durable execution, and emits `ToolResponseEvent` after all tool calls in the batch are processed. See [tool_use.md](../docs/content/docs/development/tool_use.md). When the LLM returns multiple tool calls in a single response, each call is typically an independent HTTP / MCP / RPC request. Running them in parallel can significantly reduce end-to-end latency. ### The Problem Today, `ToolCallAction.processToolRequest` processes tool calls **serially**: ```java for (Map<String, Object> toolCall : toolRequest.getToolCalls()) { // ... response = toolCallAsync ? ctx.durableExecuteAsync(callable) : ctx.durableExecute(callable); } ``` Even with `tool-call.async=true` (default) on JDK 21+, the behavior is: 1. Tool 1 starts and calls `durableExecuteAsync`. 2. The Continuation yields; the mailbox thread becomes idle and **can process other action tasks** (other keys, or other queued actions for the same key). 3. When Tool 1 completes, the Continuation resumes — but execution is still inside the `for` loop. 4. Tool 2 does **not** start until Tool 1's `durableExecuteAsync` returns. So async execution today achieves **inter-action concurrency**, not **intra-batch tool concurrency**. This is consistent with [Discussion #429](https://github.com/apache/flink-agents/discussions/429), which explicitly states: > **Serial Execution**: Multiple `executeAsync` calls are executed serially > (consistent with Python behavior). > For a `ToolRequestEvent` with N independent I/O-bound tools, total latency is approximately the **sum** of individual latencies instead of the **max**. Additionally, all tool calls currently share the same durable `functionId`: ```java public String getId() { return "tool-call"; } ``` Fine-grained durable execution ([Discussion #404](https://github.com/apache/flink-agents/discussions/404)) matches calls by `(callIndex, functionId, argsDigest)`. Reusing `"tool-call"` for every tool in a batch makes recovery matching fragile when call order or batch composition changes. ### Goal Enable **parallel execution of multiple tool calls within a single **`ToolRequestEvent`, while: 1. Preserving the **mailbox thread model** ([Discussion #429](https://github.com/apache/flink-agents/discussions/429)): memory access, `sendEvent`, and durable state recording remain on the mailbox thread. 2. Remaining compatible with **fine-grained durable execution** ([Discussion #404](https://github.com/apache/flink-agents/discussions/404)): deterministic call order for recovery, per-call result persistence. 3. Optionally supporting **reconciler hooks** for in-flight recovery ([Discussion #598](https://github.com/apache/flink-agents/discussions/598)) on a per-tool basis. 4. Degrading gracefully: JDK < 21 and `tool-call.async=false` fall back to serial execution. --- ## Design Goals ```plain // Goal: LLM returns 3 tool calls; all HTTP requests run concurrently // Total latency ≈ max(latency_i), not sum(latency_i) ToolRequestEvent { tool_calls: [call_a, call_b, call_c] } │ ▼ tool_call_action (mailbox thread) ├── durableExecuteAllAsync([callable_a, callable_b, callable_c]) │ ├── submit call_a ──> async pool │ ├── submit call_b ──> async pool (concurrent) │ └── submit call_c ──> async pool ├── yield (mailbox idle → other actions can run) ├── resume when all complete └── sendEvent(ToolResponseEvent) ``` | Goal | Description | | --- | --- | | Parallelism | N tools in one batch execute concurrently on the async thread pool | | Mailbox safety | `RunnerContext` (memory, events, metrics) accessed only on mailbox thread | | Durable recovery | Call results persisted in deterministic order; replay skips completed tools | | Backward compatible | Serial path unchanged when parallel disabled or unsupported | | Cross-language | Java and Python follow the same semantics | ### Non-Goals (first version) + Parallelism **across** different `ToolRequestEvent`s (already handled by Flink parallelism + async yield) + Framework-level automatic retry per tool (see Future Work; users handle in tool body today) + Changing the external `ToolRequestEvent` / `ToolResponseEvent` wire format --- ## API Design ### New Config Option ```java // AgentExecutionOptions public static final ConfigOption<Boolean> TOOL_CALL_PARALLEL = new ConfigOption<>("tool-call.parallel", Boolean.class, true); ``` | Config | Behavior | | --- | --- | | `tool-call.async=false` | Serial sync execution (unchanged) | | `tool-call.async=true`, `tool-call.parallel=false` | Serial async execution (current behavior) | | `tool-call.async=true`, `tool-call.parallel=true` | Parallel async batch execution (new) | Existing `num-async-threads` ([Discussion #429](https://github.com/apache/flink-agents/discussions/429)) caps concurrent async work globally. A batch of N tools may use up to N threads from this pool. Optional future knob: `tool-call.max-parallelism` to limit concurrent tools per batch. ### New RunnerContext Method ```java public interface RunnerContext { /** * Executes multiple durable calls concurrently and returns results in input order. * * <p>On JDK 21+, submits all uncached calls to the async thread pool, yields the * Continuation once until all complete, then records durable results in list order * on the mailbox thread. * * <p>On JDK < 21, falls back to serial {@link #durableExecute(DurableCallable)}. * * <p>The action must be deterministic: the same list of {@link DurableCallable} * instances (same order, same {@link DurableCallable#getId()}) must be produced * on recovery. Otherwise, subsequent cached results are cleared per existing rules. * * <p><b>Note:</b> Access to memory and sendEvent are prohibited within callables. */ <T> List<T> durableExecuteAllAsync(List<DurableCallable<T>> callables) throws Exception; } ``` Python equivalent: ```python async def durable_execute_all_async( self, callables: list[tuple[Callable, tuple, dict] | Callable], ) -> list[Any]: ... ``` ### Per-Tool Durable Identity Each tool callable must use a stable, unique `functionId`: ```java @Override public String getId() { return "tool-call-" + toolCallId; // e.g. "tool-call-call_abc123" } ``` --- ## Design Approach ### Recommended: Batch Async Durable API (Fan-out / Fan-in) Extend the runtime with `durableExecuteAllAsync` rather than changing the event model. `ToolCallAction` becomes a thin caller. | Criterion | Batch API | | --- | --- | | Preserves mailbox model | ✅ | | Minimal event model change | ✅ | | Reuses Continuation + durable state machine | ✅ | | Implementation complexity | Medium | | Per-tool independent yield / scheduling | Single batch yield | Batch API is the recommended first step. Action Task Split remains a valid future evolution if finer-grained interleaving with other actions is needed. --- ## Core Component Design ### ContinuationContext Extension ```java public class ContinuationContext { private volatile Future<?> pendingFuture; // existing: single-call async private volatile Future<?> pendingBatchFuture; // new: batch barrier public boolean hasPendingAsync() { return isPending(pendingFuture) || isPending(pendingBatchFuture); } } ``` `ContinuationActionExecutor.executeAction` checks `hasPendingAsync()` instead of only `pendingFuture`: ```java if (context.hasPendingAsync()) { return false; // action not finished; mailbox can process other tasks } // resume continuation ``` ### ContinuationActionExecutor.executeAllAsync (JDK 21) ```java public <T> List<T> executeAllAsync( ContinuationContext context, List<Supplier<T>> suppliers) throws Exception { List<Future<T>> futures = suppliers.stream() .map(s -> asyncExecutor.submit(() -> s.get())) .toList(); Future<Void> barrier = CompletableFuture.allOf( futures.stream().map(f -> (CompletableFuture<?>) CompletableFuture.supplyAsync(() -> { f.join(); return null; })).toArray(CompletableFuture[]::new)); context.setPendingBatchFuture(barrier); while (!barrier.isDone()) { Continuation.yield(SCOPE); } context.setPendingBatchFuture(null); return futures.stream().map(Future::join).toList(); } ``` ### JavaRunnerContextImpl.durableExecuteAllAsync Three phases, all orchestrated from the mailbox thread: **Phase 1 — Prepare (mailbox thread)** For each `DurableCallable` in input order: 1. `tryGetCachedResult(functionId, argsDigest)` — on hit, fill result slot. 2. On miss, add to `pendingList` with original index. **Phase 2 — Fan-out + Yield (mailbox initiates, pool executes)** 1. Submit all pending callables to `ContinuationActionExecutor.executeAllAsync`. 2. Continuation yields once until all futures complete. 3. During yield, mailbox processes other action tasks ([Discussion #429](https://github.com/apache/flink-agents/discussions/429) flow). **Phase 3 — Fan-in + Persist (mailbox thread)** For each index in original order: 1. Merge cached + async results. 2. `recordDurableCompletion(functionId, argsDigest, result, exception)` — **strictly in tool_calls order**. 3. Return ordered result list to caller. ### ToolCallAction Changes ```java public static void processToolRequest(Event event, RunnerContext ctx) { ToolRequestEvent toolRequest = ToolRequestEvent.fromEvent(event); boolean async = ctx.getConfig().get(AgentExecutionOptions.TOOL_CALL_ASYNC); boolean parallel = ctx.getConfig().get(AgentExecutionOptions.TOOL_CALL_PARALLEL); List<Map<String, Object>> toolCalls = toolRequest.getToolCalls(); List<DurableCallable<ToolResponse>> callables = buildCallables(toolCalls, ctx); List<ToolResponse> results; if (async && parallel && callables.size() > 1) { results = ctx.durableExecuteAllAsync(callables); } else { results = executeSequentially(callables, async, ctx); } ctx.sendEvent(buildToolResponseEvent(toolRequest, results, ...)); } ``` `buildCallables` resolves tools, handles missing-tool errors inline (no async submission for non-existent tools), and assigns `functionId = "tool-call-" + toolCallId`. --- ## Execution Flow ### Parallel Batch (JDK 21+) ```plain ┌──────────────────────────────────────────────────────────────────────────────┐ │ Parallel Tool Batch Execution Flow │ ├──────────────────────────────────────────────────────────────────────────────┤ │ │ │ Mailbox Thread: │ │ [scan cache] → [submit N tasks] ──yield──> [idle] ──mail──> [record] │ │ │ [sendEvent] │ │ │ │ │ Async Pool: [tool_a HTTP] ──────────┤ │ │ [tool_b HTTP] ─ parallel┤ │ │ [tool_c HTTP] ──────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────────┘ Timeline (3 tools, 2s each): ──────────────────────────────────────────────────────────────────────────────> Mailbox: [prepare] ─yield─> [idle: other actions] ─mail─> [fan-in + sendEvent] Pool: [tool_a ──────>│] [tool_b ──────>│] (~2s total, not 6s) [tool_c ──────>│] ``` Compare with current serial async: ```plain Mailbox: [tool_a yield] ─> [idle] ─> [resume] [tool_b yield] ─> [idle] ─> ... Pool: [tool_a 2s] [tool_b 2s] [tool_c 2s] Total: ~6s ``` --- ## Recovery Logic Aligned with [Discussion #404](https://github.com/apache/flink-agents/discussions/404) and [Discussion #598](https://github.com/apache/flink-agents/discussions/598). ### Normal Recovery On recovery, `processToolRequest` re-executes and calls `durableExecuteAllAsync` with the same ordered callable list: | Slot state at callIndex | Behavior | | --- | --- | | `SUCCEEDED` / cached hit | Return cached result; do not submit to pool | | `FAILED` / cached exception | Re-throw cached exception | | `PENDING` (reconcilable) | Run `reconciler()` per [#598](https://github.com/apache/flink-agents/discussions/598) | | Miss (not started before failover) | Include in fan-out submission | Recording after fan-in always follows **tool_calls list order**, regardless of which tools completed first in the previous attempt. ### Partial Batch Failover Example: 3 tools, tool 0 and 1 persisted as `SUCCEEDED`, tool 2 was `PENDING` when failover occurred: 1. Phase 1 cache scan: slots 0, 1 hit; slot 2 is `PENDING` or miss. 2. Phase 2 fan-out: only tool 2 submitted (or reconciler invoked for `PENDING`). 3. Phase 3 fan-in: record slot 2; slots 0, 1 not re-recorded (already in `callRecords`). ### Call-Order Mismatch If recovery detects `functionId` / `argsDigest` mismatch at any callIndex ([Discussion #404](https://github.com/apache/flink-agents/discussions/404)): + Clear current and subsequent `CallResult` entries. + Re-execute the full batch from the mismatch point. Using per-tool `functionId = "tool-call-{id}"` makes mismatch detection precise when LLM returns different tool sets across retries. ### Reconciler Integration HTTP / MCP tools with side effects should implement `DurableCallable.reconciler()` per [Discussion #598](https://github.com/apache/flink-agents/discussions/598): ```java new DurableCallable<ToolResponse>() { @Override public String getId() { return "tool-call-" + toolCallId; } @Override public ToolResponse call() throws Exception { return toolRef.call(new ToolParameters(arguments)); } @Override public Callable<ToolResponse> reconciler() { return () -> toolRef.reconcile(new ToolParameters(arguments)); } }; ``` For parallel batches, each tool's reconciler runs independently during recovery when its slot is `PENDING`. --- ## Implementation ### Module Changes | Module | Change | | --- | --- | | `api` | Add `TOOL_CALL_PARALLEL`, `RunnerContext.durableExecuteAllAsync` | | `runtime` (java21) | `ContinuationContext.pendingBatchFuture`, `executeAllAsync`, update `executeAction` | | `runtime` | `JavaRunnerContextImpl.durableExecuteAllAsync` with 3-phase fan-out/fan-in | | `runtime` (java11) | Fallback: serial `durableExecute` loop | | `plan` | Refactor `ToolCallAction`; fix per-tool `functionId` | | `python` | `durable_execute_all_async` + update `tool_call_action.py` | ### Multi-release JAR Same pattern as [Discussion #429](https://github.com/apache/flink-agents/discussions/429): ```plain flink-agents-runtime-{version}.jar ├── .../ContinuationActionExecutor.class # JDK 11: serial fallback └── META-INF/versions/21/ └── .../ContinuationActionExecutor.class # JDK 21: batch async ``` ### Migration + Default `tool-call.parallel=true`: existing jobs with multi-tool batches get parallel behavior automatically when on JDK 21+. + Changing from shared `"tool-call"` to `"tool-call-{id}"` affects in-flight `ActionState` during rolling upgrade. Mitigation: treat functionId change as call-order mismatch → clear and re-execute (safe, at-most-once). --- ## Documentation + Update [tool_use.md](../docs/content/docs/development/tool_use.md): parallel tool execution, config options. + Update [workflow_agent.md](../docs/content/docs/development/workflow_agent.md) Async Execution section: document `durableExecuteAllAsync`, note that `asyncio.gather` remains unsupported in Python actions (use `durable_execute_all_async` instead). + Update [configuration.md](../docs/content/docs/operations/configuration.md): `tool-call.parallel`. --- ## Testing | Test | Description | | --- | --- | | Parallel latency | 3 tools × 2s sleep → total < 4s (not 6s) | | Yield behavior | During batch yield, other action task for different key completes | | Durable recovery | Failover after 1/3 tools complete → recovery replays 1, re-runs 2–3 | | Call-order mismatch | Different toolCallId set on recovery → cache cleared, full re-exec | | Reconciler + parallel | 2 tools `PENDING` on recovery → reconcilers invoked independently | | Serial fallback | `tool-call.parallel=false` and JDK < 21 paths unchanged | | functionId uniqueness | Each tool in batch has distinct `tool-call-{id}` | GitHub link: https://github.com/apache/flink-agents/discussions/855 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
