Copilot commented on code in PR #10100:
URL: https://github.com/apache/seatunnel/pull/10100#discussion_r2551896591
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -207,4 +337,67 @@ private <T> T createMetric(
interface MetricProcessor<T> {
void process(T t);
}
+
+ private static final class PendingMetrics {
+ private long count;
+ private long bytes;
+ private final Map<String, TablePendingMetrics> tableMetrics = new
HashMap<>();
+
+ void add(String tableName, long rowBytes) {
+ count++;
+ bytes += rowBytes;
+ if (StringUtils.isNotBlank(tableName)) {
+ tableMetrics
+ .computeIfAbsent(tableName, key -> new
TablePendingMetrics())
+ .add(rowBytes);
+ }
+ }
+
+ boolean isEmpty() {
+ return count == 0 && bytes == 0 && tableMetrics.isEmpty();
Review Comment:
The `isEmpty()` logic is incorrect. When `count` and `bytes` are both 0, but
`tableMetrics` is not empty (e.g., contains entries with 0 counts/bytes), this
method returns `false`. However, if data is added to a table but the row has 0
bytes, `tableMetrics` will contain an entry but `count` will be > 0. The logic
should be: `return count == 0;` since `count` is the single source of truth for
whether any rows were processed.
```suggestion
return count == 0;
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -61,6 +67,27 @@ public class ConnectorMetricsCalcContext {
private final Map<String, Meter> bytesPerSecondsPerTable = new
ConcurrentHashMap<>();
+ // Committed metrics
+ private Counter committedCount;
+
+ private final Map<String, Counter> committedCountPerTable = new
ConcurrentHashMap<>();
+
+ private Meter committedQPS;
+
+ private final Map<String, Meter> committedQPSPerTable = new
ConcurrentHashMap<>();
+
+ private Counter committedBytes;
+
+ private final Map<String, Counter> committedBytesPerTable = new
ConcurrentHashMap<>();
+
+ private Meter committedBytesPerSeconds;
+
+ private final Map<String, Meter> committedBytesPerSecondsPerTable = new
ConcurrentHashMap<>();
+
+ private PendingMetrics currentPendingMetrics;
+
+ private final Map<Long, PendingMetrics> pendingMetricsByCheckpoint = new
ConcurrentHashMap<>();
Review Comment:
Potential memory leak: If a checkpoint is never completed or aborted (e.g.,
due to system failure, job cancellation), the `pendingMetricsByCheckpoint` map
will retain the pending metrics indefinitely. Consider adding cleanup logic in
the `close()` method or implementing a timeout mechanism to remove stale
checkpoint metrics.
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CommittedMetricsIT.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Slf4j
+public class CommittedMetricsIT {
+
+ private static final String HOST = "http://localhost:";
+
+ private ClientJobProxy streamJobProxy;
+
+ private HazelcastInstanceImpl node1;
+
+ private SeaTunnelClient engineClient;
+
+ private SeaTunnelConfig seaTunnelConfig;
+
+ @BeforeEach
+ void beforeClass() throws Exception {
+ String testClusterName =
TestUtils.getClusterName("CommittedMetricsIT");
+ seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
+ seaTunnelConfig.getEngineConfig().getHttpConfig().setPort(18080);
+
seaTunnelConfig.getEngineConfig().getHttpConfig().setEnableDynamicPort(true);
+ seaTunnelConfig.getEngineConfig().getHttpConfig().setPortRange(200);
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(testClusterName);
+ engineClient = new SeaTunnelClient(clientConfig);
+ }
+
+ @Test
+ public void testCommittedMetricsWithCheckpoint() throws Exception {
+ String streamFilePath =
+
TestUtils.getResource("stream_fake_multi_table_to_console_with_checkpoint.conf");
+ JobConfig streamConf = new JobConfig();
+
streamConf.setName("stream_fake_multi_table_to_console_with_checkpoint");
Review Comment:
The test references a configuration file
`stream_fake_multi_table_to_console_with_checkpoint.conf` that doesn't exist in
the repository. This will cause the test to fail at runtime. Please add the
missing configuration file or update the test to reference an existing
configuration file.
```suggestion
TestUtils.getResource("stream_fake_to_console_with_checkpoint.conf");
JobConfig streamConf = new JobConfig();
streamConf.setName("stream_fake_to_console_with_checkpoint");
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -127,51 +178,130 @@ public void updateMetrics(Object data, String tableId) {
QPS.markEvent();
if (data instanceof SeaTunnelRow) {
SeaTunnelRow row = (SeaTunnelRow) data;
- bytes.inc(row.getBytesSize());
- bytesPerSeconds.markEvent(row.getBytesSize());
+ long rowBytes = row.getBytesSize();
+ bytes.inc(rowBytes);
+ bytesPerSeconds.markEvent(rowBytes);
- if (StringUtils.isNotBlank(tableId)) {
- String tableName = TablePath.of(tableId).getFullName();
+ String normalizedTableName =
+ StringUtils.isNotBlank(tableId) ?
normalizeTableName(tableId) : null;
+ if (PluginType.SINK.equals(type)) {
+ recordPendingMetrics(normalizedTableName, rowBytes);
+ }
- // Processing count
+ if (StringUtils.isNotBlank(normalizedTableName)) {
processMetrics(
countPerTable,
Counter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_COUNT,
SOURCE_RECEIVED_COUNT,
Counter::inc);
- // Processing bytes
processMetrics(
bytesPerTable,
Counter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_BYTES,
SOURCE_RECEIVED_BYTES,
- counter -> counter.inc(row.getBytesSize()));
+ counter -> counter.inc(rowBytes));
- // Processing QPS
processMetrics(
QPSPerTable,
Meter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_QPS,
SOURCE_RECEIVED_QPS,
Meter::markEvent);
- // Processing bytes rate
processMetrics(
bytesPerSecondsPerTable,
Meter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_BYTES_PER_SECONDS,
SOURCE_RECEIVED_BYTES_PER_SECONDS,
- meter -> meter.markEvent(row.getBytesSize()));
+ meter -> meter.markEvent(rowBytes));
}
}
}
+ public void sealCheckpointMetrics(long checkpointId) {
+ if (!PluginType.SINK.equals(type)) {
+ return;
+ }
+ PendingMetrics pendingToSeal = currentPendingMetrics;
+ currentPendingMetrics = new PendingMetrics();
+ if (pendingToSeal.isEmpty()) {
+ return;
+ }
+ pendingMetricsByCheckpoint
+ .computeIfAbsent(checkpointId, key -> new PendingMetrics())
+ .merge(pendingToSeal);
+ }
+
+ public void commitPendingMetrics(long checkpointId) {
+ if (!PluginType.SINK.equals(type)) {
+ return;
+ }
+ PendingMetrics pending =
pendingMetricsByCheckpoint.remove(checkpointId);
+ if (pending == null || pending.isEmpty()) {
+ return;
+ }
+ committedCount.inc(pending.getCount());
+ committedQPS.markEvent(pending.getCount());
+ committedBytes.inc(pending.getBytes());
+ committedBytesPerSeconds.markEvent(pending.getBytes());
Review Comment:
The `committedCount`, `committedQPS`, `committedBytes`, and
`committedBytesPerSeconds` fields are only initialized for SINK plugin type.
However, there's no null check before using them in `commitPendingMetrics()`.
While the method checks `PluginType.SINK.equals(type)` before processing, if
`initializeCommittedMetrics()` somehow fails or is skipped, these fields could
be null and cause NullPointerException. Consider adding null checks or
defensive initialization.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -207,4 +337,67 @@ private <T> T createMetric(
interface MetricProcessor<T> {
void process(T t);
}
+
+ private static final class PendingMetrics {
+ private long count;
+ private long bytes;
+ private final Map<String, TablePendingMetrics> tableMetrics = new
HashMap<>();
Review Comment:
Thread safety issue: `PendingMetrics.tableMetrics` uses `HashMap` which is
not thread-safe. If `add()` is called concurrently from multiple threads, it
can lead to data corruption. Since `currentPendingMetrics` can be accessed by
multiple data processing threads calling `updateMetrics()`, this HashMap should
be replaced with `ConcurrentHashMap` to ensure thread safety.
```suggestion
private final Map<String, TablePendingMetrics> tableMetrics = new
ConcurrentHashMap<>();
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ConnectorMetricsCalcContext.java:
##########
@@ -127,51 +178,130 @@ public void updateMetrics(Object data, String tableId) {
QPS.markEvent();
if (data instanceof SeaTunnelRow) {
SeaTunnelRow row = (SeaTunnelRow) data;
- bytes.inc(row.getBytesSize());
- bytesPerSeconds.markEvent(row.getBytesSize());
+ long rowBytes = row.getBytesSize();
+ bytes.inc(rowBytes);
+ bytesPerSeconds.markEvent(rowBytes);
- if (StringUtils.isNotBlank(tableId)) {
- String tableName = TablePath.of(tableId).getFullName();
+ String normalizedTableName =
+ StringUtils.isNotBlank(tableId) ?
normalizeTableName(tableId) : null;
+ if (PluginType.SINK.equals(type)) {
+ recordPendingMetrics(normalizedTableName, rowBytes);
+ }
- // Processing count
+ if (StringUtils.isNotBlank(normalizedTableName)) {
processMetrics(
countPerTable,
Counter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_COUNT,
SOURCE_RECEIVED_COUNT,
Counter::inc);
- // Processing bytes
processMetrics(
bytesPerTable,
Counter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_BYTES,
SOURCE_RECEIVED_BYTES,
- counter -> counter.inc(row.getBytesSize()));
+ counter -> counter.inc(rowBytes));
- // Processing QPS
processMetrics(
QPSPerTable,
Meter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_QPS,
SOURCE_RECEIVED_QPS,
Meter::markEvent);
- // Processing bytes rate
processMetrics(
bytesPerSecondsPerTable,
Meter.class,
- tableName,
+ normalizedTableName,
SINK_WRITE_BYTES_PER_SECONDS,
SOURCE_RECEIVED_BYTES_PER_SECONDS,
- meter -> meter.markEvent(row.getBytesSize()));
+ meter -> meter.markEvent(rowBytes));
}
}
}
+ public void sealCheckpointMetrics(long checkpointId) {
+ if (!PluginType.SINK.equals(type)) {
+ return;
+ }
+ PendingMetrics pendingToSeal = currentPendingMetrics;
+ currentPendingMetrics = new PendingMetrics();
Review Comment:
Thread safety issue: `currentPendingMetrics` is accessed from multiple
threads without synchronization. The `updateMetrics()` method (called from data
processing thread via `recordPendingMetrics()`) and `sealCheckpointMetrics()`
(called from barrier processing) can race. This can lead to:
1. Lost updates if `updateMetrics()` is called between line 231 and 232
2. Data corruption in `PendingMetrics` since it uses non-thread-safe
`HashMap`
Consider making `currentPendingMetrics` volatile or using `AtomicReference`
with proper synchronization, and using `ConcurrentHashMap` in
`PendingMetrics.tableMetrics`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]