Copilot commented on code in PR #60473:
URL: https://github.com/apache/doris/pull/60473#discussion_r2762868559


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1129,6 +1142,55 @@ public void commitOffset(CommitOffsetRequest 
offsetRequest) throws JobException
         }
     }
 
+    /**
+     * Check data quality before commit offset
+     */
+    private void checkDataQuality(CommitOffsetRequest offsetRequest) throws 
JobException {
+        String maxFilterRatioStr =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+        if (maxFilterRatioStr == null) {
+            return;
+        }
+        Double maxFilterRatio = Double.parseDouble(maxFilterRatioStr.trim());

Review Comment:
   Potential NumberFormatException is not handled when parsing 
`maxFilterRatioStr`. If a user provides an invalid value (e.g., "abc" or 
"1.2.3"), the job will crash with an unhandled exception. Consider wrapping the 
`parseDouble` call in a try-catch block and providing a more informative error 
message, or validating the format earlier during job creation/alteration.
   ```suggestion
           Double maxFilterRatio;
           try {
               maxFilterRatio = Double.parseDouble(maxFilterRatioStr.trim());
           } catch (NumberFormatException e) {
               throw new JobException("Invalid max filter ratio '" + 
maxFilterRatioStr
                       + "'. It must be a numeric value between 0 and 1.", e);
           }
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy:
##########
@@ -99,7 +99,9 @@ suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,exter
             select loadStatistic, status, currentOffset from 
jobs("type"="insert") where Name='${jobName}'
             """
             log.info("jobInfoBeforeRestart: " + jobInfoBeforeRestart)
-            assert jobInfoBeforeRestart.get(0).get(0) == 
"{\"scannedRows\":2,\"loadBytes\":94,\"fileNumber\":0,\"fileSize\":0}"
+            def loadStatBefore = parseJson(jobInfoBeforeRestart.get(0).get(0))
+            assert loadStatBefore.scannedRows == 2
+            assert loadStatBefore.loadBytes == 95

Review Comment:
   The expected `loadBytes` values changed from 94 to 95 (1 byte difference) in 
multiple assertions. This suggests a change in byte counting logic. Document 
the reason for this change to help with future debugging.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy:
##########
@@ -155,7 +155,9 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
         select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
         """
         log.info("jobInfo: " + jobInfo)
-        assert jobInfo.get(0).get(0) == 
"{\"scannedRows\":7,\"loadBytes\":334,\"fileNumber\":0,\"fileSize\":0}"
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 7

Review Comment:
   The expected `loadBytes` value changed from 334 to 338 (4 bytes difference). 
Similar to other test files, this change should be documented to provide 
context for the byte count modification.
   ```suggestion
           assert loadStat.scannedRows == 7
           // Note: expected loadBytes was updated from 334 to 338 due to 
changes in the
           // test data / binlog content for this CDC job. Keep this value in 
sync with
           // the actual loadBytes reported by the job to ensure the assertion 
remains valid.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -157,6 +158,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @SerializedName("tprops")
     private Map<String, String> targetProperties;
 
+    // The sampling window starts at the beginning of the sampling window.
+    // If the error rate exceeds `max_filter_ratio` within the window, the 
sampling fails.
+    @Setter
+    private long sampleStartTime;
+    private long sampleWindowMs;
+    private long sampleWindowScannedRows;
+    private long sampleWindowFilteredRows;

Review Comment:
   The sampling window tracking variables (`sampleWindowScannedRows` and 
`sampleWindowFilteredRows`) are not marked with any synchronization annotations 
or volatile keywords, but they are accessed and modified in `checkDataQuality` 
which may be called concurrently. Consider adding proper synchronization or 
using AtomicLong to ensure thread safety, as this is critical for accurate data 
quality monitoring.
   ```suggestion
       private volatile long sampleStartTime;
       private volatile long sampleWindowMs;
       private volatile long sampleWindowScannedRows;
       private volatile long sampleWindowFilteredRows;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws 
JobException {
         return request;
     }
 
+    private Map<String, String> generateStreamLoadProps() {
+        Map<String, String> streamLoadProps = new HashMap<>();
+        String maxFilterRadio =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+        if (StringUtils.isNotEmpty(maxFilterRadio) && 
Double.parseDouble(maxFilterRadio) > 0) {
+            // If `load.max_filter_radio` is set, it is calculated on the job 
side based on a window;
+            // the `max_filter_radio` of the streamload must be 1.

Review Comment:
   The comment contains a typo: "max_filter_radio" should be 
"max_filter_ratio". This typo should be corrected to match the actual property 
name and avoid confusion.
   ```suggestion
               // If `load.max_filter_ratio` is set, it is calculated on the 
job side based on a window;
               // the `max_filter_ratio` of the streamload must be 1.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws 
JobException {
         return request;
     }
 
+    private Map<String, String> generateStreamLoadProps() {
+        Map<String, String> streamLoadProps = new HashMap<>();
+        String maxFilterRadio =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+        if (StringUtils.isNotEmpty(maxFilterRadio) && 
Double.parseDouble(maxFilterRadio) > 0) {
+            // If `load.max_filter_radio` is set, it is calculated on the job 
side based on a window;
+            // the `max_filter_radio` of the streamload must be 1.

Review Comment:
   The comment contains a typo: "max_filter_radio" should be "max_filter_ratio" 
to match the actual property name.
   ```suggestion
               // If `load.max_filter_ratio` is set, it is calculated on the 
job side based on a window;
               // the `max_filter_ratio` of the streamload must be 1.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws 
JobException {
         return request;
     }
 
+    private Map<String, String> generateStreamLoadProps() {
+        Map<String, String> streamLoadProps = new HashMap<>();
+        String maxFilterRadio =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+        if (StringUtils.isNotEmpty(maxFilterRadio) && 
Double.parseDouble(maxFilterRadio) > 0) {

Review Comment:
   Potential NumberFormatException is not handled when parsing `maxFilterRadio` 
using `Double.parseDouble()`. If an invalid value is provided (e.g., 
non-numeric string), this will throw an uncaught exception during task 
execution. Consider adding validation or exception handling to gracefully 
handle invalid configuration values.
   ```suggestion
           double maxFilterRatioValue = 0;
           if (StringUtils.isNotEmpty(maxFilterRadio)) {
               try {
                   maxFilterRatioValue = Double.parseDouble(maxFilterRadio);
               } catch (NumberFormatException e) {
                   log.warn("Invalid value for max filter ratio '{}'. It must 
be a numeric value. Ignoring this configuration.",
                           maxFilterRadio, e);
               }
           }
           if (maxFilterRatioValue > 0) {
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy:
##########
@@ -84,7 +84,7 @@ suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,exter
                             def jobSuccendCount = sql """ select 
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
                             log.info("jobSuccendCount: " + jobSuccendCount)
                             // check job status and succeed task count larger 
than 2

Review Comment:
   The expected task count threshold changed from '1' to '2', meaning the test 
now requires at least 2 successful tasks instead of 1. This could make the test 
more fragile or flaky if task execution is unpredictable. Consider documenting 
why this change was necessary, or if it's related to the new filtering 
statistics being tracked.
   ```suggestion
                               // Require at least 2 successful tasks so that 
the job has fully consumed
                               // the fixed test input (see assertions on 
scannedRows == 2 and loadBytes == 95
                               // below). This ensures loadStatistic has 
reached its final state before we
                               // capture jobInfoBeforeRestart, and avoids 
validating intermediate/partial
                               // task statistics.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -189,6 +197,24 @@ private WriteRecordRequest buildRequestParams() throws 
JobException {
         return request;
     }
 
+    private Map<String, String> generateStreamLoadProps() {
+        Map<String, String> streamLoadProps = new HashMap<>();
+        String maxFilterRadio =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+        if (StringUtils.isNotEmpty(maxFilterRadio) && 
Double.parseDouble(maxFilterRadio) > 0) {
+            // If `load.max_filter_radio` is set, it is calculated on the job 
side based on a window;
+            // the `max_filter_radio` of the streamload must be 1.

Review Comment:
   Variable name `maxFilterRadio` is a typo - it should be `maxFilterRatio`. 
This typo could lead to confusion and should be corrected for consistency with 
the property name `MAX_FILTER_RATIO_PROPERTY` and the field name in other parts 
of the code.
   ```suggestion
           String maxFilterRatio =
                   targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
   
           if (StringUtils.isNotEmpty(maxFilterRatio) && 
Double.parseDouble(maxFilterRatio) > 0) {
               // If `load.max_filter_ratio` is set, it is calculated on the 
job side based on a window;
               // the `max_filter_ratio` of the streamload must be 1.
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy:
##########
@@ -148,7 +148,9 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
         """
         log.info("jobInfo: " + jobInfo)
-        assert jobInfo.get(0).get(0) == 
"{\"scannedRows\":7,\"loadBytes\":337,\"fileNumber\":0,\"fileSize\":0}"
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 7

Review Comment:
   The expected `loadBytes` value changed from 337 to 341 (4 bytes difference). 
Similar to the change in test_streaming_postgres_job_split.groovy, this 
modification should be documented to explain the reason for the byte count 
difference.
   ```suggestion
           assert loadStat.scannedRows == 7
           // Note: expected loadBytes was updated from 337 to 341 (4 bytes 
increase),
           // consistent with test_streaming_postgres_job_split.groovy, due to 
changes
           // in the captured CDC payload/format affecting the byte count.
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy:
##########
@@ -103,8 +103,10 @@ suite("test_streaming_postgres_job_split", 
"p0,external,pg,external_docker,exter
         select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
         """
         log.info("jobInfo: " + jobInfo)
-        assert jobInfo.get(0).get(0) == 
"{\"scannedRows\":5,\"loadBytes\":270,\"fileNumber\":0,\"fileSize\":0}"
-     
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 5

Review Comment:
   The expected `loadBytes` value changed from 270 to 273 (3 bytes difference). 
This change should be documented or explained in the PR description or test 
comments, as it suggests a change in how bytes are calculated. Without context, 
it's unclear if this is an intentional correction or an unintended side effect 
of the changes.
   ```suggestion
           assert loadStat.scannedRows == 5
           // Note: The expected loadBytes value is 273 for the current 
snapshot data and
           // serialization/calculation logic (updated from 270). If the test 
data or the
           // way snapshot records are encoded changes, this value may need to 
be adjusted.
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -59,9 +58,9 @@ public static void validateSource(Map<String, String> input) 
throws IllegalArgum
     public static void validateTarget(Map<String, String> input) throws 
IllegalArgumentException {
         for (Map.Entry<String, String> entry : input.entrySet()) {
             String key = entry.getKey();
-            if (!key.startsWith(StreamingJobUtils.TABLE_PROPS_PREFIX)) {
-                throw new IllegalArgumentException("Only support target 
properties with prefix "
-                        + StreamingJobUtils.TABLE_PROPS_PREFIX);
+            if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)
+                    && !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
+                throw new IllegalArgumentException("Not support target 
properties key " + key);

Review Comment:
   The `validateTarget` method allows any key starting with 
`DataSourceConfigKeys.LOAD_PROPERTIES` prefix without validating the actual 
property names or values. This means invalid properties like 
`load.invalid_property` would be accepted. Consider adding validation for 
specific allowed load properties (e.g., `max_filter_ratio`, `strict_mode`) and 
their value formats to provide better error messages at job creation time 
rather than at runtime.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java:
##########
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.sink;
+
+import lombok.Data;
+
+@Data
+public class LoadStatistic {
+    private long filteredRows = 0;
+    private long loadedRows = 0;
+    private long loadBytes = 0;
+
+    public void add(RespContent respContent) {
+        this.filteredRows += respContent.getNumberFilteredRows();
+        this.loadedRows += respContent.getNumberLoadedRows();
+        this.loadBytes += respContent.getLoadBytes();
+    }
+
+    public void clear() {

Review Comment:
   The `loadStatistic` field in `DorisBatchStreamLoad` is accessed concurrently 
from multiple threads (load thread calls `add()` and main thread calls 
`getLoadStatistic()`/`clear()`). The field itself uses Lombok's `@Getter` 
annotation and manual methods, but the `LoadStatistic` class has no 
synchronization on its fields. Consider making `LoadStatistic.add()` 
synchronized, or using AtomicLong fields to ensure thread-safe updates to 
`filteredRows`, `loadedRows`, and `loadBytes`.
   ```suggestion
       public synchronized void add(RespContent respContent) {
           this.filteredRows += respContent.getNumberFilteredRows();
           this.loadedRows += respContent.getNumberLoadedRows();
           this.loadBytes += respContent.getLoadBytes();
       }
   
       public synchronized void clear() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to