This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b31a9572689 branch-4.0: [Improve](StreamingJob) add max_filter_ratio 
and strict mode for mysql/pg streaming job  #60473 (#60527)
b31a9572689 is described below

commit b31a957268965deedb521ff8e83fbe12af682fc7
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 5 17:42:22 2026 +0800

    branch-4.0: [Improve](StreamingJob) add max_filter_ratio and strict mode 
for mysql/pg streaming job  #60473 (#60527)
    
    Cherry-picked from #60473
    
    Co-authored-by: wudi <[email protected]>
---
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |  4 ++
 ...RecordRequest.java => CommitOffsetRequest.java} | 26 +++++---
 .../doris/job/cdc/request/FetchRecordRequest.java  | 12 ----
 .../job/cdc/request/JobBaseRecordRequest.java      |  4 --
 .../doris/job/cdc/request/WriteRecordRequest.java  | 13 +---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  2 +-
 .../doris/httpv2/rest/StreamingJobAction.java      | 17 +----
 .../streaming/DataSourceConfigValidator.java       | 16 +++--
 .../insert/streaming/StreamingInsertJob.java       | 74 ++++++++++++++++++++--
 .../streaming/StreamingJobSchedulerTask.java       |  1 +
 .../insert/streaming/StreamingJobStatistic.java    |  3 +
 .../insert/streaming/StreamingMultiTblTask.java    | 37 +++++++++--
 .../apache/doris/job/util/StreamingJobUtils.java   |  5 +-
 .../cdcclient/service/PipelineCoordinator.java     | 10 +--
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java | 31 ++++++---
 .../doris/cdcclient/sink/HttpPutBuilder.java       |  9 +--
 .../apache/doris/cdcclient/sink/LoadStatistic.java | 25 ++++----
 .../cdc/test_streaming_mysql_job_errormsg.out      |  4 ++
 .../cdc/test_streaming_mysql_job.groovy            |  4 +-
 .../test_streaming_mysql_job_create_alter.groovy   |  2 +-
 .../cdc/test_streaming_mysql_job_errormsg.groovy   | 45 ++++++++++++-
 .../cdc/test_streaming_mysql_job_restart_fe.groovy | 10 ++-
 .../cdc/test_streaming_postgres_job.groovy         |  4 +-
 .../cdc/test_streaming_postgres_job_split.groovy   |  6 +-
 .../streaming_job/test_streaming_insert_job.groovy |  6 +-
 .../test_streaming_insert_job_offset.groovy        | 12 +++-
 ...st_streaming_job_alter_offset_restart_fe.groovy | 24 +++++--
 .../test_streaming_job_restart_fe.groovy           | 12 +++-
 28 files changed, 300 insertions(+), 118 deletions(-)

diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index d29cdef0766..17c5d7d575e 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -35,4 +35,8 @@ public class DataSourceConfigKeys {
     public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
     public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
     public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+
+    // target properties
+    public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
+    public static final String LOAD_PROPERTIES = "load.";
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
similarity index 69%
copy from 
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
copy to 
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
index a9a1be374db..3d2d221ea49 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
@@ -17,19 +17,25 @@
 
 package org.apache.doris.job.cdc.request;
 
-import lombok.EqualsAndHashCode;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
 import lombok.Setter;
-
-import java.util.Map;
+import lombok.ToString;
 
 @Getter
 @Setter
-@EqualsAndHashCode(callSuper = true)
-public abstract class JobBaseRecordRequest extends JobBaseConfig {
-    protected Map<String, Object> meta;
-
-    public abstract boolean isReload();
-
-    public abstract int getFetchSize();
+@NoArgsConstructor
+@ToString
+@AllArgsConstructor
+@Builder
+public class CommitOffsetRequest {
+    public long jobId;
+    public long taskId;
+    public String offset;
+    public long scannedRows;
+    public long filteredRows;
+    public long loadedRows;
+    public long loadBytes;
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
index f11539e6832..7ed28d618fd 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
@@ -23,16 +23,4 @@ import lombok.EqualsAndHashCode;
 @Data
 @EqualsAndHashCode(callSuper = true)
 public class FetchRecordRequest extends JobBaseRecordRequest {
-    private boolean reload = true;
-    private int fetchSize;
-
-    @Override
-    public boolean isReload() {
-        return reload;
-    }
-
-    @Override
-    public int getFetchSize() {
-        return fetchSize;
-    }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
index a9a1be374db..282913e2dd2 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
@@ -28,8 +28,4 @@ import java.util.Map;
 @EqualsAndHashCode(callSuper = true)
 public abstract class JobBaseRecordRequest extends JobBaseConfig {
     protected Map<String, Object> meta;
-
-    public abstract boolean isReload();
-
-    public abstract int getFetchSize();
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index a75edfcf7fb..195125b1c66 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -20,6 +20,8 @@ package org.apache.doris.job.cdc.request;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
+import java.util.Map;
+
 @Data
 @EqualsAndHashCode(callSuper = true)
 public class WriteRecordRequest extends JobBaseRecordRequest {
@@ -28,14 +30,5 @@ public class WriteRecordRequest extends JobBaseRecordRequest 
{
     private String token;
     private String frontendAddress;
     private String taskId;
-
-    @Override
-    public boolean isReload() {
-        return true;
-    }
-
-    @Override
-    public int getFetchSize() {
-        return Integer.MAX_VALUE;
-    }
+    private Map<String, String> streamLoadProps;
 }
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index abef3617806..2985b77582f 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -100,7 +100,7 @@ materializedViewStatement
     ;
 
 jobFromToClause
-    : FROM sourceType=identifier LEFT_PAREN sourceProperties=propertyItemList 
RIGHT_PAREN
+    : FROM sourceType=identifier (LEFT_PAREN sourceProperties=propertyItemList 
RIGHT_PAREN)?
       TO DATABASE targetDb=identifier (LEFT_PAREN 
targetProperties=propertyItemList RIGHT_PAREN)?
     ;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
index 573e0a17f16..34d7abe3133 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
@@ -21,14 +21,11 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
 import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 
 import com.google.common.base.Strings;
 import jakarta.servlet.http.HttpServletRequest;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -77,16 +74,4 @@ public class StreamingJobAction extends RestBaseController {
             return ResponseEntityBuilder.okWithCommonError(e.getMessage());
         }
     }
-
-    @Getter
-    @Setter
-    @NoArgsConstructor
-    @ToString
-    public static class CommitOffsetRequest {
-        public long jobId;
-        public long taskId;
-        public String offset;
-        public long scannedRows;
-        public long scannedBytes;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index ddaf8456271..cb7ec530fcf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -18,7 +18,7 @@
 package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
-import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 
 import com.google.common.collect.Sets;
 
@@ -59,9 +59,17 @@ public class DataSourceConfigValidator {
     public static void validateTarget(Map<String, String> input) throws 
IllegalArgumentException {
         for (Map.Entry<String, String> entry : input.entrySet()) {
             String key = entry.getKey();
-            if (!key.startsWith(StreamingJobUtils.TABLE_PROPS_PREFIX)) {
-                throw new IllegalArgumentException("Only support target 
properties with prefix "
-                        + StreamingJobUtils.TABLE_PROPS_PREFIX);
+            if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)
+                    && !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
+                throw new IllegalArgumentException("Not support target 
properties key " + key);
+            }
+
+            if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY)) {
+                try {
+                    Double.parseDouble(entry.getValue());
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid value for key 
'" + key + "': " + entry.getValue());
+                }
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 07ede21a5b8..43fe652ee06 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -32,11 +32,11 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.httpv2.rest.StreamingJobAction.CommitOffsetRequest;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecutionConfiguration;
 import org.apache.doris.job.base.TimerDefinition;
 import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
 import org.apache.doris.job.common.DataSourceType;
 import org.apache.doris.job.common.FailureReason;
 import org.apache.doris.job.common.IntervalUnit;
@@ -60,6 +60,7 @@ import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils;
@@ -157,6 +158,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @SerializedName("tprops")
     private Map<String, String> targetProperties;
 
+    // The sampling window starts at the beginning of the sampling window.
+    // If the error rate exceeds `max_filter_ratio` within the window, the 
sampling fails.
+    @Setter
+    private long sampleStartTime;
+    private long sampleWindowMs;
+    private long sampleWindowScannedRows;
+    private long sampleWindowFilteredRows;
+
     public StreamingInsertJob(String jobName,
             JobStatus jobStatus,
             String dbName,
@@ -260,6 +269,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         try {
             this.jobProperties = new StreamingJobProperties(properties);
             jobProperties.validate();
+            this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 * 
1000;
             // build time definition
             JobExecutionConfiguration execConfig = getJobConfig();
             TimerDefinition timerDefinition = new TimerDefinition();
@@ -373,7 +383,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
         // update target properties
         if (!alterJobCommand.getTargetProperties().isEmpty()) {
-            
this.sourceProperties.putAll(alterJobCommand.getTargetProperties());
+            
this.targetProperties.putAll(alterJobCommand.getTargetProperties());
             logParts.add("target properties: " + 
alterJobCommand.getTargetProperties());
         }
         log.info("Alter streaming job {}, {}", getJobId(), String.join(", ", 
logParts));
@@ -621,7 +631,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             this.jobStatistic = new StreamingJobStatistic();
         }
         this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() + 
offsetRequest.getScannedRows());
-        this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
offsetRequest.getScannedBytes());
+        this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
offsetRequest.getLoadBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
     }
 
@@ -631,7 +641,9 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
         this.nonTxnJobStatistic
                 .setScannedRows(this.nonTxnJobStatistic.getScannedRows() + 
offsetRequest.getScannedRows());
-        
this.nonTxnJobStatistic.setLoadBytes(this.nonTxnJobStatistic.getLoadBytes() + 
offsetRequest.getScannedBytes());
+        this.nonTxnJobStatistic
+                .setFilteredRows(this.nonTxnJobStatistic.getFilteredRows() + 
offsetRequest.getFilteredRows());
+        
this.nonTxnJobStatistic.setLoadBytes(this.nonTxnJobStatistic.getLoadBytes() + 
offsetRequest.getLoadBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
     }
 
@@ -1104,6 +1116,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             throw new JobException("Unsupported commit offset for offset 
provider type: "
                     + offsetProvider.getClass().getSimpleName());
         }
+
         writeLock();
         try {
             if (this.runningStreamTask != null
@@ -1112,12 +1125,12 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                     throw new JobException("Task id mismatch when commit 
offset. expected: "
                             + this.runningStreamTask.getTaskId() + ", actual: 
" + offsetRequest.getTaskId());
                 }
+                checkDataQuality(offsetRequest);
                 updateNoTxnJobStatisticAndOffset(offsetRequest);
-                if (offsetRequest.getScannedRows() == 0 && 
offsetRequest.getScannedBytes() == 0) {
+                if (offsetRequest.getScannedRows() == 0 && 
offsetRequest.getLoadBytes() == 0) {
                     JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) 
offsetProvider;
                     op.setHasMoreData(false);
                 }
-
                 persistOffsetProviderIfNeed();
                 log.info("Streaming multi table job {} task {} commit offset 
successfully, offset: {}",
                         getJobId(), offsetRequest.getTaskId(), 
offsetRequest.getOffset());
@@ -1129,6 +1142,55 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
     }
 
+    /**
+     * Check data quality before commit offset
+     */
+    private void checkDataQuality(CommitOffsetRequest offsetRequest) throws 
JobException {
+        String maxFilterRatioStr =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+        if (maxFilterRatioStr == null) {
+            return;
+        }
+        Double maxFilterRatio = Double.parseDouble(maxFilterRatioStr.trim());
+        if (maxFilterRatio < 0 || maxFilterRatio > 1) {
+            log.warn("invalid max filter ratio {}, skip data quality check", 
maxFilterRatio);
+            return;
+        }
+
+        this.sampleWindowScannedRows += offsetRequest.getScannedRows();
+        this.sampleWindowFilteredRows += offsetRequest.getFilteredRows();
+
+        if (sampleWindowScannedRows <= 0) {
+            return;
+        }
+
+        double ratio = (double) sampleWindowFilteredRows / (double) 
sampleWindowScannedRows;
+
+        if (ratio > maxFilterRatio) {
+            String msg = String.format(
+                    "data quality check failed for streaming multi table job 
%d (within sample window): "
+                            + "window filtered/scanned=%.6f > 
maxFilterRatio=%.6f "
+                            + "(windowFiltered=%d, windowScanned=%d)",
+                    getJobId(), ratio, maxFilterRatio, 
sampleWindowFilteredRows, sampleWindowScannedRows);
+            log.error(msg);
+            FailureReason failureReason = new 
FailureReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR,
+                    "too many filtered rows exceeded max_filter_ratio " + 
maxFilterRatio);
+            this.setFailureReason(failureReason);
+            this.updateJobStatus(JobStatus.PAUSED);
+            throw new JobException(failureReason.getMsg());
+        }
+
+        long now = System.currentTimeMillis();
+
+        if ((now - sampleStartTime) > sampleWindowMs) {
+            this.sampleStartTime = now;
+            this.sampleWindowScannedRows = 0L;
+            this.sampleWindowFilteredRows = 0L;
+            log.info("streaming multi table job {} enter next sample window, 
startTime={}",
+                    getJobId(), TimeUtils.longToTimeString(sampleStartTime));
+        }
+    }
+
     private void persistOffsetProviderIfNeed() {
         // only for jdbc
         this.offsetProviderPersist = offsetProvider.getPersistInfo();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 942a25812db..8df18f1ee63 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -67,6 +67,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
         }
         streamingInsertJob.replayOffsetProviderIfNeed();
         streamingInsertJob.createStreamingTask();
+        streamingInsertJob.setSampleStartTime(System.currentTimeMillis());
         streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
         streamingInsertJob.setAutoResumeCount(0);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
index 71bf9e6f065..80d0da3de23 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
@@ -38,6 +38,9 @@ public class StreamingJobStatistic {
     @Getter
     @Setter
     private long fileSize;
+    @Getter
+    @Setter
+    private long filteredRows;
 
     public String toJson() {
         return new Gson().toJson(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 10364fe465f..0bdd6262864 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -23,8 +23,9 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.httpv2.entity.ResponseBody;
 import org.apache.doris.httpv2.rest.RestApiStatusCode;
-import org.apache.doris.httpv2.rest.StreamingJobAction.CommitOffsetRequest;
 import org.apache.doris.job.base.Job;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
 import org.apache.doris.job.cdc.split.SnapshotSplit;
@@ -35,6 +36,7 @@ import org.apache.doris.job.offset.SourceOffsetProvider;
 import org.apache.doris.job.offset.jdbc.JdbcOffset;
 import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
 import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
 import org.apache.doris.rpc.BackendServiceProxy;
@@ -51,6 +53,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.Gson;
 import lombok.Getter;
 import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.HashMap;
 import java.util.List;
@@ -69,13 +72,16 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     private String targetDb;
     private StreamingJobProperties jobProperties;
     private long scannedRows = 0L;
-    private long scannedBytes = 0L;
+    private long loadBytes = 0L;
+    private long filteredRows = 0L;
+    private long loadedRows = 0L;
     private long timeoutMs;
     private long runningBackendId;
 
     public StreamingMultiTblTask(Long jobId,
             long taskId,
             DataSourceType dataSourceType,
+
             SourceOffsetProvider offsetProvider,
             Map<String, String> sourceProperties,
             String targetDb,
@@ -180,6 +186,9 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         request.setToken(getToken());
         request.setTargetDb(targetDb);
 
+        Map<String, String> props = generateStreamLoadProps();
+        request.setStreamLoadProps(props);
+
         Map<String, Object> splitMeta = offset.generateMeta();
         Preconditions.checkArgument(!splitMeta.isEmpty(), "split meta is 
empty");
         request.setMeta(splitMeta);
@@ -189,6 +198,24 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         return request;
     }
 
+    private Map<String, String> generateStreamLoadProps() {
+        Map<String, String> streamLoadProps = new HashMap<>();
+        String maxFilterRatio =
+                targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+        if (StringUtils.isNotEmpty(maxFilterRatio) && 
Double.parseDouble(maxFilterRatio) > 0) {
+            // If `load.max_filter_ratio` is set, it is calculated on the job 
side based on a window;
+            // the `max_filter_ratio` of the streamload must be 1.
+            streamLoadProps.put(LoadCommand.MAX_FILTER_RATIO_PROPERTY, "1");
+        }
+
+        String strictMode = 
targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES + 
LoadCommand.STRICT_MODE);
+        if (StringUtils.isNotEmpty(strictMode)) {
+            streamLoadProps.put(LoadCommand.STRICT_MODE, strictMode);
+        }
+        return streamLoadProps;
+    }
+
     @Override
     public boolean onSuccess() throws JobException {
         if (getIsCanceled().get()) {
@@ -246,7 +273,9 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         }
 
         this.scannedRows = offsetRequest.getScannedRows();
-        this.scannedBytes = offsetRequest.getScannedBytes();
+        this.loadBytes = offsetRequest.getLoadBytes();
+        this.filteredRows = offsetRequest.getFilteredRows();
+        this.loadedRows = offsetRequest.getLoadedRows();
         Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
         if (null == job) {
             log.info("job is null, job id is {}", jobId);
@@ -330,7 +359,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
         trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         Map<String, Object> statistic = new HashMap<>();
         statistic.put("scannedRows", scannedRows);
-        statistic.put("loadBytes", scannedBytes);
+        statistic.put("loadBytes", loadBytes);
         trow.addToColumnValue(new TCell().setStringVal(new 
Gson().toJson(statistic)));
 
         if (this.getUserIdentity() == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 4164dcaa262..1a1fb68fe82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -70,7 +70,6 @@ import java.util.stream.Collectors;
 
 @Log4j2
 public class StreamingJobUtils {
-    public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
     public static final String INTERNAL_STREAMING_JOB_META_TABLE_NAME = 
"streaming_job_meta";
     public static final String FULL_QUALIFIED_META_TBL_NAME = 
InternalCatalog.INTERNAL_CATALOG_NAME
             + "." + FeConstants.INTERNAL_DB_NAME + "." + 
INTERNAL_STREAMING_JOB_META_TABLE_NAME;
@@ -411,8 +410,8 @@ public class StreamingJobUtils {
     private static Map<String, String> getTableCreateProperties(Map<String, 
String> properties) {
         final Map<String, String> tableCreateProps = new HashMap<>();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
-            if (entry.getKey().startsWith(TABLE_PROPS_PREFIX)) {
-                String subKey = 
entry.getKey().substring(TABLE_PROPS_PREFIX.length());
+            if 
(entry.getKey().startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)) {
+                String subKey = 
entry.getKey().substring(DataSourceConfigKeys.TABLE_PROPS_PREFIX.length());
                 tableCreateProps.put(subKey, entry.getValue());
             }
         }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 3f6438d6ee5..614c506619f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -239,7 +239,6 @@ public class PipelineCoordinator {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(writeRecordRequest);
         DorisBatchStreamLoad batchStreamLoad = null;
         long scannedRows = 0L;
-        long scannedBytes = 0L;
         int heartbeatCount = 0;
         SplitReadResult readResult = null;
         try {
@@ -318,9 +317,7 @@ public class PipelineCoordinator {
                         String table = extractTable(element);
                         for (String record : serializedRecords) {
                             scannedRows++;
-                            byte[] dataBytes = record.getBytes();
-                            scannedBytes += dataBytes.length;
-                            batchStreamLoad.writeRecord(database, table, 
dataBytes);
+                            batchStreamLoad.writeRecord(database, table, 
record.getBytes());
                         }
                         // Mark last message as data (not heartbeat)
                         lastMessageIsHeartbeat = false;
@@ -349,7 +346,8 @@ public class PipelineCoordinator {
         // The offset must be reset before commitOffset to prevent the next 
taskId from being create
         // by the fe.
         batchStreamLoad.resetTaskId();
-        batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows, 
scannedBytes);
+        batchStreamLoad.commitOffset(
+                currentTaskId, metaResponse, scannedRows, 
batchStreamLoad.getLoadStatistic());
     }
 
     public static boolean isHeartbeatEvent(SourceRecord record) {
@@ -436,6 +434,8 @@ public class PipelineCoordinator {
         batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
         
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
         batchStreamLoad.setToken(writeRecordRequest.getToken());
+        batchStreamLoad.setLoadProps(writeRecordRequest.getStreamLoadProps());
+        batchStreamLoad.getLoadStatistic().clear();
         return batchStreamLoad;
     }
 
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 926b49dcb45..92a2f9db2b6 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.sink;
 import org.apache.doris.cdcclient.common.Env;
 import org.apache.doris.cdcclient.exception.StreamLoadException;
 import org.apache.doris.cdcclient.utils.HttpUtil;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -93,9 +94,13 @@ public class DorisBatchStreamLoad implements Serializable {
     private String targetDb;
     private long jobId;
     @Setter private String token;
+    // stream load headers
+    @Setter private Map<String, String> loadProps = new HashMap<>();
+    @Getter private LoadStatistic loadStatistic;
 
     public DorisBatchStreamLoad(long jobId, String targetDb) {
         this.hostPort = Env.getCurrentEnv().getBackendHostPort();
+        this.loadStatistic = new LoadStatistic();
         this.flushQueue = new LinkedBlockingDeque<>(1);
         // maxBlockedBytes is two times of FLUSH_MAX_BYTE_SIZE
         this.maxBlockedBytes = STREAM_LOAD_MAX_BYTES * 2;
@@ -388,11 +393,11 @@ public class DorisBatchStreamLoad implements Serializable 
{
             String finalLabel = String.format("%s_%s_%s", jobId, 
currentTaskId, label);
             putBuilder
                     .setUrl(loadUrl)
+                    .addProperties(loadProps)
                     .addTokenAuth(token)
                     .setLabel(finalLabel)
                     .formatJson()
                     .addCommonHeader()
-                    .setEntity(entity)
                     .addHiddenColumns(true)
                     .setEntity(entity);
 
@@ -422,6 +427,7 @@ public class DorisBatchStreamLoad implements Serializable {
                                 } finally {
                                     lock.unlock();
                                 }
+                                loadStatistic.add(respContent);
                                 return;
                             } else {
                                 String errMsg = null;
@@ -494,16 +500,23 @@ public class DorisBatchStreamLoad implements Serializable 
{
 
     /** commit offfset to frontends. */
     public void commitOffset(
-            String taskId, List<Map<String, String>> meta, long scannedRows, 
long scannedBytes) {
+            String taskId,
+            List<Map<String, String>> meta,
+            long scannedRows,
+            LoadStatistic loadStatistic) {
         try {
             String url = String.format(COMMIT_URL_PATTERN, frontendAddress, 
targetDb);
-            Map<String, Object> commitParams = new HashMap<>();
-            commitParams.put("offset", OBJECT_MAPPER.writeValueAsString(meta));
-            commitParams.put("jobId", jobId);
-            commitParams.put("taskId", taskId);
-            commitParams.put("scannedRows", scannedRows);
-            commitParams.put("scannedBytes", scannedBytes);
-            String param = OBJECT_MAPPER.writeValueAsString(commitParams);
+            CommitOffsetRequest commitRequest =
+                    CommitOffsetRequest.builder()
+                            .offset(OBJECT_MAPPER.writeValueAsString(meta))
+                            .jobId(jobId)
+                            .taskId(Long.parseLong(taskId))
+                            .scannedRows(scannedRows)
+                            .filteredRows(loadStatistic.getFilteredRows())
+                            .loadedRows(loadStatistic.getLoadedRows())
+                            .loadBytes(loadStatistic.getLoadBytes())
+                            .build();
+            String param = OBJECT_MAPPER.writeValueAsString(commitRequest);
 
             HttpPutBuilder builder =
                     new HttpPutBuilder()
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
index b5174d82798..3abd9eaabc2 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.sink;
 import org.apache.doris.cdcclient.common.Constants;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.collections.MapUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
@@ -29,7 +30,6 @@ import org.apache.http.entity.StringEntity;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 /** Builder for HttpPut. */
 public class HttpPutBuilder {
@@ -116,9 +116,10 @@ public class HttpPutBuilder {
         return this;
     }
 
-    public HttpPutBuilder addProperties(Properties properties) {
-        // TODO: check duplicate key.
-        properties.forEach((key, value) -> header.put(String.valueOf(key), 
String.valueOf(value)));
+    public HttpPutBuilder addProperties(Map<String, String> properties) {
+        if (MapUtils.isNotEmpty(properties)) {
+            header.putAll(properties);
+        }
         return this;
     }
 
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
similarity index 62%
copy from 
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
copy to 
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
index f11539e6832..10f715c4307 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
@@ -15,24 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.sink;
 
 import lombok.Data;
-import lombok.EqualsAndHashCode;
 
 @Data
-@EqualsAndHashCode(callSuper = true)
-public class FetchRecordRequest extends JobBaseRecordRequest {
-    private boolean reload = true;
-    private int fetchSize;
+public class LoadStatistic {
+    private long filteredRows = 0;
+    private long loadedRows = 0;
+    private long loadBytes = 0;
 
-    @Override
-    public boolean isReload() {
-        return reload;
+    public void add(RespContent respContent) {
+        this.filteredRows += respContent.getNumberFilteredRows();
+        this.loadedRows += respContent.getNumberLoadedRows();
+        this.loadBytes += respContent.getLoadBytes();
     }
 
-    @Override
-    public int getFetchSize() {
-        return fetchSize;
+    public void clear() {
+        this.filteredRows = 0;
+        this.loadedRows = 0;
+        this.loadBytes = 0;
     }
 }
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.out
 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.out
new file mode 100644
index 00000000000..7f44731e15d
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_snapshot_table1 --
+AB     123
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index 2febce4cb36..29680f8506d 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -155,7 +155,9 @@ suite("test_streaming_mysql_job", 
"p0,external,mysql,external_docker,external_do
         select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
         """
         log.info("jobInfo: " + jobInfo)
-        assert jobInfo.get(0).get(0) == 
"{\"scannedRows\":7,\"loadBytes\":334,\"fileNumber\":0,\"fileSize\":0}"
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 7
+        assert loadStat.loadBytes == 338
         assert jobInfo.get(0).get(1) == "RUNNING"
 
         // mock mysql incremental into again
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
index b5451a899ef..b412c470558 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
@@ -76,7 +76,7 @@ suite("test_streaming_mysql_job_create_alter", 
"p0,external,mysql,external_docke
                   "table.create.properties1.replication_num" = "1"
                 )
             """
-            exception "Only support target properties with prefix 
table.create.properties"
+            exception "Not support target properties key 
table.create.properties1.replication_num"
         }
 
         //error jdbc url format
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
index f063b629f90..a23e5efb0bf 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -58,7 +58,8 @@ suite("test_streaming_mysql_job_errormsg", 
"p0,external,mysql,external_docker,ex
                   `age` varchar(8) NOT NULL,
                   PRIMARY KEY (`name`)
                 ) ENGINE=InnoDB"""
-            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES 
('ABCDEFG', 'abc');"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES 
('ABCDEFG1', 'abc');"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES 
('ABCDEFG2', '123');"""
         }
 
         sql """CREATE JOB ${jobName}
@@ -99,8 +100,50 @@ suite("test_streaming_mysql_job_errormsg", 
"p0,external,mysql,external_docker,ex
 
         def jobFailMsg = sql """select errorMsg  from jobs("type"="insert") 
where Name = '${jobName}' and ExecuteType='STREAMING'"""
         log.info("jobFailMsg: " + jobFailMsg)
+        // stream load error: [DATA_QUALITY_ERROR]too many filtered rows
         assert jobFailMsg.get(0).get(0).contains("stream load error")
 
+
+        // add max_filter_ratio to 1
+        sql """ALTER JOB ${jobName}
+        FROM MYSQL
+        TO DATABASE ${currentDb} (
+            "load.max_filter_ratio" = "1"
+        )"""
+
+        sql """RESUME JOB where jobname = '${jobName}'"""
+
+        // check job running
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        // check job status and succeed task count larger than 
1
+                        jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        def jobInfo = sql """
+        select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        def loadStat = parseJson(jobInfo.get(0).get(0));
+        assert loadStat.scannedRows == 2
+        assert loadStat.loadBytes == 115
+        assert loadStat.filteredRows == 1
+        assert jobInfo.get(0).get(1) == "RUNNING"
+
+        qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name 
asc """
+
         sql """
             DROP JOB IF EXISTS where jobname =  '${jobName}'
         """
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
index 8ece9f4ba74..80a18ee5a56 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
@@ -84,7 +84,7 @@ suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,exter
                             def jobSuccendCount = sql """ select 
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
                             log.info("jobSuccendCount: " + jobSuccendCount)
                             // check job status and succeed task count larger 
than 2
-                            jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
+                            jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
                         }
                 )
             } catch (Exception ex){
@@ -99,7 +99,9 @@ suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,exter
             select loadStatistic, status, currentOffset from 
jobs("type"="insert") where Name='${jobName}'
             """
             log.info("jobInfoBeforeRestart: " + jobInfoBeforeRestart)
-            assert jobInfoBeforeRestart.get(0).get(0) == 
"{\"scannedRows\":2,\"loadBytes\":94,\"fileNumber\":0,\"fileSize\":0}"
+            def loadStatBefore = parseJson(jobInfoBeforeRestart.get(0).get(0))
+            assert loadStatBefore.scannedRows == 2
+            assert loadStatBefore.loadBytes == 95
             assert jobInfoBeforeRestart.get(0).get(1) == "RUNNING"
 
             // Restart FE
@@ -112,7 +114,9 @@ suite("test_streaming_mysql_job_restart_fe", 
"docker,mysql,external_docker,exter
                 select loadStatistic, status, currentOffset from 
jobs("type"="insert") where Name='${jobName}'
             """
             log.info("jobAfterRestart: " + jobAfterRestart)
-            assert jobAfterRestart.get(0).get(0) == 
"{\"scannedRows\":2,\"loadBytes\":94,\"fileNumber\":0,\"fileSize\":0}"
+            def loadStatAfter = parseJson(jobAfterRestart.get(0).get(0))
+            assert loadStatAfter.scannedRows == 2
+            assert loadStatAfter.loadBytes == 95
             assert jobAfterRestart.get(0).get(1) == "RUNNING"
             assert jobAfterRestart.get(0).get(2) == 
jobInfoBeforeRestart.get(0).get(2)
 
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index 7fe8cb73daa..ba2c1247016 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -148,7 +148,9 @@ suite("test_streaming_postgres_job", 
"p0,external,pg,external_docker,external_do
         select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
         """
         log.info("jobInfo: " + jobInfo)
-        assert jobInfo.get(0).get(0) == 
"{\"scannedRows\":7,\"loadBytes\":337,\"fileNumber\":0,\"fileSize\":0}"
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 7
+        assert loadStat.loadBytes == 341
         assert jobInfo.get(0).get(1) == "RUNNING"
 
         // mock incremental into again
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
index ac299223101..5e01bfe58b5 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
@@ -103,8 +103,10 @@ suite("test_streaming_postgres_job_split", 
"p0,external,pg,external_docker,exter
         select loadStatistic, status from jobs("type"="insert") where 
Name='${jobName}'
         """
         log.info("jobInfo: " + jobInfo)
-        assert jobInfo.get(0).get(0) == 
"{\"scannedRows\":5,\"loadBytes\":270,\"fileNumber\":0,\"fileSize\":0}"
-     
+        def loadStat = parseJson(jobInfo.get(0).get(0))
+        assert loadStat.scannedRows == 5
+        assert loadStat.loadBytes == 273
+
         sql """
             DROP JOB IF EXISTS where jobname =  '${jobName}'
         """
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 02e86b54325..d8058ba5fbc 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -108,7 +108,11 @@ suite("test_streaming_insert_job") {
     log.info("jobInfo: " + jobInfo)
     assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
     assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-    assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
+    def loadStat = parseJson(jobInfo.get(0).get(2))
+    assert loadStat.scannedRows == 20
+    assert loadStat.loadBytes == 425
+    assert loadStat.fileNumber == 2
+    assert loadStat.fileSize == 256
 
     def showTask = sql """select * from tasks("type"="insert") where 
jobName='${jobName}'"""
     log.info("showTask is : " + showTask )
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index a9439095a64..85d494ca942 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -173,7 +173,11 @@ suite("test_streaming_insert_job_offset") {
     log.info("jobInfo: " + jobInfo)
     assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
     assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-    assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+    def loadStat = parseJson(jobInfo.get(0).get(2))
+    assert loadStat.scannedRows == 10
+    assert loadStat.loadBytes == 218
+    assert loadStat.fileNumber == 1
+    assert loadStat.fileSize == 138
     assert jobInfo.get(0).get(3) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}"
 
     // alter job init offset, Lexicographic order includes example_[0-1]
@@ -211,7 +215,11 @@ suite("test_streaming_insert_job_offset") {
     log.info("jobInfo: " + jobInfo)
     assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
     assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-    assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
+    def loadStat2 = parseJson(jobInfo.get(0).get(2))
+    assert loadStat2.scannedRows == 30
+    assert loadStat2.loadBytes == 643
+    assert loadStat2.fileNumber == 3
+    assert loadStat2.fileSize == 394
     assert jobInfo.get(0).get(3) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
 
     // has double example_1.csv and example_0.csv data
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
index 02d44f80a97..520e5b75991 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
@@ -93,7 +93,11 @@ suite("test_streaming_job_alter_offset_restart_fe", 
"docker") {
         log.info("jobInfo: " + jobInfo)
         assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
         assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+        def loadStat = parseJson(jobInfo.get(0).get(2))
+        assert loadStat.scannedRows == 10
+        assert loadStat.loadBytes == 218
+        assert loadStat.fileNumber == 1
+        assert loadStat.fileSize == 138
 
         sql """
             PAUSE JOB where jobname =  '${jobName}'
@@ -112,7 +116,11 @@ suite("test_streaming_job_alter_offset_restart_fe", 
"docker") {
         """
         log.info("jobInfo: " + jobInfo)
         assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}";
-        assert jobInfo.get(0).get(1) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+        def loadStat1 = parseJson(jobInfo.get(0).get(1))
+        assert loadStat1.scannedRows == 10
+        assert loadStat1.loadBytes == 218
+        assert loadStat1.fileNumber == 1
+        assert loadStat1.fileSize == 138
         assert jobInfo.get(0).get(2) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
 
         // Restart FE
@@ -131,7 +139,11 @@ suite("test_streaming_job_alter_offset_restart_fe", 
"docker") {
         """
         log.info("jobInfo: " + jobInfo)
         assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}";
-        assert jobInfo.get(0).get(1) == 
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+        def loadStat2 = parseJson(jobInfo.get(0).get(1))
+        assert loadStat2.scannedRows == 10
+        assert loadStat2.loadBytes == 218
+        assert loadStat2.fileNumber == 1
+        assert loadStat2.fileSize == 138
         assert jobInfo.get(0).get(2) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
 
         // resume to check whether consumption will resume
@@ -163,7 +175,11 @@ suite("test_streaming_job_alter_offset_restart_fe", 
"docker") {
         log.info("jobInfo: " + jobInfo)
         assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
         assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
+        def loadStat3 = parseJson(jobInfo.get(0).get(2))
+        assert loadStat3.scannedRows == 30
+        assert loadStat3.loadBytes == 643
+        assert loadStat3.fileNumber == 3
+        assert loadStat3.fileSize == 394
         assert jobInfo.get(0).get(3) == 
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
 
         sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
index 11d2113ce5d..ae03afb47a4 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -97,7 +97,11 @@ suite("test_streaming_job_restart_fe", "docker") {
         log.info("jobInfo: " + jobInfo)
         assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
         assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
+        def loadStat = parseJson(jobInfo.get(0).get(2))
+        assert loadStat.scannedRows == 20
+        assert loadStat.loadBytes == 425
+        assert loadStat.fileNumber == 2
+        assert loadStat.fileSize == 256
 
         // Restart FE
         cluster.restartFrontends()
@@ -115,7 +119,11 @@ suite("test_streaming_job_restart_fe", "docker") {
         log.info("jobInfo: " + jobInfo)
         assert jobInfo.get(0).get(0) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
         assert jobInfo.get(0).get(1) == 
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
-        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
+        def loadStatAfter = parseJson(jobInfo.get(0).get(2))
+        assert loadStatAfter.scannedRows == 20
+        assert loadStatAfter.loadBytes == 425
+        assert loadStatAfter.fileNumber == 2
+        assert loadStatAfter.fileSize == 256
 
         sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
         sql """drop table if exists `${tableName}` force"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to