GitHub user weiqingy added a comment to the discussion: Parallel Tool Call 
Execution

Thanks for putting this together — the fan-out/fan-in shape is the right one, 
and I like that the proposal already pins down the degradation matrix 
(`async=false` / `parallel=false` / JDK<21) and grounds recovery in the #404 
cursor model and #598 reconcilers instead of hand-waving it. @joeyutong already 
covered the recovery state machine, collect-all vs fail-fast, duplicate side 
effects, the global pool, and tracing, so I won't re-tread those — a few things 
from the cross-language and API-surface angle that I didn't see raised yet.

**The Python side and the per-call durable identity.** The design is built 
end-to-end on the JDK 21 Continuation model — mailbox-thread yield, 
`pendingBatchFuture`, and `functionId = "tool-call-" + toolCallId`. Python gets 
a one-line signature, but none of those primitives exist on that side. 
`durable_execute_async` submits to a thread pool, and Python derives a call's 
identity from the function object itself — `_compute_function_id(func)` → 
`module.qualname` (`durable_execution.py:25`) — so there's no place to inject a 
caller-supplied `"tool-call-{id}"`. Every call into the same `tool.call` 
collides on `functionId` and is disambiguated only by `argsDigest` + cursor 
position (`tool_call_action.py:51`). The proposed 
`durable_execute_all_async(callables)` signature doesn't carry an id either. So 
the deterministic per-call identity that the whole recovery section leans on — 
ordered recording, partial-batch failover, call-order-mismatch detection — has 
no Python expression as
  written. CLAUDE.md asks for Java/Python parity and the goal table here lists 
"Cross-language: same semantics"; could the design spell out the Python 
fan-out/fan-in and recovery-identity model concretely rather than the single 
signature line? As it stands it reads as a Java design with a Python stub, and 
the two durable-execution models are different enough that parity won't come 
for free.

**Collect-all isn't symmetric across the two languages today, which the 
parallel design will inherit.** Picking up the collect-all point from above — 
that semantics isn't actually the same on both sides right now, so "preserve 
collect-all under parallelism" means different things on each. Java's 
`ToolCallAction` wraps every tool in its own try/catch and accumulates 
`success` / `error` / `responses` maps, then emits `new ToolResponseEvent(id, 
responses, success, error, externalIds)` (`ToolCallAction.java:49-113`). The 
Python `process_tool_request` has no per-tool try/catch around execution — only 
the missing-tool case lands in the response map; if a `tool.call` raises, it 
propagates and fails the whole action with no `ToolResponseEvent` emitted at 
all (`tool_call_action.py`). And the Python `ToolResponseEvent` carries only 
`responses` + `external_ids`, no `success` / `error` maps (`tool_event.py`), so 
it can't express per-tool success/failure to begin with. So before parallel exec
 ution can "preserve collect-all", collect-all has to exist symmetrically — 
otherwise one tool failing in an N-tool parallel batch drops the whole batch on 
Python while Java returns the other N-1 results plus one error. Worth deciding 
the intended cross-language failure contract here, since parallelism makes the 
divergence louder (which of N concurrent calls failed?).

**Does `durableExecuteAllAsync` need to be public API in v1?** It's proposed as 
a public `RunnerContext` method on both languages, but the only caller in the 
design is the built-in `tool_call_action`. Promoting it to the public interface 
now locks the batch recovery semantics — which, per the open questions on this 
thread, aren't settled — into two language APIs at once, before a real 
implementation has shaken them out. Would it be worth keeping the batch 
primitive internal to the tool-call path for the first version (the public knob 
stays just `tool-call.parallel`) and exposing a public `RunnerContext` method 
only once the recovery model has stabilized? And if it does stay public, the 
return type is worth a second look: `List<T>` is structurally fail-fast — 
`futures.stream().map(Future::join)` throws on the first failure — whereas the 
tool-call path needs the collect-all behavior already flagged above, so the 
signature itself would have to carry per-call outcomes rather than
  a bare `List<T>`.

**One concrete note on the barrier sketch, since it feeds the pool-exhaustion 
question.** In `executeAllAsync`, the barrier wraps each tool future in 
`CompletableFuture.supplyAsync(() -> { f.join(); return null; })` with no 
explicit executor — that runs on the common `ForkJoinPool` and parks one pool 
thread per tool blocked on `join()`. So a batch of N tools occupies N threads 
on `asyncExecutor` plus N blocking joiners on the common pool, entirely outside 
the `num-async-threads` bound — which makes "concurrency is bounded by the 
global async pool" harder to actually hold. The submitted futures are already 
the barrier; building the wait directly from them (or submitting 
`CompletableFuture`s onto `asyncExecutor`) keeps the whole batch inside 
`num-async-threads`. Minor relative to the semantics questions, but it's the 
difference between the global pool being a real bound and a nominal one.


GitHub link: 
https://github.com/apache/flink-agents/discussions/855#discussioncomment-17490207

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to