Copilot commented on code in PR #10100:
URL: https://github.com/apache/seatunnel/pull/10100#discussion_r2551896591


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -207,4 +337,67 @@ private <T> T createMetric(
     interface MetricProcessor<T> {
         void process(T t);
     }
+
+    private static final class PendingMetrics {
+        private long count;
+        private long bytes;
+        private final Map<String, TablePendingMetrics> tableMetrics = new 
HashMap<>();
+
+        void add(String tableName, long rowBytes) {
+            count++;
+            bytes += rowBytes;
+            if (StringUtils.isNotBlank(tableName)) {
+                tableMetrics
+                        .computeIfAbsent(tableName, key -> new 
TablePendingMetrics())
+                        .add(rowBytes);
+            }
+        }
+
+        boolean isEmpty() {
+            return count == 0 && bytes == 0 && tableMetrics.isEmpty();

Review Comment:
   The `isEmpty()` logic is incorrect. When `count` and `bytes` are both 0, but 
`tableMetrics` is not empty (e.g., contains entries with 0 counts/bytes), this 
method returns `false`. However, if data is added to a table but the row has 0 
bytes, `tableMetrics` will contain an entry but `count` will be > 0. The logic 
should be: `return count == 0;` since `count` is the single source of truth for 
whether any rows were processed.
   ```suggestion
               return count == 0;
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -61,6 +67,27 @@ public class ConnectorMetricsCalcContext {
 
     private final Map<String, Meter> bytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
 
+    // Committed metrics
+    private Counter committedCount;
+
+    private final Map<String, Counter> committedCountPerTable = new 
ConcurrentHashMap<>();
+
+    private Meter committedQPS;
+
+    private final Map<String, Meter> committedQPSPerTable = new 
ConcurrentHashMap<>();
+
+    private Counter committedBytes;
+
+    private final Map<String, Counter> committedBytesPerTable = new 
ConcurrentHashMap<>();
+
+    private Meter committedBytesPerSeconds;
+
+    private final Map<String, Meter> committedBytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
+
+    private PendingMetrics currentPendingMetrics;
+
+    private final Map<Long, PendingMetrics> pendingMetricsByCheckpoint = new 
ConcurrentHashMap<>();

Review Comment:
   Potential memory leak: If a checkpoint is never completed or aborted (e.g., 
due to system failure, job cancellation), the `pendingMetricsByCheckpoint` map 
will retain the pending metrics indefinitely. Consider adding cleanup logic in 
the `close()` method or implementing a timeout mechanism to remove stale 
checkpoint metrics.



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Slf4j
+public class CommittedMetricsIT {
+
+    private static final String HOST = "http://localhost:";;
+
+    private ClientJobProxy streamJobProxy;
+
+    private HazelcastInstanceImpl node1;
+
+    private SeaTunnelClient engineClient;
+
+    private SeaTunnelConfig seaTunnelConfig;
+
+    @BeforeEach
+    void beforeClass() throws Exception {
+        String testClusterName = 
TestUtils.getClusterName("CommittedMetricsIT");
+        seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18080);
+        
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
+        seaTunnelConfig.getEngineConfig().getHttpConfig().setPortRange(200);
+        node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        clientConfig.setClusterName(testClusterName);
+        engineClient = new SeaTunnelClient(clientConfig);
+    }
+
+    @Test
+    public void testCommittedMetricsWithCheckpoint() throws Exception {
+        String streamFilePath =
+                
TestUtils.getResource("stream_fake_multi_table_to_console_with_checkpoint.conf");
+        JobConfig streamConf = new JobConfig();
+        
streamConf.setName("stream_fake_multi_table_to_console_with_checkpoint");

Review Comment:
   The test references a configuration file 
`stream_fake_multi_table_to_console_with_checkpoint.conf` that doesn't exist in 
the repository. This will cause the test to fail at runtime. Please add the 
missing configuration file or update the test to reference an existing 
configuration file.
   ```suggestion
                   
TestUtils.getResource("stream_fake_to_console_with_checkpoint.conf");
           JobConfig streamConf = new JobConfig();
           streamConf.setName("stream_fake_to_console_with_checkpoint");
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -127,51 +178,130 @@ public void updateMetrics(Object data, String tableId) {
         QPS.markEvent();
         if (data instanceof SeaTunnelRow) {
             SeaTunnelRow row = (SeaTunnelRow) data;
-            bytes.inc(row.getBytesSize());
-            bytesPerSeconds.markEvent(row.getBytesSize());
+            long rowBytes = row.getBytesSize();
+            bytes.inc(rowBytes);
+            bytesPerSeconds.markEvent(rowBytes);
 
-            if (StringUtils.isNotBlank(tableId)) {
-                String tableName = TablePath.of(tableId).getFullName();
+            String normalizedTableName =
+                    StringUtils.isNotBlank(tableId) ? 
normalizeTableName(tableId) : null;
+            if (PluginType.SINK.equals(type)) {
+                recordPendingMetrics(normalizedTableName, rowBytes);
+            }
 
-                // Processing count
+            if (StringUtils.isNotBlank(normalizedTableName)) {
                 processMetrics(
                         countPerTable,
                         Counter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_COUNT,
                         SOURCE_RECEIVED_COUNT,
                         Counter::inc);
 
-                // Processing bytes
                 processMetrics(
                         bytesPerTable,
                         Counter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_BYTES,
                         SOURCE_RECEIVED_BYTES,
-                        counter -> counter.inc(row.getBytesSize()));
+                        counter -> counter.inc(rowBytes));
 
-                // Processing QPS
                 processMetrics(
                         QPSPerTable,
                         Meter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_QPS,
                         SOURCE_RECEIVED_QPS,
                         Meter::markEvent);
 
-                // Processing bytes rate
                 processMetrics(
                         bytesPerSecondsPerTable,
                         Meter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_BYTES_PER_SECONDS,
                         SOURCE_RECEIVED_BYTES_PER_SECONDS,
-                        meter -> meter.markEvent(row.getBytesSize()));
+                        meter -> meter.markEvent(rowBytes));
             }
         }
     }
 
+    public void sealCheckpointMetrics(long checkpointId) {
+        if (!PluginType.SINK.equals(type)) {
+            return;
+        }
+        PendingMetrics pendingToSeal = currentPendingMetrics;
+        currentPendingMetrics = new PendingMetrics();
+        if (pendingToSeal.isEmpty()) {
+            return;
+        }
+        pendingMetricsByCheckpoint
+                .computeIfAbsent(checkpointId, key -> new PendingMetrics())
+                .merge(pendingToSeal);
+    }
+
+    public void commitPendingMetrics(long checkpointId) {
+        if (!PluginType.SINK.equals(type)) {
+            return;
+        }
+        PendingMetrics pending = 
pendingMetricsByCheckpoint.remove(checkpointId);
+        if (pending == null || pending.isEmpty()) {
+            return;
+        }
+        committedCount.inc(pending.getCount());
+        committedQPS.markEvent(pending.getCount());
+        committedBytes.inc(pending.getBytes());
+        committedBytesPerSeconds.markEvent(pending.getBytes());

Review Comment:
   The `committedCount`, `committedQPS`, `committedBytes`, and 
`committedBytesPerSeconds` fields are only initialized for SINK plugin type. 
However, there's no null check before using them in `commitPendingMetrics()`. 
While the method checks `PluginType.SINK.equals(type)` before processing, if 
`initializeCommittedMetrics()` somehow fails or is skipped, these fields could 
be null and cause NullPointerException. Consider adding null checks or 
defensive initialization.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -207,4 +337,67 @@ private <T> T createMetric(
     interface MetricProcessor<T> {
         void process(T t);
     }
+
+    private static final class PendingMetrics {
+        private long count;
+        private long bytes;
+        private final Map<String, TablePendingMetrics> tableMetrics = new 
HashMap<>();

Review Comment:
   Thread safety issue: `PendingMetrics.tableMetrics` uses `HashMap` which is 
not thread-safe. If `add()` is called concurrently from multiple threads, it 
can lead to data corruption. Since `currentPendingMetrics` can be accessed by 
multiple data processing threads calling `updateMetrics()`, this HashMap should 
be replaced with `ConcurrentHashMap` to ensure thread safety.
   ```suggestion
           private final Map<String, TablePendingMetrics> tableMetrics = new 
ConcurrentHashMap<>();
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -127,51 +178,130 @@ public void updateMetrics(Object data, String tableId) {
         QPS.markEvent();
         if (data instanceof SeaTunnelRow) {
             SeaTunnelRow row = (SeaTunnelRow) data;
-            bytes.inc(row.getBytesSize());
-            bytesPerSeconds.markEvent(row.getBytesSize());
+            long rowBytes = row.getBytesSize();
+            bytes.inc(rowBytes);
+            bytesPerSeconds.markEvent(rowBytes);
 
-            if (StringUtils.isNotBlank(tableId)) {
-                String tableName = TablePath.of(tableId).getFullName();
+            String normalizedTableName =
+                    StringUtils.isNotBlank(tableId) ? 
normalizeTableName(tableId) : null;
+            if (PluginType.SINK.equals(type)) {
+                recordPendingMetrics(normalizedTableName, rowBytes);
+            }
 
-                // Processing count
+            if (StringUtils.isNotBlank(normalizedTableName)) {
                 processMetrics(
                         countPerTable,
                         Counter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_COUNT,
                         SOURCE_RECEIVED_COUNT,
                         Counter::inc);
 
-                // Processing bytes
                 processMetrics(
                         bytesPerTable,
                         Counter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_BYTES,
                         SOURCE_RECEIVED_BYTES,
-                        counter -> counter.inc(row.getBytesSize()));
+                        counter -> counter.inc(rowBytes));
 
-                // Processing QPS
                 processMetrics(
                         QPSPerTable,
                         Meter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_QPS,
                         SOURCE_RECEIVED_QPS,
                         Meter::markEvent);
 
-                // Processing bytes rate
                 processMetrics(
                         bytesPerSecondsPerTable,
                         Meter.class,
-                        tableName,
+                        normalizedTableName,
                         SINK_WRITE_BYTES_PER_SECONDS,
                         SOURCE_RECEIVED_BYTES_PER_SECONDS,
-                        meter -> meter.markEvent(row.getBytesSize()));
+                        meter -> meter.markEvent(rowBytes));
             }
         }
     }
 
+    public void sealCheckpointMetrics(long checkpointId) {
+        if (!PluginType.SINK.equals(type)) {
+            return;
+        }
+        PendingMetrics pendingToSeal = currentPendingMetrics;
+        currentPendingMetrics = new PendingMetrics();

Review Comment:
   Thread safety issue: `currentPendingMetrics` is accessed from multiple 
threads without synchronization. The `updateMetrics()` method (called from data 
processing thread via `recordPendingMetrics()`) and `sealCheckpointMetrics()` 
(called from barrier processing) can race. This can lead to:
   1. Lost updates if `updateMetrics()` is called between line 231 and 232
   2. Data corruption in `PendingMetrics` since it uses non-thread-safe 
`HashMap`
   
   Consider making `currentPendingMetrics` volatile or using `AtomicReference` 
with proper synchronization, and using `ConcurrentHashMap` in 
`PendingMetrics.tableMetrics`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to