GitHub user Sxnan created a discussion: Java Async Execution Design

## Background

Python API already supports `execute_async`, but the Java API does not yet 
support this feature. This design adds the same capability to the Java API.

## Design Goals

```java
// Java API Goal: Sequential code style consistent with Python
public static void myAction(InputEvent event, RunnerContext ctx) throws 
Exception {
    String result = ctx.executeAsync(() -> slowLlmCall(prompt));
    ctx.getShortTermMemory().set("result", result);
    ctx.sendEvent(new OutputEvent(result));
}
```

## Technical Challenges

### Challenge 1: Mailbox Thread Constraint

Flink Agents' state access and event sending must be executed in the **mailbox 
thread**. This means:
- Time-consuming operations in `executeAsync` can be executed in other threads
- Code outside `executeAsync` (state access, sendEvent) must be executed in the 
mailbox thread

### Challenge 2: JDK Version Compatibility

Flink Agents needs to support JDK 11+:
- JDK 21+ users: Can use Continuation to implement true async execution
- JDK < 21 users: Need to fall back to synchronous execution

## API Design

Add `executeAsync` method to the `RunnerContext` interface:

```java
public interface RunnerContext {
    /**
     * Asynchronously executes the provided supplier function and returns the 
result.
     *
     * <p>On JDK 21+, this method uses Continuation to yield the current action 
execution,
     * submits the supplier to a thread pool, and resumes via mailbox when 
complete.
     *
     * <p>On JDK < 21, this method falls back to synchronous execution.
     *
     * <p><b>Note:</b> Access to memory and sendEvent are prohibited within the 
supplier.
     */
    <T> T executeAsync(Supplier<T> supplier) throws Exception;
    
    void executeAsync(Runnable runnable) throws Exception;
}
```

### Usage Restrictions

1. **Memory Access Restriction**: State access is prohibited within the 
`executeAsync` supplier
2. **Serial Execution**: Multiple `executeAsync` calls are executed serially 
(consistent with Python behavior)
3. **JDK < 21**: Falls back to synchronous execution, blocking the mailbox 
thread

## Design Approach

### Continuation + Mailbox Scheduling

Use JDK 21's Continuation API to implement async execution:

1. **Action Executes Within Continuation**: Continuation wraps Action code, 
running in the mailbox thread
2. **Explicit Yield on `executeAsync`**: Pause the Continuation, submit the 
time-consuming task to a thread pool, return to the mailbox
3. **Resume Continuation After Task Completion**: Resume execution through 
mailbox scheduling, subsequent code remains in the mailbox thread

```
┌────────────────────────────────────────────────────────────────────────────┐
│ Execution Flow                                                             │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  Mailbox Thread:  [cont.run()] ──yield──> [idle] ──mail──> [cont.run()]    │
│                        │                              │                    │
│                        │ submit task                  │ task complete      │
│                        ▼                              │                    │
│  Thread Pool:       [execute time-consuming op] ─────>│                    │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘
```

**Key Advantages**:
- ✅ Sequential code style (same as Python)
- ✅ Fully preserves the Mailbox model (Action code always executes in mailbox 
thread)
- ✅ No proxy needed for state access (already in mailbox thread)

**Note**: JDK 21 environment requires adding JVM parameters:
```yaml
# flink-conf.yaml
env.java.opts.taskmanager: "--add-opens java.base/jdk.internal.vm=ALL-UNNAMED"
```

### Why Use Continuation API

To achieve sequential code style while preserving the Mailbox model, we need 
the ability to **explicitly pause and resume** during method execution.

**Preserving the Mailbox model is essential**: Flink Agents' state access and 
event sending rely on the single-threaded model of the mailbox thread to ensure 
correctness. If Action code executes outside the mailbox thread, state access 
will have concurrency issues, leading to data inconsistency. Therefore, the 
virtual thread approach (Action executes in virtual threads) **is not 
feasible**.

Comparison of approaches to achieve this capability in Java:

| Approach | Sequential Code Style | Preserves Mailbox Model | Disadvantages |
|----------|----------------------|-------------------------|---------------|
| **Callback Pattern** | ❌ Requires callbacks | ✅ | Poor user experience, 
non-intuitive code |
| **Virtual Threads** | ✅ | ❌ Not feasible | Action leaves mailbox thread, 
state access has concurrency issues |
| **Bytecode Enhancement** (Quasar, etc.) | ✅ | ✅ | Requires 
compile-time/runtime instrumentation, adds complexity |
| **Continuation API** | ✅ | ✅ | Internal API, requires JVM parameters |

**Continuation API is the only lightweight approach that can simultaneously 
satisfy "sequential code style" and "preserve Mailbox model"**.

**Reference**: [JDK Continuation Source 
Code](https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/jdk/internal/vm/Continuation.java)

**⚠️ Internal API Risk**: `jdk.internal.vm.Continuation` is a JDK internal API 
with no stability guarantees, and may change in future versions.

### Execution Flow

#### JDK 21+ Environment

1. `JavaActionTask.invoke()` is called in the mailbox thread
2. Create a Continuation wrapping the Action code, call `cont.run()`
3. Action code executes sequentially within the Continuation (in the mailbox 
thread)
4. When `ctx.executeAsync(supplier)` is called:
   - supplier is submitted to the thread pool
   - Call `Continuation.yield()` to pause, mailbox thread becomes idle
5. After supplier completes, resume the Continuation through mailbox
6. Action continues execution from the yield point (still in mailbox thread)

```
Timeline (identical to Python):
────────────────────────────────────────────────────────────────────>
Mailbox Thread: [code segment 1] ─yield─> [idle] ─mail─> [code segment 2] ─> 
[done]
Thread Pool:                     [execute LLM call] ────>│
```

#### JDK < 21 Environment

Falls back to synchronous execution, behavior is identical to current Java 
Action.

### Core Component Design

#### ContinuationActionExecutor

Add new `ContinuationActionExecutor` class to encapsulate Continuation 
execution logic:

**JDK 11 Version**: Execute action synchronously, `executeAsync` directly calls 
supplier

**JDK 21 Version**:
1. `executeAction()`: Create Continuation wrapping action, call `cont.run()` to 
execute
2. `executeAsync()`: 
   - Submit supplier to thread pool
   - Call `Continuation.yield()` to pause current Continuation
   - After thread pool task completes, resume Continuation through 
`mailboxExecutor.submit()`
   - Return supplier's result

#### JavaActionTask Modifications

Modify `JavaActionTask.invoke()`:
- JDK 21+: Use `ContinuationActionExecutor` to execute action
- JDK < 21: Keep existing synchronous execution logic

When action yields at `executeAsync`, `invoke()` returns 
`ActionTaskResult(finished=false, generatedTask=this)`, waiting for subsequent 
resumption.

### Multi-release JAR Support

Adopt Multi-release JAR approach, JVM automatically selects the correct version:

```plaintext
flink-agents-runtime-{version}.jar
├── org/apache/flink/agents/runtime/async/
│   └── ContinuationActionExecutor.class       # JDK 11 version
└── META-INF/versions/21/
    └── org/apache/flink/agents/runtime/async/
        └── ContinuationActionExecutor.class   # JDK 21 version
```



GitHub link: https://github.com/apache/flink-agents/discussions/429

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to