Copilot commented on code in PR #60473:
URL: https://github.com/apache/doris/pull/60473#discussion_r2762868559
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1129,6 +1142,55 @@ public void commitOffset(CommitOffsetRequest
offsetRequest) throws JobException
}
}
+ /**
+ * Check data quality before commit offset
+ */
+ private void checkDataQuality(CommitOffsetRequest offsetRequest) throws
JobException {
+ String maxFilterRatioStr =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+ if (maxFilterRatioStr == null) {
+ return;
+ }
+ Double maxFilterRatio = Double.parseDouble(maxFilterRatioStr.trim());
Review Comment:
Potential NumberFormatException is not handled when parsing
`maxFilterRatioStr`. If a user provides an invalid value (e.g., "abc" or
"1.2.3"), the job will crash with an unhandled exception. Consider wrapping the
`parseDouble` call in a try-catch block and providing a more informative error
message, or validating the format earlier during job creation/alteration.
```suggestion
Double maxFilterRatio;
try {
maxFilterRatio = Double.parseDouble(maxFilterRatioStr.trim());
} catch (NumberFormatException e) {
throw new JobException("Invalid max filter ratio '" +
maxFilterRatioStr
+ "'. It must be a numeric value between 0 and 1.", e);
}
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy:
##########
@@ -99,7 +99,9 @@ suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,exter
select loadStatistic, status, currentOffset from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfoBeforeRestart: " + jobInfoBeforeRestart)
- assert jobInfoBeforeRestart.get(0).get(0) ==
"{\"scannedRows\":2,\"loadBytes\":94,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStatBefore = parseJson(jobInfoBeforeRestart.get(0).get(0))
+ assert loadStatBefore.scannedRows == 2
+ assert loadStatBefore.loadBytes == 95
Review Comment:
The expected `loadBytes` values changed from 94 to 95 (1 byte difference) in
multiple assertions. This suggests a change in byte counting logic. Document
the reason for this change to help with future debugging.
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy:
##########
@@ -155,7 +155,9 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"scannedRows\":7,\"loadBytes\":334,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 7
Review Comment:
The expected `loadBytes` value changed from 334 to 338 (4 bytes difference).
Similar to other test files, this change should be documented to provide
context for the byte count modification.
```suggestion
assert loadStat.scannedRows == 7
// Note: expected loadBytes was updated from 334 to 338 due to
changes in the
// test data / binlog content for this CDC job. Keep this value in
sync with
// the actual loadBytes reported by the job to ensure the assertion
remains valid.
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -157,6 +158,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("tprops")
private Map<String, String> targetProperties;
+ // The sampling window starts at the beginning of the sampling window.
+ // If the error rate exceeds `max_filter_ratio` within the window, the
sampling fails.
+ @Setter
+ private long sampleStartTime;
+ private long sampleWindowMs;
+ private long sampleWindowScannedRows;
+ private long sampleWindowFilteredRows;
Review Comment:
The sampling window tracking variables (`sampleWindowScannedRows` and
`sampleWindowFilteredRows`) are not marked with any synchronization annotations
or volatile keywords, but they are accessed and modified in `checkDataQuality`
which may be called concurrently. Consider adding proper synchronization or
using AtomicLong to ensure thread safety, as this is critical for accurate data
quality monitoring.
```suggestion
private volatile long sampleStartTime;
private volatile long sampleWindowMs;
private volatile long sampleWindowScannedRows;
private volatile long sampleWindowFilteredRows;
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws
JobException {
return request;
}
+ private Map<String, String> generateStreamLoadProps() {
+ Map<String, String> streamLoadProps = new HashMap<>();
+ String maxFilterRadio =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+ if (StringUtils.isNotEmpty(maxFilterRadio) &&
Double.parseDouble(maxFilterRadio) > 0) {
+ // If `load.max_filter_radio` is set, it is calculated on the job
side based on a window;
+ // the `max_filter_radio` of the streamload must be 1.
Review Comment:
The comment contains a typo: "max_filter_radio" should be
"max_filter_ratio". This typo should be corrected to match the actual property
name and avoid confusion.
```suggestion
// If `load.max_filter_ratio` is set, it is calculated on the
job side based on a window;
// the `max_filter_ratio` of the streamload must be 1.
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws
JobException {
return request;
}
+ private Map<String, String> generateStreamLoadProps() {
+ Map<String, String> streamLoadProps = new HashMap<>();
+ String maxFilterRadio =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+ if (StringUtils.isNotEmpty(maxFilterRadio) &&
Double.parseDouble(maxFilterRadio) > 0) {
+ // If `load.max_filter_radio` is set, it is calculated on the job
side based on a window;
+ // the `max_filter_radio` of the streamload must be 1.
Review Comment:
The comment contains a typo: "max_filter_radio" should be "max_filter_ratio"
to match the actual property name.
```suggestion
// If `load.max_filter_ratio` is set, it is calculated on the
job side based on a window;
// the `max_filter_ratio` of the streamload must be 1.
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws
JobException {
return request;
}
+ private Map<String, String> generateStreamLoadProps() {
+ Map<String, String> streamLoadProps = new HashMap<>();
+ String maxFilterRadio =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+ if (StringUtils.isNotEmpty(maxFilterRadio) &&
Double.parseDouble(maxFilterRadio) > 0) {
Review Comment:
Potential NumberFormatException is not handled when parsing `maxFilterRadio`
using `Double.parseDouble()`. If an invalid value is provided (e.g.,
non-numeric string), this will throw an uncaught exception during task
execution. Consider adding validation or exception handling to gracefully
handle invalid configuration values.
```suggestion
double maxFilterRatioValue = 0;
if (StringUtils.isNotEmpty(maxFilterRadio)) {
try {
maxFilterRatioValue = Double.parseDouble(maxFilterRadio);
} catch (NumberFormatException e) {
log.warn("Invalid value for max filter ratio '{}'. It must
be a numeric value. Ignoring this configuration.",
maxFilterRadio, e);
}
}
if (maxFilterRatioValue > 0) {
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy:
##########
@@ -84,7 +84,7 @@ suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,exter
def jobSuccendCount = sql """ select
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
// check job status and succeed task count larger
than 2
Review Comment:
The expected task count threshold changed from '1' to '2', meaning the test
now requires at least 2 successful tasks instead of 1. This could make the test
more fragile or flaky if task execution is unpredictable. Consider documenting
why this change was necessary, or if it's related to the new filtering
statistics being tracked.
```suggestion
// Require at least 2 successful tasks so that
the job has fully consumed
// the fixed test input (see assertions on
scannedRows == 2 and loadBytes == 95
// below). This ensures loadStatistic has
reached its final state before we
// capture jobInfoBeforeRestart, and avoids
validating intermediate/partial
// task statistics.
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws
JobException {
return request;
}
+ private Map<String, String> generateStreamLoadProps() {
+ Map<String, String> streamLoadProps = new HashMap<>();
+ String maxFilterRadio =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+ if (StringUtils.isNotEmpty(maxFilterRadio) &&
Double.parseDouble(maxFilterRadio) > 0) {
+ // If `load.max_filter_radio` is set, it is calculated on the job
side based on a window;
+ // the `max_filter_radio` of the streamload must be 1.
Review Comment:
Variable name `maxFilterRadio` is a typo - it should be `maxFilterRatio`.
This typo could lead to confusion and should be corrected for consistency with
the property name `MAX_FILTER_RATIO_PROPERTY` and the field name in other parts
of the code.
```suggestion
String maxFilterRatio =
targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
if (StringUtils.isNotEmpty(maxFilterRatio) &&
Double.parseDouble(maxFilterRatio) > 0) {
// If `load.max_filter_ratio` is set, it is calculated on the
job side based on a window;
// the `max_filter_ratio` of the streamload must be 1.
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy:
##########
@@ -148,7 +148,9 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"scannedRows\":7,\"loadBytes\":337,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 7
Review Comment:
The expected `loadBytes` value changed from 337 to 341 (4 bytes difference).
Similar to the change in test_streaming_postgres_job_split.groovy, this
modification should be documented to explain the reason for the byte count
difference.
```suggestion
assert loadStat.scannedRows == 7
// Note: expected loadBytes was updated from 337 to 341 (4 bytes
increase),
// consistent with test_streaming_postgres_job_split.groovy, due to
changes
// in the captured CDC payload/format affecting the byte count.
```
##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy:
##########
@@ -103,8 +103,10 @@ suite("test_streaming_postgres_job_split",
"p0,external,pg,external_docker,exter
select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"scannedRows\":5,\"loadBytes\":270,\"fileNumber\":0,\"fileSize\":0}"
-
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 5
Review Comment:
The expected `loadBytes` value changed from 270 to 273 (3 bytes difference).
This change should be documented or explained in the PR description or test
comments, as it suggests a change in how bytes are calculated. Without context,
it's unclear if this is an intentional correction or an unintended side effect
of the changes.
```suggestion
assert loadStat.scannedRows == 5
// Note: The expected loadBytes value is 273 for the current
snapshot data and
// serialization/calculation logic (updated from 270). If the test
data or the
// way snapshot records are encoded changes, this value may need to
be adjusted.
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -59,9 +58,9 @@ public static void validateSource(Map<String, String> input)
throws IllegalArgum
public static void validateTarget(Map<String, String> input) throws
IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
- if (!key.startsWith(StreamingJobUtils.TABLE_PROPS_PREFIX)) {
- throw new IllegalArgumentException("Only support target
properties with prefix "
- + StreamingJobUtils.TABLE_PROPS_PREFIX);
+ if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)
+ && !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
+ throw new IllegalArgumentException("Not support target
properties key " + key);
Review Comment:
The `validateTarget` method allows any key starting with
`DataSourceConfigKeys.LOAD_PROPERTIES` prefix without validating the actual
property names or values. This means invalid properties like
`load.invalid_property` would be accepted. Consider adding validation for
specific allowed load properties (e.g., `max_filter_ratio`, `strict_mode`) and
their value formats to provide better error messages at job creation time
rather than at runtime.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java:
##########
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.sink;
+
+import lombok.Data;
+
+@Data
+public class LoadStatistic {
+ private long filteredRows = 0;
+ private long loadedRows = 0;
+ private long loadBytes = 0;
+
+ public void add(RespContent respContent) {
+ this.filteredRows += respContent.getNumberFilteredRows();
+ this.loadedRows += respContent.getNumberLoadedRows();
+ this.loadBytes += respContent.getLoadBytes();
+ }
+
+ public void clear() {
Review Comment:
The `loadStatistic` field in `DorisBatchStreamLoad` is accessed concurrently
from multiple threads (load thread calls `add()` and main thread calls
`getLoadStatistic()`/`clear()`). The field itself uses Lombok's `@Getter`
annotation and manual methods, but the `LoadStatistic` class has no
synchronization on its fields. Consider making `LoadStatistic.add()`
synchronized, or using AtomicLong fields to ensure thread-safe updates to
`filteredRows`, `loadedRows`, and `loadBytes`.
```suggestion
public synchronized void add(RespContent respContent) {
this.filteredRows += respContent.getNumberFilteredRows();
this.loadedRows += respContent.getNumberLoadedRows();
this.loadBytes += respContent.getLoadBytes();
}
public synchronized void clear() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]