GitHub user codenohup closed a discussion: Asynchronous Execution for Flink
Agents
- [1. Overview and Motivation](#1-overview-and-motivation)
- [2. Current Synchronous Execution Flow](#2-current-synchronous-execution-flow)
- [3. Asynchronous Execution Design](#3-asynchronous-execution-design)
* [3.1 User-facing API](#31-user-facing-api)
* [3.2 Runtime Changes](#32-runtime-changes)
- [4. Summary](#4-summary)
- [Appendix](#appendix)
* [Using Python Generator for Code-Block
Splitting](#using-python-generator-for-code-block-splitting)
## 1. Overview and Motivation
Flink Agents is an event-driven agent framework that executes a user-defined
sequence of actions in response to each incoming input event. The processing of
a single input event is referred to as an **agent run**.
Currently, agent runs are processed synchronously and in strict order. While
this design ensures correctness and simplicity, it leads to two significant
performance bottlenecks under high load:
- Low throughput per agent run
- Even when the multiple actions triggered by an agent run are independent,
they still execute sequentially. Parallel execution is not supported.
- This limitation significantly restricts the overall throughput of the
system.
- Head-of-line blocking between agent runs
- Agent runs are handled one at a time. If an earlier agent run takes
longer than expected, subsequent agent runs must wait for it to complete.
- Long-running agent runs can delay or even starve later agent runs,
leading to what we call "agent run starvation" in extreme cases.
To address these issues, we suggest changing from the current synchronous
execution during the agent runs to an asynchronous approach. This approach
would allow the following improvements:
- Parallel execution of actions within an agent run: Independent actions can be
executed concurrently, improving the processing speed of individual agent run.
- Concurrent processing of different agent runs: agent run with different keys
can be processed in parallel, reducing head-of-line blocking and increasing
overall system throughput.
Note: The current proposal focuses only on the Python API. Supports for Java
API will be discussed subsequently.
## 2. Current Synchronous Execution Flow
Currently, Flink Agents primarily rely on a Flink operator called
`ActionExecutionOperator` to execute agents.
This operator integrates seamlessly with upstream and downstream Flink
`DataStream`, and it handles the full agent execution flow—including event
parsing and action execution.
<img width="1111" height="133" alt="image"
src="https://github.com/user-attachments/assets/423f1b6b-f44e-40d3-a9f3-8b4940bac038"
/>
Inside the `ActionExecutionOperator`, the agent execution process works as
follows:
<img width="1166" height="702" alt="image"
src="https://github.com/user-attachments/assets/774002b8-2235-4dc9-b8d7-a2827e809e91"
/>
## 3. Asynchronous Execution Design
### 3.1 User-facing API
Below is a user-defined action that leverages the new async capability.
```python
@action(InputEvent)
@staticmethod
def action1(event: Event, ctx: RunnerContext):
def my_func1(id: int):
time.sleep(5)
return "my func1 result"
def my_func2(id: int):
time.sleep(10)
return "my func2 result"
input = event.input
func1_result = yield from ctx.execute_async(
my_func1,
input.id
)
ctx.get_short_term_memory.set("func1_result", func1_result)
func2_result = yield from ctx.execute_async(
my_func2,
input.id
)
ctx.send_event(MyEvent(value=str(func1_result) + str(func2_result)))
```
The new `RunnerContext.execute_async` signature:
```python
class RunnerContext(ABC):
@abstractmethod
def execute_async(self, func: Callable[[Any], Any], *args: Tuple[Any, ...],
**kwargs: Dict[str, Any]) -> Any:
"""Asynchronously execute the provided function. Access to memory is
prohibited within the function.
Parameters
----------
func : Callable
The function need to be asynchronously processing.
*args : tuple
Positional arguments to pass to the function.
**kwargs : dict
Keyword arguments to pass to the function.
Returns:
-------
Any
The result of the function.
"""
```
Key differences from the synchronous version
- Actions can call `yield from ctx.execute_async(...)` to launch asynchronous
work and await its result.
- Each action is split into **code blocks**:
- Sync-blocks are codes directly in the action function. They will be
executed in Flink's mailbox main thread, so that they can access the state
without concurrency concerns.
- Async-blocks are codes passed into `execute_async`, which will be
executed in a separate thread other than the mailbox main thread. State access
is prohibited for async-blocks, to avoid potential conflicts and inconsistency.
- If an Action never calls `execute_async`, the entire action remains one
sync-block.
- The figure below illustrates the code blocks of `action1` in the example.
The red box highlights the async-blocks, while the rest are sync-blocks.
<img width="1198" height="794" alt="image"
src="https://github.com/user-attachments/assets/4c487cf4-5dce-44f2-9ca5-cfd057e9b662"
/>
### 3.2 Runtime Changes
`ActionExecutionOperator` is refactored to support the asynchronous execution.
The updated workflow of the `ActionExecutionOperator` is as follows:
<img width="1379" height="1078" alt="image"
src="https://github.com/user-attachments/assets/25dd5ce4-55ad-4256-adfc-b450cd267d9e"
/>
**Notes:**
- The above diagram does not show how we enforce that agent runs with the same
key are not executed in parallel.
- This can be implemented in a relatively straightforward way by
introducing a `ListState<InputEvent>` named `pendingInputEvents` within the
operator. Before processing an incoming `InputEvent`, the operator checks
whether the previous `InputEvent` with the same key is still being processed.
If it is, the new `InputEvent` is added to `pendingInputEvents` and will be
processed later; otherwise, processing begins immediately.
- Currently, asynchronous execution is only supported for actions in the Python
API.
- In Python, action code is split into code blocks and executed using
Python `Generator`. More details on this implementation can be found in the
appendix of this document.
## 4. Summary
Ultimately, implementing asynchronous execution for Flink Agents will achieve
the following effects:
- Mailbox thread is never blocked; async work runs in a dedicated thread pool.
- Independent actions for the same agent run can execute in parallel.
- Agent runs with different keys can be processed concurrently—even while
earlier agent runs still have outstanding async tasks.
## Appendix
### Using Python Generator for Code-Block Splitting
`Generator` is a special type of iterator in Python that allows values to be
generated on demand, rather than storing all data in memory at once.
Here is an example of a generator:
```python
def func_a():
print("A: Start")
x = yield from func_b()
print("A: Received from B:", x)
print("A: End")
def func_b():
print("B: Start")
yield 1
print("B: Continue")
yield 2
print("B: End")
return "result"
# create generator instance of func_a
a_generator = func_a()
print(type(a_generator)) # print:<class 'generator'>
# step-by-step execution
next(a_generator) # print: A: Start, B: Start; return value: 1
next(a_generator) # print: B: Continue; return value: 2
next(a_generator) # print: B: End, A: Received from B: result, A: End; Throw
StopIteration, represent the genertor has been completed
```
To clarify how the asynchronous execution approach utilizes Python generators
to split an action into code blocks and execute them step by step, let's take
`my_action` as an example:
```python
@action(InputEvent)
@staticmethod
def my_action(event: Event, ctx: RunnerContext): # noqa D102
def my_func(id: int):
time.sleep(5)
return "my func result"
input = event.input
func_result = yield from ctx.execute_async(my_func, input.id)
ctx.send_event(MyEvent(value="my_action" + str(func_result)))
```
```python
class FlinkRunnerContext(RunnerContext):
@override
def execute_async(self, fn, *args, **kwargs) -> Any:
future = self.thread_pool.submit(fn, *args, **kwargs)
while not future.done():
# TODO:Currently, we are using a polling mechanism to check whether
the future has completed. This approach should be optimized in the future by
switching to a notification-based model, where the Flink operator is notified
directly once the future is completed.
yield
return future.result()
```
1. When `my_action` is first invoked, a Python generator is created —
essentially, it's an generator instance of the function
`my_action(inputEvent)`. At this point, we can consider that there is one
pending generator associated with `my_action`, and subsequent executions of
`my_action` will simply call the next method on this generator.
2. On the first call to next, the generator executes from the beginning of the
function up to the first yield statement — in this case, line 7 of
`FlinkRunnerContext`.
- At this point, the asynchronous task has already been submitted to a
thread pool for execution.
3. On the second call to next, the generator resumes execution from line 5 to
line 7 of `FlinkRunnerContext`.
- If the asynchronous task is not yet completed, the generator will yield
again and wait for it to finish.
4. The third, fourth, and N-th calls to next behave similarly to the second
call: the generator checks the status of the asynchronous task and yields again
if it’s still running.
5. Finally, on the (N + 1)-th call, when the asynchronous task has completed,
the generator proceeds to execute the remaining code of `my_action`.
- Since no more yield statements are present after this point, the entire
`my_action` completes in one go.
GitHub link: https://github.com/apache/flink-agents/discussions/78
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]