GitHub user Sxnan created a discussion: Java Async Execution Design
## Background
Python API already supports `execute_async`, but the Java API does not yet
support this feature. This design adds the same capability to the Java API.
## Design Goals
```java
// Java API Goal: Sequential code style consistent with Python
public static void myAction(InputEvent event, RunnerContext ctx) throws
Exception {
String result = ctx.executeAsync(() -> slowLlmCall(prompt));
ctx.getShortTermMemory().set("result", result);
ctx.sendEvent(new OutputEvent(result));
}
```
## Technical Challenges
### Challenge 1: Mailbox Thread Constraint
Flink Agents' state access and event sending must be executed in the **mailbox
thread**. This means:
- Time-consuming operations in `executeAsync` can be executed in other threads
- Code outside `executeAsync` (state access, sendEvent) must be executed in the
mailbox thread
### Challenge 2: JDK Version Compatibility
Flink Agents needs to support JDK 11+:
- JDK 21+ users: Can use Continuation to implement true async execution
- JDK < 21 users: Need to fall back to synchronous execution
## API Design
Add `executeAsync` method to the `RunnerContext` interface:
```java
public interface RunnerContext {
/**
* Asynchronously executes the provided supplier function and returns the
result.
*
* <p>On JDK 21+, this method uses Continuation to yield the current action
execution,
* submits the supplier to a thread pool, and resumes via mailbox when
complete.
*
* <p>On JDK < 21, this method falls back to synchronous execution.
*
* <p><b>Note:</b> Access to memory and sendEvent are prohibited within the
supplier.
*/
<T> T executeAsync(Supplier<T> supplier) throws Exception;
void executeAsync(Runnable runnable) throws Exception;
}
```
### Usage Restrictions
1. **Memory Access Restriction**: State access is prohibited within the
`executeAsync` supplier
2. **Serial Execution**: Multiple `executeAsync` calls are executed serially
(consistent with Python behavior)
3. **JDK < 21**: Falls back to synchronous execution, blocking the mailbox
thread
## Design Approach
### Continuation + Mailbox Scheduling
Use JDK 21's Continuation API to implement async execution:
1. **Action Executes Within Continuation**: Continuation wraps Action code,
running in the mailbox thread
2. **Explicit Yield on `executeAsync`**: Pause the Continuation, submit the
time-consuming task to a thread pool, return to the mailbox
3. **Resume Continuation After Task Completion**: Resume execution through
mailbox scheduling, subsequent code remains in the mailbox thread
```
┌────────────────────────────────────────────────────────────────────────────┐
│ Execution Flow │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ Mailbox Thread: [cont.run()] ──yield──> [idle] ──mail──> [cont.run()] │
│ │ │ │
│ │ submit task │ task complete │
│ ▼ │ │
│ Thread Pool: [execute time-consuming op] ─────>│ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
```
**Key Advantages**:
- ✅ Sequential code style (same as Python)
- ✅ Fully preserves the Mailbox model (Action code always executes in mailbox
thread)
- ✅ No proxy needed for state access (already in mailbox thread)
**Note**: JDK 21 environment requires adding JVM parameters:
```yaml
# flink-conf.yaml
env.java.opts.taskmanager: "--add-opens java.base/jdk.internal.vm=ALL-UNNAMED"
```
### Why Use Continuation API
To achieve sequential code style while preserving the Mailbox model, we need
the ability to **explicitly pause and resume** during method execution.
**Preserving the Mailbox model is essential**: Flink Agents' state access and
event sending rely on the single-threaded model of the mailbox thread to ensure
correctness. If Action code executes outside the mailbox thread, state access
will have concurrency issues, leading to data inconsistency. Therefore, the
virtual thread approach (Action executes in virtual threads) **is not
feasible**.
Comparison of approaches to achieve this capability in Java:
| Approach | Sequential Code Style | Preserves Mailbox Model | Disadvantages |
|----------|----------------------|-------------------------|---------------|
| **Callback Pattern** | ❌ Requires callbacks | ✅ | Poor user experience,
non-intuitive code |
| **Virtual Threads** | ✅ | ❌ Not feasible | Action leaves mailbox thread,
state access has concurrency issues |
| **Bytecode Enhancement** (Quasar, etc.) | ✅ | ✅ | Requires
compile-time/runtime instrumentation, adds complexity |
| **Continuation API** | ✅ | ✅ | Internal API, requires JVM parameters |
**Continuation API is the only lightweight approach that can simultaneously
satisfy "sequential code style" and "preserve Mailbox model"**.
**Reference**: [JDK Continuation Source
Code](https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/jdk/internal/vm/Continuation.java)
**⚠️ Internal API Risk**: `jdk.internal.vm.Continuation` is a JDK internal API
with no stability guarantees, and may change in future versions.
### Execution Flow
#### JDK 21+ Environment
1. `JavaActionTask.invoke()` is called in the mailbox thread
2. Create a Continuation wrapping the Action code, call `cont.run()`
3. Action code executes sequentially within the Continuation (in the mailbox
thread)
4. When `ctx.executeAsync(supplier)` is called:
- supplier is submitted to the thread pool
- Call `Continuation.yield()` to pause, mailbox thread becomes idle
5. After supplier completes, resume the Continuation through mailbox
6. Action continues execution from the yield point (still in mailbox thread)
```
Timeline (identical to Python):
────────────────────────────────────────────────────────────────────>
Mailbox Thread: [code segment 1] ─yield─> [idle] ─mail─> [code segment 2] ─>
[done]
Thread Pool: [execute LLM call] ────>│
```
#### JDK < 21 Environment
Falls back to synchronous execution, behavior is identical to current Java
Action.
### Core Component Design
#### ContinuationActionExecutor
Add new `ContinuationActionExecutor` class to encapsulate Continuation
execution logic:
**JDK 11 Version**: Execute action synchronously, `executeAsync` directly calls
supplier
**JDK 21 Version**:
1. `executeAction()`: Create Continuation wrapping action, call `cont.run()` to
execute
2. `executeAsync()`:
- Submit supplier to thread pool
- Call `Continuation.yield()` to pause current Continuation
- After thread pool task completes, resume Continuation through
`mailboxExecutor.submit()`
- Return supplier's result
#### JavaActionTask Modifications
Modify `JavaActionTask.invoke()`:
- JDK 21+: Use `ContinuationActionExecutor` to execute action
- JDK < 21: Keep existing synchronous execution logic
When action yields at `executeAsync`, `invoke()` returns
`ActionTaskResult(finished=false, generatedTask=this)`, waiting for subsequent
resumption.
### Multi-release JAR Support
Adopt Multi-release JAR approach, JVM automatically selects the correct version:
```plaintext
flink-agents-runtime-{version}.jar
├── org/apache/flink/agents/runtime/async/
│ └── ContinuationActionExecutor.class # JDK 11 version
└── META-INF/versions/21/
└── org/apache/flink/agents/runtime/async/
└── ContinuationActionExecutor.class # JDK 21 version
```
GitHub link: https://github.com/apache/flink-agents/discussions/429
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]