GitHub user letaoj edited a discussion: WIP: Replay-based Per Action State 
Consistency

# Background

Flink Agents execute various actions during record processing, including model 
inference and tool invocation. Model inference involves calling LLMs for 
reasoning, classification, or generation tasks, often through expensive API 
calls to external providers. Tool invocation allows agents to interact with 
external systems through UDFs with network access, with native support for 
Model Context Protocol (MCP). These actions enable agents to perform contextual 
searches, execute business logic, interact with enterprise systems, and invoke 
specialized processing services.

# Problem

## Side Effects and Costs from Action Replay

While Flink provides exactly-once processing guarantees for stream processing 
on a per message basis, agent actions create challenges around side effects, 
costs, and recovery semantics. Both model inference and tool invocation can 
produce effects that persist beyond the agent's execution context or incur 
significant costs that should not be duplicated.

The core problem occurs when:
1. A Flink agent processes a record and executes multiple actions (model calls, 
tool calls)
2. Some actions complete successfully, potentially modifying external state or 
consuming costly resources
3. The agent crashes before completing record processing
4. Upon recovery, Flink reprocesses the same record, repeating all actions

This creates several issues: 
* duplicate side effects in external systems (e.g. sending the same email 
multiple times to the same recipient) 
* unnecessary costs from repeated expensive model inference calls
* consistency violations from mixed successful and repeated actions
* resource waste from redundant operations
* observability challenges when debugging multiple executions of the same 
logical operation.

## Non-Deterministic Model Outputs

A critical additional challenge is that model inference and generation is 
inherently non-deterministic. Repeating model calls multiple times with 
identical inputs may result in different outputs due to sampling, temperature 
settings, or model provider variations. This creates severe consistency 
problems when model outputs drive downstream decisions such as reasoning chains 
or tool selection.
Consider this scenario: an agent makes a model call that decides to invoke Tool 
A, but crashes before completion. Upon recovery, the same model call with 
identical inputs may decide to invoke Tool B instead. This leaves the system in 
an inconsistent state where Tool A was already executed based on the first 
decision, but the agent now wants to execute Tool B based on the second 
decision. The best approach is to ensure the model never makes the same 
decision twice - the original model output should be preserved and reused 
during recovery.

Flink's streaming architecture introduces additional complexity through 
continuous processing on unbounded streams, distributed state management, 
back-pressure from action failures, and a semantic gap where exactly-once 
guarantees don't extend to external model providers or tool endpoints.

# Goals and Non-Goals

## Goals

* **Durability of Agent State (short-term memory)**: Ensure that the state can 
be recovered when the agent crashes
* **State Recovery**: Provide a mechanism to recover the agent's short-term 
memory by replaying the action history from the state store
* **Minimal Performance Overhead**: The solution should have minimal impact on 
the performance of the Flink job during normal operation.
* **Pluggable Architecture**: The design should be flexible enough to support 
different types of external databases.

## Non-Goals

* **Long-Term Memory**: This design focuses on recovering the short-term memory 
of the current task. It does not address the problem of long-term memory or 
knowledge persistence.
* **Database Management**: This design assumes that the external database is 
already set up and managed. It does not cover the details of database 
administration.

# High-Level Design

## Execution Flow for Static Agent

```mermaid
sequenceDiagram
participant A as Agent
participant M as Memory
participant SS as State Store

A ->> SS: Check if the key exists or not
opt key exists
SS ->> A: state
A --> M: rebuild short-term memory (request-response map)
end
A ->> A: receive Events (Chat Model/Tool/Prompt)
A ->> M: check memory to see if the key (message key + hash of received events) 
exists or not
alt not exists
A ->> Chat Model/Tool/Prompt: take action
A ->> SS: persist the input
Chat Model/Tool/Prompt ->> A: response
A ->> SS: persist the output
A ->> A: process response and generate the output event
else exists
A ->> M: get the event from memory
end
A ->> A: send output event
```

## APIs

### Action Result Store

Action result store is an abstract layer to the external database which handles 
the serialization/deserialization from/to AgentState

```python
class TaskActionStore(ABC):
  """
  Abstraction layer to the external database
  """

  @abstractmethod
  def put(self, key: str, value: Any):
  """
  Persist the object into the external database
  
  Parameters
  -----------
  key: str
    The key of the agent consist of the message key and the hash value of the 
event
  value: Any
    The value can be either an action task or action task result
  """

  def get(self, key: str): -> Any:
  """
  Get the Object by the key (which is a composite of message key and hash of 
event)
  
  Parameters:
  ----------
  key: str
    The key of the message suffixed with the hash of event

  Returns:
  --------
  Any
    The object associate with the key, it can be an ActionTask or an 
ActionTaskResult
  """

  def list(self, key: str) -> List<Any>:
  """
  List all the object associate with the given message key
  
  Parameters:
  -----------
  key: str
    The key of the message

  Returns:
  --------
  List<Any>
    List of all the task action or task action result
  """
```

### ActionExecutionOperator

```java
class ActionExecutionOperator {
  // ...
  private void processActionTaskForKey(Object key) throws Exception {
    // ...
    // 2. Invoke the action task.
    String eventKey = key + hash(actionTask.getEvent());
    Object obj = actionTaskResultStore.get(eventKey);
    
    actionTaskResultStore.put(eventKey, actionTask);
    ActionTask.ActionTaskResult result;
    if (obj != null && obj instanceof ActionTaskResult) {
      result = (ActionActionTaskResult) obj;
    } else {
      createAndSetRunnerContext(actionTask);
      result = actionTask.invoke();
    }
    actionTaskResultStore.put(eventKey, result); 

    for (Event actionOutputEvent : actionTaskResult.getOutputEvents()) {
      processEvent(key, actionOutputEvent);
    }

    // ...
  }
}
```

## External Database

### Database Consideration

Below are some characters of the agent state to consider when picking the right 
external DB:

* Low QPS - Agent state updates typically exhibit lower Queries Per Second 
(QPS) due to the prolonged response times from the LLM model and tool 
invocation. Consequently, the storage system only needs to accommodate low QPS.
* Durability - The agent state is crucial for action recovery following runtime 
failures and for debugging purposes. Therefore, the storage system must ensure 
persistence.
* Strong Consistency - To facilitate rapid recovery from the state store after 
a crash, the agent state must maintain strong consistency.
* High Data Volume - Given that both model and tool responses will be stored in 
the state store, the data size can become substantial. Thus, the state store 
should be capable of managing data at the megabyte level in extreme scenarios, 
such as retrieving emails from an email server.
* Ranged Get - During recovery, we will use a ranged get the get based on the 
key generated by the agent, so that the external database need to support 
ranged get.

### Data Retention

### Viable solution

A viable solution for the external database is a combination of Kafka and a 
key-value store for metadata. The KV store will store the information about 
Kafka offset so that, during LIST, the system knows where to read from/to from 
the Kafka. The Kafka on the other hand will be used to store the ActionTask and 
ActionTaskResult.

#### Agent Rescale

We will create the Kafka topic with the max parallelism possible of a Flink. 
Because the Flink worker group uses the max parallelism to determine the 
partition assignment, as long as the Kafka's partition is at the max 
parallelism and if we are using the same murmurhash3 algorithm to compute the 
hash of the key, then no matter how the Flink agent will scale, we will not 
have the issue of different agents reading from the same partition.

GitHub link: https://github.com/apache/flink-agents/discussions/108

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to