GitHub user letaoj edited a discussion: WIP: Replay-based Per Action State
Consistency
# Background
Flink Agents execute various actions during record processing, including model
inference and tool invocation. Model inference involves calling LLMs for
reasoning, classification, or generation tasks, often through expensive API
calls to external providers. Tool invocation allows agents to interact with
external systems through UDFs with network access, with native support for
Model Context Protocol (MCP). These actions enable agents to perform contextual
searches, execute business logic, interact with enterprise systems, and invoke
specialized processing services.
# Problem
## Side Effects and Costs from Action Replay
While Flink provides exactly-once processing guarantees for stream processing
on a per message basis, agent actions create challenges around side effects,
costs, and recovery semantics. Both model inference and tool invocation can
produce effects that persist beyond the agent's execution context or incur
significant costs that should not be duplicated.
The core problem occurs when:
1. A Flink agent processes a record and executes multiple actions (model calls,
tool calls)
2. Some actions complete successfully, potentially modifying external state or
consuming costly resources
3. The agent crashes before completing record processing
4. Upon recovery, Flink reprocesses the same record, repeating all actions
This creates several issues:
* duplicate side effects in external systems (e.g. sending the same email
multiple times to the same recipient)
* unnecessary costs from repeated expensive model inference calls
* consistency violations from mixed successful and repeated actions
* resource waste from redundant operations
* observability challenges when debugging multiple executions of the same
logical operation.
## Non-Deterministic Model Outputs
A critical additional challenge is that model inference and generation is
inherently non-deterministic. Repeating model calls multiple times with
identical inputs may result in different outputs due to sampling, temperature
settings, or model provider variations. This creates severe consistency
problems when model outputs drive downstream decisions such as reasoning chains
or tool selection.
Consider this scenario: an agent makes a model call that decides to invoke Tool
A, but crashes before completion. Upon recovery, the same model call with
identical inputs may decide to invoke Tool B instead. This leaves the system in
an inconsistent state where Tool A was already executed based on the first
decision, but the agent now wants to execute Tool B based on the second
decision. The best approach is to ensure the model never makes the same
decision twice - the original model output should be preserved and reused
during recovery.
Flink's streaming architecture introduces additional complexity through
continuous processing on unbounded streams, distributed state management,
back-pressure from action failures, and a semantic gap where exactly-once
guarantees don't extend to external model providers or tool endpoints.
# Goals and Non-Goals
## Goals
* **Durability of Agent State (short-term memory)**: Ensure that the state can
be recovered when the agent crashes
* **State Recovery**: Provide a mechanism to recover the agent's short-term
memory by replaying the action history from the state store
* **Minimal Performance Overhead**: The solution should have minimal impact on
the performance of the Flink job during normal operation.
* **Pluggable Architecture**: The design should be flexible enough to support
different types of external databases.
## Non-Goals
* **Long-Term Memory**: This design focuses on recovering the short-term memory
of the current task. It does not address the problem of long-term memory or
knowledge persistence.
* **Database Management**: This design assumes that the external database is
already set up and managed. It does not cover the details of database
administration.
# High-Level Design
## Execution Flow for ReAct Agent
```mermaid
sequenceDiagram
participant A as Agent
participant SS as State Store
participant LLM as LLM
A ->> SS: Check if the key exists or not
opt exists
SS ->> A: state
A --> A: replay until the last step recorded
end
loop ReAct Flow
A ->> LLM: Generate Tool Calls
LLM ->> A: List of Tool Calls
A ->> SS: Persist the input and output
A ->> MCP: Call the tool
A ->> SS: Persist the input
MCP ->> A: Tool call response
A ->> SS: Update the same tool call row with output
end
A ->> A: send events when LLM decided to terminate
```
## Execution Flow for Static Agent
```mermaid
sequenceDiagram
participant A as Agent
participant SS as State Store
A ->> SS: Check if the key exists or not
opt key exists
SS ->> A: state
A --> A: replay until the last step recorded
end
A ->> Chat Model/Tool/Prompt: Take Action
A ->> SS: Persist the input
Chat Model/Tool/Prompt ->> A: response
A ->> SS: Persist the output
A ->> A: send event
```
## APIs
### Agent State
Agent state represents the current state of the agent consist of the request
and response for the current action and the next step the agent should take.
```python
@dataclass
class AgentState:
"""
State object represents the execution state of the agent
Attributes
----------
next_action: str
The next action that the agent is going to take
request: str
The request the agent sent to the external system
response: str
The raw response the agent get from the external system
"""
next_action: str
request: str
response: str
def hash(self) -> str
"""
hash of the request, the idea here is the same request will always produce
the same output
"""
```
### StateStore
State store is the abstract layer to the external database which handles the
serialization/deserialization from/to AgentState
```python
class StateStore(ABC):
"""
Abstraction layer to the state store/external database
"""
@abstractmethod
def put(self, key: str, state: AgentState):
"""
Persist the agent state into the external database
Parameters
-----------
key: str
The key of the agent consist of the user / device / product id
state: AgentState
The current agent state
"""
def get(self, key, hash: str): -> AgentState:
"""
Get the agent state by the key and the hash of the previous state
"""
def list(self, key: str) -> List<AgentState>:
"""
List all the AgentState associate with the given key
Parameters
-----------
key: str
The key of the agent consist of user / device / product id
"""
```
### ActionExecutionOperator
```java
class ActionExecutionOperator {
// ...
private void processActionTaskForKey(Object key) throws Exception {
// after invoke
StateStore.put(key, new AgentState(request, null))
// after finish
StateStore.put(key, new AgentState(request,
actionTaskResult.getAttr("response")))
}
}
```
## External Database Consideration
Below are some characters of the agent state to consider when picking the right
external DB:
* Low QPS - Agent state updates typically exhibit lower Queries Per Second
(QPS) due to the prolonged response times from the LLM model and tool
invocation. Consequently, the storage system only needs to accommodate low QPS.
* Durability - The agent state is crucial for action recovery following runtime
failures and for debugging purposes. Therefore, the storage system must ensure
persistence.
* Strong Consistency - To facilitate rapid recovery from the state store after
a crash, the agent state must maintain strong consistency.
* High Data Volume - Given that both model and tool responses will be stored in
the state store, the data size can become substantial. Thus, the state store
should be capable of managing data at the megabyte level in extreme scenarios,
such as retrieving emails from an email server.
* Ranged Get - During recovery, we will use a ranged get the get based on the
key generated by the agent, so that the external database need to support
ranged get.
GitHub link: https://github.com/apache/flink-agents/discussions/108
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]