This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0df747401d [Feature][Core][Metrics] Add sink committed metrics and 
calculate commit rate in job metrics summary (#10233)
0df747401d is described below

commit 0df747401d3ea38622ca84aa032e5bc16717d52c
Author: Jast <[email protected]>
AuthorDate: Wed Jan 7 12:29:04 2026 +0800

    [Feature][Core][Metrics] Add sink committed metrics and calculate commit 
rate in job metrics summary (#10233)
---
 .../seatunnel/engine/client/job/JobClient.java     |  18 +-
 .../engine/client/job/JobMetricsRunner.java        |  28 ++-
 .../seatunnel/engine/client/JobClientTest.java     | 234 +++++++++++++++++++++
 3 files changed, 275 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 035d3bc138..446c9ce04b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -154,21 +154,33 @@ public class JobClient {
     public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) 
{
         long sourceReadCount = 0L;
         long sinkWriteCount = 0L;
+        long sinkCommittedCount = 0L;
         String jobMetrics = getJobMetrics(jobId);
         try {
             JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
             JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
             JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
+            JsonNode sinkCommitteds = jsonNode.get("SinkCommittedCount");
+
             for (int i = 0; i < sourceReaders.size(); i++) {
                 JsonNode sourceReader = sourceReaders.get(i);
                 JsonNode sinkWriter = sinkWriters.get(i);
                 sourceReadCount += sourceReader.get("value").asLong();
                 sinkWriteCount += sinkWriter.get("value").asLong();
             }
-            return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, 
sinkWriteCount);
-            // Add NullPointerException because of metrics information can be 
empty like {}
+
+            if (sinkCommitteds != null) {
+                for (int i = 0; i < sinkCommitteds.size(); i++) {
+                    JsonNode sinkCommitted = sinkCommitteds.get(i);
+                    sinkCommittedCount += sinkCommitted.get("value").asLong();
+                }
+            }
+
+            return new JobMetricsRunner.JobMetricsSummary(
+                    sourceReadCount, sinkWriteCount, sinkCommittedCount);
         } catch (JsonProcessingException | NullPointerException e) {
-            return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, 
sinkWriteCount);
+            return new JobMetricsRunner.JobMetricsSummary(
+                    sourceReadCount, sinkWriteCount, sinkCommittedCount);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
index f413a5bfc5..45eea32e91 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
@@ -35,6 +35,7 @@ public class JobMetricsRunner implements Runnable {
     private LocalDateTime lastRunTime = LocalDateTime.now();
     private Long lastReadCount = 0L;
     private Long lastWriteCount = 0L;
+    private Long lastCommittedCount = 0L;
 
     public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) {
         this.seaTunnelClient = seaTunnelClient;
@@ -50,6 +51,21 @@ public class JobMetricsRunner implements Runnable {
             long seconds = Duration.between(lastRunTime, now).getSeconds();
             long averageRead = (jobMetricsSummary.getSourceReadCount() - 
lastReadCount) / seconds;
             long averageWrite = (jobMetricsSummary.getSinkWriteCount() - 
lastWriteCount) / seconds;
+            long averageCommitted =
+                    (jobMetricsSummary.getSinkCommittedCount() - 
lastCommittedCount) / seconds;
+
+            String commitRate = "N/A";
+            if (jobMetricsSummary.getSinkWriteCount() > 0
+                    && jobMetricsSummary.getSinkCommittedCount() >= 0) {
+                double rate =
+                        (double) jobMetricsSummary.getSinkCommittedCount()
+                                / jobMetricsSummary.getSinkWriteCount()
+                                * 100;
+
+                rate = Math.max(0, Math.min(100, rate));
+                commitRate = String.format("%.2f%%", rate);
+            }
+
             log.info(
                     StringFormatUtils.formatTable(
                             "Job Progress Information",
@@ -57,12 +73,18 @@ public class JobMetricsRunner implements Runnable {
                             jobId,
                             "Read Count So Far",
                             jobMetricsSummary.getSourceReadCount(),
-                            "Write Count So Far",
+                            "Write Attempt Count So Far",
                             jobMetricsSummary.getSinkWriteCount(),
+                            "Write Committed Count So Far",
+                            jobMetricsSummary.getSinkCommittedCount(),
+                            "Commit Rate",
+                            commitRate,
                             "Average Read Count",
                             averageRead + "/s",
-                            "Average Write Count",
+                            "Average Write Attempt Count",
                             averageWrite + "/s",
+                            "Average Write Committed Count",
+                            averageCommitted + "/s",
                             "Last Statistic Time",
                             DateTimeUtils.toString(
                                     lastRunTime, 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
@@ -72,6 +94,7 @@ public class JobMetricsRunner implements Runnable {
             lastRunTime = now;
             lastReadCount = jobMetricsSummary.getSourceReadCount();
             lastWriteCount = jobMetricsSummary.getSinkWriteCount();
+            lastCommittedCount = jobMetricsSummary.getSinkCommittedCount();
         } catch (Exception e) {
             log.warn("Failed to get job metrics summary, it maybe first-run");
         }
@@ -82,5 +105,6 @@ public class JobMetricsRunner implements Runnable {
     public static class JobMetricsSummary {
         private long sourceReadCount;
         private long sinkWriteCount;
+        private long sinkCommittedCount;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobClientTest.java
new file mode 100644
index 0000000000..b915db2ff4
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobClientTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.client.job.JobClient;
+import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class JobClientTest {
+
+    private JobClient jobClient;
+    private SeaTunnelHazelcastClient hazelcastClient;
+
+    @BeforeEach
+    public void setUp() {
+        hazelcastClient = mock(SeaTunnelHazelcastClient.class);
+        jobClient = new JobClient(hazelcastClient);
+    }
+
+    @Test
+    public void testNormalCaseWithCommittedMetrics() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": [{\"value\": 1000, 
\"name\": \"source1\"}],"
+                        + "\"SinkWriteCount\": [{\"value\": 950, \"name\": 
\"sink1\"}],"
+                        + "\"SinkCommittedCount\": [{\"value\": 900, \"name\": 
\"sink1\"}]"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(1000L, summary.getSourceReadCount());
+        Assertions.assertEquals(950L, summary.getSinkWriteCount());
+        Assertions.assertEquals(900L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testWithoutCommittedMetrics() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": [{\"value\": 1000, 
\"name\": \"source1\"}],"
+                        + "\"SinkWriteCount\": [{\"value\": 950, \"name\": 
\"sink1\"}]"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(1000L, summary.getSourceReadCount());
+        Assertions.assertEquals(950L, summary.getSinkWriteCount());
+        Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testEmptyMetrics() {
+        String metricsJson = "{}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(0L, summary.getSourceReadCount());
+        Assertions.assertEquals(0L, summary.getSinkWriteCount());
+        Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testEmptyArrays() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": [],"
+                        + "\"SinkWriteCount\": [],"
+                        + "\"SinkCommittedCount\": []"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(0L, summary.getSourceReadCount());
+        Assertions.assertEquals(0L, summary.getSinkWriteCount());
+        Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testMultipleSinks() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": ["
+                        + "  {\"value\": 500, \"name\": \"source1\"},"
+                        + "  {\"value\": 400, \"name\": \"source2\"},"
+                        + "  {\"value\": 300, \"name\": \"source3\"}"
+                        + "],"
+                        + "\"SinkWriteCount\": ["
+                        + "  {\"value\": 500, \"name\": \"sink1\"},"
+                        + "  {\"value\": 400, \"name\": \"sink2\"},"
+                        + "  {\"value\": 300, \"name\": \"sink3\"}"
+                        + "],"
+                        + "\"SinkCommittedCount\": ["
+                        + "  {\"value\": 450, \"name\": \"sink1\"},"
+                        + "  {\"value\": 380, \"name\": \"sink2\"},"
+                        + "  {\"value\": 290, \"name\": \"sink3\"}"
+                        + "]"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(1200L, summary.getSourceReadCount());
+        Assertions.assertEquals(1200L, summary.getSinkWriteCount());
+        Assertions.assertEquals(1120L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testCommittedLessThanWrite() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": [{\"value\": 1000, 
\"name\": \"source1\"}],"
+                        + "\"SinkWriteCount\": [{\"value\": 1000, \"name\": 
\"sink1\"}],"
+                        + "\"SinkCommittedCount\": [{\"value\": 800, \"name\": 
\"sink1\"}]"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(1000L, summary.getSourceReadCount());
+        Assertions.assertEquals(1000L, summary.getSinkWriteCount());
+        Assertions.assertEquals(800L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testCommittedEqualsWrite() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": [{\"value\": 1000, 
\"name\": \"source1\"}],"
+                        + "\"SinkWriteCount\": [{\"value\": 1000, \"name\": 
\"sink1\"}],"
+                        + "\"SinkCommittedCount\": [{\"value\": 1000, 
\"name\": \"sink1\"}]"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(1000L, summary.getSourceReadCount());
+        Assertions.assertEquals(1000L, summary.getSinkWriteCount());
+        Assertions.assertEquals(1000L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testInvalidJson() {
+        String metricsJson = "invalid json {{}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(0L, summary.getSourceReadCount());
+        Assertions.assertEquals(0L, summary.getSinkWriteCount());
+        Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testNullMetrics() {
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), 
any())).thenReturn("null");
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(0L, summary.getSourceReadCount());
+        Assertions.assertEquals(0L, summary.getSinkWriteCount());
+        Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+    }
+
+    @Test
+    public void testZeroValues() {
+        String metricsJson =
+                "{"
+                        + "\"SourceReceivedCount\": [{\"value\": 0, \"name\": 
\"source1\"}],"
+                        + "\"SinkWriteCount\": [{\"value\": 0, \"name\": 
\"sink1\"}],"
+                        + "\"SinkCommittedCount\": [{\"value\": 0, \"name\": 
\"sink1\"}]"
+                        + "}";
+
+        when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+                .thenReturn(metricsJson);
+
+        JobMetricsRunner.JobMetricsSummary summary = 
jobClient.getJobMetricsSummary(123456L);
+
+        Assertions.assertNotNull(summary);
+        Assertions.assertEquals(0L, summary.getSourceReadCount());
+        Assertions.assertEquals(0L, summary.getSinkWriteCount());
+        Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+    }
+}

Reply via email to