This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0df747401d [Feature][Core][Metrics] Add sink committed metrics and
calculate commit rate in job metrics summary (#10233)
0df747401d is described below
commit 0df747401d3ea38622ca84aa032e5bc16717d52c
Author: Jast <[email protected]>
AuthorDate: Wed Jan 7 12:29:04 2026 +0800
[Feature][Core][Metrics] Add sink committed metrics and calculate commit
rate in job metrics summary (#10233)
---
.../seatunnel/engine/client/job/JobClient.java | 18 +-
.../engine/client/job/JobMetricsRunner.java | 28 ++-
.../seatunnel/engine/client/JobClientTest.java | 234 +++++++++++++++++++++
3 files changed, 275 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 035d3bc138..446c9ce04b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -154,21 +154,33 @@ public class JobClient {
public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId)
{
long sourceReadCount = 0L;
long sinkWriteCount = 0L;
+ long sinkCommittedCount = 0L;
String jobMetrics = getJobMetrics(jobId);
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
+ JsonNode sinkCommitteds = jsonNode.get("SinkCommittedCount");
+
for (int i = 0; i < sourceReaders.size(); i++) {
JsonNode sourceReader = sourceReaders.get(i);
JsonNode sinkWriter = sinkWriters.get(i);
sourceReadCount += sourceReader.get("value").asLong();
sinkWriteCount += sinkWriter.get("value").asLong();
}
- return new JobMetricsRunner.JobMetricsSummary(sourceReadCount,
sinkWriteCount);
- // Add NullPointerException because of metrics information can be
empty like {}
+
+ if (sinkCommitteds != null) {
+ for (int i = 0; i < sinkCommitteds.size(); i++) {
+ JsonNode sinkCommitted = sinkCommitteds.get(i);
+ sinkCommittedCount += sinkCommitted.get("value").asLong();
+ }
+ }
+
+ return new JobMetricsRunner.JobMetricsSummary(
+ sourceReadCount, sinkWriteCount, sinkCommittedCount);
} catch (JsonProcessingException | NullPointerException e) {
- return new JobMetricsRunner.JobMetricsSummary(sourceReadCount,
sinkWriteCount);
+ return new JobMetricsRunner.JobMetricsSummary(
+ sourceReadCount, sinkWriteCount, sinkCommittedCount);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
index f413a5bfc5..45eea32e91 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
@@ -35,6 +35,7 @@ public class JobMetricsRunner implements Runnable {
private LocalDateTime lastRunTime = LocalDateTime.now();
private Long lastReadCount = 0L;
private Long lastWriteCount = 0L;
+ private Long lastCommittedCount = 0L;
public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) {
this.seaTunnelClient = seaTunnelClient;
@@ -50,6 +51,21 @@ public class JobMetricsRunner implements Runnable {
long seconds = Duration.between(lastRunTime, now).getSeconds();
long averageRead = (jobMetricsSummary.getSourceReadCount() -
lastReadCount) / seconds;
long averageWrite = (jobMetricsSummary.getSinkWriteCount() -
lastWriteCount) / seconds;
+ long averageCommitted =
+ (jobMetricsSummary.getSinkCommittedCount() -
lastCommittedCount) / seconds;
+
+ String commitRate = "N/A";
+ if (jobMetricsSummary.getSinkWriteCount() > 0
+ && jobMetricsSummary.getSinkCommittedCount() >= 0) {
+ double rate =
+ (double) jobMetricsSummary.getSinkCommittedCount()
+ / jobMetricsSummary.getSinkWriteCount()
+ * 100;
+
+ rate = Math.max(0, Math.min(100, rate));
+ commitRate = String.format("%.2f%%", rate);
+ }
+
log.info(
StringFormatUtils.formatTable(
"Job Progress Information",
@@ -57,12 +73,18 @@ public class JobMetricsRunner implements Runnable {
jobId,
"Read Count So Far",
jobMetricsSummary.getSourceReadCount(),
- "Write Count So Far",
+ "Write Attempt Count So Far",
jobMetricsSummary.getSinkWriteCount(),
+ "Write Committed Count So Far",
+ jobMetricsSummary.getSinkCommittedCount(),
+ "Commit Rate",
+ commitRate,
"Average Read Count",
averageRead + "/s",
- "Average Write Count",
+ "Average Write Attempt Count",
averageWrite + "/s",
+ "Average Write Committed Count",
+ averageCommitted + "/s",
"Last Statistic Time",
DateTimeUtils.toString(
lastRunTime,
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
@@ -72,6 +94,7 @@ public class JobMetricsRunner implements Runnable {
lastRunTime = now;
lastReadCount = jobMetricsSummary.getSourceReadCount();
lastWriteCount = jobMetricsSummary.getSinkWriteCount();
+ lastCommittedCount = jobMetricsSummary.getSinkCommittedCount();
} catch (Exception e) {
log.warn("Failed to get job metrics summary, it maybe first-run");
}
@@ -82,5 +105,6 @@ public class JobMetricsRunner implements Runnable {
public static class JobMetricsSummary {
private long sourceReadCount;
private long sinkWriteCount;
+ private long sinkCommittedCount;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobClientTest.java
new file mode 100644
index 0000000000..b915db2ff4
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobClientTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client;
+
+import org.apache.seatunnel.engine.client.job.JobClient;
+import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class JobClientTest {
+
+ private JobClient jobClient;
+ private SeaTunnelHazelcastClient hazelcastClient;
+
+ @BeforeEach
+ public void setUp() {
+ hazelcastClient = mock(SeaTunnelHazelcastClient.class);
+ jobClient = new JobClient(hazelcastClient);
+ }
+
+ @Test
+ public void testNormalCaseWithCommittedMetrics() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": [{\"value\": 1000,
\"name\": \"source1\"}],"
+ + "\"SinkWriteCount\": [{\"value\": 950, \"name\":
\"sink1\"}],"
+ + "\"SinkCommittedCount\": [{\"value\": 900, \"name\":
\"sink1\"}]"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(1000L, summary.getSourceReadCount());
+ Assertions.assertEquals(950L, summary.getSinkWriteCount());
+ Assertions.assertEquals(900L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testWithoutCommittedMetrics() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": [{\"value\": 1000,
\"name\": \"source1\"}],"
+ + "\"SinkWriteCount\": [{\"value\": 950, \"name\":
\"sink1\"}]"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(1000L, summary.getSourceReadCount());
+ Assertions.assertEquals(950L, summary.getSinkWriteCount());
+ Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testEmptyMetrics() {
+ String metricsJson = "{}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(0L, summary.getSourceReadCount());
+ Assertions.assertEquals(0L, summary.getSinkWriteCount());
+ Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testEmptyArrays() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": [],"
+ + "\"SinkWriteCount\": [],"
+ + "\"SinkCommittedCount\": []"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(0L, summary.getSourceReadCount());
+ Assertions.assertEquals(0L, summary.getSinkWriteCount());
+ Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testMultipleSinks() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": ["
+ + " {\"value\": 500, \"name\": \"source1\"},"
+ + " {\"value\": 400, \"name\": \"source2\"},"
+ + " {\"value\": 300, \"name\": \"source3\"}"
+ + "],"
+ + "\"SinkWriteCount\": ["
+ + " {\"value\": 500, \"name\": \"sink1\"},"
+ + " {\"value\": 400, \"name\": \"sink2\"},"
+ + " {\"value\": 300, \"name\": \"sink3\"}"
+ + "],"
+ + "\"SinkCommittedCount\": ["
+ + " {\"value\": 450, \"name\": \"sink1\"},"
+ + " {\"value\": 380, \"name\": \"sink2\"},"
+ + " {\"value\": 290, \"name\": \"sink3\"}"
+ + "]"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(1200L, summary.getSourceReadCount());
+ Assertions.assertEquals(1200L, summary.getSinkWriteCount());
+ Assertions.assertEquals(1120L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testCommittedLessThanWrite() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": [{\"value\": 1000,
\"name\": \"source1\"}],"
+ + "\"SinkWriteCount\": [{\"value\": 1000, \"name\":
\"sink1\"}],"
+ + "\"SinkCommittedCount\": [{\"value\": 800, \"name\":
\"sink1\"}]"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(1000L, summary.getSourceReadCount());
+ Assertions.assertEquals(1000L, summary.getSinkWriteCount());
+ Assertions.assertEquals(800L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testCommittedEqualsWrite() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": [{\"value\": 1000,
\"name\": \"source1\"}],"
+ + "\"SinkWriteCount\": [{\"value\": 1000, \"name\":
\"sink1\"}],"
+ + "\"SinkCommittedCount\": [{\"value\": 1000,
\"name\": \"sink1\"}]"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(1000L, summary.getSourceReadCount());
+ Assertions.assertEquals(1000L, summary.getSinkWriteCount());
+ Assertions.assertEquals(1000L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testInvalidJson() {
+ String metricsJson = "invalid json {{}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(0L, summary.getSourceReadCount());
+ Assertions.assertEquals(0L, summary.getSinkWriteCount());
+ Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testNullMetrics() {
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(),
any())).thenReturn("null");
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(0L, summary.getSourceReadCount());
+ Assertions.assertEquals(0L, summary.getSinkWriteCount());
+ Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+ }
+
+ @Test
+ public void testZeroValues() {
+ String metricsJson =
+ "{"
+ + "\"SourceReceivedCount\": [{\"value\": 0, \"name\":
\"source1\"}],"
+ + "\"SinkWriteCount\": [{\"value\": 0, \"name\":
\"sink1\"}],"
+ + "\"SinkCommittedCount\": [{\"value\": 0, \"name\":
\"sink1\"}]"
+ + "}";
+
+ when(hazelcastClient.requestOnMasterAndDecodeResponse(any(), any()))
+ .thenReturn(metricsJson);
+
+ JobMetricsRunner.JobMetricsSummary summary =
jobClient.getJobMetricsSummary(123456L);
+
+ Assertions.assertNotNull(summary);
+ Assertions.assertEquals(0L, summary.getSourceReadCount());
+ Assertions.assertEquals(0L, summary.getSinkWriteCount());
+ Assertions.assertEquals(0L, summary.getSinkCommittedCount());
+ }
+}