This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c2589915c9 [Improve][Core][Metrics] Optimize Sink Write Data
Statistics (#10100)
c2589915c9 is described below
commit c2589915c969f0af78b8997b94c9f297e0b8b78a
Author: Jast <[email protected]>
AuthorDate: Mon Dec 1 22:27:25 2025 +0800
[Improve][Core][Metrics] Optimize Sink Write Data Statistics (#10100)
---
docs/en/seatunnel-engine/rest-api-v1.md | 32 ++-
docs/en/seatunnel-engine/rest-api-v2.md | 31 ++-
docs/zh/seatunnel-engine/rest-api-v1.md | 30 ++-
docs/zh/seatunnel-engine/rest-api-v2.md | 31 ++-
.../seatunnel/api/common/metrics/MetricNames.java | 4 +
.../seatunnel/engine/e2e/CommittedMetricsIT.java | 286 +++++++++++++++++++++
...ake_multi_table_to_console_with_checkpoint.conf | 68 +++++
.../metrics/ConnectorMetricsCalcContext.java | 246 ++++++++++++++++--
.../seatunnel/engine/server/rest/RestConstant.java | 6 +-
.../engine/server/rest/service/BaseService.java | 54 +++-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 3 +
.../metrics/ConnectorMetricsCalcContextTest.java | 106 ++++++++
.../rest/service/BaseServiceTableMetricsTest.java | 191 ++++++++++++++
13 files changed, 1045 insertions(+), 43 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v1.md
b/docs/en/seatunnel-engine/rest-api-v1.md
index d99f2213a8..91c3773435 100644
--- a/docs/en/seatunnel-engine/rest-api-v1.md
+++ b/docs/en/seatunnel-engine/rest-api-v1.md
@@ -199,6 +199,26 @@ network:
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is
running.
`finishedTime`, `errorMsg` will return when job is finished.
+#### Metrics field description
+
+| Field | Description |
+| --- | --- |
+| SourceReceivedCount | Total rows received from sources |
+| SourceReceivedQPS | Source receive rate (rows/s) |
+| SourceReceivedBytes | Total bytes received from sources |
+| SourceReceivedBytesPerSeconds | Source receive rate (bytes/s) |
+| SinkWriteCount | Sink write attempts (rows) |
+| SinkWriteQPS | Sink write attempt rate (rows/s) |
+| SinkWriteBytes | Sink write attempts (bytes) |
+| SinkWriteBytesPerSeconds | Sink write attempt rate (bytes/s) |
+| SinkCommittedCount | Sink committed rows after checkpoint succeeds |
+| SinkCommittedQPS | Sink committed rate (rows/s) |
+| SinkCommittedBytes | Sink committed bytes after checkpoint succeeds |
+| SinkCommittedBytesPerSeconds | Sink committed rate (bytes/s) |
+| TableSourceReceived* | Per-table source metrics, key format
`TableSourceReceivedXXX#<table>` |
+| TableSinkWrite* | Per-table sink write attempts, key format
`TableSinkWriteXXX#<table>` |
+| TableSinkCommitted* | Per-table sink committed metrics, key format
`TableSinkCommittedXXX#<table>` |
+
When we can't get the job info, the response will be:
```json
@@ -256,6 +276,10 @@ This API has been deprecated, please use
/hazelcast/rest/maps/job-info/:jobId in
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
+ "SinkCommittedCount": "",
+ "SinkCommittedQPS": "",
+ "SinkCommittedBytes": "",
+ "SinkCommittedBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
@@ -263,7 +287,11 @@ This API has been deprecated, please use
/hazelcast/rest/maps/job-info/:jobId in
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
- "TableSinkWriteBytesPerSeconds": {}
+ "TableSinkWriteBytesPerSeconds": {},
+ "TableSinkCommittedCount": {},
+ "TableSinkCommittedQPS": {},
+ "TableSinkCommittedBytes": {},
+ "TableSinkCommittedBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
@@ -839,4 +867,4 @@ Returns a list of logs from the requested node.
To get a list of logs from the current node:
`http://localhost:5801/hazelcast/rest/maps/log`
To get the content of a log file:
`http://localhost:5801/hazelcast/rest/maps/log/job-898380162133917698.log`
-</details>
\ No newline at end of file
+</details>
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index 2ba5504af4..b3808d35d7 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -320,6 +320,10 @@ This endpoint helps troubleshoot why jobs stay in
`PENDING` by showing the pendi
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
+ "SinkCommittedCount": "",
+ "SinkCommittedQPS": "",
+ "SinkCommittedBytes": "",
+ "SinkCommittedBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
@@ -327,7 +331,11 @@ This endpoint helps troubleshoot why jobs stay in
`PENDING` by showing the pendi
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
- "TableSinkWriteBytesPerSeconds": {}
+ "TableSinkWriteBytesPerSeconds": {},
+ "TableSinkCommittedCount": {},
+ "TableSinkCommittedQPS": {},
+ "TableSinkCommittedBytes": {},
+ "TableSinkCommittedBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
@@ -343,6 +351,27 @@ This endpoint helps troubleshoot why jobs stay in
`PENDING` by showing the pendi
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is
running.
`finishedTime`, `errorMsg` will return when job is finished.
+#### Metrics field description
+
+| Field | Description |
+| --- | --- |
+| IntermediateQueueSize | Size of intermediate queue between operators |
+| SourceReceivedCount | Total rows received from sources |
+| SourceReceivedQPS | Source receive rate (rows/s) |
+| SourceReceivedBytes | Total bytes received from sources |
+| SourceReceivedBytesPerSeconds | Source receive rate (bytes/s) |
+| SinkWriteCount | Sink write attempts (rows) |
+| SinkWriteQPS | Sink write attempt rate (rows/s) |
+| SinkWriteBytes | Sink write attempts (bytes) |
+| SinkWriteBytesPerSeconds | Sink write attempt rate (bytes/s) |
+| SinkCommittedCount | Sink committed rows after checkpoint succeeds |
+| SinkCommittedQPS | Sink committed rate (rows/s) |
+| SinkCommittedBytes | Sink committed bytes after checkpoint succeeds |
+| SinkCommittedBytesPerSeconds | Sink committed rate (bytes/s) |
+| TableSourceReceived* | Per-table source metrics, key format
`TableSourceReceivedXXX#<table>` |
+| TableSinkWrite* | Per-table sink write attempts, key format
`TableSinkWriteXXX#<table>` |
+| TableSinkCommitted* | Per-table sink committed metrics, key format
`TableSinkCommittedXXX#<table>` |
+
When we can't get the job info, the response will be:
```json
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md
b/docs/zh/seatunnel-engine/rest-api-v1.md
index b91be2c7b9..83a6693aa0 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -188,6 +188,10 @@ network:
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
+ "SinkCommittedCount": "",
+ "SinkCommittedQPS": "",
+ "SinkCommittedBytes": "",
+ "SinkCommittedBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
@@ -195,7 +199,11 @@ network:
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
- "TableSinkWriteBytesPerSeconds": {}
+ "TableSinkWriteBytesPerSeconds": {},
+ "TableSinkCommittedCount": {},
+ "TableSinkCommittedQPS": {},
+ "TableSinkCommittedBytes": {},
+ "TableSinkCommittedBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
@@ -211,6 +219,26 @@ network:
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` 字段在Job在RUNNING状态时会返回
`finishedTime`, `errorMsg` 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
+#### 指标字段说明
+
+| 字段 | 说明 |
+| --- | --- |
+| SourceReceivedCount | 源端接收的行数 |
+| SourceReceivedQPS | 源端接收速率(行/秒) |
+| SourceReceivedBytes | 源端接收的字节数 |
+| SourceReceivedBytesPerSeconds | 源端接收速率(字节/秒) |
+| SinkWriteCount | Sink 写入尝试行数 |
+| SinkWriteQPS | Sink 写入尝试速率(行/秒) |
+| SinkWriteBytes | Sink 写入尝试字节数 |
+| SinkWriteBytesPerSeconds | Sink 写入尝试速率(字节/秒) |
+| SinkCommittedCount | checkpoint 成功后的 Sink 已提交行数 |
+| SinkCommittedQPS | Sink 已提交速率(行/秒) |
+| SinkCommittedBytes | checkpoint 成功后的 Sink 已提交字节数 |
+| SinkCommittedBytesPerSeconds | Sink 已提交速率(字节/秒) |
+| TableSourceReceived* | 按表汇总的源指标,键格式 `TableSourceReceivedXXX#<表>` |
+| TableSinkWrite* | 按表汇总的 Sink 写入尝试,键格式 `TableSinkWriteXXX#<表>` |
+| TableSinkCommitted* | 按表汇总的 Sink 已提交指标,键格式 `TableSinkCommittedXXX#<表>` |
+
当我们查询不到这个Job时,返回结果为:
```json
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 9b72a38300..8bab244208 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -314,6 +314,10 @@ seatunnel:
"SinkWriteQPS": "",
"SinkWriteBytes": "",
"SinkWriteBytesPerSeconds": "",
+ "SinkCommittedCount": "",
+ "SinkCommittedQPS": "",
+ "SinkCommittedBytes": "",
+ "SinkCommittedBytesPerSeconds": "",
"TableSourceReceivedCount": {},
"TableSourceReceivedBytes": {},
"TableSourceReceivedBytesPerSeconds": {},
@@ -321,7 +325,11 @@ seatunnel:
"TableSinkWriteCount": {},
"TableSinkWriteQPS": {},
"TableSinkWriteBytes": {},
- "TableSinkWriteBytesPerSeconds": {}
+ "TableSinkWriteBytesPerSeconds": {},
+ "TableSinkCommittedCount": {},
+ "TableSinkCommittedQPS": {},
+ "TableSinkCommittedBytes": {},
+ "TableSinkCommittedBytesPerSeconds": {}
},
"finishedTime": "",
"errorMsg": null,
@@ -337,6 +345,27 @@ seatunnel:
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` 字段在Job在RUNNING状态时会返回
`finishedTime`, `errorMsg` 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
+#### 指标字段说明
+
+| 字段 | 说明 |
+| --- | --- |
+| IntermediateQueueSize | 中间队列的大小 |
+| SourceReceivedCount | 源端接收的行数 |
+| SourceReceivedQPS | 源端接收速率(行/秒) |
+| SourceReceivedBytes | 源端接收的字节数 |
+| SourceReceivedBytesPerSeconds | 源端接收速率(字节/秒) |
+| SinkWriteCount | Sink 写入尝试行数 |
+| SinkWriteQPS | Sink 写入尝试速率(行/秒) |
+| SinkWriteBytes | Sink 写入尝试字节数 |
+| SinkWriteBytesPerSeconds | Sink 写入尝试速率(字节/秒) |
+| SinkCommittedCount | checkpoint 成功后的 Sink 已提交行数 |
+| SinkCommittedQPS | Sink 已提交速率(行/秒) |
+| SinkCommittedBytes | checkpoint 成功后的 Sink 已提交字节数 |
+| SinkCommittedBytesPerSeconds | Sink 已提交速率(字节/秒) |
+| TableSourceReceived* | 按表汇总的源指标,键格式 `TableSourceReceivedXXX#<表>` |
+| TableSinkWrite* | 按表汇总的 Sink 写入尝试,键格式 `TableSinkWriteXXX#<表>` |
+| TableSinkCommitted* | 按表汇总的 Sink 已提交指标,键格式 `TableSinkCommittedXXX#<表>` |
+
当我们查询不到这个Job时,返回结果为:
```json
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
index 46c71047b9..eb70b2359c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -33,6 +33,10 @@ public final class MetricNames {
public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
public static final String SINK_WRITE_QPS = "SinkWriteQPS";
public static final String SINK_WRITE_BYTES_PER_SECONDS =
"SinkWriteBytesPerSeconds";
+ public static final String SINK_COMMITTED_COUNT = "SinkCommittedCount";
+ public static final String SINK_COMMITTED_BYTES = "SinkCommittedBytes";
+ public static final String SINK_COMMITTED_QPS = "SinkCommittedQPS";
+ public static final String SINK_COMMITTED_BYTES_PER_SECONDS =
"SinkCommittedBytesPerSeconds";
public static final String INTERMEDIATE_QUEUE_SIZE =
"IntermediateQueueSize";
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java
new file mode 100644
index 0000000000..5c48a34f95
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Slf4j
+public class CommittedMetricsIT {
+
+ private static final String HOST = "http://localhost:";
+
+ private ClientJobProxy streamJobProxy;
+
+ private HazelcastInstanceImpl node1;
+
+ private SeaTunnelClient engineClient;
+
+ private SeaTunnelConfig seaTunnelConfig;
+
+ @BeforeEach
+ void beforeClass() throws Exception {
+ String testClusterName =
TestUtils.getClusterName("CommittedMetricsIT");
+ seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+ seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18080);
+
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
+ seaTunnelConfig.getEngineConfig().getHttpConfig().setPortRange(200);
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(testClusterName);
+ engineClient = new SeaTunnelClient(clientConfig);
+ }
+
+ @Test
+ public void testCommittedMetricsWithCheckpoint() throws Exception {
+ String streamFilePath =
+
TestUtils.getResource("stream_fake_multi_table_to_console_with_checkpoint.conf");
+ JobConfig streamConf = new JobConfig();
+
streamConf.setName("stream_fake_multi_table_to_console_with_checkpoint");
+ ClientJobExecutionEnvironment streamJobExecutionEnv =
+ engineClient.createExecutionContext(streamFilePath,
streamConf, seaTunnelConfig);
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ streamJobProxy = streamJobExecutionEnv.execute();
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ Assertions.assertNotNull(streamJobProxy);
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
streamJobProxy.getJobStatus());
+ });
+
+ log.info("Job is running, job id: {}", streamJobProxy.getJobId());
+
+ Thread.sleep(5000);
+
+ Response responseBeforeCheckpoint =
+ given().get(
+ HOST
+ +
node1.getCluster().getLocalMember().getAddress().getPort()
+ + RestConstant.CONTEXT_PATH
+ + RestConstant.REST_URL_JOB_INFO
+ + "/"
+ + streamJobProxy.getJobId());
+
+ log.info("Metrics before checkpoint: {}",
responseBeforeCheckpoint.prettyPrint());
+
+ String writeCountBeforeCP =
responseBeforeCheckpoint.path("metrics.SinkWriteCount");
+ String committedCountBeforeCP =
responseBeforeCheckpoint.path("metrics.SinkCommittedCount");
+
+ long writeBeforeCP = Long.parseLong(writeCountBeforeCP);
+ long committedBeforeCP = 0;
+ if (committedCountBeforeCP != null) {
+ committedBeforeCP = Long.parseLong(committedCountBeforeCP);
+ }
+
+ Assertions.assertTrue(writeBeforeCP > 0);
+ Assertions.assertEquals(0, committedBeforeCP);
+
+ log.info(
+ "Before checkpoint - WriteCount: {}, CommittedCount: {}",
+ writeBeforeCP,
+ committedBeforeCP);
+
+ Thread.sleep(8000);
+
+ Response responseAfterFirstCheckpoint =
+ given().get(
+ HOST
+ +
node1.getCluster().getLocalMember().getAddress().getPort()
+ + RestConstant.CONTEXT_PATH
+ + RestConstant.REST_URL_JOB_INFO
+ + "/"
+ + streamJobProxy.getJobId());
+
+ log.info("Metrics after first checkpoint: {}",
responseAfterFirstCheckpoint.prettyPrint());
+
+ String sinkCommittedCount =
responseAfterFirstCheckpoint.path("metrics.SinkCommittedCount");
+ String sinkWriteCount =
responseAfterFirstCheckpoint.path("metrics.SinkWriteCount");
+ Assertions.assertNotNull(sinkCommittedCount);
+ Assertions.assertNotNull(sinkWriteCount);
+
+ long committedCountAfterFirstCP = Long.parseLong(sinkCommittedCount);
+ long writeCountAfterFirstCP = Long.parseLong(sinkWriteCount);
+
+ Assertions.assertTrue(committedCountAfterFirstCP > 0);
+ Assertions.assertTrue(committedCountAfterFirstCP > committedBeforeCP);
+ Assertions.assertTrue(committedCountAfterFirstCP <=
writeCountAfterFirstCP);
+
+ log.info(
+ "After first checkpoint - WriteCount: {}, CommittedCount: {},
Uncommitted: {}",
+ writeCountAfterFirstCP,
+ committedCountAfterFirstCP,
+ writeCountAfterFirstCP - committedCountAfterFirstCP);
+
+ Thread.sleep(12000);
+
+ Response responseFinal =
+ given().get(
+ HOST
+ +
node1.getCluster().getLocalMember().getAddress().getPort()
+ + RestConstant.CONTEXT_PATH
+ + RestConstant.REST_URL_JOB_INFO
+ + "/"
+ + streamJobProxy.getJobId());
+
+ log.info("Metrics after second checkpoint: {}",
responseFinal.prettyPrint());
+
+ responseFinal
+ .then()
+ .statusCode(200)
+ .body("jobName", notNullValue())
+ .body("jobStatus", notNullValue());
+
+ String finalWriteCount = responseFinal.path("metrics.SinkWriteCount");
+ String finalCommittedCount =
responseFinal.path("metrics.SinkCommittedCount");
+ String finalCommittedBytes =
responseFinal.path("metrics.SinkCommittedBytes");
+ String finalWriteBytes = responseFinal.path("metrics.SinkWriteBytes");
+
+ long finalWrite = Long.parseLong(finalWriteCount);
+ long finalCommitted = Long.parseLong(finalCommittedCount);
+ long finalCommittedBytesVal = Long.parseLong(finalCommittedBytes);
+ long finalWriteBytesVal = Long.parseLong(finalWriteBytes);
+
+ Assertions.assertTrue(finalCommitted > committedCountAfterFirstCP);
+ Assertions.assertTrue(finalCommitted <= finalWrite);
+ Assertions.assertTrue(finalCommittedBytesVal > 0);
+ Assertions.assertTrue(finalCommittedBytesVal <= finalWriteBytesVal);
+
+ responseFinal
+ .then()
+ .body("metrics.SinkCommittedQPS", notNullValue())
+ .body("metrics.SinkCommittedBytesPerSeconds", notNullValue());
+
+ Double committedQPS =
Double.parseDouble(responseFinal.path("metrics.SinkCommittedQPS"));
+ Double committedBytesPerSec =
+
Double.parseDouble(responseFinal.path("metrics.SinkCommittedBytesPerSeconds"));
+ Assertions.assertTrue(committedQPS > 0);
+ Assertions.assertTrue(committedBytesPerSec > 0);
+
+ String table1CommittedCount =
+
responseFinal.path("metrics.TableSinkCommittedCount.'fake.table1'");
+ String table2CommittedCount =
+
responseFinal.path("metrics.TableSinkCommittedCount.'fake.public.table2'");
+ Assertions.assertNotNull(table1CommittedCount);
+ Assertions.assertNotNull(table2CommittedCount);
+
+ long table1Committed = Long.parseLong(table1CommittedCount);
+ long table2Committed = Long.parseLong(table2CommittedCount);
+ Assertions.assertTrue(table1Committed > 0);
+ Assertions.assertTrue(table2Committed > 0);
+
+ Assertions.assertEquals(finalCommitted, table1Committed +
table2Committed);
+
+ String table1CommittedBytes =
+
responseFinal.path("metrics.TableSinkCommittedBytes.'fake.table1'");
+ String table2CommittedBytes =
+
responseFinal.path("metrics.TableSinkCommittedBytes.'fake.public.table2'");
+ Assertions.assertNotNull(table1CommittedBytes);
+ Assertions.assertNotNull(table2CommittedBytes);
+
+ Assertions.assertTrue(Long.parseLong(table1CommittedBytes) > 0);
+ Assertions.assertTrue(Long.parseLong(table2CommittedBytes) > 0);
+
+ Double table1CommittedQPS =
+ Double.parseDouble(
+
responseFinal.path("metrics.TableSinkCommittedQPS.'fake.table1'"));
+ Double table2CommittedQPS =
+ Double.parseDouble(
+
responseFinal.path("metrics.TableSinkCommittedQPS.'fake.public.table2'"));
+ Assertions.assertTrue(table1CommittedQPS > 0);
+ Assertions.assertTrue(table2CommittedQPS > 0);
+
+ Double table1CommittedBytesPerSec =
+ Double.parseDouble(
+ responseFinal.path(
+
"metrics.TableSinkCommittedBytesPerSeconds.'fake.table1'"));
+ Double table2CommittedBytesPerSec =
+ Double.parseDouble(
+ responseFinal.path(
+
"metrics.TableSinkCommittedBytesPerSeconds.'fake.public.table2'"));
+ Assertions.assertTrue(table1CommittedBytesPerSec > 0);
+ Assertions.assertTrue(table2CommittedBytesPerSec > 0);
+
+ log.info("All committed metrics assertions passed");
+ log.info(
+ "Final summary - WriteCount: {}, CommittedCount: {},
Uncommitted: {}",
+ finalWrite,
+ finalCommitted,
+ finalWrite - finalCommitted);
+
+ streamJobProxy.cancelJob();
+
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.CANCELED,
streamJobProxy.getJobStatus()));
+
+ log.info("testCommittedMetricsWithCheckpoint completed successfully");
+ }
+
+ @AfterEach
+ void afterClass() {
+ if (engineClient != null) {
+ engineClient.close();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_multi_table_to_console_with_checkpoint.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_multi_table_to_console_with_checkpoint.conf
new file mode 100644
index 0000000000..f267344e4e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_multi_table_to_console_with_checkpoint.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 10000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ plugin_output = "fake1"
+ row.num = 150
+ split.num = 5
+ split.read-interval = 3000
+ schema = {
+ table = "fake.table1"
+ fields {
+ id = bigint
+ name = string
+ score = int
+ }
+ }
+ }
+
+ FakeSource {
+ plugin_output = "fake2"
+ row.num = 90
+ split.num = 5
+ split.read-interval = 3000
+ schema = {
+ table = "fake.public.table2"
+ fields {
+ id = bigint
+ name = string
+ score = int
+ }
+ }
+ }
+
+}
+
+transform {
+}
+
+sink {
+ console {
+ plugin_input = "fake1"
+ }
+ console {
+ plugin_input = "fake2"
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
index a8ee505f65..404cec5f66 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
@@ -30,6 +30,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_QPS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -45,6 +49,7 @@ public class ConnectorMetricsCalcContext {
private final PluginType type;
+ // Real-time (attempt) metrics
private Counter count;
private final Map<String, Counter> countPerTable = new
ConcurrentHashMap<>();
@@ -61,6 +66,29 @@ public class ConnectorMetricsCalcContext {
private final Map<String, Meter> bytesPerSecondsPerTable = new
ConcurrentHashMap<>();
+ // Committed metrics
+ private Counter committedCount;
+
+ private final Map<String, Counter> committedCountPerTable = new
ConcurrentHashMap<>();
+
+ private Meter committedQPS;
+
+ private final Map<String, Meter> committedQPSPerTable = new
ConcurrentHashMap<>();
+
+ private Counter committedBytes;
+
+ private final Map<String, Counter> committedBytesPerTable = new
ConcurrentHashMap<>();
+
+ private Meter committedBytesPerSeconds;
+
+ private final Map<String, Meter> committedBytesPerSecondsPerTable = new
ConcurrentHashMap<>();
+
+ private PendingMetrics currentPendingMetrics;
+
+ private final Map<Long, PendingMetrics> pendingMetricsByCheckpoint = new
ConcurrentHashMap<>();
+
+ private final Map<String, String> tableNameCache = new
ConcurrentHashMap<>();
+
public ConnectorMetricsCalcContext(
MetricsContext metricsContext,
PluginType type,
@@ -73,15 +101,17 @@ public class ConnectorMetricsCalcContext {
private void initializeMetrics(boolean isMulti, List<TablePath> tables) {
if (type.equals(PluginType.SINK)) {
- this.initializeMetrics(
+ initializeAttemptMetrics(
isMulti,
tables,
SINK_WRITE_COUNT,
SINK_WRITE_QPS,
SINK_WRITE_BYTES,
SINK_WRITE_BYTES_PER_SECONDS);
+ initializeCommittedMetrics(isMulti, tables);
+ currentPendingMetrics = new PendingMetrics();
} else if (type.equals(PluginType.SOURCE)) {
- this.initializeMetrics(
+ initializeAttemptMetrics(
isMulti,
tables,
SOURCE_RECEIVED_COUNT,
@@ -91,7 +121,7 @@ public class ConnectorMetricsCalcContext {
}
}
- private void initializeMetrics(
+ private void initializeAttemptMetrics(
boolean isMulti,
List<TablePath> tables,
String countName,
@@ -105,19 +135,41 @@ public class ConnectorMetricsCalcContext {
if (isMulti) {
tables.forEach(
tablePath -> {
+ String fullName = tablePath.getFullName();
countPerTable.put(
- tablePath.getFullName(),
- metricsContext.counter(countName + "#" +
tablePath.getFullName()));
- QPSPerTable.put(
- tablePath.getFullName(),
- metricsContext.meter(qpsName + "#" +
tablePath.getFullName()));
+ fullName, metricsContext.counter(countName +
"#" + fullName));
+ QPSPerTable.put(fullName, metricsContext.meter(qpsName
+ "#" + fullName));
bytesPerTable.put(
- tablePath.getFullName(),
- metricsContext.counter(bytesName + "#" +
tablePath.getFullName()));
+ fullName, metricsContext.counter(bytesName +
"#" + fullName));
bytesPerSecondsPerTable.put(
- tablePath.getFullName(),
+ fullName,
+ metricsContext.meter(bytesPerSecondsName + "#"
+ fullName));
+ });
+ }
+ }
+
+ private void initializeCommittedMetrics(boolean isMulti, List<TablePath>
tables) {
+ committedCount = metricsContext.counter(SINK_COMMITTED_COUNT);
+ committedQPS = metricsContext.meter(SINK_COMMITTED_QPS);
+ committedBytes = metricsContext.counter(SINK_COMMITTED_BYTES);
+ committedBytesPerSeconds =
metricsContext.meter(SINK_COMMITTED_BYTES_PER_SECONDS);
+ if (isMulti) {
+ tables.forEach(
+ tablePath -> {
+ String fullName = tablePath.getFullName();
+ committedCountPerTable.put(
+ fullName,
+ metricsContext.counter(SINK_COMMITTED_COUNT +
"#" + fullName));
+ committedQPSPerTable.put(
+ fullName,
+ metricsContext.meter(SINK_COMMITTED_QPS + "#"
+ fullName));
+ committedBytesPerTable.put(
+ fullName,
+ metricsContext.counter(SINK_COMMITTED_BYTES +
"#" + fullName));
+ committedBytesPerSecondsPerTable.put(
+ fullName,
metricsContext.meter(
- bytesPerSecondsName + "#" +
tablePath.getFullName()));
+ SINK_COMMITTED_BYTES_PER_SECONDS + "#"
+ fullName));
});
}
}
@@ -127,51 +179,130 @@ public class ConnectorMetricsCalcContext {
QPS.markEvent();
if (data instanceof SeaTunnelRow) {
SeaTunnelRow row = (SeaTunnelRow) data;
- bytes.inc(row.getBytesSize());
- bytesPerSeconds.markEvent(row.getBytesSize());
+ long rowBytes = row.getBytesSize();
+ bytes.inc(rowBytes);
+ bytesPerSeconds.markEvent(rowBytes);
- if (StringUtils.isNotBlank(tableId)) {
- String tableName = TablePath.of(tableId).getFullName();
+ String normalizedTableName =
+ StringUtils.isNotBlank(tableId) ?
normalizeTableName(tableId) : null;
+ if (PluginType.SINK.equals(type)) {
+ recordPendingMetrics(normalizedTableName, rowBytes);
+ }
- // Processing count
+ if (StringUtils.isNotBlank(normalizedTableName)) {
processMetrics(
countPerTable,
Counter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_COUNT,
SOURCE_RECEIVED_COUNT,
Counter::inc);
- // Processing bytes
processMetrics(
bytesPerTable,
Counter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_BYTES,
SOURCE_RECEIVED_BYTES,
- counter -> counter.inc(row.getBytesSize()));
+ counter -> counter.inc(rowBytes));
- // Processing QPS
processMetrics(
QPSPerTable,
Meter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_QPS,
SOURCE_RECEIVED_QPS,
Meter::markEvent);
- // Processing bytes rate
processMetrics(
bytesPerSecondsPerTable,
Meter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_BYTES_PER_SECONDS,
SOURCE_RECEIVED_BYTES_PER_SECONDS,
- meter -> meter.markEvent(row.getBytesSize()));
+ meter -> meter.markEvent(rowBytes));
}
}
}
+ public void sealCheckpointMetrics(long checkpointId) {
+ if (!PluginType.SINK.equals(type)) {
+ return;
+ }
+ PendingMetrics pendingToSeal = currentPendingMetrics;
+ currentPendingMetrics = new PendingMetrics();
+ if (pendingToSeal.isEmpty()) {
+ return;
+ }
+ pendingMetricsByCheckpoint
+ .computeIfAbsent(checkpointId, key -> new PendingMetrics())
+ .merge(pendingToSeal);
+ }
+
+ public void commitPendingMetrics(long checkpointId) {
+ if (!PluginType.SINK.equals(type)) {
+ return;
+ }
+ PendingMetrics pending =
pendingMetricsByCheckpoint.remove(checkpointId);
+ if (pending == null || pending.isEmpty()) {
+ return;
+ }
+ committedCount.inc(pending.getCount());
+ committedQPS.markEvent(pending.getCount());
+ committedBytes.inc(pending.getBytes());
+ committedBytesPerSeconds.markEvent(pending.getBytes());
+ pending.getTableMetrics()
+ .forEach(
+ (table, metrics) -> {
+ processMetrics(
+ committedCountPerTable,
+ Counter.class,
+ table,
+ SINK_COMMITTED_COUNT,
+ SOURCE_RECEIVED_COUNT,
+ counter -> counter.inc(metrics.count));
+ processMetrics(
+ committedBytesPerTable,
+ Counter.class,
+ table,
+ SINK_COMMITTED_BYTES,
+ SOURCE_RECEIVED_BYTES,
+ counter -> counter.inc(metrics.bytes));
+ processMetrics(
+ committedQPSPerTable,
+ Meter.class,
+ table,
+ SINK_COMMITTED_QPS,
+ SOURCE_RECEIVED_QPS,
+ meter -> meter.markEvent(metrics.count));
+ processMetrics(
+ committedBytesPerSecondsPerTable,
+ Meter.class,
+ table,
+ SINK_COMMITTED_BYTES_PER_SECONDS,
+ SOURCE_RECEIVED_BYTES_PER_SECONDS,
+ meter -> meter.markEvent(metrics.bytes));
+ });
+ }
+
+ public void abortPendingMetrics(long checkpointId) {
+ if (!PluginType.SINK.equals(type)) {
+ return;
+ }
+ pendingMetricsByCheckpoint.remove(checkpointId);
+ }
+
+ private void recordPendingMetrics(String normalizedTableName, long
rowBytes) {
+ if (currentPendingMetrics == null) {
+ return;
+ }
+ currentPendingMetrics.add(normalizedTableName, rowBytes);
+ }
+
+ private String normalizeTableName(String tableId) {
+ return tableNameCache.computeIfAbsent(tableId, id ->
TablePath.of(id).getFullName());
+ }
+
private <T> void processMetrics(
Map<String, T> metricMap,
Class<T> cls,
@@ -207,4 +338,67 @@ public class ConnectorMetricsCalcContext {
interface MetricProcessor<T> {
void process(T t);
}
+
+ private static final class PendingMetrics {
+ private long count;
+ private long bytes;
+ private final Map<String, TablePendingMetrics> tableMetrics = new
ConcurrentHashMap<>();
+
+ void add(String tableName, long rowBytes) {
+ count++;
+ bytes += rowBytes;
+ if (StringUtils.isNotBlank(tableName)) {
+ tableMetrics
+ .computeIfAbsent(tableName, key -> new
TablePendingMetrics())
+ .add(rowBytes);
+ }
+ }
+
+ boolean isEmpty() {
+ return count == 0;
+ }
+
+ void merge(PendingMetrics other) {
+ if (other == null || other.isEmpty()) {
+ return;
+ }
+ this.count += other.count;
+ this.bytes += other.bytes;
+ other.tableMetrics.forEach(
+ (table, metrics) ->
+ this.tableMetrics
+ .computeIfAbsent(table, key -> new
TablePendingMetrics())
+ .merge(metrics));
+ }
+
+ long getCount() {
+ return count;
+ }
+
+ long getBytes() {
+ return bytes;
+ }
+
+ Map<String, TablePendingMetrics> getTableMetrics() {
+ return tableMetrics;
+ }
+ }
+
+ private static final class TablePendingMetrics {
+ private long count;
+ private long bytes;
+
+ void add(long rowBytes) {
+ this.count++;
+ this.bytes += rowBytes;
+ }
+
+ void merge(TablePendingMetrics other) {
+ if (other == null) {
+ return;
+ }
+ this.count += other.count;
+ this.bytes += other.bytes;
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 8784f31d98..7185859847 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -59,7 +59,11 @@ public class RestConstant {
public static final String TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS =
"TableSourceReceivedBytesPerSeconds";
public static final String TABLE_SINK_WRITE_BYTES_PER_SECONDS =
"TableSinkWriteBytesPerSeconds";
-
+ public static final String TABLE_SINK_COMMITTED_COUNT =
"TableSinkCommittedCount";
+ public static final String TABLE_SINK_COMMITTED_QPS =
"TableSinkCommittedQPS";
+ public static final String TABLE_SINK_COMMITTED_BYTES =
"TableSinkCommittedBytes";
+ public static final String TABLE_SINK_COMMITTED_BYTES_PER_SECONDS =
+ "TableSinkCommittedBytesPerSeconds";
public static final String CONTEXT_PATH = "/hazelcast/rest/maps";
public static final String INSTANCE_CONTEXT_PATH =
"/hazelcast/rest/instance";
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 8b18575bec..e638cefd51 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -78,6 +78,10 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_QPS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -86,6 +90,10 @@ import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVE
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_BYTES;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_BYTES_PER_SECONDS;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_COUNT;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_QPS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_BYTES;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_COUNT;
@@ -249,27 +257,35 @@ public abstract class BaseService {
String[] countMetricsNames = {
SOURCE_RECEIVED_COUNT,
SINK_WRITE_COUNT,
+ SINK_COMMITTED_COUNT,
SOURCE_RECEIVED_BYTES,
SINK_WRITE_BYTES,
+ SINK_COMMITTED_BYTES,
INTERMEDIATE_QUEUE_SIZE
};
String[] rateMetricsNames = {
SOURCE_RECEIVED_QPS,
SINK_WRITE_QPS,
+ SINK_COMMITTED_QPS,
SOURCE_RECEIVED_BYTES_PER_SECONDS,
- SINK_WRITE_BYTES_PER_SECONDS
+ SINK_WRITE_BYTES_PER_SECONDS,
+ SINK_COMMITTED_BYTES_PER_SECONDS
};
String[] tableCountMetricsNames = {
TABLE_SOURCE_RECEIVED_COUNT,
TABLE_SINK_WRITE_COUNT,
+ TABLE_SINK_COMMITTED_COUNT,
TABLE_SOURCE_RECEIVED_BYTES,
- TABLE_SINK_WRITE_BYTES
+ TABLE_SINK_WRITE_BYTES,
+ TABLE_SINK_COMMITTED_BYTES
};
String[] tableRateMetricsNames = {
TABLE_SOURCE_RECEIVED_QPS,
TABLE_SINK_WRITE_QPS,
+ TABLE_SINK_COMMITTED_QPS,
TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS,
- TABLE_SINK_WRITE_BYTES_PER_SECONDS
+ TABLE_SINK_WRITE_BYTES_PER_SECONDS,
+ TABLE_SINK_COMMITTED_BYTES_PER_SECONDS
};
Long[] metricsSums =
Stream.generate(() ->
0L).limit(countMetricsNames.length).toArray(Long[]::new);
@@ -281,12 +297,16 @@ public abstract class BaseService {
new Map[] {
new HashMap<>(), // Source Received Count
new HashMap<>(), // Sink Write Count
+ new HashMap<>(), // Sink Committed Count
new HashMap<>(), // Source Received Bytes
new HashMap<>(), // Sink Write Bytes
+ new HashMap<>(), // Sink Committed Bytes
new HashMap<>(), // Source Received QPS
new HashMap<>(), // Sink Write QPS
+ new HashMap<>(), // Sink Committed QPS
new HashMap<>(), // Source Received Bytes Per Second
- new HashMap<>() // Sink Write Bytes Per Second
+ new HashMap<>(), // Sink Write Bytes Per Second
+ new HashMap<>() // Sink Committed Bytes Per Second
};
try {
@@ -320,7 +340,7 @@ public abstract class BaseService {
metricsMap,
tableMetricsMaps,
ArrayUtils.addAll(tableCountMetricsNames,
tableRateMetricsNames),
- countMetricsNames.length);
+ tableCountMetricsNames.length);
populateMetricsMap(
metricsMap,
Stream.concat(Arrays.stream(metricsSums),
Arrays.stream(metricsRates))
@@ -343,28 +363,40 @@ public abstract class BaseService {
// Define index constant
final int SOURCE_COUNT_IDX = 0,
SINK_COUNT_IDX = 1,
- SOURCE_BYTES_IDX = 2,
- SINK_BYTES_IDX = 3,
- SOURCE_QPS_IDX = 4,
- SINK_QPS_IDX = 5,
- SOURCE_BYTES_SEC_IDX = 6,
- SINK_BYTES_SEC_IDX = 7;
+ SINK_COMMITTED_COUNT_IDX = 2,
+ SOURCE_BYTES_IDX = 3,
+ SINK_BYTES_IDX = 4,
+ SINK_COMMITTED_BYTES_IDX = 5,
+ SOURCE_QPS_IDX = 6,
+ SINK_QPS_IDX = 7,
+ SINK_COMMITTED_QPS_IDX = 8,
+ SOURCE_BYTES_SEC_IDX = 9,
+ SINK_BYTES_SEC_IDX = 10,
+ SINK_COMMITTED_BYTES_SEC_IDX = 11;
if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) {
tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode);
} else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) {
tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_COMMITTED_COUNT + "#")) {
+ tableMetricsMaps[SINK_COMMITTED_COUNT_IDX].put(tableName,
metricNode);
} else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) {
tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode);
} else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) {
tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_COMMITTED_BYTES + "#")) {
+ tableMetricsMaps[SINK_COMMITTED_BYTES_IDX].put(tableName,
metricNode);
} else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) {
tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode);
} else if (metricName.startsWith(SINK_WRITE_QPS + "#")) {
tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_COMMITTED_QPS + "#")) {
+ tableMetricsMaps[SINK_COMMITTED_QPS_IDX].put(tableName,
metricNode);
} else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS +
"#")) {
tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode);
} else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) {
tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode);
+ } else if (metricName.startsWith(SINK_COMMITTED_BYTES_PER_SECONDS +
"#")) {
+ tableMetricsMaps[SINK_COMMITTED_BYTES_SEC_IDX].put(tableName,
metricNode);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index c039e97a76..7e8427af9b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -194,6 +194,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
long startTime = System.currentTimeMillis();
Barrier barrier = (Barrier) record.getData();
+
connectorMetricsCalcContext.sealCheckpointMetrics(barrier.getId());
if (barrier.prepareClose(this.taskLocation)) {
prepareClose = true;
}
@@ -309,6 +310,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
if (committer.isPresent() && lastCommitInfo.isPresent()) {
committer.get().commit(Collections.singletonList(lastCommitInfo.get()));
}
+ connectorMetricsCalcContext.commitPendingMetrics(checkpointId);
}
@Override
@@ -316,6 +318,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
if (committer.isPresent() && lastCommitInfo.isPresent()) {
committer.get().abort(Collections.singletonList(lastCommitInfo.get()));
}
+ connectorMetricsCalcContext.abortPendingMetrics(checkpointId);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContextTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContextTest.java
new file mode 100644
index 0000000000..5bc8c1824a
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContextTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_COUNT;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+
+public class ConnectorMetricsCalcContextTest {
+
+ private static final String TABLE_ID = "fake.table1";
+
+ @Test
+ public void testCommitFlushesPendingMetrics() {
+ SeaTunnelMetricsContext metricsContext = new SeaTunnelMetricsContext();
+ ConnectorMetricsCalcContext calcContext =
+ new ConnectorMetricsCalcContext(
+ metricsContext,
+ PluginType.SINK,
+ true,
+ Collections.singletonList(TablePath.of(TABLE_ID)));
+
+ SeaTunnelRow row = createRowWithTableId(TABLE_ID, "A");
+
+ calcContext.updateMetrics(row, TABLE_ID);
+ Assertions.assertEquals(1,
metricsContext.counter(SINK_WRITE_COUNT).getCount());
+ Assertions.assertEquals(
+ 1, metricsContext.counter(SINK_WRITE_COUNT + "#" +
TABLE_ID).getCount());
+
+ Assertions.assertEquals(0,
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+ Assertions.assertEquals(
+ 0, metricsContext.counter(SINK_COMMITTED_COUNT + "#" +
TABLE_ID).getCount());
+
+ long checkpointId = 1L;
+ calcContext.sealCheckpointMetrics(checkpointId);
+
+ Assertions.assertEquals(0,
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+
+ calcContext.commitPendingMetrics(checkpointId);
+
+ Assertions.assertEquals(1,
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+ Assertions.assertEquals(
+ 1, metricsContext.counter(SINK_COMMITTED_COUNT + "#" +
TABLE_ID).getCount());
+
+ Counter writeBytes = metricsContext.counter(SINK_WRITE_BYTES);
+ Counter committedBytes = metricsContext.counter(SINK_COMMITTED_BYTES);
+ Assertions.assertEquals(writeBytes.getCount(),
committedBytes.getCount());
+ }
+
+ @Test
+ public void testAbortClearsPendingMetrics() {
+ SeaTunnelMetricsContext metricsContext = new SeaTunnelMetricsContext();
+ ConnectorMetricsCalcContext calcContext =
+ new ConnectorMetricsCalcContext(
+ metricsContext,
+ PluginType.SINK,
+ true,
+ Collections.singletonList(TablePath.of(TABLE_ID)));
+
+ SeaTunnelRow row = createRowWithTableId(TABLE_ID, "B");
+
+ calcContext.updateMetrics(row, TABLE_ID);
+ Assertions.assertEquals(1,
metricsContext.counter(SINK_WRITE_COUNT).getCount());
+
+ long checkpointId = 2L;
+ calcContext.sealCheckpointMetrics(checkpointId);
+ calcContext.abortPendingMetrics(checkpointId);
+ calcContext.commitPendingMetrics(checkpointId);
+
+ Assertions.assertEquals(0,
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+ Assertions.assertEquals(
+ 0, metricsContext.counter(SINK_COMMITTED_COUNT + "#" +
TABLE_ID).getCount());
+ }
+
+ private SeaTunnelRow createRowWithTableId(String tableId, String payload) {
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, payload});
+ row.setTableId(tableId);
+ return row;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceTableMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceTableMetricsTest.java
new file mode 100644
index 0000000000..8e07bb5ce3
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceTableMetricsTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+public class BaseServiceTableMetricsTest {
+
+ private JobInfoService jobInfoService;
+ private Method getJobMetricsMethod;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ NodeEngineImpl nodeEngine =
org.mockito.Mockito.mock(NodeEngineImpl.class);
+
+ jobInfoService = new JobInfoService(nodeEngine);
+
+ getJobMetricsMethod =
BaseService.class.getDeclaredMethod("getJobMetrics", String.class);
+ getJobMetricsMethod.setAccessible(true);
+ }
+
+ @Test
+ public void testTableQPSMetricsAggregation() throws Exception {
+ String jobMetrics =
+ "{"
+ + "\"SourceReceivedCount#fake.table1\": [{\"value\":
100}],"
+ + "\"SourceReceivedCount#fake.table2\": [{\"value\":
200}],"
+ + "\"SinkWriteCount#fake.table1\": [{\"value\": 90}],"
+ + "\"SinkWriteCount#fake.table2\": [{\"value\": 180}],"
+ + "\"SinkCommittedCount#fake.table1\": [{\"value\":
80}],"
+ + "\"SinkCommittedCount#fake.table2\": [{\"value\":
160}],"
+ + "\"SourceReceivedBytes#fake.table1\": [{\"value\":
1000}],"
+ + "\"SourceReceivedBytes#fake.table2\": [{\"value\":
2000}],"
+ + "\"SinkWriteBytes#fake.table1\": [{\"value\": 900}],"
+ + "\"SinkWriteBytes#fake.table2\": [{\"value\":
1800}],"
+ + "\"SinkCommittedBytes#fake.table1\": [{\"value\":
800}],"
+ + "\"SinkCommittedBytes#fake.table2\": [{\"value\":
1600}],"
+ + "\"SourceReceivedQPS#fake.table1\": [{\"value\":
10.5}],"
+ + "\"SourceReceivedQPS#fake.table2\": [{\"value\":
20.3}],"
+ + "\"SinkWriteQPS#fake.table1\": [{\"value\": 9.2}],"
+ + "\"SinkWriteQPS#fake.table2\": [{\"value\": 18.7}],"
+ + "\"SinkCommittedQPS#fake.table1\": [{\"value\":
8.1}],"
+ + "\"SinkCommittedQPS#fake.table2\": [{\"value\":
16.4}],"
+ + "\"SourceReceivedBytesPerSeconds#fake.table1\":
[{\"value\": 105.5}],"
+ + "\"SourceReceivedBytesPerSeconds#fake.table2\":
[{\"value\": 203.2}],"
+ + "\"SinkWriteBytesPerSeconds#fake.table1\":
[{\"value\": 92.3}],"
+ + "\"SinkWriteBytesPerSeconds#fake.table2\":
[{\"value\": 187.6}],"
+ + "\"SinkCommittedBytesPerSeconds#fake.table1\":
[{\"value\": 81.2}],"
+ + "\"SinkCommittedBytesPerSeconds#fake.table2\":
[{\"value\": 164.5}],"
+ + "\"SourceReceivedCount\": [{\"value\": 300}],"
+ + "\"SinkWriteCount\": [{\"value\": 270}],"
+ + "\"SinkCommittedCount\": [{\"value\": 240}],"
+ + "\"SourceReceivedBytes\": [{\"value\": 3000}],"
+ + "\"SinkWriteBytes\": [{\"value\": 2700}],"
+ + "\"SinkCommittedBytes\": [{\"value\": 2400}],"
+ + "\"SourceReceivedQPS\": [{\"value\": 30.8}],"
+ + "\"SinkWriteQPS\": [{\"value\": 27.9}],"
+ + "\"SinkCommittedQPS\": [{\"value\": 24.5}],"
+ + "\"SourceReceivedBytesPerSeconds\": [{\"value\":
308.7}],"
+ + "\"SinkWriteBytesPerSeconds\": [{\"value\": 279.9}],"
+ + "\"SinkCommittedBytesPerSeconds\": [{\"value\":
245.7}]"
+ + "}";
+
+ Map<String, Object> result =
+ (Map<String, Object>)
getJobMetricsMethod.invoke(jobInfoService, jobMetrics);
+
+ Map<String, Object> tableSourceQPS =
+ (Map<String, Object>) result.get("TableSourceReceivedQPS");
+ Assertions.assertNotNull(tableSourceQPS);
+ Assertions.assertEquals(10.5, (Double)
tableSourceQPS.get("fake.table1"), 0.01);
+ Assertions.assertEquals(20.3, (Double)
tableSourceQPS.get("fake.table2"), 0.01);
+
+ Map<String, Object> tableSinkQPS = (Map<String, Object>)
result.get("TableSinkWriteQPS");
+ Assertions.assertNotNull(tableSinkQPS);
+ Assertions.assertEquals(9.2, (Double) tableSinkQPS.get("fake.table1"),
0.01);
+ Assertions.assertEquals(18.7, (Double)
tableSinkQPS.get("fake.table2"), 0.01);
+
+ Map<String, Object> tableSinkCommittedQPS =
+ (Map<String, Object>) result.get("TableSinkCommittedQPS");
+ Assertions.assertNotNull(tableSinkCommittedQPS);
+ Assertions.assertEquals(8.1, (Double)
tableSinkCommittedQPS.get("fake.table1"), 0.01);
+ Assertions.assertEquals(16.4, (Double)
tableSinkCommittedQPS.get("fake.table2"), 0.01);
+
+ Map<String, Object> tableSourceBytesPerSec =
+ (Map<String, Object>)
result.get("TableSourceReceivedBytesPerSeconds");
+ Assertions.assertNotNull(tableSourceBytesPerSec);
+ Assertions.assertEquals(105.5, (Double)
tableSourceBytesPerSec.get("fake.table1"), 0.01);
+ Assertions.assertEquals(203.2, (Double)
tableSourceBytesPerSec.get("fake.table2"), 0.01);
+
+ Map<String, Object> tableSinkBytesPerSec =
+ (Map<String, Object>)
result.get("TableSinkWriteBytesPerSeconds");
+ Assertions.assertNotNull(tableSinkBytesPerSec);
+ Assertions.assertEquals(92.3, (Double)
tableSinkBytesPerSec.get("fake.table1"), 0.01);
+ Assertions.assertEquals(187.6, (Double)
tableSinkBytesPerSec.get("fake.table2"), 0.01);
+
+ Map<String, Object> tableSinkCommittedBytesPerSec =
+ (Map<String, Object>)
result.get("TableSinkCommittedBytesPerSeconds");
+ Assertions.assertNotNull(tableSinkCommittedBytesPerSec);
+ Assertions.assertEquals(
+ 81.2, (Double)
tableSinkCommittedBytesPerSec.get("fake.table1"), 0.01);
+ Assertions.assertEquals(
+ 164.5, (Double)
tableSinkCommittedBytesPerSec.get("fake.table2"), 0.01);
+ }
+
+ @Test
+ public void testTableCountMetricsAggregation() throws Exception {
+ String jobMetrics =
+ "{"
+ + "\"SourceReceivedCount#fake.table1\": [{\"value\":
100}, {\"value\": 50}],"
+ + "\"SourceReceivedCount#fake.table2\": [{\"value\":
200}, {\"value\": 100}],"
+ + "\"SinkWriteCount#fake.table1\": [{\"value\": 90},
{\"value\": 45}],"
+ + "\"SinkWriteCount#fake.table2\": [{\"value\": 180},
{\"value\": 90}],"
+ + "\"SinkCommittedCount#fake.table1\": [{\"value\":
80}, {\"value\": 40}],"
+ + "\"SinkCommittedCount#fake.table2\": [{\"value\":
160}, {\"value\": 80}],"
+ + "\"SourceReceivedCount\": [{\"value\": 300}],"
+ + "\"SinkWriteCount\": [{\"value\": 270}],"
+ + "\"SinkCommittedCount\": [{\"value\": 240}]"
+ + "}";
+
+ Map<String, Object> result =
+ (Map<String, Object>)
getJobMetricsMethod.invoke(jobInfoService, jobMetrics);
+
+ Map<String, Object> tableSourceCount =
+ (Map<String, Object>) result.get("TableSourceReceivedCount");
+ Assertions.assertNotNull(tableSourceCount);
+ Assertions.assertEquals(150L, tableSourceCount.get("fake.table1"));
+ Assertions.assertEquals(300L, tableSourceCount.get("fake.table2"));
+
+ Map<String, Object> tableSinkCount =
+ (Map<String, Object>) result.get("TableSinkWriteCount");
+ Assertions.assertNotNull(tableSinkCount);
+ Assertions.assertEquals(135L, tableSinkCount.get("fake.table1"));
+ Assertions.assertEquals(270L, tableSinkCount.get("fake.table2"));
+
+ Map<String, Object> tableSinkCommittedCount =
+ (Map<String, Object>) result.get("TableSinkCommittedCount");
+ Assertions.assertNotNull(tableSinkCommittedCount);
+ Assertions.assertEquals(120L,
tableSinkCommittedCount.get("fake.table1"));
+ Assertions.assertEquals(240L,
tableSinkCommittedCount.get("fake.table2"));
+ }
+
+ @Test
+ public void testMixedMetricsWithMultipleWorkers() throws Exception {
+ String jobMetrics =
+ "{"
+ + "\"SourceReceivedQPS#fake.table1\": [{\"value\":
5.5}, {\"value\": 4.5}, {\"value\": 3.2}],"
+ + "\"SourceReceivedQPS#fake.table2\": [{\"value\":
10.2}, {\"value\": 9.8}, {\"value\": 8.5}],"
+ + "\"SinkCommittedQPS#fake.table1\": [{\"value\":
4.1}, {\"value\": 3.9}, {\"value\": 2.8}],"
+ + "\"SinkCommittedQPS#fake.table2\": [{\"value\":
8.2}, {\"value\": 7.8}, {\"value\": 6.5}],"
+ + "\"SourceReceivedQPS\": [{\"value\": 30.8}],"
+ + "\"SinkCommittedQPS\": [{\"value\": 24.5}]"
+ + "}";
+
+ Map<String, Object> result =
+ (Map<String, Object>)
getJobMetricsMethod.invoke(jobInfoService, jobMetrics);
+
+ Map<String, Object> tableSourceQPS =
+ (Map<String, Object>) result.get("TableSourceReceivedQPS");
+ Assertions.assertNotNull(tableSourceQPS);
+ Assertions.assertEquals(13.2, (Double)
tableSourceQPS.get("fake.table1"), 0.01);
+ Assertions.assertEquals(28.5, (Double)
tableSourceQPS.get("fake.table2"), 0.01);
+
+ Map<String, Object> tableSinkCommittedQPS =
+ (Map<String, Object>) result.get("TableSinkCommittedQPS");
+ Assertions.assertNotNull(tableSinkCommittedQPS);
+ Assertions.assertEquals(10.8, (Double)
tableSinkCommittedQPS.get("fake.table1"), 0.01);
+ Assertions.assertEquals(22.5, (Double)
tableSinkCommittedQPS.get("fake.table2"), 0.01);
+ }
+}