GitHub user GreatEugenius closed a discussion: Flink Agents Metrics Design

# Introduce
In Flink Agents, we aim to introduce a metrics mechanism that allows users not 
only to understand their jobs through output results but also to easily collect 
and analyze the usage of different Events, Actions, Models, and Tools within 
the Agent jobs by integrating with external systems. When necessary, we will 
also set up monitoring and alerting for key metrics—such as abnormal spikes in 
Token usage over a short period. This will help users better understand the 
runtime status of their jobs, enabling them to refine and optimize their 
workflows to achieve desired outcomes.
We plan to integrate Flink Agents Metrics into Flink’s built-in metric system. 
Through the Flink Web UI, users can conveniently view relevant parameters. 
Additionally, leveraging the rich set of Metric Reporters provided by the Flink 
metric system, users will be able to export Flink runtime metrics to external 
systems, enabling visualization and setting up monitoring alerts.

# Flink Agents Metric Design
In Flink Agent jobs, the `ActionExecutionOperator` plays a central role in the 
entire Agent lifecycle — from event parsing to action execution. To align with 
Flink’s native metric architecture, we design the metric system by leveraging a 
`ProxyMetricGroup` associated with the `ActionExecutionOperator`. This ensures 
that all metrics defined under the `FlinkAgentMetricGroup` are automatically 
registered at the operator-level (`OperatorMetricGroup`), enabling seamless 
integration with Flink’s built-in monitoring capabilities.
The Flink Agents metrics system is logically divided into two main categories:

- **Builtin Metrics**: These are predefined metrics designed for common use 
cases, such as tracking agent throughput, latency, and resource utilization. 
They are implemented directly within core components like the 
ActionExecutionOperator and BuiltInAction, ensuring automatic collection during 
job execution.
- **User-defined Metrics**: For custom or scenario-specific requirements, the 
system supports user-defined metrics via an extensible API. Users can create 
and register their own metrics using the provided MetricGroup interface, 
offering flexibility for advanced monitoring needs.



## Flink Agents Builtin Metric

To provide a comprehensive and extensible metric system, the Flink Agents 
project defines a set of Builtin Metrics, which are automatically collected 
during job execution. These metrics are implemented directly within core 
components such as `ActionExecutionOperator` and `BuiltInAction`, ensuring 
minimal user configuration while offering rich visibility into agent behavior.

### Survey of Metric Support in Agent-Based Frameworks

When developing metrics for Flink Agents, we analyzed the types of metrics 
supported by mainstream frameworks such as LlamaIndex and LangChain. Based on 
this analysis, metrics can be broadly categorized into three groups:

#### 1. **Performance Monitoring Metrics**

These metrics track system performance characteristics such as response time 
and throughput. Commonly collected using tools like Prometheus or Grafana, they 
provide insights into runtime behavior.

#### 2. **Resource and Cost Metrics**

These metrics monitor resource usage, particularly in relation to model token 
consumption. They are essential for cost tracking and optimization.

Common metrics include:

- **Token Usage**:
  - `total_tokens`: total number of tokens
  - `prompt_tokens`: number of prompt tokens
  - `completion_tokens`: number of generated tokens

#### 3. **Model Evaluation Metrics**

Model Evaluation Metrics are used to assess the quality of model input and 
output, such as correctness, semantic similarity, and preference alignment. 
These metrics are typically applied in offline evaluation scenarios rather than 
real-time monitoring.

Common types include accuracy checks, text similarity measurements (e.g., based 
on string or embedding distances), and pairwise comparisons between model 
outputs for preference scoring or ranking.

### Proposed Scope of Builtin Metrics in Flink Agents

The MVP version of Flink Agents focuses on implementing Performance Monitoring 
Metrics and Resource & Cost Metrics, which are most relevant for real-time 
monitoring and observability. Support for Model Evaluation Metrics is planned 
for future versions.

We have designed the following metrics at different levels of granularity, 
using a two-dimensional structure based on component type (Agent, Event, 
Action, Model, Tool) and metric type (Count, Meter, Histogram):

| **Component Type**                                 | **Count**                
                                    | **Meter**                                 
                   | **Histogram**       |
| -------------------------------------------------- | 
:----------------------------------------------------------- | 
------------------------------------------------------------ | 
------------------- |
| Agent<br />(Operater Builtin has been implemented) | NumOfInput<br 
/>NumOfOutput                                  | NumOfInputPerSec<br 
/>NumOfOutputPerSec                      |                     |
| Event                                              | NumOfEventProcessed      
                                    | NumOfEventProcessedPerSec                 
                   |                     |
| Action                                             | NumOfActionsExecuting<br 
/>NumOfActionsExecuted              | NumOfActionsExecutedPerSec                
                   | ActionExecutionTime |
| Model                                              | NumOfModelCalls<br 
/>NumOfToken<br />NumOfPromptToken<br />NumCompletionToken | 
NumOfModelCallsPerSec<br />NumOfTokenPerSec<br />NumOfPromptTokenPerSec<br 
/>NumCompletionTokenPerSec | ModelCallsDuration  |
| Tool                                               | NumOfToolCall            
                                    | NumOfToolCallPerSec                       
                   | ToolCallDuration    |

**Note**: Token statistics depend on the output returned by the model, as 
different models use different tokenizers.

For Builtin Metrics, we provide metrics for Event, ModelChat, and ToolCall at a 
general level (without type distinction), as well as per-action metrics. For 
example, the `NumOfEvent` metric counts the total number of events without 
differentiation by event type, while the `NumOfAction` metric tracks both the 
total number of actions executed and the count per action type.

### Implementation Example: NumOfEvent Builtin Metric

To illustrate how Builtin Metrics are implemented, consider the `NumOfEvent` 
metric. In the `ActionExecutionOperator`, event counting can be achieved as 
follows:

```java
public class ActionExecutionOperator<IN, OUT> {
  private transient FlinkAgentsMetricGroup metricGroup;

  public void open() throws Exception {
    // Initialize metric group with Flink's operator-level metric context
    metricGroup = new FlinkAgentsMetricGroup(getMetricGroup());
  }

  @Override
  public void processElement(StreamRecord<IN> record) throws Exception {
    while (!events.isEmpty()) {
      Event event = events.pop();
      // Record event occurrence using the metric group
      metricGroup.markEvent(event);
    }
  }
}
```

This implementation ensures that every event processed by the operator is 
counted and reported via the Flink metric system, enabling visibility through 
the Web UI or external monitoring tools.



## Flink Agents User Define Metric

In Flink Agents, users implement their logic by defining custom Actions that 
respond to various Events throughout the Agent lifecycle. To support 
user-defined metrics, we introduce two new methods: `get_metric_group()` and 
`get_action_metric_group()` in the RunnerContext. These methods allow users to 
create or update global metrics and independent metrics for actions.

This method allows users to access the operator-level OperatorMetricGroup from 
any Action. With this capability, users can register and update metrics 
directly at the operator level while defining their Flink Agent jobs. The 
resulting metric identifier is 
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.<metrix_name>`.
 We also support Pre Action metrics, with the identifier being 
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.<action_name>.<metric_name>`.

Additionally, the system supports the creation of sub-metric groups to 
distinguish metrics generated by different Actions. These sub-groups enable 
more granular tracking and use the identifier: 
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.<user_define_group_name>.<metrix_name>`.

### **APIs**

#### MetricGroup

The `MetricGroup` interface enables hierarchical metric management through 
`get_sub_group(name: str)` and provides dynamic creation/update of four core 
metric types: counters (e.g., action counts), meters (e.g., event rates), 
histograms (e.g., latency distributions), and gauges (e.g., system state). 
These methods allow users to organize and track metrics within custom Actions 
while integrating seamlessly with Flink’s operator-level monitoring system.

```python
class MetricGroup(ABC):
    """Abstract base class providing a metric group for action execution.

    This metric group offers access to various metric types.
    """

    @abstractmethod
    def get_sub_group(self, name: str) -> "MetricGroup":
        """Create or retrieve a sub-metric group with the given name.

        Parameters
        ----------
        name : str
            The name of the sub-metric group.
        """

    @abstractmethod
    def get_counter(self, name: str) -> "Counter":
        """Create or retrieve a counter with the given name.

        Parameters
        ----------
        name : str
            The name of the counter.
        """

    @abstractmethod
    def get_meter(self, name: str) -> "Meter":
        """Create or retrieve a meter with the given name.

        Parameters
        ----------
        name : str
            The name of the meter.
        """

    @abstractmethod
    def get_histogram(self, name: str, window_size: int = 100) -> "Histogram":
        """Create or retrieve a histogram with the given name and window size.

        Parameters
        ----------
        name : str
            The name of the histogram.
        window_size : int, optional
            The sliding window size for histogram statistics.
        """

    @abstractmethod
    def get_gauge(self, name: str) -> "Gauge":
        """Create or retrieve a gauge with the given name.

        Parameters
        ----------
        name : str
            The name of the gauge.
        """
```

**Note**: In `FlinkAgentsMetricGroup`, we maintain four internal maps keyed by 
metric name for each type (`Counter`, `Meter`, `Histogram`, `Gauge`). If a 
metric with a given name does not exist, it will be automatically created. 
Otherwise, the existing metric is returned.

#### Core Metric Types

##### Counter

A counter is used to measure the number of occurrences of an event.

```python
class Counter(ABC):
    """A counter that measures the count of events."""

    @abstractmethod
    def inc(self, n: int = 1) -> None:
        """Increment the current count by the given value."""

    @abstractmethod
    def dec(self, n: int = 1) -> None:
        """Decrement the current count by the given value."""

    @abstractmethod
    def get_count(self) -> int:
        """Return the current count."""
```

##### Meter

A meter is used to track the rate of events over time (throughput).

```python
class Meter(ABC):
    """A meter that measures throughput."""

    @abstractmethod
    def mark(self, n: int = 1) -> None:
        """Mark the occurrence of one or more events."""

    @abstractmethod
    def get_rate(self) -> float:
        """Return the current event rate per second."""
```

##### Histogram

A histogram is used to collect and analyze distributions of values (e.g., 
latencies).

```python
class Histogram(ABC):
    """A histogram for recording values and computing statistical summaries."""

    @abstractmethod
    def update(self, value: int) -> None:
        """Record a new value into the histogram."""

    @abstractmethod
    def get_mean(self) -> float:
        """Return the average value."""

    @abstractmethod
    def get_max(self) -> int:
        """Return the maximum recorded value."""

    @abstractmethod
    def get_min(self) -> int:
        """Return the minimum recorded value."""
```

##### **Gauge**

A gauge is used to record a single value at a point in time (e.g., current 
system load).

```python
class Gauge(ABC):
    """A gauge for recording a single value at a specific moment in time."""

    @abstractmethod
    def update(self, value: str) -> None:
        """Update the gauge with the given value."""

    @abstractmethod
    def get_value(self) -> str:
        """Return the current value of the gauge."""
```



#### RunnerContext

```python
class RunnerContext(ABC):
  # ...

  @abstractmethod
  def get_agent_metric_group(self) -> MetricGroup:
      """Get the global metric group for flink agents.

      Returns:
      -------
      MetricGroup
          The global metric group shared across all actions.
      """

  @abstractmethod
  def get_action_metric_group(self) -> MetricGroup:
      """Get the individual metric group dedicated for each action.

      Returns:
      -------
      MetricGroup
          The individual metric group specific to the current action.
      """
```



### **Examples**

```python
class MyWorkflow(Workflow):
    @action(InputEvent)
    @staticmethod
    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
        start_time = time.time_ns()
        input = event.input
        content = input.get_review() + " first action."
        ctx.send_event(MyEvent(value=content))

        # Access the main agent metric group
        metrics = ctx.get_agent_metric_group()

        # Increment counters and meters
        metrics.get_counter("numEvent").inc()
        metrics.get_meter("numEventPerSecond").mark()

        # Access the per-action metric group
        action_metrics = ctx.get_action_metric_group()
        action_metrics.get_histogram("actionLatencyMs") \
            .update(int(time.time_ns() - start_time) // 1000000)

    @action(MyEvent)
    @staticmethod
    def second_action(event: Event, ctx: RunnerContext):  # noqa D102
        input = event.value
        content = input + " second action."
        ctx.send_event(OutputEvent(output=content))

        # Access the main agent metric group
        metrics = ctx.get_metric_group()

        # Update global metrics
        metrics.get_counter("numEvent").inc()
        metrics.get_meter("numEventPerSecond").mark()

        # Creating and tracking metrics for MyEvent using submetric group
        if isinstance(event, MyEvent):
            sub_metrics = metrics.get_sub_metric_group("myEvent")
            sub_metrics.get_counter("numEvent").inc()
            sub_metrics.get_meter("numEventPerSecond").mark()
```



GitHub link: https://github.com/apache/flink-agents/discussions/73

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to