This is an automated email from the ASF dual-hosted git repository.
yunfengzhou-hub pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new e94da402 [metric] Add key-value metric group for model and action
dimensions (#760)
e94da402 is described below
commit e94da402cc106c5a1f7593d805cff5efb6ef6888
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Fri Jun 5 10:41:28 2026 +0800
[metric] Add key-value metric group for model and action dimensions (#760)
---
.../agents/api/chat/model/BaseChatModelSetup.java | 2 +-
.../agents/api/metrics/FlinkAgentsMetricGroup.java | 9 +
.../model/BaseChatModelSetupTokenMetricsTest.java | 109 +++++++++-
docs/content/docs/operations/monitoring.md | 14 +-
.../pom.xml | 19 ++
.../integration/test/TokenMetricsE2EAgent.java | 76 +++++++
.../integration/test/TokenMetricsE2ETest.java | 230 +++++++++++++++++++++
.../flink/agents/plan/actions/ChatModelAction.java | 2 +-
.../plan/actions/ChatModelActionRetryTest.java | 7 +-
python/flink_agents/api/chat_models/chat_model.py | 2 +-
.../api/chat_models/tests/test_token_metrics.py | 17 +-
python/flink_agents/api/metric_group.py | 12 +-
.../flink_agents/plan/actions/chat_model_action.py | 2 +-
.../tests/actions/test_chat_model_action_retry.py | 11 +-
python/flink_agents/runtime/flink_metric_group.py | 10 +-
.../runtime/memory/mem0/mem0_long_term_memory.py | 2 +-
.../mem0/tests/test_mem0_long_term_memory.py | 13 +-
.../agents/runtime/context/RunnerContextImpl.java | 2 +-
.../agents/runtime/metrics/BuiltInMetrics.java | 2 +-
.../metrics/FlinkAgentsMetricGroupImpl.java | 15 ++
.../metrics/FlinkAgentsMetricGroupImplTest.java | 38 ++++
21 files changed, 549 insertions(+), 45 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
b/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
index 3a9c7b2d..34b2b299 100644
---
a/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
+++
b/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
@@ -120,7 +120,7 @@ public abstract class BaseChatModelSetup extends Resource {
if (metricGroup == null) {
return;
}
- FlinkAgentsMetricGroup modelGroup = metricGroup.getSubGroup(modelName);
+ FlinkAgentsMetricGroup modelGroup = metricGroup.getSubGroup("model",
modelName);
modelGroup.getCounter("promptTokens").inc(promptTokens);
modelGroup.getCounter("completionTokens").inc(completionTokens);
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
index 9ab3df3d..fa194c91 100644
---
a/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
+++
b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
@@ -37,6 +37,15 @@ public interface FlinkAgentsMetricGroup {
*/
FlinkAgentsMetricGroup getSubGroup(String name);
+ /**
+ * Create or retrieve a key-value sub-metric group.
+ *
+ * @param key The key of the metric group variable.
+ * @param value The value of the metric group variable.
+ * @return the sub-metric group instance.
+ */
+ FlinkAgentsMetricGroup getSubGroup(String key, String value);
+
/**
* Create or retrieve a gauge with the given name.
*
diff --git
a/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
b/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
index cde9f683..8e47105f 100644
---
a/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
+++
b/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
@@ -19,15 +19,20 @@
package org.apache.flink.agents.api.chat.model;
import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
+import org.apache.flink.agents.api.metrics.UpdatableGauge;
import org.apache.flink.agents.api.resource.ResourceContext;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -72,7 +77,7 @@ class BaseChatModelSetupTokenMetricsTest {
mockPromptTokensCounter = mock(Counter.class);
mockCompletionTokensCounter = mock(Counter.class);
- when(mockMetricGroup.getSubGroup("gpt-4")).thenReturn(mockModelGroup);
+ when(mockMetricGroup.getSubGroup("model",
"gpt-4")).thenReturn(mockModelGroup);
when(mockModelGroup.getCounter("promptTokens")).thenReturn(mockPromptTokensCounter);
when(mockModelGroup.getCounter("completionTokens")).thenReturn(mockCompletionTokensCounter);
}
@@ -84,7 +89,7 @@ class BaseChatModelSetupTokenMetricsTest {
setup.recordTokenMetrics("gpt-4", 100, 50);
- verify(mockMetricGroup).getSubGroup("gpt-4");
+ verify(mockMetricGroup).getSubGroup("model", "gpt-4");
verify(mockModelGroup).getCounter("promptTokens");
verify(mockModelGroup).getCounter("completionTokens");
verify(mockPromptTokensCounter).inc(100);
@@ -108,15 +113,15 @@ class BaseChatModelSetupTokenMetricsTest {
Counter mockGpt35PromptCounter = mock(Counter.class);
Counter mockGpt35CompletionCounter = mock(Counter.class);
-
when(mockMetricGroup.getSubGroup("gpt-3.5-turbo")).thenReturn(mockGpt35Group);
+ when(mockMetricGroup.getSubGroup("model",
"gpt-3.5-turbo")).thenReturn(mockGpt35Group);
when(mockGpt35Group.getCounter("promptTokens")).thenReturn(mockGpt35PromptCounter);
when(mockGpt35Group.getCounter("completionTokens")).thenReturn(mockGpt35CompletionCounter);
setup.recordTokenMetrics("gpt-4", 100, 50);
setup.recordTokenMetrics("gpt-3.5-turbo", 200, 100);
- verify(mockMetricGroup).getSubGroup("gpt-4");
- verify(mockMetricGroup).getSubGroup("gpt-3.5-turbo");
+ verify(mockMetricGroup).getSubGroup("model", "gpt-4");
+ verify(mockMetricGroup).getSubGroup("model", "gpt-3.5-turbo");
verify(mockPromptTokensCounter).inc(100);
verify(mockCompletionTokensCounter).inc(50);
verify(mockGpt35PromptCounter).inc(200);
@@ -128,4 +133,98 @@ class BaseChatModelSetupTokenMetricsTest {
void testResourceType() {
assertEquals(ResourceType.CHAT_MODEL, setup.getResourceType());
}
+
+ // ========================================================================
+ // Value-based tests using TestMetricGroup (non-Mockito)
+ // ========================================================================
+
+ private static class TestMetricGroup implements FlinkAgentsMetricGroup {
+ final Map<String, TestMetricGroup> subGroups = new HashMap<>();
+ final Map<String, SimpleCounter> counters = new HashMap<>();
+
+ @Override
+ public FlinkAgentsMetricGroup getSubGroup(String name) {
+ return subGroups.computeIfAbsent(name, k -> new TestMetricGroup());
+ }
+
+ @Override
+ public FlinkAgentsMetricGroup getSubGroup(String key, String value) {
+ return subGroups.computeIfAbsent(key + "=" + value, k -> new
TestMetricGroup());
+ }
+
+ @Override
+ public Counter getCounter(String name) {
+ return counters.computeIfAbsent(name, k -> new SimpleCounter());
+ }
+
+ @Override
+ public UpdatableGauge getGauge(String name) {
+ return null;
+ }
+
+ @Override
+ public Meter getMeter(String name) {
+ return null;
+ }
+
+ @Override
+ public Meter getMeter(String name, Counter counter) {
+ return null;
+ }
+
+ @Override
+ public Histogram getHistogram(String name) {
+ return null;
+ }
+
+ @Override
+ public Histogram getHistogram(String name, int windowSize) {
+ return null;
+ }
+ }
+
+ @Test
+ @DisplayName("Value-based: token counters are accessible under model
key-value group")
+ void testTokenMetricsUnderModelKeyValueGroup() {
+ TestMetricGroup root = new TestMetricGroup();
+ setup.setMetricGroup(root);
+
+ setup.recordTokenMetrics("gpt-4", 100, 50);
+
+ TestMetricGroup modelGroup = (TestMetricGroup)
root.getSubGroup("model", "gpt-4");
+ assertEquals(100, modelGroup.counters.get("promptTokens").getCount());
+ assertEquals(50,
modelGroup.counters.get("completionTokens").getCount());
+ }
+
+ @Test
+ @DisplayName("Value-based: different models have independent counters")
+ void testDifferentModelsHaveIndependentCounters() {
+ TestMetricGroup root = new TestMetricGroup();
+ setup.setMetricGroup(root);
+
+ setup.recordTokenMetrics("gpt-4", 100, 50);
+ setup.recordTokenMetrics("gpt-3.5-turbo", 200, 80);
+
+ TestMetricGroup gpt4 = (TestMetricGroup) root.getSubGroup("model",
"gpt-4");
+ TestMetricGroup gpt35 = (TestMetricGroup) root.getSubGroup("model",
"gpt-3.5-turbo");
+
+ assertEquals(100, gpt4.counters.get("promptTokens").getCount());
+ assertEquals(50, gpt4.counters.get("completionTokens").getCount());
+ assertEquals(200, gpt35.counters.get("promptTokens").getCount());
+ assertEquals(80, gpt35.counters.get("completionTokens").getCount());
+ }
+
+ @Test
+ @DisplayName("Value-based: counters accumulate across multiple calls")
+ void testCountersAccumulate() {
+ TestMetricGroup root = new TestMetricGroup();
+ setup.setMetricGroup(root);
+
+ setup.recordTokenMetrics("gpt-4", 100, 50);
+ setup.recordTokenMetrics("gpt-4", 150, 75);
+
+ TestMetricGroup modelGroup = (TestMetricGroup)
root.getSubGroup("model", "gpt-4");
+ assertEquals(250, modelGroup.counters.get("promptTokens").getCount());
+ assertEquals(125,
modelGroup.counters.get("completionTokens").getCount());
+ }
}
diff --git a/docs/content/docs/operations/monitoring.md
b/docs/content/docs/operations/monitoring.md
index 1925c7f6..68012f01 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -36,18 +36,18 @@ We offer data monitoring for built-in metrics, which
includes events, actions, a
| **Agent** | numOfEventProcessedPerSec | The number of
Events this operator has processed per second. | Meter |
| **Agent** | numOfActionsExecuted | The total
number of actions this operator has executed. | Count |
| **Agent** | numOfActionsExecutedPerSec | The number of
actions this operator has executed per second. | Meter |
-| **Action** | <action_name>.numOfActionsExecuted | The total number of
actions this operator has executed for a specific action name. | Count |
-| **Action** | <action_name>.numOfActionsExecutedPerSec | The number of
actions this operator has executed per second for a specific action name. |
Meter |
+| **Action** | action.\<action_name\>.numOfActionsExecuted | The total number
of actions this operator has executed for a specific action name. | Count |
+| **Action** | action.\<action_name\>.numOfActionsExecutedPerSec | The number
of actions this operator has executed per second for a specific action name. |
Meter |
| **Agent** | eventLogTruncatedEvents | Number of
event log records whose payload was truncated at `STANDARD` level. Increments
once per event, regardless of how many fields inside it were truncated. Use
this to decide whether to raise truncation thresholds or move specific event
types to `VERBOSE`. | Count |
#### Token Usage Metrics
Token usage metrics are automatically recorded when chat models are invoked
through `ChatModelConnection`. These metrics help track LLM API usage and costs.
-| Scope | Metrics | Description
| Type |
-|-----------|---------------------------------------------|--------------------------------------------------------------------------------|-------|
-| **Model** | <action_name>.<model_name>.promptTokens | The total number
of prompt tokens consumed by the model within an action. | Count |
-| **Model** | <action_name>.<model_name>.completionTokens | The total number
of completion tokens generated by the model within an action. | Count |
+| Scope | Metrics |
Description
| Type |
+|-----------|--------------------------------------------------------------|--------------------------------------------------------------------------------|-------|
+| **Model** | action.\<action_name\>.model.\<model_name\>.promptTokens |
The total number of prompt tokens consumed by the model within an action.
| Count |
+| **Model** | action.\<action_name\>.model.\<model_name\>.completionTokens |
The total number of completion tokens generated by the model within an action.
| Count |
### How to add custom metrics
@@ -113,7 +113,7 @@ public class MyAgent extends Agent {
### How to check the metrics with Flink executor
-Flink agents enable the reporting of metrics to external systems by creating a
metric identifier prefix in the format
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>`. Please
refer to [Flink Metric
Reporters](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/metric_reporters/)
for more details.
+Flink agents enable the reporting of metrics to external systems by creating a
metric identifier prefix in the format
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>`.
Agent-specific metrics use key-value metric groups (e.g.,
`action.<action_name>`, `model.<model_name>`) which are exposed as
dimensions/labels in reporters that support them (such as Prometheus). Please
refer to [Flink Metric
Reporters](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/
[...]
Additionally, we can check the metric results in the Flink Job WebUI using the
metric identifier prefix `<subtask_index>.<operator_name>`.
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 44d76579..119edaf8 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -109,6 +109,25 @@ under the License.
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>4.12.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2EAgent.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2EAgent.java
new file mode 100644
index 00000000..9cefc284
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2EAgent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integration.test;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceName;
+
+import java.util.Collections;
+
+/**
+ * Minimal agent for token metrics e2e testing. Uses OpenAI completions
connection pointing to a
+ * MockWebServer endpoint. Set {@link #apiBaseUrl} before building the agent
plan.
+ */
+public class TokenMetricsE2EAgent extends Agent {
+
+ static String apiBaseUrl;
+
+ @ChatModelConnection
+ public static ResourceDescriptor chatModelConnection() {
+ return ResourceDescriptor.Builder.newBuilder(
+ ResourceName.ChatModel.OPENAI_COMPLETIONS_CONNECTION)
+ .addInitialArgument("api_key", "test-key")
+ .addInitialArgument("api_base_url", apiBaseUrl)
+ .build();
+ }
+
+ @ChatModelSetup
+ public static ResourceDescriptor chatModel() {
+ return ResourceDescriptor.Builder.newBuilder(
+ ResourceName.ChatModel.OPENAI_COMPLETIONS_SETUP)
+ .addInitialArgument("connection", "chatModelConnection")
+ .addInitialArgument("model", "gpt-4o-mini")
+ .build();
+ }
+
+ @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
+ public static void process(InputEvent event, RunnerContext ctx) throws
Exception {
+ ctx.sendEvent(
+ new ChatRequestEvent(
+ "chatModel",
+ Collections.singletonList(
+ new ChatMessage(MessageRole.USER, (String)
event.getInput()))));
+ }
+
+ @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
+ public static void processChatResponse(ChatResponseEvent event,
RunnerContext ctx) {
+ ctx.sendEvent(new OutputEvent(event.getResponse().getContent()));
+ }
+}
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2ETest.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2ETest.java
new file mode 100644
index 00000000..5b0982b2
--- /dev/null
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2ETest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integration.test;
+
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test verifying that agent metrics with key-value metric group
dimensions flow through
+ * the real Flink metric system. Uses {@link InMemoryReporter} to capture
metrics and {@link
+ * MockWebServer} as a stand-in for the LLM endpoint.
+ *
+ * <p>The test validates the <b>complete</b> metric identifier. The
TaskManager resource ID is
+ * extracted at runtime via {@link #PREFIX_PATTERN}, then each expected
metric's full identifier is
+ * constructed from {@link #PREFIX_TEMPLATE} and matched exactly. To add
coverage for a new metric,
+ * add an entry to {@link #EXPECTED_AGENT_COUNTERS}.
+ */
+class TokenMetricsE2ETest {
+
+ private static final String CANNED_RESPONSE =
+
"{\"id\":\"chatcmpl-test\",\"object\":\"chat.completion\",\"created\":0,"
+ + "\"model\":\"gpt-4o-mini\","
+ + "\"choices\":[{\"index\":0,"
+ +
"\"message\":{\"role\":\"assistant\",\"content\":\"Hello!\"},"
+ + "\"finish_reason\":\"stop\"}],"
+ +
"\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}";
+
+ private static final Pattern PREFIX_PATTERN =
+ Pattern.compile(
+ "^\\.taskmanager\\.([a-f0-9-]+)\\.Flink Streaming
Job\\.action-execute-operator\\.0\\.");
+
+ private static final String PREFIX_TEMPLATE =
+ ".taskmanager.%s.Flink Streaming Job.action-execute-operator.0.";
+
+ /**
+ * Expected agent Counter metrics. Each key is the deterministic suffix
after the prefix in the
+ * full metric identifier. The value is the expected counter value after
processing 2 input
+ * records.
+ *
+ * <p>To extend: add one entry per new counter metric.
+ */
+ private static final Map<String, Long> EXPECTED_AGENT_COUNTERS =
buildExpectedCounters();
+
+ private static Map<String, Long> buildExpectedCounters() {
+ Map<String, Long> m = new LinkedHashMap<>();
+ // Token metrics — validates action.<name>.model.<model>.<counter>
hierarchy
+ // 2 chat requests x 10 prompt tokens = 20
+ m.put("action.chat_model_action.model.gpt-4o-mini.promptTokens", 20L);
+ // 2 chat requests x 5 completion tokens = 10
+ m.put("action.chat_model_action.model.gpt-4o-mini.completionTokens",
10L);
+ return m;
+ }
+
+ private static final InMemoryReporter REPORTER =
InMemoryReporter.createWithRetainedMetrics();
+
+ @RegisterExtension
+ static final MiniClusterExtension MINI_CLUSTER =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(createClusterConfig())
+ .setNumberSlotsPerTaskManager(2)
+ .setNumberTaskManagers(1)
+ .build());
+
+ private static Configuration createClusterConfig() {
+ Configuration config = new Configuration();
+ REPORTER.addToConfiguration(config);
+ return config;
+ }
+
+ private MockWebServer mockServer;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mockServer = new MockWebServer();
+ mockServer.setDispatcher(
+ new Dispatcher() {
+ @Override
+ public MockResponse dispatch(RecordedRequest request) {
+ return new MockResponse()
+ .setHeader("Content-Type", "application/json")
+ .setBody(CANNED_RESPONSE);
+ }
+ });
+ mockServer.start();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ mockServer.shutdown();
+ }
+
+ @Test
+ @DisplayName("Agent metrics are registered with correct key-value group
hierarchy")
+ void testAgentMetricsWithFullIdentifierValidation() throws Exception {
+ TokenMetricsE2EAgent.apiBaseUrl = mockServer.url("/v1").toString();
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<String> inputStream = env.fromData("What is 1+1?", "Hello
world");
+
+ AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+ DataStream<Object> outputStream =
+ agentsEnv
+ .fromDataStream(inputStream, (KeySelector<String,
String>) value -> value)
+ .apply(new TokenMetricsE2EAgent())
+ .toDataStream();
+
+ CloseableIterator<Object> results = outputStream.collectAsync();
+ agentsEnv.execute();
+
+ List<Object> collected = new ArrayList<>();
+ while (results.hasNext()) {
+ collected.add(results.next());
+ }
+ assertThat(collected).hasSize(2);
+
+ Map<MetricGroup, Map<String, Metric>> byGroup =
REPORTER.getMetricsByGroup();
+
+ String taskManagerId = extractTaskManagerId(byGroup);
+ assertThat(taskManagerId)
+ .as(
+ "Should extract TaskManager resource ID from metrics "
+ + "matching prefix pattern: %s",
+ PREFIX_PATTERN.pattern())
+ .isNotNull();
+
+ String metricIdPrefix = String.format(PREFIX_TEMPLATE, taskManagerId);
+
+ for (Map.Entry<String, Long> expected :
EXPECTED_AGENT_COUNTERS.entrySet()) {
+ String expectedSuffix = expected.getKey();
+ long expectedValue = expected.getValue();
+ String expectedFullId = metricIdPrefix + expectedSuffix;
+
+ List<Counter> counters = findAllCountersByExactId(byGroup,
expectedFullId);
+
+ assertThat(counters)
+ .as("Exactly one counter with metric id '%s'",
expectedFullId)
+ .hasSize(1);
+
+ assertThat(counters.get(0).getCount())
+ .as("Counter value for '%s'", expectedFullId)
+ .isEqualTo(expectedValue);
+ }
+ }
+
+ /**
+ * Extracts the TaskManager resource ID by matching any operator metric
against {@link
+ * #PREFIX_PATTERN}. Returns {@code null} if no metric matches the
expected prefix structure,
+ * which would indicate the framework changed its metric hierarchy.
+ */
+ private static String extractTaskManagerId(Map<MetricGroup, Map<String,
Metric>> byGroup) {
+ for (Map.Entry<MetricGroup, Map<String, Metric>> groupEntry :
byGroup.entrySet()) {
+ MetricGroup group = groupEntry.getKey();
+ for (String metricName : groupEntry.getValue().keySet()) {
+ String fullId = group.getMetricIdentifier(metricName);
+ Matcher matcher = PREFIX_PATTERN.matcher(fullId);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+ }
+ }
+ return null;
+ }
+
+ private static List<Counter> findAllCountersByExactId(
+ Map<MetricGroup, Map<String, Metric>> byGroup, String expectedId) {
+ List<Counter> result = new ArrayList<>();
+ for (Map.Entry<MetricGroup, Map<String, Metric>> groupEntry :
byGroup.entrySet()) {
+ MetricGroup group = groupEntry.getKey();
+ for (Map.Entry<String, Metric> metricEntry :
groupEntry.getValue().entrySet()) {
+ if (!(metricEntry.getValue() instanceof Counter)) {
+ continue;
+ }
+ String fullId =
group.getMetricIdentifier(metricEntry.getKey());
+ if (expectedId.equals(fullId)) {
+ result.add((Counter) metricEntry.getValue());
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
index 504b4fb9..df28c1d4 100644
---
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
+++
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
@@ -176,7 +176,7 @@ public class ChatModelAction {
}
FlinkAgentsMetricGroup metricGroup = ctx.getActionMetricGroup();
if (metricGroup != null) {
- FlinkAgentsMetricGroup modelGroup = metricGroup.getSubGroup(model);
+ FlinkAgentsMetricGroup modelGroup =
metricGroup.getSubGroup("model", model);
modelGroup.getCounter("retryCount").inc(retryCount);
modelGroup.getCounter("retryWaitSec").inc(totalRetryWaitSec);
}
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
index a66e36f9..8c139505 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
@@ -90,7 +90,8 @@ class ChatModelActionRetryTest {
.thenAnswer(inv ->
inv.<DurableCallable<ChatMessage>>getArgument(0).call());
// Wire up metric group chain
-
when(mockActionMetricGroup.getSubGroup(anyString())).thenReturn(mockModelMetricGroup);
+ when(mockActionMetricGroup.getSubGroup(anyString(), anyString()))
+ .thenReturn(mockModelMetricGroup);
when(mockModelMetricGroup.getCounter("retryCount")).thenReturn(mockRetryCountCounter);
when(mockModelMetricGroup.getCounter("retryWaitSec")).thenReturn(mockRetryWaitSecCounter);
}
@@ -123,7 +124,7 @@ class ChatModelActionRetryTest {
assertThat(responseEvent.getTotalRetryWaitSec()).isEqualTo(0);
// No retry metrics should be recorded
- verify(mockActionMetricGroup, never()).getSubGroup(anyString());
+ verify(mockActionMetricGroup, never()).getSubGroup(anyString(),
anyString());
}
@Test
@@ -163,7 +164,7 @@ class ChatModelActionRetryTest {
assertThat(elapsed).isGreaterThanOrEqualTo(1000L);
// Verify metrics recorded under connection name
-
verify(mockActionMetricGroup).getSubGroup(mockChatModel.getConnectionName());
+ verify(mockActionMetricGroup).getSubGroup("model",
mockChatModel.getConnectionName());
verify(mockRetryCountCounter).inc(1);
verify(mockRetryWaitSecCounter).inc(1);
}
diff --git a/python/flink_agents/api/chat_models/chat_model.py
b/python/flink_agents/api/chat_models/chat_model.py
index 7cbe6e12..ac4a814a 100644
--- a/python/flink_agents/api/chat_models/chat_model.py
+++ b/python/flink_agents/api/chat_models/chat_model.py
@@ -278,7 +278,7 @@ class BaseChatModelSetup(Resource):
if metric_group is None:
return
- model_group = metric_group.get_sub_group(model_name)
+ model_group = metric_group.get_sub_group("model", model_name)
model_group.get_counter("promptTokens").inc(prompt_tokens)
model_group.get_counter("completionTokens").inc(completion_tokens)
diff --git a/python/flink_agents/api/chat_models/tests/test_token_metrics.py
b/python/flink_agents/api/chat_models/tests/test_token_metrics.py
index d987d121..e8195151 100644
--- a/python/flink_agents/api/chat_models/tests/test_token_metrics.py
+++ b/python/flink_agents/api/chat_models/tests/test_token_metrics.py
@@ -75,10 +75,11 @@ class _MockMetricGroup(MetricGroup):
self._sub_groups: dict[str, _MockMetricGroup] = {}
self._counters: dict[str, _MockCounter] = {}
- def get_sub_group(self, name: str) -> "_MockMetricGroup":
- if name not in self._sub_groups:
- self._sub_groups[name] = _MockMetricGroup()
- return self._sub_groups[name]
+ def get_sub_group(self, name: str, value: str | None = None) ->
"_MockMetricGroup":
+ key = f"{name}={value}" if value is not None else name
+ if key not in self._sub_groups:
+ self._sub_groups[key] = _MockMetricGroup()
+ return self._sub_groups[key]
def get_counter(self, name: str) -> _MockCounter:
if name not in self._counters:
@@ -110,7 +111,7 @@ class TestBaseChatModelTokenMetrics:
chat_model.test_record_token_metrics("gpt-4", 100, 50)
# Verify the metrics were recorded
- model_group = mock_metric_group.get_sub_group("gpt-4")
+ model_group = mock_metric_group.get_sub_group("model", "gpt-4")
assert model_group.get_counter("promptTokens").get_count() == 100
assert model_group.get_counter("completionTokens").get_count() == 50
@@ -138,8 +139,8 @@ class TestBaseChatModelTokenMetrics:
chat_model.test_record_token_metrics("gpt-3.5-turbo", 200, 100)
# Verify each model has its own counters
- gpt4_group = mock_metric_group.get_sub_group("gpt-4")
- gpt35_group = mock_metric_group.get_sub_group("gpt-3.5-turbo")
+ gpt4_group = mock_metric_group.get_sub_group("model", "gpt-4")
+ gpt35_group = mock_metric_group.get_sub_group("model", "gpt-3.5-turbo")
assert gpt4_group.get_counter("promptTokens").get_count() == 100
assert gpt4_group.get_counter("completionTokens").get_count() == 50
@@ -159,7 +160,7 @@ class TestBaseChatModelTokenMetrics:
chat_model.test_record_token_metrics("gpt-4", 150, 75)
# Verify the metrics accumulated
- model_group = mock_metric_group.get_sub_group("gpt-4")
+ model_group = mock_metric_group.get_sub_group("model", "gpt-4")
assert model_group.get_counter("promptTokens").get_count() == 250
assert model_group.get_counter("completionTokens").get_count() == 125
diff --git a/python/flink_agents/api/metric_group.py
b/python/flink_agents/api/metric_group.py
index 78bc629c..12525294 100644
--- a/python/flink_agents/api/metric_group.py
+++ b/python/flink_agents/api/metric_group.py
@@ -25,13 +25,19 @@ class MetricGroup(ABC):
"""
@abstractmethod
- def get_sub_group(self, name: str) -> "MetricGroup":
- """Create or retrieve a sub-metric group with the given name.
+ def get_sub_group(self, name: str, value: str | None = None) ->
"MetricGroup":
+ """Create or retrieve a sub-metric group.
+
+ When *value* is ``None`` a plain named sub-group is returned.
+ When *value* is given, *name* is treated as the key and a
+ key-value sub-group is returned.
Parameters
----------
name : str
- The name of the sub metric group.
+ The name (or key) of the sub metric group.
+ value : str, optional
+ The value of the metric group variable.
"""
@abstractmethod
diff --git a/python/flink_agents/plan/actions/chat_model_action.py
b/python/flink_agents/plan/actions/chat_model_action.py
index bbb6866d..188eb9e7 100644
--- a/python/flink_agents/plan/actions/chat_model_action.py
+++ b/python/flink_agents/plan/actions/chat_model_action.py
@@ -162,7 +162,7 @@ def _record_retry_metrics(
return
metric_group = ctx.action_metric_group
if metric_group is not None:
- model_group = metric_group.get_sub_group(model)
+ model_group = metric_group.get_sub_group("model", model)
model_group.get_counter("retryCount").inc(retry_count)
model_group.get_counter("retryWaitSec").inc(total_retry_wait_sec)
diff --git
a/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
b/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
index 879a02e7..e1f04f62 100644
--- a/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
+++ b/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
@@ -66,10 +66,11 @@ class _MockMetricGroup(MetricGroup):
self._sub_groups: dict[str, _MockMetricGroup] = {}
self._counters: dict[str, _MockCounter] = {}
- def get_sub_group(self, name: str) -> "_MockMetricGroup":
- if name not in self._sub_groups:
- self._sub_groups[name] = _MockMetricGroup()
- return self._sub_groups[name]
+ def get_sub_group(self, name: str, value: str | None = None) ->
"_MockMetricGroup":
+ key = f"{name}={value}" if value is not None else name
+ if key not in self._sub_groups:
+ self._sub_groups[key] = _MockMetricGroup()
+ return self._sub_groups[key]
def get_counter(self, name: str) -> _MockCounter:
if name not in self._counters:
@@ -218,7 +219,7 @@ class TestChatModelActionRetry:
assert elapsed >= 1.0
# Verify metrics recorded under connection name
- model_group = metric_group.get_sub_group(chat_model.connection)
+ model_group = metric_group.get_sub_group("model",
chat_model.connection)
assert model_group.get_counter("retryCount").get_count() == 1
assert model_group.get_counter("retryWaitSec").get_count() == 1
diff --git a/python/flink_agents/runtime/flink_metric_group.py
b/python/flink_agents/runtime/flink_metric_group.py
index 59a2ae52..5c731e6a 100644
--- a/python/flink_agents/runtime/flink_metric_group.py
+++ b/python/flink_agents/runtime/flink_metric_group.py
@@ -36,7 +36,15 @@ class FlinkMetricGroup(MetricGroup):
self._j_metric_group = j_metric_group
@override
- def get_sub_group(self, name: str) -> "FlinkMetricGroup":
+ def get_sub_group(self, name: str, value: str | None = None) ->
"FlinkMetricGroup":
+ if value is not None:
+ return FlinkMetricGroup(self._j_metric_group.getSubGroup(name,
value))
+ if "=" in name:
+ msg = (
+ f"Sub-group name must not contain '=' (got '{name}'). "
+ "Use get_sub_group(name, value) for key-value groups."
+ )
+ raise ValueError(msg)
return FlinkMetricGroup(self._j_metric_group.getSubGroup(name))
@override
diff --git a/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
b/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
index b4085827..bcd81dfd 100644
--- a/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
+++ b/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
@@ -455,7 +455,7 @@ class Mem0LongTermMemory(InternalBaseLongTermMemory):
and metric.get("promptTokens")
and metric.get("completionTokens")
):
- model_group =
self.metric_group.get_sub_group(metric["model_name"])
+ model_group = self.metric_group.get_sub_group("model",
metric["model_name"])
model_group.get_counter("promptTokens").inc(metric["promptTokens"])
model_group.get_counter("completionTokens").inc(
metric["completionTokens"]
diff --git
a/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
b/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
index df04eb3c..9911bb44 100644
---
a/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
+++
b/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
@@ -400,10 +400,11 @@ class MockMetricGroup:
self._sub_groups: dict[str, MockMetricGroup] = {}
self._counters: dict[str, int] = {}
- def get_sub_group(self, name: str) -> "MockMetricGroup":
- if name not in self._sub_groups:
- self._sub_groups[name] = MockMetricGroup()
- return self._sub_groups[name]
+ def get_sub_group(self, name: str, value: str | None = None) ->
"MockMetricGroup":
+ key = f"{name}={value}" if value is not None else name
+ if key not in self._sub_groups:
+ self._sub_groups[key] = MockMetricGroup()
+ return self._sub_groups[key]
def get_counter(self, name: str) -> "MockCounter":
return MockCounter(self, name)
@@ -468,7 +469,7 @@ def test_token_usage_reported_on_switch_context() -> None:
# Metrics are still in the queue — not reported until switch_context.
ltm_group = metric_group.get_sub_group("long-term-memory")
- model_group = ltm_group.get_sub_group("mock-model")
+ model_group = ltm_group.get_sub_group("model", "mock-model")
assert model_group._counters.get("promptTokens", 0) == 0
# switch_context drains the queue on the mailbox thread.
@@ -530,7 +531,7 @@ def test_token_usage_flushed_on_close() -> None:
ltm.close()
ltm_group = metric_group.get_sub_group("long-term-memory")
- model_group = ltm_group.get_sub_group("mock-model")
+ model_group = ltm_group.get_sub_group("model", "mock-model")
# 2 adds x 2 LLM calls each x 10 prompt tokens = 40
assert model_group._counters.get("promptTokens", 0) == 40
# 2 adds x 2 LLM calls each x 5 completion tokens = 20
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index b0603ecb..1395bdc5 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -143,7 +143,7 @@ public class RunnerContextImpl implements RunnerContext {
@Override
public FlinkAgentsMetricGroupImpl getActionMetricGroup() {
- return agentMetricGroup.getSubGroup(actionName);
+ return agentMetricGroup.getSubGroup("action", actionName);
}
@Override
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
index 59e0daaa..6bbc03aa 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
@@ -56,7 +56,7 @@ public class BuiltInMetrics {
for (String actionName : agentPlan.getActions().keySet()) {
actionMetricGroups.put(
actionName,
- new
BuiltInActionMetrics(parentMetricGroup.getSubGroup(actionName)));
+ new
BuiltInActionMetrics(parentMetricGroup.getSubGroup("action", actionName)));
}
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
index 6b0a2365..8360a9f0 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+import org.apache.flink.util.Preconditions;
import java.util.HashMap;
@@ -53,12 +54,26 @@ public class FlinkAgentsMetricGroupImpl extends
ProxyMetricGroup<MetricGroup>
}
public FlinkAgentsMetricGroupImpl getSubGroup(String name) {
+ Preconditions.checkArgument(
+ !name.contains("="),
+ "Sub-group name must not contain '=' (got '%s'). "
+ + "Use getSubGroup(key, value) for key-value groups.",
+ name);
if (!subMetricGroups.containsKey(name)) {
subMetricGroups.put(name, new
FlinkAgentsMetricGroupImpl(super.addGroup(name)));
}
return subMetricGroups.get(name);
}
+ public FlinkAgentsMetricGroupImpl getSubGroup(String key, String value) {
+ String cacheKey = key + "=" + value;
+ if (!subMetricGroups.containsKey(cacheKey)) {
+ subMetricGroups.put(
+ cacheKey, new
FlinkAgentsMetricGroupImpl(super.addGroup(key, value)));
+ }
+ return subMetricGroups.get(cacheKey);
+ }
+
public UpdatableGaugeImpl getGauge(String name) {
if (!gauges.containsKey(name)) {
gauges.put(name, super.gauge(name, new UpdatableGaugeImpl()));
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
index d58f8cdf..86d60f01 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
@@ -51,6 +51,44 @@ public class FlinkAgentsMetricGroupImplTest {
assertEquals(result, metricGroup.getSubGroup(name));
}
+ @Test
+ void testGetSubGroupWithKeyValue() {
+ String key = "model";
+ String value = "gpt-4";
+ FlinkAgentsMetricGroupImpl result = metricGroup.getSubGroup(key,
value);
+
+ assertNotNull(result);
+ assertEquals(result, metricGroup.getSubGroup(key, value));
+ }
+
+ @Test
+ void testKeyValueSubGroupIsolatedFromNamedSubGroup() {
+ FlinkAgentsMetricGroupImpl named = metricGroup.getSubGroup("model");
+ FlinkAgentsMetricGroupImpl kv = metricGroup.getSubGroup("model",
"gpt-4");
+
+ assertNotSame(named, kv);
+
+ named.getCounter("c").inc(10);
+ kv.getCounter("c").inc(99);
+
+ assertEquals(10, named.getCounter("c").getCount());
+ assertEquals(99, kv.getCounter("c").getCount());
+ }
+
+ @Test
+ void testDifferentValuesCreateDistinctSubGroups() {
+ FlinkAgentsMetricGroupImpl gpt4 = metricGroup.getSubGroup("model",
"gpt-4");
+ FlinkAgentsMetricGroupImpl gpt35 = metricGroup.getSubGroup("model",
"gpt-3.5");
+
+ assertNotSame(gpt4, gpt35);
+
+ gpt4.getCounter("promptTokens").inc(100);
+ gpt35.getCounter("promptTokens").inc(200);
+
+ assertEquals(100, gpt4.getCounter("promptTokens").getCount());
+ assertEquals(200, gpt35.getCounter("promptTokens").getCount());
+ }
+
@Test
void testGetGauge() {
String name = "testGauge";