GitHub user da-daken created a discussion: Parallel Tool Call Execution

## Motivation
### Background
Flink Agents already supports async durable execution for high-latency I/O:

+ **Python**: `await ctx.durable_execute_async(...)` submits work to a thread 
pool and yields the operator. See [Discussion 
#404](https://github.com/apache/flink-agents/discussions/404).
+ **Java (JDK 21+)**: `ctx.durableExecuteAsync(DurableCallable)` uses the 
Continuation API to yield the mailbox thread while work runs on an async thread 
pool. See [Discussion 
#429](https://github.com/apache/flink-agents/discussions/429).

The built-in `tool_call_action` listens to `ToolRequestEvent`, looks up each 
tool resource, executes it through durable execution, and emits 
`ToolResponseEvent` after all tool calls in the batch are processed. See 
[tool_use.md](../docs/content/docs/development/tool_use.md).

When the LLM returns multiple tool calls in a single response, each call is 
typically an independent HTTP / MCP / RPC request. Running them in parallel can 
significantly reduce end-to-end latency.

### The Problem
Today, `ToolCallAction.processToolRequest` processes tool calls **serially**:

```java
for (Map<String, Object> toolCall : toolRequest.getToolCalls()) {
    // ...
    response = toolCallAsync
            ? ctx.durableExecuteAsync(callable)
            : ctx.durableExecute(callable);
}
```

Even with `tool-call.async=true` (default) on JDK 21+, the behavior is:

1. Tool 1 starts and calls `durableExecuteAsync`.
2. The Continuation yields; the mailbox thread becomes idle and **can process 
other action tasks** (other keys, or other queued actions for the same key).
3. When Tool 1 completes, the Continuation resumes — but execution is still 
inside the `for` loop.
4. Tool 2 does **not** start until Tool 1's `durableExecuteAsync` returns.

So async execution today achieves **inter-action concurrency**, not 
**intra-batch tool concurrency**. This is consistent with [Discussion 
#429](https://github.com/apache/flink-agents/discussions/429), which explicitly 
states:

> **Serial Execution**: Multiple `executeAsync` calls are executed serially 
> (consistent with Python behavior).
>

For a `ToolRequestEvent` with N independent I/O-bound tools, total latency is 
approximately the **sum** of individual latencies instead of the **max**.

Additionally, all tool calls currently share the same durable `functionId`:

```java
public String getId() {
    return "tool-call";
}
```

Fine-grained durable execution ([Discussion 
#404](https://github.com/apache/flink-agents/discussions/404)) matches calls by 
`(callIndex, functionId, argsDigest)`. Reusing `"tool-call"` for every tool in 
a batch makes recovery matching fragile when call order or batch composition 
changes.

### Goal
Enable **parallel execution of multiple tool calls within a single 
**`ToolRequestEvent`, while:

1. Preserving the **mailbox thread model** ([Discussion 
#429](https://github.com/apache/flink-agents/discussions/429)): memory access, 
`sendEvent`, and durable state recording remain on the mailbox thread.
2. Remaining compatible with **fine-grained durable execution** ([Discussion 
#404](https://github.com/apache/flink-agents/discussions/404)): deterministic 
call order for recovery, per-call result persistence.
3. Optionally supporting **reconciler hooks** for in-flight recovery 
([Discussion #598](https://github.com/apache/flink-agents/discussions/598)) on 
a per-tool basis.
4. Degrading gracefully: JDK < 21 and `tool-call.async=false` fall back to 
serial execution.

---

## Design Goals
```plain
// Goal: LLM returns 3 tool calls; all HTTP requests run concurrently
// Total latency ≈ max(latency_i), not sum(latency_i)

ToolRequestEvent { tool_calls: [call_a, call_b, call_c] }
    │
    ▼
tool_call_action (mailbox thread)
    ├── durableExecuteAllAsync([callable_a, callable_b, callable_c])
    │       ├── submit call_a ──> async pool
    │       ├── submit call_b ──> async pool   (concurrent)
    │       └── submit call_c ──> async pool
    ├── yield (mailbox idle → other actions can run)
    ├── resume when all complete
    └── sendEvent(ToolResponseEvent)
```

| Goal | Description |
| --- | --- |
| Parallelism | N tools in one batch execute concurrently on the async thread 
pool |
| Mailbox safety | `RunnerContext` (memory, events, metrics) accessed only on 
mailbox thread |
| Durable recovery | Call results persisted in deterministic order; replay 
skips completed tools |
| Backward compatible | Serial path unchanged when parallel disabled or 
unsupported |
| Cross-language | Java and Python follow the same semantics |


### Non-Goals (first version)
+ Parallelism **across** different `ToolRequestEvent`s (already handled by 
Flink parallelism + async yield)
+ Framework-level automatic retry per tool (see Future Work; users handle in 
tool body today)
+ Changing the external `ToolRequestEvent` / `ToolResponseEvent` wire format

---

## API Design
### New Config Option
```java
// AgentExecutionOptions
public static final ConfigOption<Boolean> TOOL_CALL_PARALLEL =
        new ConfigOption<>("tool-call.parallel", Boolean.class, true);
```

| Config | Behavior |
| --- | --- |
| `tool-call.async=false` | Serial sync execution (unchanged) |
| `tool-call.async=true`, `tool-call.parallel=false` | Serial async execution 
(current behavior) |
| `tool-call.async=true`, `tool-call.parallel=true` | Parallel async batch 
execution (new) |


Existing `num-async-threads` ([Discussion 
#429](https://github.com/apache/flink-agents/discussions/429)) caps concurrent 
async work globally. A batch of N tools may use up to N threads from this pool.

Optional future knob: `tool-call.max-parallelism` to limit concurrent tools per 
batch.

### New RunnerContext Method
```java
public interface RunnerContext {
    /**
     * Executes multiple durable calls concurrently and returns results in 
input order.
     *
     * <p>On JDK 21+, submits all uncached calls to the async thread pool, 
yields the
     * Continuation once until all complete, then records durable results in 
list order
     * on the mailbox thread.
     *
     * <p>On JDK &lt; 21, falls back to serial {@link 
#durableExecute(DurableCallable)}.
     *
     * <p>The action must be deterministic: the same list of {@link 
DurableCallable}
     * instances (same order, same {@link DurableCallable#getId()}) must be 
produced
     * on recovery. Otherwise, subsequent cached results are cleared per 
existing rules.
     *
     * <p><b>Note:</b> Access to memory and sendEvent are prohibited within 
callables.
     */
    <T> List<T> durableExecuteAllAsync(List<DurableCallable<T>> callables) 
throws Exception;
}
```

Python equivalent:

```python
async def durable_execute_all_async(
    self,
    callables: list[tuple[Callable, tuple, dict] | Callable],
) -> list[Any]: ...
```

### Per-Tool Durable Identity
Each tool callable must use a stable, unique `functionId`:

```java
@Override
public String getId() {
    return "tool-call-" + toolCallId;  // e.g. "tool-call-call_abc123"
}
```

---

## Design Approach
### Recommended: Batch Async Durable API (Fan-out / Fan-in)
Extend the runtime with `durableExecuteAllAsync` rather than changing the event 
model. `ToolCallAction` becomes a thin caller.

| Criterion | Batch API |
| --- | --- |
| Preserves mailbox model | ✅ |
| Minimal event model change | ✅ |
| Reuses Continuation + durable state machine | ✅ |
| Implementation complexity | Medium |
| Per-tool independent yield / scheduling | Single batch yield |


Batch API is the recommended first step. Action Task Split remains a valid 
future evolution if finer-grained interleaving with other actions is needed.

---

## Core Component Design
### ContinuationContext Extension
```java
public class ContinuationContext {
    private volatile Future<?> pendingFuture;       // existing: single-call 
async
    private volatile Future<?> pendingBatchFuture; // new: batch barrier

    public boolean hasPendingAsync() {
        return isPending(pendingFuture) || isPending(pendingBatchFuture);
    }
}
```

`ContinuationActionExecutor.executeAction` checks `hasPendingAsync()` instead 
of only `pendingFuture`:

```java
if (context.hasPendingAsync()) {
    return false;  // action not finished; mailbox can process other tasks
}
// resume continuation
```

### ContinuationActionExecutor.executeAllAsync (JDK 21)
```java
public <T> List<T> executeAllAsync(
        ContinuationContext context,
        List<Supplier<T>> suppliers) throws Exception {

    List<Future<T>> futures = suppliers.stream()
            .map(s -> asyncExecutor.submit(() -> s.get()))
            .toList();

    Future<Void> barrier = CompletableFuture.allOf(
            futures.stream().map(f -> (CompletableFuture<?>) 
CompletableFuture.supplyAsync(() -> {
                f.join(); return null;
            })).toArray(CompletableFuture[]::new));

    context.setPendingBatchFuture(barrier);

    while (!barrier.isDone()) {
        Continuation.yield(SCOPE);
    }

    context.setPendingBatchFuture(null);
    return futures.stream().map(Future::join).toList();
}
```

### JavaRunnerContextImpl.durableExecuteAllAsync
Three phases, all orchestrated from the mailbox thread:

**Phase 1 — Prepare (mailbox thread)**

For each `DurableCallable` in input order:

1. `tryGetCachedResult(functionId, argsDigest)` — on hit, fill result slot.
2. On miss, add to `pendingList` with original index.

**Phase 2 — Fan-out + Yield (mailbox initiates, pool executes)**

1. Submit all pending callables to `ContinuationActionExecutor.executeAllAsync`.
2. Continuation yields once until all futures complete.
3. During yield, mailbox processes other action tasks ([Discussion 
#429](https://github.com/apache/flink-agents/discussions/429) flow).

**Phase 3 — Fan-in + Persist (mailbox thread)**

For each index in original order:

1. Merge cached + async results.
2. `recordDurableCompletion(functionId, argsDigest, result, exception)` — 
**strictly in tool_calls order**.
3. Return ordered result list to caller.

### ToolCallAction Changes
```java
public static void processToolRequest(Event event, RunnerContext ctx) {
    ToolRequestEvent toolRequest = ToolRequestEvent.fromEvent(event);
    boolean async = ctx.getConfig().get(AgentExecutionOptions.TOOL_CALL_ASYNC);
    boolean parallel = 
ctx.getConfig().get(AgentExecutionOptions.TOOL_CALL_PARALLEL);

    List<Map<String, Object>> toolCalls = toolRequest.getToolCalls();
    List<DurableCallable<ToolResponse>> callables = buildCallables(toolCalls, 
ctx);

    List<ToolResponse> results;
    if (async && parallel && callables.size() > 1) {
        results = ctx.durableExecuteAllAsync(callables);
    } else {
        results = executeSequentially(callables, async, ctx);
    }

    ctx.sendEvent(buildToolResponseEvent(toolRequest, results, ...));
}
```

`buildCallables` resolves tools, handles missing-tool errors inline (no async 
submission for non-existent tools), and assigns `functionId = "tool-call-" + 
toolCallId`.

---

## Execution Flow
### Parallel Batch (JDK 21+)
```plain
┌──────────────────────────────────────────────────────────────────────────────┐
│ Parallel Tool Batch Execution Flow                                           │
├──────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Mailbox Thread:                                                             │
│    [scan cache] → [submit N tasks] ──yield──> [idle] ──mail──> [record]     │
│                                              │                  [sendEvent]  │
│                                              │                               │
│  Async Pool:         [tool_a HTTP] ──────────┤                               │
│                      [tool_b HTTP] ─ parallel┤                               │
│                      [tool_c HTTP] ──────────┘                               │
│                                                                              │
└──────────────────────────────────────────────────────────────────────────────┘

Timeline (3 tools, 2s each):
──────────────────────────────────────────────────────────────────────────────>
Mailbox:  [prepare] ─yield─> [idle: other actions] ─mail─> [fan-in + sendEvent]
Pool:              [tool_a ──────>│]
                   [tool_b ──────>│]  (~2s total, not 6s)
                   [tool_c ──────>│]
```

Compare with current serial async:

```plain
Mailbox:  [tool_a yield] ─> [idle] ─> [resume] [tool_b yield] ─> [idle] ─> ...
Pool:              [tool_a 2s]                  [tool_b 2s]           [tool_c 
2s]
Total: ~6s
```

---

## Recovery Logic
Aligned with [Discussion 
#404](https://github.com/apache/flink-agents/discussions/404) and [Discussion 
#598](https://github.com/apache/flink-agents/discussions/598).

### Normal Recovery
On recovery, `processToolRequest` re-executes and calls 
`durableExecuteAllAsync` with the same ordered callable list:

| Slot state at callIndex | Behavior |
| --- | --- |
| `SUCCEEDED` / cached hit | Return cached result; do not submit to pool |
| `FAILED` / cached exception | Re-throw cached exception |
| `PENDING` (reconcilable) | Run `reconciler()` per 
[#598](https://github.com/apache/flink-agents/discussions/598) |
| Miss (not started before failover) | Include in fan-out submission |


Recording after fan-in always follows **tool_calls list order**, regardless of 
which tools completed first in the previous attempt.

### Partial Batch Failover
Example: 3 tools, tool 0 and 1 persisted as `SUCCEEDED`, tool 2 was `PENDING` 
when failover occurred:

1. Phase 1 cache scan: slots 0, 1 hit; slot 2 is `PENDING` or miss.
2. Phase 2 fan-out: only tool 2 submitted (or reconciler invoked for `PENDING`).
3. Phase 3 fan-in: record slot 2; slots 0, 1 not re-recorded (already in 
`callRecords`).

### Call-Order Mismatch
If recovery detects `functionId` / `argsDigest` mismatch at any callIndex 
([Discussion #404](https://github.com/apache/flink-agents/discussions/404)):

+ Clear current and subsequent `CallResult` entries.
+ Re-execute the full batch from the mismatch point.

Using per-tool `functionId = "tool-call-{id}"` makes mismatch detection precise 
when LLM returns different tool sets across retries.

### Reconciler Integration
HTTP / MCP tools with side effects should implement 
`DurableCallable.reconciler()` per [Discussion 
#598](https://github.com/apache/flink-agents/discussions/598):

```java
new DurableCallable<ToolResponse>() {
    @Override
    public String getId() {
        return "tool-call-" + toolCallId;
    }

    @Override
    public ToolResponse call() throws Exception {
        return toolRef.call(new ToolParameters(arguments));
    }

    @Override
    public Callable<ToolResponse> reconciler() {
        return () -> toolRef.reconcile(new ToolParameters(arguments));
    }
};
```

For parallel batches, each tool's reconciler runs independently during recovery 
when its slot is `PENDING`.

---

## Implementation
### Module Changes
| Module | Change |
| --- | --- |
| `api` | Add `TOOL_CALL_PARALLEL`, `RunnerContext.durableExecuteAllAsync` |
| `runtime` (java21) | `ContinuationContext.pendingBatchFuture`, 
`executeAllAsync`, update `executeAction` |
| `runtime` | `JavaRunnerContextImpl.durableExecuteAllAsync` with 3-phase 
fan-out/fan-in |
| `runtime` (java11) | Fallback: serial `durableExecute` loop |
| `plan` | Refactor `ToolCallAction`; fix per-tool `functionId` |
| `python` | `durable_execute_all_async` + update `tool_call_action.py` |


### Multi-release JAR
Same pattern as [Discussion 
#429](https://github.com/apache/flink-agents/discussions/429):

```plain
flink-agents-runtime-{version}.jar
├── .../ContinuationActionExecutor.class       # JDK 11: serial fallback
└── META-INF/versions/21/
    └── .../ContinuationActionExecutor.class   # JDK 21: batch async
```

### Migration
+ Default `tool-call.parallel=true`: existing jobs with multi-tool batches get 
parallel behavior automatically when on JDK 21+.
+ Changing from shared `"tool-call"` to `"tool-call-{id}"` affects in-flight 
`ActionState` during rolling upgrade. Mitigation: treat functionId change as 
call-order mismatch → clear and re-execute (safe, at-most-once).

---

## Documentation
+ Update [tool_use.md](../docs/content/docs/development/tool_use.md): parallel 
tool execution, config options.
+ Update 
[workflow_agent.md](../docs/content/docs/development/workflow_agent.md) Async 
Execution section: document `durableExecuteAllAsync`, note that 
`asyncio.gather` remains unsupported in Python actions (use 
`durable_execute_all_async` instead).
+ Update [configuration.md](../docs/content/docs/operations/configuration.md): 
`tool-call.parallel`.

---

## Testing
| Test | Description |
| --- | --- |
| Parallel latency | 3 tools × 2s sleep → total < 4s (not 6s) |
| Yield behavior | During batch yield, other action task for different key 
completes |
| Durable recovery | Failover after 1/3 tools complete → recovery replays 1, 
re-runs 2–3 |
| Call-order mismatch | Different toolCallId set on recovery → cache cleared, 
full re-exec |
| Reconciler + parallel | 2 tools `PENDING` on recovery → reconcilers invoked 
independently |
| Serial fallback | `tool-call.parallel=false` and JDK < 21 paths unchanged |
| functionId uniqueness | Each tool in batch has distinct `tool-call-{id}` |

GitHub link: https://github.com/apache/flink-agents/discussions/855

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to