This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c2589915c9 [Improve][Core][Metrics] Optimize Sink Write Data 
Statistics (#10100)
c2589915c9 is described below

commit c2589915c969f0af78b8997b94c9f297e0b8b78a
Author: Jast <[email protected]>
AuthorDate: Mon Dec 1 22:27:25 2025 +0800

    [Improve][Core][Metrics] Optimize Sink Write Data Statistics (#10100)
---
 docs/en/seatunnel-engine/rest-api-v1.md            |  32 ++-
 docs/en/seatunnel-engine/rest-api-v2.md            |  31 ++-
 docs/zh/seatunnel-engine/rest-api-v1.md            |  30 ++-
 docs/zh/seatunnel-engine/rest-api-v2.md            |  31 ++-
 .../seatunnel/api/common/metrics/MetricNames.java  |   4 +
 .../seatunnel/engine/e2e/CommittedMetricsIT.java   | 286 +++++++++++++++++++++
 ...ake_multi_table_to_console_with_checkpoint.conf |  68 +++++
 .../metrics/ConnectorMetricsCalcContext.java       | 246 ++++++++++++++++--
 .../seatunnel/engine/server/rest/RestConstant.java |   6 +-
 .../engine/server/rest/service/BaseService.java    |  54 +++-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |   3 +
 .../metrics/ConnectorMetricsCalcContextTest.java   | 106 ++++++++
 .../rest/service/BaseServiceTableMetricsTest.java  | 191 ++++++++++++++
 13 files changed, 1045 insertions(+), 43 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v1.md 
b/docs/en/seatunnel-engine/rest-api-v1.md
index d99f2213a8..91c3773435 100644
--- a/docs/en/seatunnel-engine/rest-api-v1.md
+++ b/docs/en/seatunnel-engine/rest-api-v1.md
@@ -199,6 +199,26 @@ network:
 `envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is 
running.
 `finishedTime`, `errorMsg` will return when job is finished.
 
+#### Metrics field description
+
+| Field | Description |
+| --- | --- |
+| SourceReceivedCount | Total rows received from sources |
+| SourceReceivedQPS | Source receive rate (rows/s) |
+| SourceReceivedBytes | Total bytes received from sources |
+| SourceReceivedBytesPerSeconds | Source receive rate (bytes/s) |
+| SinkWriteCount | Sink write attempts (rows) |
+| SinkWriteQPS | Sink write attempt rate (rows/s) |
+| SinkWriteBytes | Sink write attempts (bytes) |
+| SinkWriteBytesPerSeconds | Sink write attempt rate (bytes/s) |
+| SinkCommittedCount | Sink committed rows after checkpoint succeeds |
+| SinkCommittedQPS | Sink committed rate (rows/s) |
+| SinkCommittedBytes | Sink committed bytes after checkpoint succeeds |
+| SinkCommittedBytesPerSeconds | Sink committed rate (bytes/s) |
+| TableSourceReceived* | Per-table source metrics, key format 
`TableSourceReceivedXXX#<table>` |
+| TableSinkWrite* | Per-table sink write attempts, key format 
`TableSinkWriteXXX#<table>` |
+| TableSinkCommitted* | Per-table sink committed metrics, key format 
`TableSinkCommittedXXX#<table>` |
+
 When we can't get the job info, the response will be:
 
 ```json
@@ -256,6 +276,10 @@ This API has been deprecated, please use 
/hazelcast/rest/maps/job-info/:jobId in
     "SinkWriteQPS": "",
     "SinkWriteBytes": "",
     "SinkWriteBytesPerSeconds": "",
+    "SinkCommittedCount": "",
+    "SinkCommittedQPS": "",
+    "SinkCommittedBytes": "",
+    "SinkCommittedBytesPerSeconds": "",
     "TableSourceReceivedCount": {},
     "TableSourceReceivedBytes": {},
     "TableSourceReceivedBytesPerSeconds": {},
@@ -263,7 +287,11 @@ This API has been deprecated, please use 
/hazelcast/rest/maps/job-info/:jobId in
     "TableSinkWriteCount": {},
     "TableSinkWriteQPS": {},
     "TableSinkWriteBytes": {},
-    "TableSinkWriteBytesPerSeconds": {}
+    "TableSinkWriteBytesPerSeconds": {},
+    "TableSinkCommittedCount": {},
+    "TableSinkCommittedQPS": {},
+    "TableSinkCommittedBytes": {},
+    "TableSinkCommittedBytesPerSeconds": {}
   },
   "finishedTime": "",
   "errorMsg": null,
@@ -839,4 +867,4 @@ Returns a list of logs from the requested node.
 To get a list of logs from the current node: 
`http://localhost:5801/hazelcast/rest/maps/log`
 To get the content of a log file: 
`http://localhost:5801/hazelcast/rest/maps/log/job-898380162133917698.log`
 
-</details>
\ No newline at end of file
+</details>
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index 2ba5504af4..b3808d35d7 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -320,6 +320,10 @@ This endpoint helps troubleshoot why jobs stay in 
`PENDING` by showing the pendi
     "SinkWriteQPS": "",
     "SinkWriteBytes": "",
     "SinkWriteBytesPerSeconds": "",
+    "SinkCommittedCount": "",
+    "SinkCommittedQPS": "",
+    "SinkCommittedBytes": "",
+    "SinkCommittedBytesPerSeconds": "",
     "TableSourceReceivedCount": {},
     "TableSourceReceivedBytes": {},
     "TableSourceReceivedBytesPerSeconds": {},
@@ -327,7 +331,11 @@ This endpoint helps troubleshoot why jobs stay in 
`PENDING` by showing the pendi
     "TableSinkWriteCount": {},
     "TableSinkWriteQPS": {},
     "TableSinkWriteBytes": {},
-    "TableSinkWriteBytesPerSeconds": {}
+    "TableSinkWriteBytesPerSeconds": {},
+    "TableSinkCommittedCount": {},
+    "TableSinkCommittedQPS": {},
+    "TableSinkCommittedBytes": {},
+    "TableSinkCommittedBytesPerSeconds": {}
   },
   "finishedTime": "",
   "errorMsg": null,
@@ -343,6 +351,27 @@ This endpoint helps troubleshoot why jobs stay in 
`PENDING` by showing the pendi
 `envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is 
running.
 `finishedTime`, `errorMsg` will return when job is finished.
 
+#### Metrics field description
+
+| Field | Description |
+| --- | --- |
+| IntermediateQueueSize | Size of intermediate queue between operators |
+| SourceReceivedCount | Total rows received from sources |
+| SourceReceivedQPS | Source receive rate (rows/s) |
+| SourceReceivedBytes | Total bytes received from sources |
+| SourceReceivedBytesPerSeconds | Source receive rate (bytes/s) |
+| SinkWriteCount | Sink write attempts (rows) |
+| SinkWriteQPS | Sink write attempt rate (rows/s) |
+| SinkWriteBytes | Sink write attempts (bytes) |
+| SinkWriteBytesPerSeconds | Sink write attempt rate (bytes/s) |
+| SinkCommittedCount | Sink committed rows after checkpoint succeeds |
+| SinkCommittedQPS | Sink committed rate (rows/s) |
+| SinkCommittedBytes | Sink committed bytes after checkpoint succeeds |
+| SinkCommittedBytesPerSeconds | Sink committed rate (bytes/s) |
+| TableSourceReceived* | Per-table source metrics, key format 
`TableSourceReceivedXXX#<table>` |
+| TableSinkWrite* | Per-table sink write attempts, key format 
`TableSinkWriteXXX#<table>` |
+| TableSinkCommitted* | Per-table sink committed metrics, key format 
`TableSinkCommittedXXX#<table>` |
+
 When we can't get the job info, the response will be:
 
 ```json
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md 
b/docs/zh/seatunnel-engine/rest-api-v1.md
index b91be2c7b9..83a6693aa0 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -188,6 +188,10 @@ network:
     "SinkWriteQPS": "",
     "SinkWriteBytes": "",
     "SinkWriteBytesPerSeconds": "",
+    "SinkCommittedCount": "",
+    "SinkCommittedQPS": "",
+    "SinkCommittedBytes": "",
+    "SinkCommittedBytesPerSeconds": "",
     "TableSourceReceivedCount": {},
     "TableSourceReceivedBytes": {},
     "TableSourceReceivedBytesPerSeconds": {},
@@ -195,7 +199,11 @@ network:
     "TableSinkWriteCount": {},
     "TableSinkWriteQPS": {},
     "TableSinkWriteBytes": {},
-    "TableSinkWriteBytesPerSeconds": {}
+    "TableSinkWriteBytesPerSeconds": {},
+    "TableSinkCommittedCount": {},
+    "TableSinkCommittedQPS": {},
+    "TableSinkCommittedBytes": {},
+    "TableSinkCommittedBytesPerSeconds": {}
   },
   "finishedTime": "",
   "errorMsg": null,
@@ -211,6 +219,26 @@ network:
 `envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` 字段在Job在RUNNING状态时会返回
 `finishedTime`, `errorMsg` 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
 
+#### 指标字段说明
+
+| 字段 | 说明 |
+| --- | --- |
+| SourceReceivedCount | 源端接收的行数 |
+| SourceReceivedQPS | 源端接收速率(行/秒) |
+| SourceReceivedBytes | 源端接收的字节数 |
+| SourceReceivedBytesPerSeconds | 源端接收速率(字节/秒) |
+| SinkWriteCount | Sink 写入尝试行数 |
+| SinkWriteQPS | Sink 写入尝试速率(行/秒) |
+| SinkWriteBytes | Sink 写入尝试字节数 |
+| SinkWriteBytesPerSeconds | Sink 写入尝试速率(字节/秒) |
+| SinkCommittedCount | checkpoint 成功后的 Sink 已提交行数 |
+| SinkCommittedQPS | Sink 已提交速率(行/秒) |
+| SinkCommittedBytes | checkpoint 成功后的 Sink 已提交字节数 |
+| SinkCommittedBytesPerSeconds | Sink 已提交速率(字节/秒) |
+| TableSourceReceived* | 按表汇总的源指标,键格式 `TableSourceReceivedXXX#<表>` |
+| TableSinkWrite* | 按表汇总的 Sink 写入尝试,键格式 `TableSinkWriteXXX#<表>` |
+| TableSinkCommitted* | 按表汇总的 Sink 已提交指标,键格式 `TableSinkCommittedXXX#<表>` |
+
 当我们查询不到这个Job时,返回结果为:
 
 ```json
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index 9b72a38300..8bab244208 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -314,6 +314,10 @@ seatunnel:
     "SinkWriteQPS": "",
     "SinkWriteBytes": "",
     "SinkWriteBytesPerSeconds": "",
+    "SinkCommittedCount": "",
+    "SinkCommittedQPS": "",
+    "SinkCommittedBytes": "",
+    "SinkCommittedBytesPerSeconds": "",
     "TableSourceReceivedCount": {},
     "TableSourceReceivedBytes": {},
     "TableSourceReceivedBytesPerSeconds": {},
@@ -321,7 +325,11 @@ seatunnel:
     "TableSinkWriteCount": {},
     "TableSinkWriteQPS": {},
     "TableSinkWriteBytes": {},
-    "TableSinkWriteBytesPerSeconds": {}
+    "TableSinkWriteBytesPerSeconds": {},
+    "TableSinkCommittedCount": {},
+    "TableSinkCommittedQPS": {},
+    "TableSinkCommittedBytes": {},
+    "TableSinkCommittedBytesPerSeconds": {}
   },
   "finishedTime": "",
   "errorMsg": null,
@@ -337,6 +345,27 @@ seatunnel:
 `envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` 字段在Job在RUNNING状态时会返回
 `finishedTime`, `errorMsg` 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL
 
+#### 指标字段说明
+
+| 字段 | 说明 |
+| --- | --- |
+| IntermediateQueueSize | 中间队列的大小 |
+| SourceReceivedCount | 源端接收的行数 |
+| SourceReceivedQPS | 源端接收速率(行/秒) |
+| SourceReceivedBytes | 源端接收的字节数 |
+| SourceReceivedBytesPerSeconds | 源端接收速率(字节/秒) |
+| SinkWriteCount | Sink 写入尝试行数 |
+| SinkWriteQPS | Sink 写入尝试速率(行/秒) |
+| SinkWriteBytes | Sink 写入尝试字节数 |
+| SinkWriteBytesPerSeconds | Sink 写入尝试速率(字节/秒) |
+| SinkCommittedCount | checkpoint 成功后的 Sink 已提交行数 |
+| SinkCommittedQPS | Sink 已提交速率(行/秒) |
+| SinkCommittedBytes | checkpoint 成功后的 Sink 已提交字节数 |
+| SinkCommittedBytesPerSeconds | Sink 已提交速率(字节/秒) |
+| TableSourceReceived* | 按表汇总的源指标,键格式 `TableSourceReceivedXXX#<表>` |
+| TableSinkWrite* | 按表汇总的 Sink 写入尝试,键格式 `TableSinkWriteXXX#<表>` |
+| TableSinkCommitted* | 按表汇总的 Sink 已提交指标,键格式 `TableSinkCommittedXXX#<表>` |
+
 当我们查询不到这个Job时,返回结果为:
 
 ```json
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
index 46c71047b9..eb70b2359c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -33,6 +33,10 @@ public final class MetricNames {
     public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
     public static final String SINK_WRITE_QPS = "SinkWriteQPS";
     public static final String SINK_WRITE_BYTES_PER_SECONDS = 
"SinkWriteBytesPerSeconds";
+    public static final String SINK_COMMITTED_COUNT = "SinkCommittedCount";
+    public static final String SINK_COMMITTED_BYTES = "SinkCommittedBytes";
+    public static final String SINK_COMMITTED_QPS = "SinkCommittedQPS";
+    public static final String SINK_COMMITTED_BYTES_PER_SECONDS = 
"SinkCommittedBytesPerSeconds";
 
     public static final String INTERMEDIATE_QUEUE_SIZE = 
"IntermediateQueueSize";
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java
new file mode 100644
index 0000000000..5c48a34f95
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Slf4j
+public class CommittedMetricsIT {
+
+    private static final String HOST = "http://localhost:";;
+
+    private ClientJobProxy streamJobProxy;
+
+    private HazelcastInstanceImpl node1;
+
+    private SeaTunnelClient engineClient;
+
+    private SeaTunnelConfig seaTunnelConfig;
+
+    @BeforeEach
+    void beforeClass() throws Exception {
+        String testClusterName = 
TestUtils.getClusterName("CommittedMetricsIT");
+        seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18080);
+        
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setPortRange(200);
+        node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(testClusterName);
+        engineClient = new SeaTunnelClient(clientConfig);
+    }
+
+    @Test
+    public void testCommittedMetricsWithCheckpoint() throws Exception {
+        String streamFilePath =
+                
TestUtils.getResource("stream_fake_multi_table_to_console_with_checkpoint.conf");
+        JobConfig streamConf = new JobConfig();
+        
streamConf.setName("stream_fake_multi_table_to_console_with_checkpoint");
+        ClientJobExecutionEnvironment streamJobExecutionEnv =
+                engineClient.createExecutionContext(streamFilePath, 
streamConf, seaTunnelConfig);
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        streamJobProxy = streamJobExecutionEnv.execute();
+                    } catch (ExecutionException e) {
+                        throw new RuntimeException(e);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertNotNull(streamJobProxy);
+                            Assertions.assertEquals(
+                                    JobStatus.RUNNING, 
streamJobProxy.getJobStatus());
+                        });
+
+        log.info("Job is running, job id: {}", streamJobProxy.getJobId());
+
+        Thread.sleep(5000);
+
+        Response responseBeforeCheckpoint =
+                given().get(
+                                HOST
+                                        + 
node1.getCluster().getLocalMember().getAddress().getPort()
+                                        + RestConstant.CONTEXT_PATH
+                                        + RestConstant.REST_URL_JOB_INFO
+                                        + "/"
+                                        + streamJobProxy.getJobId());
+
+        log.info("Metrics before checkpoint: {}", 
responseBeforeCheckpoint.prettyPrint());
+
+        String writeCountBeforeCP = 
responseBeforeCheckpoint.path("metrics.SinkWriteCount");
+        String committedCountBeforeCP = 
responseBeforeCheckpoint.path("metrics.SinkCommittedCount");
+
+        long writeBeforeCP = Long.parseLong(writeCountBeforeCP);
+        long committedBeforeCP = 0;
+        if (committedCountBeforeCP != null) {
+            committedBeforeCP = Long.parseLong(committedCountBeforeCP);
+        }
+
+        Assertions.assertTrue(writeBeforeCP > 0);
+        Assertions.assertEquals(0, committedBeforeCP);
+
+        log.info(
+                "Before checkpoint - WriteCount: {}, CommittedCount: {}",
+                writeBeforeCP,
+                committedBeforeCP);
+
+        Thread.sleep(8000);
+
+        Response responseAfterFirstCheckpoint =
+                given().get(
+                                HOST
+                                        + 
node1.getCluster().getLocalMember().getAddress().getPort()
+                                        + RestConstant.CONTEXT_PATH
+                                        + RestConstant.REST_URL_JOB_INFO
+                                        + "/"
+                                        + streamJobProxy.getJobId());
+
+        log.info("Metrics after first checkpoint: {}", 
responseAfterFirstCheckpoint.prettyPrint());
+
+        String sinkCommittedCount = 
responseAfterFirstCheckpoint.path("metrics.SinkCommittedCount");
+        String sinkWriteCount = 
responseAfterFirstCheckpoint.path("metrics.SinkWriteCount");
+        Assertions.assertNotNull(sinkCommittedCount);
+        Assertions.assertNotNull(sinkWriteCount);
+
+        long committedCountAfterFirstCP = Long.parseLong(sinkCommittedCount);
+        long writeCountAfterFirstCP = Long.parseLong(sinkWriteCount);
+
+        Assertions.assertTrue(committedCountAfterFirstCP > 0);
+        Assertions.assertTrue(committedCountAfterFirstCP > committedBeforeCP);
+        Assertions.assertTrue(committedCountAfterFirstCP <= 
writeCountAfterFirstCP);
+
+        log.info(
+                "After first checkpoint - WriteCount: {}, CommittedCount: {}, 
Uncommitted: {}",
+                writeCountAfterFirstCP,
+                committedCountAfterFirstCP,
+                writeCountAfterFirstCP - committedCountAfterFirstCP);
+
+        Thread.sleep(12000);
+
+        Response responseFinal =
+                given().get(
+                                HOST
+                                        + 
node1.getCluster().getLocalMember().getAddress().getPort()
+                                        + RestConstant.CONTEXT_PATH
+                                        + RestConstant.REST_URL_JOB_INFO
+                                        + "/"
+                                        + streamJobProxy.getJobId());
+
+        log.info("Metrics after second checkpoint: {}", 
responseFinal.prettyPrint());
+
+        responseFinal
+                .then()
+                .statusCode(200)
+                .body("jobName", notNullValue())
+                .body("jobStatus", notNullValue());
+
+        String finalWriteCount = responseFinal.path("metrics.SinkWriteCount");
+        String finalCommittedCount = 
responseFinal.path("metrics.SinkCommittedCount");
+        String finalCommittedBytes = 
responseFinal.path("metrics.SinkCommittedBytes");
+        String finalWriteBytes = responseFinal.path("metrics.SinkWriteBytes");
+
+        long finalWrite = Long.parseLong(finalWriteCount);
+        long finalCommitted = Long.parseLong(finalCommittedCount);
+        long finalCommittedBytesVal = Long.parseLong(finalCommittedBytes);
+        long finalWriteBytesVal = Long.parseLong(finalWriteBytes);
+
+        Assertions.assertTrue(finalCommitted > committedCountAfterFirstCP);
+        Assertions.assertTrue(finalCommitted <= finalWrite);
+        Assertions.assertTrue(finalCommittedBytesVal > 0);
+        Assertions.assertTrue(finalCommittedBytesVal <= finalWriteBytesVal);
+
+        responseFinal
+                .then()
+                .body("metrics.SinkCommittedQPS", notNullValue())
+                .body("metrics.SinkCommittedBytesPerSeconds", notNullValue());
+
+        Double committedQPS = 
Double.parseDouble(responseFinal.path("metrics.SinkCommittedQPS"));
+        Double committedBytesPerSec =
+                
Double.parseDouble(responseFinal.path("metrics.SinkCommittedBytesPerSeconds"));
+        Assertions.assertTrue(committedQPS > 0);
+        Assertions.assertTrue(committedBytesPerSec > 0);
+
+        String table1CommittedCount =
+                
responseFinal.path("metrics.TableSinkCommittedCount.'fake.table1'");
+        String table2CommittedCount =
+                
responseFinal.path("metrics.TableSinkCommittedCount.'fake.public.table2'");
+        Assertions.assertNotNull(table1CommittedCount);
+        Assertions.assertNotNull(table2CommittedCount);
+
+        long table1Committed = Long.parseLong(table1CommittedCount);
+        long table2Committed = Long.parseLong(table2CommittedCount);
+        Assertions.assertTrue(table1Committed > 0);
+        Assertions.assertTrue(table2Committed > 0);
+
+        Assertions.assertEquals(finalCommitted, table1Committed + 
table2Committed);
+
+        String table1CommittedBytes =
+                
responseFinal.path("metrics.TableSinkCommittedBytes.'fake.table1'");
+        String table2CommittedBytes =
+                
responseFinal.path("metrics.TableSinkCommittedBytes.'fake.public.table2'");
+        Assertions.assertNotNull(table1CommittedBytes);
+        Assertions.assertNotNull(table2CommittedBytes);
+
+        Assertions.assertTrue(Long.parseLong(table1CommittedBytes) > 0);
+        Assertions.assertTrue(Long.parseLong(table2CommittedBytes) > 0);
+
+        Double table1CommittedQPS =
+                Double.parseDouble(
+                        
responseFinal.path("metrics.TableSinkCommittedQPS.'fake.table1'"));
+        Double table2CommittedQPS =
+                Double.parseDouble(
+                        
responseFinal.path("metrics.TableSinkCommittedQPS.'fake.public.table2'"));
+        Assertions.assertTrue(table1CommittedQPS > 0);
+        Assertions.assertTrue(table2CommittedQPS > 0);
+
+        Double table1CommittedBytesPerSec =
+                Double.parseDouble(
+                        responseFinal.path(
+                                
"metrics.TableSinkCommittedBytesPerSeconds.'fake.table1'"));
+        Double table2CommittedBytesPerSec =
+                Double.parseDouble(
+                        responseFinal.path(
+                                
"metrics.TableSinkCommittedBytesPerSeconds.'fake.public.table2'"));
+        Assertions.assertTrue(table1CommittedBytesPerSec > 0);
+        Assertions.assertTrue(table2CommittedBytesPerSec > 0);
+
+        log.info("All committed metrics assertions passed");
+        log.info(
+                "Final summary - WriteCount: {}, CommittedCount: {}, 
Uncommitted: {}",
+                finalWrite,
+                finalCommitted,
+                finalWrite - finalCommitted);
+
+        streamJobProxy.cancelJob();
+
+        Awaitility.await()
+                .atMost(1, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        JobStatus.CANCELED, 
streamJobProxy.getJobStatus()));
+
+        log.info("testCommittedMetricsWithCheckpoint completed successfully");
+    }
+
+    @AfterEach
+    void afterClass() {
+        if (engineClient != null) {
+            engineClient.close();
+        }
+
+        if (node1 != null) {
+            node1.shutdown();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_multi_table_to_console_with_checkpoint.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_multi_table_to_console_with_checkpoint.conf
new file mode 100644
index 0000000000..f267344e4e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_multi_table_to_console_with_checkpoint.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 10000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+      plugin_output = "fake1"
+      row.num = 150
+      split.num = 5
+      split.read-interval = 3000
+      schema = {
+        table = "fake.table1"
+        fields {
+          id = bigint
+          name = string
+          score = int
+        }
+      }
+    }
+
+    FakeSource {
+        plugin_output = "fake2"
+        row.num = 90
+        split.num = 5
+        split.read-interval = 3000
+        schema = {
+          table = "fake.public.table2"
+          fields {
+            id = bigint
+            name = string
+            score = int
+          }
+        }
+      }
+
+}
+
+transform {
+}
+
+sink {
+  console {
+    plugin_input = "fake1"
+  }
+  console {
+    plugin_input = "fake2"
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
index a8ee505f65..404cec5f66 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java
@@ -30,6 +30,10 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_QPS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -45,6 +49,7 @@ public class ConnectorMetricsCalcContext {
 
     private final PluginType type;
 
+    // Real-time (attempt) metrics
     private Counter count;
 
     private final Map<String, Counter> countPerTable = new 
ConcurrentHashMap<>();
@@ -61,6 +66,29 @@ public class ConnectorMetricsCalcContext {
 
     private final Map<String, Meter> bytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
 
+    // Committed metrics
+    private Counter committedCount;
+
+    private final Map<String, Counter> committedCountPerTable = new 
ConcurrentHashMap<>();
+
+    private Meter committedQPS;
+
+    private final Map<String, Meter> committedQPSPerTable = new 
ConcurrentHashMap<>();
+
+    private Counter committedBytes;
+
+    private final Map<String, Counter> committedBytesPerTable = new 
ConcurrentHashMap<>();
+
+    private Meter committedBytesPerSeconds;
+
+    private final Map<String, Meter> committedBytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
+
+    private PendingMetrics currentPendingMetrics;
+
+    private final Map<Long, PendingMetrics> pendingMetricsByCheckpoint = new 
ConcurrentHashMap<>();
+
+    private final Map<String, String> tableNameCache = new 
ConcurrentHashMap<>();
+
     public ConnectorMetricsCalcContext(
             MetricsContext metricsContext,
             PluginType type,
@@ -73,15 +101,17 @@ public class ConnectorMetricsCalcContext {
 
     private void initializeMetrics(boolean isMulti, List<TablePath> tables) {
         if (type.equals(PluginType.SINK)) {
-            this.initializeMetrics(
+            initializeAttemptMetrics(
                     isMulti,
                     tables,
                     SINK_WRITE_COUNT,
                     SINK_WRITE_QPS,
                     SINK_WRITE_BYTES,
                     SINK_WRITE_BYTES_PER_SECONDS);
+            initializeCommittedMetrics(isMulti, tables);
+            currentPendingMetrics = new PendingMetrics();
         } else if (type.equals(PluginType.SOURCE)) {
-            this.initializeMetrics(
+            initializeAttemptMetrics(
                     isMulti,
                     tables,
                     SOURCE_RECEIVED_COUNT,
@@ -91,7 +121,7 @@ public class ConnectorMetricsCalcContext {
         }
     }
 
-    private void initializeMetrics(
+    private void initializeAttemptMetrics(
             boolean isMulti,
             List<TablePath> tables,
             String countName,
@@ -105,19 +135,41 @@ public class ConnectorMetricsCalcContext {
         if (isMulti) {
             tables.forEach(
                     tablePath -> {
+                        String fullName = tablePath.getFullName();
                         countPerTable.put(
-                                tablePath.getFullName(),
-                                metricsContext.counter(countName + "#" + 
tablePath.getFullName()));
-                        QPSPerTable.put(
-                                tablePath.getFullName(),
-                                metricsContext.meter(qpsName + "#" + 
tablePath.getFullName()));
+                                fullName, metricsContext.counter(countName + 
"#" + fullName));
+                        QPSPerTable.put(fullName, metricsContext.meter(qpsName 
+ "#" + fullName));
                         bytesPerTable.put(
-                                tablePath.getFullName(),
-                                metricsContext.counter(bytesName + "#" + 
tablePath.getFullName()));
+                                fullName, metricsContext.counter(bytesName + 
"#" + fullName));
                         bytesPerSecondsPerTable.put(
-                                tablePath.getFullName(),
+                                fullName,
+                                metricsContext.meter(bytesPerSecondsName + "#" 
+ fullName));
+                    });
+        }
+    }
+
+    private void initializeCommittedMetrics(boolean isMulti, List<TablePath> 
tables) {
+        committedCount = metricsContext.counter(SINK_COMMITTED_COUNT);
+        committedQPS = metricsContext.meter(SINK_COMMITTED_QPS);
+        committedBytes = metricsContext.counter(SINK_COMMITTED_BYTES);
+        committedBytesPerSeconds = 
metricsContext.meter(SINK_COMMITTED_BYTES_PER_SECONDS);
+        if (isMulti) {
+            tables.forEach(
+                    tablePath -> {
+                        String fullName = tablePath.getFullName();
+                        committedCountPerTable.put(
+                                fullName,
+                                metricsContext.counter(SINK_COMMITTED_COUNT + 
"#" + fullName));
+                        committedQPSPerTable.put(
+                                fullName,
+                                metricsContext.meter(SINK_COMMITTED_QPS + "#" 
+ fullName));
+                        committedBytesPerTable.put(
+                                fullName,
+                                metricsContext.counter(SINK_COMMITTED_BYTES + 
"#" + fullName));
+                        committedBytesPerSecondsPerTable.put(
+                                fullName,
                                 metricsContext.meter(
-                                        bytesPerSecondsName + "#" + 
tablePath.getFullName()));
+                                        SINK_COMMITTED_BYTES_PER_SECONDS + "#" 
+ fullName));
                     });
         }
     }
@@ -127,51 +179,130 @@ public class ConnectorMetricsCalcContext {
         QPS.markEvent();
         if (data instanceof SeaTunnelRow) {
             SeaTunnelRow row = (SeaTunnelRow) data;
-            bytes.inc(row.getBytesSize());
-            bytesPerSeconds.markEvent(row.getBytesSize());
+            long rowBytes = row.getBytesSize();
+            bytes.inc(rowBytes);
+            bytesPerSeconds.markEvent(rowBytes);
 
-            if (StringUtils.isNotBlank(tableId)) {
-                String tableName = TablePath.of(tableId).getFullName();
+            String normalizedTableName =
+                    StringUtils.isNotBlank(tableId) ? 
normalizeTableName(tableId) : null;
+            if (PluginType.SINK.equals(type)) {
+                recordPendingMetrics(normalizedTableName, rowBytes);
+            }
 
-                // Processing count
+            if (StringUtils.isNotBlank(normalizedTableName)) {
                 processMetrics(
                         countPerTable,
                         Counter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_COUNT,
                         SOURCE_RECEIVED_COUNT,
                         Counter::inc);
 
-                // Processing bytes
                 processMetrics(
                         bytesPerTable,
                         Counter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_BYTES,
                         SOURCE_RECEIVED_BYTES,
-                        counter -> counter.inc(row.getBytesSize()));
+                        counter -> counter.inc(rowBytes));
 
-                // Processing QPS
                 processMetrics(
                         QPSPerTable,
                         Meter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_QPS,
                         SOURCE_RECEIVED_QPS,
                         Meter::markEvent);
 
-                // Processing bytes rate
                 processMetrics(
                         bytesPerSecondsPerTable,
                         Meter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_BYTES_PER_SECONDS,
                         SOURCE_RECEIVED_BYTES_PER_SECONDS,
-                        meter -> meter.markEvent(row.getBytesSize()));
+                        meter -> meter.markEvent(rowBytes));
             }
         }
     }
 
+    public void sealCheckpointMetrics(long checkpointId) {
+        if (!PluginType.SINK.equals(type)) {
+            return;
+        }
+        PendingMetrics pendingToSeal = currentPendingMetrics;
+        currentPendingMetrics = new PendingMetrics();
+        if (pendingToSeal.isEmpty()) {
+            return;
+        }
+        pendingMetricsByCheckpoint
+                .computeIfAbsent(checkpointId, key -> new PendingMetrics())
+                .merge(pendingToSeal);
+    }
+
+    public void commitPendingMetrics(long checkpointId) {
+        if (!PluginType.SINK.equals(type)) {
+            return;
+        }
+        PendingMetrics pending = 
pendingMetricsByCheckpoint.remove(checkpointId);
+        if (pending == null || pending.isEmpty()) {
+            return;
+        }
+        committedCount.inc(pending.getCount());
+        committedQPS.markEvent(pending.getCount());
+        committedBytes.inc(pending.getBytes());
+        committedBytesPerSeconds.markEvent(pending.getBytes());
+        pending.getTableMetrics()
+                .forEach(
+                        (table, metrics) -> {
+                            processMetrics(
+                                    committedCountPerTable,
+                                    Counter.class,
+                                    table,
+                                    SINK_COMMITTED_COUNT,
+                                    SOURCE_RECEIVED_COUNT,
+                                    counter -> counter.inc(metrics.count));
+                            processMetrics(
+                                    committedBytesPerTable,
+                                    Counter.class,
+                                    table,
+                                    SINK_COMMITTED_BYTES,
+                                    SOURCE_RECEIVED_BYTES,
+                                    counter -> counter.inc(metrics.bytes));
+                            processMetrics(
+                                    committedQPSPerTable,
+                                    Meter.class,
+                                    table,
+                                    SINK_COMMITTED_QPS,
+                                    SOURCE_RECEIVED_QPS,
+                                    meter -> meter.markEvent(metrics.count));
+                            processMetrics(
+                                    committedBytesPerSecondsPerTable,
+                                    Meter.class,
+                                    table,
+                                    SINK_COMMITTED_BYTES_PER_SECONDS,
+                                    SOURCE_RECEIVED_BYTES_PER_SECONDS,
+                                    meter -> meter.markEvent(metrics.bytes));
+                        });
+    }
+
+    public void abortPendingMetrics(long checkpointId) {
+        if (!PluginType.SINK.equals(type)) {
+            return;
+        }
+        pendingMetricsByCheckpoint.remove(checkpointId);
+    }
+
+    private void recordPendingMetrics(String normalizedTableName, long 
rowBytes) {
+        if (currentPendingMetrics == null) {
+            return;
+        }
+        currentPendingMetrics.add(normalizedTableName, rowBytes);
+    }
+
+    private String normalizeTableName(String tableId) {
+        return tableNameCache.computeIfAbsent(tableId, id -> 
TablePath.of(id).getFullName());
+    }
+
     private <T> void processMetrics(
             Map<String, T> metricMap,
             Class<T> cls,
@@ -207,4 +338,67 @@ public class ConnectorMetricsCalcContext {
     interface MetricProcessor<T> {
         void process(T t);
     }
+
+    private static final class PendingMetrics {
+        private long count;
+        private long bytes;
+        private final Map<String, TablePendingMetrics> tableMetrics = new 
ConcurrentHashMap<>();
+
+        void add(String tableName, long rowBytes) {
+            count++;
+            bytes += rowBytes;
+            if (StringUtils.isNotBlank(tableName)) {
+                tableMetrics
+                        .computeIfAbsent(tableName, key -> new 
TablePendingMetrics())
+                        .add(rowBytes);
+            }
+        }
+
+        boolean isEmpty() {
+            return count == 0;
+        }
+
+        void merge(PendingMetrics other) {
+            if (other == null || other.isEmpty()) {
+                return;
+            }
+            this.count += other.count;
+            this.bytes += other.bytes;
+            other.tableMetrics.forEach(
+                    (table, metrics) ->
+                            this.tableMetrics
+                                    .computeIfAbsent(table, key -> new 
TablePendingMetrics())
+                                    .merge(metrics));
+        }
+
+        long getCount() {
+            return count;
+        }
+
+        long getBytes() {
+            return bytes;
+        }
+
+        Map<String, TablePendingMetrics> getTableMetrics() {
+            return tableMetrics;
+        }
+    }
+
+    private static final class TablePendingMetrics {
+        private long count;
+        private long bytes;
+
+        void add(long rowBytes) {
+            this.count++;
+            this.bytes += rowBytes;
+        }
+
+        void merge(TablePendingMetrics other) {
+            if (other == null) {
+                return;
+            }
+            this.count += other.count;
+            this.bytes += other.bytes;
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 8784f31d98..7185859847 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -59,7 +59,11 @@ public class RestConstant {
     public static final String TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS =
             "TableSourceReceivedBytesPerSeconds";
     public static final String TABLE_SINK_WRITE_BYTES_PER_SECONDS = 
"TableSinkWriteBytesPerSeconds";
-
+    public static final String TABLE_SINK_COMMITTED_COUNT = 
"TableSinkCommittedCount";
+    public static final String TABLE_SINK_COMMITTED_QPS = 
"TableSinkCommittedQPS";
+    public static final String TABLE_SINK_COMMITTED_BYTES = 
"TableSinkCommittedBytes";
+    public static final String TABLE_SINK_COMMITTED_BYTES_PER_SECONDS =
+            "TableSinkCommittedBytesPerSeconds";
     public static final String CONTEXT_PATH = "/hazelcast/rest/maps";
     public static final String INSTANCE_CONTEXT_PATH = 
"/hazelcast/rest/instance";
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index 8b18575bec..e638cefd51 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -78,6 +78,10 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.INTERMEDIATE_QUEUE_SIZE;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_QPS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -86,6 +90,10 @@ import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVE
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_BYTES;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_COUNT;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_COMMITTED_QPS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_BYTES;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.TABLE_SINK_WRITE_COUNT;
@@ -249,27 +257,35 @@ public abstract class BaseService {
         String[] countMetricsNames = {
             SOURCE_RECEIVED_COUNT,
             SINK_WRITE_COUNT,
+            SINK_COMMITTED_COUNT,
             SOURCE_RECEIVED_BYTES,
             SINK_WRITE_BYTES,
+            SINK_COMMITTED_BYTES,
             INTERMEDIATE_QUEUE_SIZE
         };
         String[] rateMetricsNames = {
             SOURCE_RECEIVED_QPS,
             SINK_WRITE_QPS,
+            SINK_COMMITTED_QPS,
             SOURCE_RECEIVED_BYTES_PER_SECONDS,
-            SINK_WRITE_BYTES_PER_SECONDS
+            SINK_WRITE_BYTES_PER_SECONDS,
+            SINK_COMMITTED_BYTES_PER_SECONDS
         };
         String[] tableCountMetricsNames = {
             TABLE_SOURCE_RECEIVED_COUNT,
             TABLE_SINK_WRITE_COUNT,
+            TABLE_SINK_COMMITTED_COUNT,
             TABLE_SOURCE_RECEIVED_BYTES,
-            TABLE_SINK_WRITE_BYTES
+            TABLE_SINK_WRITE_BYTES,
+            TABLE_SINK_COMMITTED_BYTES
         };
         String[] tableRateMetricsNames = {
             TABLE_SOURCE_RECEIVED_QPS,
             TABLE_SINK_WRITE_QPS,
+            TABLE_SINK_COMMITTED_QPS,
             TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS,
-            TABLE_SINK_WRITE_BYTES_PER_SECONDS
+            TABLE_SINK_WRITE_BYTES_PER_SECONDS,
+            TABLE_SINK_COMMITTED_BYTES_PER_SECONDS
         };
         Long[] metricsSums =
                 Stream.generate(() -> 
0L).limit(countMetricsNames.length).toArray(Long[]::new);
@@ -281,12 +297,16 @@ public abstract class BaseService {
                 new Map[] {
                     new HashMap<>(), // Source Received Count
                     new HashMap<>(), // Sink Write Count
+                    new HashMap<>(), // Sink Committed Count
                     new HashMap<>(), // Source Received Bytes
                     new HashMap<>(), // Sink Write Bytes
+                    new HashMap<>(), // Sink Committed Bytes
                     new HashMap<>(), // Source Received QPS
                     new HashMap<>(), // Sink Write QPS
+                    new HashMap<>(), // Sink Committed QPS
                     new HashMap<>(), // Source Received Bytes Per Second
-                    new HashMap<>() // Sink Write Bytes Per Second
+                    new HashMap<>(), // Sink Write Bytes Per Second
+                    new HashMap<>() // Sink Committed Bytes Per Second
                 };
 
         try {
@@ -320,7 +340,7 @@ public abstract class BaseService {
                 metricsMap,
                 tableMetricsMaps,
                 ArrayUtils.addAll(tableCountMetricsNames, 
tableRateMetricsNames),
-                countMetricsNames.length);
+                tableCountMetricsNames.length);
         populateMetricsMap(
                 metricsMap,
                 Stream.concat(Arrays.stream(metricsSums), 
Arrays.stream(metricsRates))
@@ -343,28 +363,40 @@ public abstract class BaseService {
         // Define index constant
         final int SOURCE_COUNT_IDX = 0,
                 SINK_COUNT_IDX = 1,
-                SOURCE_BYTES_IDX = 2,
-                SINK_BYTES_IDX = 3,
-                SOURCE_QPS_IDX = 4,
-                SINK_QPS_IDX = 5,
-                SOURCE_BYTES_SEC_IDX = 6,
-                SINK_BYTES_SEC_IDX = 7;
+                SINK_COMMITTED_COUNT_IDX = 2,
+                SOURCE_BYTES_IDX = 3,
+                SINK_BYTES_IDX = 4,
+                SINK_COMMITTED_BYTES_IDX = 5,
+                SOURCE_QPS_IDX = 6,
+                SINK_QPS_IDX = 7,
+                SINK_COMMITTED_QPS_IDX = 8,
+                SOURCE_BYTES_SEC_IDX = 9,
+                SINK_BYTES_SEC_IDX = 10,
+                SINK_COMMITTED_BYTES_SEC_IDX = 11;
         if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) {
             tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) {
             tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_COMMITTED_COUNT + "#")) {
+            tableMetricsMaps[SINK_COMMITTED_COUNT_IDX].put(tableName, 
metricNode);
         } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) {
             tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) {
             tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_COMMITTED_BYTES + "#")) {
+            tableMetricsMaps[SINK_COMMITTED_BYTES_IDX].put(tableName, 
metricNode);
         } else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) {
             tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_QPS + "#")) {
             tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_COMMITTED_QPS + "#")) {
+            tableMetricsMaps[SINK_COMMITTED_QPS_IDX].put(tableName, 
metricNode);
         } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS + 
"#")) {
             tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode);
         } else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) {
             tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_COMMITTED_BYTES_PER_SECONDS + 
"#")) {
+            tableMetricsMaps[SINK_COMMITTED_BYTES_SEC_IDX].put(tableName, 
metricNode);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index c039e97a76..7e8427af9b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -194,6 +194,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 long startTime = System.currentTimeMillis();
 
                 Barrier barrier = (Barrier) record.getData();
+                
connectorMetricsCalcContext.sealCheckpointMetrics(barrier.getId());
                 if (barrier.prepareClose(this.taskLocation)) {
                     prepareClose = true;
                 }
@@ -309,6 +310,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         if (committer.isPresent() && lastCommitInfo.isPresent()) {
             
committer.get().commit(Collections.singletonList(lastCommitInfo.get()));
         }
+        connectorMetricsCalcContext.commitPendingMetrics(checkpointId);
     }
 
     @Override
@@ -316,6 +318,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         if (committer.isPresent() && lastCommitInfo.isPresent()) {
             
committer.get().abort(Collections.singletonList(lastCommitInfo.get()));
         }
+        connectorMetricsCalcContext.abortPendingMetrics(checkpointId);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContextTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContextTest.java
new file mode 100644
index 0000000000..5bc8c1824a
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContextTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_COMMITTED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+
+public class ConnectorMetricsCalcContextTest {
+
+    private static final String TABLE_ID = "fake.table1";
+
+    @Test
+    public void testCommitFlushesPendingMetrics() {
+        SeaTunnelMetricsContext metricsContext = new SeaTunnelMetricsContext();
+        ConnectorMetricsCalcContext calcContext =
+                new ConnectorMetricsCalcContext(
+                        metricsContext,
+                        PluginType.SINK,
+                        true,
+                        Collections.singletonList(TablePath.of(TABLE_ID)));
+
+        SeaTunnelRow row = createRowWithTableId(TABLE_ID, "A");
+
+        calcContext.updateMetrics(row, TABLE_ID);
+        Assertions.assertEquals(1, 
metricsContext.counter(SINK_WRITE_COUNT).getCount());
+        Assertions.assertEquals(
+                1, metricsContext.counter(SINK_WRITE_COUNT + "#" + 
TABLE_ID).getCount());
+
+        Assertions.assertEquals(0, 
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+        Assertions.assertEquals(
+                0, metricsContext.counter(SINK_COMMITTED_COUNT + "#" + 
TABLE_ID).getCount());
+
+        long checkpointId = 1L;
+        calcContext.sealCheckpointMetrics(checkpointId);
+
+        Assertions.assertEquals(0, 
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+
+        calcContext.commitPendingMetrics(checkpointId);
+
+        Assertions.assertEquals(1, 
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+        Assertions.assertEquals(
+                1, metricsContext.counter(SINK_COMMITTED_COUNT + "#" + 
TABLE_ID).getCount());
+
+        Counter writeBytes = metricsContext.counter(SINK_WRITE_BYTES);
+        Counter committedBytes = metricsContext.counter(SINK_COMMITTED_BYTES);
+        Assertions.assertEquals(writeBytes.getCount(), 
committedBytes.getCount());
+    }
+
+    @Test
+    public void testAbortClearsPendingMetrics() {
+        SeaTunnelMetricsContext metricsContext = new SeaTunnelMetricsContext();
+        ConnectorMetricsCalcContext calcContext =
+                new ConnectorMetricsCalcContext(
+                        metricsContext,
+                        PluginType.SINK,
+                        true,
+                        Collections.singletonList(TablePath.of(TABLE_ID)));
+
+        SeaTunnelRow row = createRowWithTableId(TABLE_ID, "B");
+
+        calcContext.updateMetrics(row, TABLE_ID);
+        Assertions.assertEquals(1, 
metricsContext.counter(SINK_WRITE_COUNT).getCount());
+
+        long checkpointId = 2L;
+        calcContext.sealCheckpointMetrics(checkpointId);
+        calcContext.abortPendingMetrics(checkpointId);
+        calcContext.commitPendingMetrics(checkpointId);
+
+        Assertions.assertEquals(0, 
metricsContext.counter(SINK_COMMITTED_COUNT).getCount());
+        Assertions.assertEquals(
+                0, metricsContext.counter(SINK_COMMITTED_COUNT + "#" + 
TABLE_ID).getCount());
+    }
+
+    private SeaTunnelRow createRowWithTableId(String tableId, String payload) {
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, payload});
+        row.setTableId(tableId);
+        return row;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceTableMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceTableMetricsTest.java
new file mode 100644
index 0000000000..8e07bb5ce3
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/BaseServiceTableMetricsTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+
+public class BaseServiceTableMetricsTest {
+
+    private JobInfoService jobInfoService;
+    private Method getJobMetricsMethod;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        NodeEngineImpl nodeEngine = 
org.mockito.Mockito.mock(NodeEngineImpl.class);
+
+        jobInfoService = new JobInfoService(nodeEngine);
+
+        getJobMetricsMethod = 
BaseService.class.getDeclaredMethod("getJobMetrics", String.class);
+        getJobMetricsMethod.setAccessible(true);
+    }
+
+    @Test
+    public void testTableQPSMetricsAggregation() throws Exception {
+        String jobMetrics =
+                "{"
+                        + "\"SourceReceivedCount#fake.table1\": [{\"value\": 
100}],"
+                        + "\"SourceReceivedCount#fake.table2\": [{\"value\": 
200}],"
+                        + "\"SinkWriteCount#fake.table1\": [{\"value\": 90}],"
+                        + "\"SinkWriteCount#fake.table2\": [{\"value\": 180}],"
+                        + "\"SinkCommittedCount#fake.table1\": [{\"value\": 
80}],"
+                        + "\"SinkCommittedCount#fake.table2\": [{\"value\": 
160}],"
+                        + "\"SourceReceivedBytes#fake.table1\": [{\"value\": 
1000}],"
+                        + "\"SourceReceivedBytes#fake.table2\": [{\"value\": 
2000}],"
+                        + "\"SinkWriteBytes#fake.table1\": [{\"value\": 900}],"
+                        + "\"SinkWriteBytes#fake.table2\": [{\"value\": 
1800}],"
+                        + "\"SinkCommittedBytes#fake.table1\": [{\"value\": 
800}],"
+                        + "\"SinkCommittedBytes#fake.table2\": [{\"value\": 
1600}],"
+                        + "\"SourceReceivedQPS#fake.table1\": [{\"value\": 
10.5}],"
+                        + "\"SourceReceivedQPS#fake.table2\": [{\"value\": 
20.3}],"
+                        + "\"SinkWriteQPS#fake.table1\": [{\"value\": 9.2}],"
+                        + "\"SinkWriteQPS#fake.table2\": [{\"value\": 18.7}],"
+                        + "\"SinkCommittedQPS#fake.table1\": [{\"value\": 
8.1}],"
+                        + "\"SinkCommittedQPS#fake.table2\": [{\"value\": 
16.4}],"
+                        + "\"SourceReceivedBytesPerSeconds#fake.table1\": 
[{\"value\": 105.5}],"
+                        + "\"SourceReceivedBytesPerSeconds#fake.table2\": 
[{\"value\": 203.2}],"
+                        + "\"SinkWriteBytesPerSeconds#fake.table1\": 
[{\"value\": 92.3}],"
+                        + "\"SinkWriteBytesPerSeconds#fake.table2\": 
[{\"value\": 187.6}],"
+                        + "\"SinkCommittedBytesPerSeconds#fake.table1\": 
[{\"value\": 81.2}],"
+                        + "\"SinkCommittedBytesPerSeconds#fake.table2\": 
[{\"value\": 164.5}],"
+                        + "\"SourceReceivedCount\": [{\"value\": 300}],"
+                        + "\"SinkWriteCount\": [{\"value\": 270}],"
+                        + "\"SinkCommittedCount\": [{\"value\": 240}],"
+                        + "\"SourceReceivedBytes\": [{\"value\": 3000}],"
+                        + "\"SinkWriteBytes\": [{\"value\": 2700}],"
+                        + "\"SinkCommittedBytes\": [{\"value\": 2400}],"
+                        + "\"SourceReceivedQPS\": [{\"value\": 30.8}],"
+                        + "\"SinkWriteQPS\": [{\"value\": 27.9}],"
+                        + "\"SinkCommittedQPS\": [{\"value\": 24.5}],"
+                        + "\"SourceReceivedBytesPerSeconds\": [{\"value\": 
308.7}],"
+                        + "\"SinkWriteBytesPerSeconds\": [{\"value\": 279.9}],"
+                        + "\"SinkCommittedBytesPerSeconds\": [{\"value\": 
245.7}]"
+                        + "}";
+
+        Map<String, Object> result =
+                (Map<String, Object>) 
getJobMetricsMethod.invoke(jobInfoService, jobMetrics);
+
+        Map<String, Object> tableSourceQPS =
+                (Map<String, Object>) result.get("TableSourceReceivedQPS");
+        Assertions.assertNotNull(tableSourceQPS);
+        Assertions.assertEquals(10.5, (Double) 
tableSourceQPS.get("fake.table1"), 0.01);
+        Assertions.assertEquals(20.3, (Double) 
tableSourceQPS.get("fake.table2"), 0.01);
+
+        Map<String, Object> tableSinkQPS = (Map<String, Object>) 
result.get("TableSinkWriteQPS");
+        Assertions.assertNotNull(tableSinkQPS);
+        Assertions.assertEquals(9.2, (Double) tableSinkQPS.get("fake.table1"), 
0.01);
+        Assertions.assertEquals(18.7, (Double) 
tableSinkQPS.get("fake.table2"), 0.01);
+
+        Map<String, Object> tableSinkCommittedQPS =
+                (Map<String, Object>) result.get("TableSinkCommittedQPS");
+        Assertions.assertNotNull(tableSinkCommittedQPS);
+        Assertions.assertEquals(8.1, (Double) 
tableSinkCommittedQPS.get("fake.table1"), 0.01);
+        Assertions.assertEquals(16.4, (Double) 
tableSinkCommittedQPS.get("fake.table2"), 0.01);
+
+        Map<String, Object> tableSourceBytesPerSec =
+                (Map<String, Object>) 
result.get("TableSourceReceivedBytesPerSeconds");
+        Assertions.assertNotNull(tableSourceBytesPerSec);
+        Assertions.assertEquals(105.5, (Double) 
tableSourceBytesPerSec.get("fake.table1"), 0.01);
+        Assertions.assertEquals(203.2, (Double) 
tableSourceBytesPerSec.get("fake.table2"), 0.01);
+
+        Map<String, Object> tableSinkBytesPerSec =
+                (Map<String, Object>) 
result.get("TableSinkWriteBytesPerSeconds");
+        Assertions.assertNotNull(tableSinkBytesPerSec);
+        Assertions.assertEquals(92.3, (Double) 
tableSinkBytesPerSec.get("fake.table1"), 0.01);
+        Assertions.assertEquals(187.6, (Double) 
tableSinkBytesPerSec.get("fake.table2"), 0.01);
+
+        Map<String, Object> tableSinkCommittedBytesPerSec =
+                (Map<String, Object>) 
result.get("TableSinkCommittedBytesPerSeconds");
+        Assertions.assertNotNull(tableSinkCommittedBytesPerSec);
+        Assertions.assertEquals(
+                81.2, (Double) 
tableSinkCommittedBytesPerSec.get("fake.table1"), 0.01);
+        Assertions.assertEquals(
+                164.5, (Double) 
tableSinkCommittedBytesPerSec.get("fake.table2"), 0.01);
+    }
+
+    @Test
+    public void testTableCountMetricsAggregation() throws Exception {
+        String jobMetrics =
+                "{"
+                        + "\"SourceReceivedCount#fake.table1\": [{\"value\": 
100}, {\"value\": 50}],"
+                        + "\"SourceReceivedCount#fake.table2\": [{\"value\": 
200}, {\"value\": 100}],"
+                        + "\"SinkWriteCount#fake.table1\": [{\"value\": 90}, 
{\"value\": 45}],"
+                        + "\"SinkWriteCount#fake.table2\": [{\"value\": 180}, 
{\"value\": 90}],"
+                        + "\"SinkCommittedCount#fake.table1\": [{\"value\": 
80}, {\"value\": 40}],"
+                        + "\"SinkCommittedCount#fake.table2\": [{\"value\": 
160}, {\"value\": 80}],"
+                        + "\"SourceReceivedCount\": [{\"value\": 300}],"
+                        + "\"SinkWriteCount\": [{\"value\": 270}],"
+                        + "\"SinkCommittedCount\": [{\"value\": 240}]"
+                        + "}";
+
+        Map<String, Object> result =
+                (Map<String, Object>) 
getJobMetricsMethod.invoke(jobInfoService, jobMetrics);
+
+        Map<String, Object> tableSourceCount =
+                (Map<String, Object>) result.get("TableSourceReceivedCount");
+        Assertions.assertNotNull(tableSourceCount);
+        Assertions.assertEquals(150L, tableSourceCount.get("fake.table1"));
+        Assertions.assertEquals(300L, tableSourceCount.get("fake.table2"));
+
+        Map<String, Object> tableSinkCount =
+                (Map<String, Object>) result.get("TableSinkWriteCount");
+        Assertions.assertNotNull(tableSinkCount);
+        Assertions.assertEquals(135L, tableSinkCount.get("fake.table1"));
+        Assertions.assertEquals(270L, tableSinkCount.get("fake.table2"));
+
+        Map<String, Object> tableSinkCommittedCount =
+                (Map<String, Object>) result.get("TableSinkCommittedCount");
+        Assertions.assertNotNull(tableSinkCommittedCount);
+        Assertions.assertEquals(120L, 
tableSinkCommittedCount.get("fake.table1"));
+        Assertions.assertEquals(240L, 
tableSinkCommittedCount.get("fake.table2"));
+    }
+
+    @Test
+    public void testMixedMetricsWithMultipleWorkers() throws Exception {
+        String jobMetrics =
+                "{"
+                        + "\"SourceReceivedQPS#fake.table1\": [{\"value\": 
5.5}, {\"value\": 4.5}, {\"value\": 3.2}],"
+                        + "\"SourceReceivedQPS#fake.table2\": [{\"value\": 
10.2}, {\"value\": 9.8}, {\"value\": 8.5}],"
+                        + "\"SinkCommittedQPS#fake.table1\": [{\"value\": 
4.1}, {\"value\": 3.9}, {\"value\": 2.8}],"
+                        + "\"SinkCommittedQPS#fake.table2\": [{\"value\": 
8.2}, {\"value\": 7.8}, {\"value\": 6.5}],"
+                        + "\"SourceReceivedQPS\": [{\"value\": 30.8}],"
+                        + "\"SinkCommittedQPS\": [{\"value\": 24.5}]"
+                        + "}";
+
+        Map<String, Object> result =
+                (Map<String, Object>) 
getJobMetricsMethod.invoke(jobInfoService, jobMetrics);
+
+        Map<String, Object> tableSourceQPS =
+                (Map<String, Object>) result.get("TableSourceReceivedQPS");
+        Assertions.assertNotNull(tableSourceQPS);
+        Assertions.assertEquals(13.2, (Double) 
tableSourceQPS.get("fake.table1"), 0.01);
+        Assertions.assertEquals(28.5, (Double) 
tableSourceQPS.get("fake.table2"), 0.01);
+
+        Map<String, Object> tableSinkCommittedQPS =
+                (Map<String, Object>) result.get("TableSinkCommittedQPS");
+        Assertions.assertNotNull(tableSinkCommittedQPS);
+        Assertions.assertEquals(10.8, (Double) 
tableSinkCommittedQPS.get("fake.table1"), 0.01);
+        Assertions.assertEquals(22.5, (Double) 
tableSinkCommittedQPS.get("fake.table2"), 0.01);
+    }
+}

Reply via email to