This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ddce328ca32 [fix](streaming-job) fix filteredRows always 0 on 
single-table S3 streaming (#62816)
ddce328ca32 is described below

commit ddce328ca32aded871c696eeee676f2363192560
Author: wudi <[email protected]>
AuthorDate: Wed May 6 10:19:38 2026 +0800

    [fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming 
(#62816)
    
    Fix `filteredRows` always reported as 0 in `jobs("type"="insert")` for 
single-table S3 streaming insert jobs under `enable_insert_strict=false` + 
`insert_max_filter_ratio>0`.
    
    The filter count is now propagated from BE through the txn commit 
attachment into job statistics, and survives FE EditLog replay and cloud 
meta-service round-trip.
    
    Added regression test `test_streaming_insert_job_filtered_rows`.
---
 cloud/src/meta-service/meta_service_txn.cpp        |   4 +
 cloud/test/meta_service_job_test.cpp               |   5 ++
 .../apache/doris/cloud/transaction/TxnUtil.java    |   3 +-
 .../insert/streaming/StreamingInsertJob.java       |   5 ++
 .../StreamingTaskTxnCommitAttachment.java          |   9 +-
 .../apache/doris/load/loadv2/LoadStatistic.java    |  12 +++
 .../commands/insert/AbstractInsertExecutor.java    |   3 +
 gensrc/proto/cloud.proto                           |   1 +
 .../test_streaming_insert_job_filtered_rows.groovy | 100 +++++++++++++++++++++
 9 files changed, 140 insertions(+), 2 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 6be303a4c00..2f850a4cd13 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -655,12 +655,15 @@ void update_streaming_job_meta(MetaServiceCode& code, 
std::string& msg,
         }
         new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
                                       commit_attachment.scanned_rows());
+        new_job_info.set_filtered_rows(new_job_info.filtered_rows() +
+                                       commit_attachment.filtered_rows());
         new_job_info.set_load_bytes(new_job_info.load_bytes() + 
commit_attachment.load_bytes());
         new_job_info.set_num_files(new_job_info.num_files() + 
commit_attachment.num_files());
         new_job_info.set_file_bytes(new_job_info.file_bytes() + 
commit_attachment.file_bytes());
     } else {
         new_job_info.set_job_id(commit_attachment.job_id());
         new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
+        new_job_info.set_filtered_rows(commit_attachment.filtered_rows());
         new_job_info.set_load_bytes(commit_attachment.load_bytes());
         new_job_info.set_num_files(commit_attachment.num_files());
         new_job_info.set_file_bytes(commit_attachment.file_bytes());
@@ -981,6 +984,7 @@ void 
MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcControll
         // Preserve existing statistics if they exist
         if (prev_existed) {
             new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
+            new_job_info.set_filtered_rows(prev_job_info.filtered_rows());
             new_job_info.set_load_bytes(prev_job_info.load_bytes());
             new_job_info.set_num_files(prev_job_info.num_files());
             new_job_info.set_file_bytes(prev_job_info.file_bytes());
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index ddba9b4c750..d5c837e8711 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -5213,6 +5213,7 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
         streaming_attach->set_job_id(1002);
         streaming_attach->set_offset("test_offset_3");
         streaming_attach->set_scanned_rows(2000);
+        streaming_attach->set_filtered_rows(150);
         streaming_attach->set_load_bytes(10000);
         streaming_attach->set_num_files(20);
         streaming_attach->set_file_bytes(15000);
@@ -5241,6 +5242,7 @@ TEST(MetaServiceJobTest, 
GetStreamingTaskCommitAttachTest) {
         EXPECT_TRUE(response.has_commit_attach());
         EXPECT_EQ(response.commit_attach().job_id(), 1002);
         EXPECT_EQ(response.commit_attach().scanned_rows(), 2000);
+        EXPECT_EQ(response.commit_attach().filtered_rows(), 150);
         EXPECT_EQ(response.commit_attach().load_bytes(), 10000);
         EXPECT_EQ(response.commit_attach().num_files(), 20);
         EXPECT_EQ(response.commit_attach().file_bytes(), 15000);
@@ -5363,6 +5365,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
         streaming_attach->set_job_id(job_id);
         streaming_attach->set_offset("original_offset");
         streaming_attach->set_scanned_rows(1000);
+        streaming_attach->set_filtered_rows(50);
         streaming_attach->set_load_bytes(5000);
         streaming_attach->set_num_files(10);
         streaming_attach->set_file_bytes(8000);
@@ -5391,6 +5394,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
         EXPECT_TRUE(response.has_commit_attach());
         EXPECT_EQ(response.commit_attach().offset(), "original_offset");
         EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+        EXPECT_EQ(response.commit_attach().filtered_rows(), 50);
         EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
     }
 
@@ -5427,6 +5431,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
         EXPECT_EQ(response.commit_attach().offset(), "reset_offset");
         // Other fields should remain unchanged
         EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+        EXPECT_EQ(response.commit_attach().filtered_rows(), 50);
         EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
         EXPECT_EQ(response.commit_attach().num_files(), 10);
         EXPECT_EQ(response.commit_attach().file_bytes(), 8000);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index b947ef0649e..187d551dbd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -285,7 +285,8 @@ public class TxnUtil {
                 
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
                 .setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
                 .setNumFiles(streamingTaskTxnCommitAttachment.getNumFiles())
-                .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes());
+                .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes())
+                
.setFilteredRows(streamingTaskTxnCommitAttachment.getFilteredRows());
 
         if (streamingTaskTxnCommitAttachment.getOffset() != null) {
             builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 4583271a64b..f0fd31623fa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -792,6 +792,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() + 
attachment.getLoadBytes());
         this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getNumFiles());
         this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileBytes());
+        this.jobStatistic.setFilteredRows(this.jobStatistic.getFilteredRows() 
+ attachment.getFilteredRows());
         Offset endOffset = 
offsetProvider.deserializeOffset(attachment.getOffset());
         offsetProvider.updateOffset(endOffset);
         // Sync offsetProviderPersist after each offset update so the 
checkpoint thread
@@ -807,6 +808,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         //update metric
         if (MetricRepo.isInit && !isReplay) {
             
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(attachment.getFilteredRows());
             
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
         }
     }
@@ -819,11 +821,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobStatistic.setLoadBytes(attachment.getLoadBytes());
         this.jobStatistic.setFileNumber(attachment.getNumFiles());
         this.jobStatistic.setFileSize(attachment.getFileBytes());
+        this.jobStatistic.setFilteredRows(attachment.getFilteredRows());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
 
         //update metric
         if (MetricRepo.isInit && !isReplay) {
             
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+            
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.update(attachment.getFilteredRows());
             
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
         }
     }
@@ -1210,6 +1214,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                         loadStatistic.getLoadBytes(),
                         loadStatistic.getFileNumber(),
                         loadStatistic.getTotalFileSizeB(),
+                        loadStatistic.getFilteredRows(),
                         offsetJson));
             passCheck = true;
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 01fdb3e16e4..c8bf72559e8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -32,7 +32,8 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
     }
 
     public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
-                long scannedRows, long loadBytes, long numFiles, long 
fileBytes, String offset) {
+                long scannedRows, long loadBytes, long numFiles, long 
fileBytes,
+                long filteredRows, String offset) {
         super(TransactionState.LoadJobSourceType.STREAMING_JOB);
         this.jobId = jobId;
         this.taskId = taskId;
@@ -40,6 +41,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         this.loadBytes = loadBytes;
         this.numFiles = numFiles;
         this.fileBytes = fileBytes;
+        this.filteredRows = filteredRows;
         this.offset = offset;
     }
 
@@ -49,6 +51,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         this.loadBytes = pb.getLoadBytes();
         this.numFiles = pb.getNumFiles();
         this.fileBytes = pb.getFileBytes();
+        this.filteredRows = pb.getFilteredRows();
         this.offset = pb.getOffset();
     }
 
@@ -69,6 +72,9 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
     @SerializedName(value = "fs")
     @Getter
     private long fileBytes;
+    @SerializedName(value = "fr")
+    @Getter
+    private long filteredRows;
     @SerializedName(value = "of")
     @Getter
     private String offset;
@@ -80,6 +86,7 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
                 + ", loadBytes=" + loadBytes
                 + ", numFiles=" + numFiles
                 + ", fileBytes=" + fileBytes
+                + ", filteredRows=" + filteredRows
                 + ", offset=" + offset
                 + "]";
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
index cf9e51b1f2a..fc88a0977bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
@@ -58,6 +58,9 @@ public class LoadStatistic {
     public int fileNum = 0;
     public long totalFileSizeB = 0;
 
+    // number of rows filtered by BE (DPP_ABNORMAL_ALL), set once after 
coordinator finishes
+    private long filteredRows = 0;
+
     // init the statistic of specified load task
     public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> 
fragmentIds, List<Long> relatedBackendIds) {
         counterTbl.rowMap().remove(loadId);
@@ -133,6 +136,14 @@ public class LoadStatistic {
         return totalFileSizeB;
     }
 
+    public long getFilteredRows() {
+        return filteredRows;
+    }
+
+    public void setFilteredRows(long filteredRows) {
+        this.filteredRows = filteredRows;
+    }
+
     public synchronized String toJson() {
         long total = 0;
         for (long rows : counterTbl.values()) {
@@ -156,6 +167,7 @@ public class LoadStatistic {
         details.put("LoadBytes", totalBytes);
         details.put("FileNumber", fileNum);
         details.put("FileSize", totalFileSizeB);
+        details.put("FilteredRows", filteredRows);
         details.put("TaskNumber", counterTbl.rowMap().size());
         details.put("Unfinished backends", unfinishedBackendIdsList);
         details.put("All backends", allBackendIdsList);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 262b10a25c4..407f75afd51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -222,6 +222,9 @@ public abstract class AbstractInsertExecutor {
         if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != 
null) {
             filteredRows = 
Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
         }
+        if (insertLoadJob != null) {
+            insertLoadJob.getLoadStatistic().setFilteredRows(filteredRows);
+        }
     }
 
     private void checkStrictModeAndFilterRatio() throws Exception {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 569bcf6e023..710a0b0dd79 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -418,6 +418,7 @@ message StreamingTaskCommitAttachmentPB {
     optional int64 load_bytes = 4;
     optional int64 num_files = 5;
     optional int64 file_bytes = 6;
+    optional int64 filtered_rows = 7;
 }
 
 message TxnCommitAttachmentPB {
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy
new file mode 100644
index 00000000000..f30756f4ce9
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_filtered_rows") {
+    def tableName = "test_streaming_insert_job_filtered_rows_tbl"
+    def jobName = "test_streaming_insert_job_filtered_rows_name"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    // c2 INT NOT NULL forces BE to filter every row when CSV provides
+    // non-parseable strings in that column.
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` int NOT NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // insert_max_filter_ratio=1 lets the task succeed even if every row is 
filtered.
+    sql """
+       CREATE JOB ${jobName}
+       PROPERTIES(
+        "s3.max_batch_files" = "1",
+        "session.enable_insert_strict" = "false",
+        "session.insert_max_filter_ratio" = "1"
+       )
+       ON STREAMING DO INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    def jobSucceedCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                    log.info("jobSucceedCount: " + jobSucceedCount)
+                    jobSucceedCount.size() == 1 && '2' <= 
jobSucceedCount.get(0).get(0)
+                }
+        )
+    } catch (Exception ex) {
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex;
+    }
+
+    def jobInfo = sql """
+        select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
+    """
+    log.info("jobInfo: " + jobInfo)
+    def loadStat = parseJson(jobInfo.get(0).get(2))
+    log.info("loadStatistic: " + jobInfo.get(0).get(2))
+
+    assert loadStat.scannedRows == 20
+    assert loadStat.fileNumber == 2
+    assert loadStat.filteredRows == 20
+
+    def rowCount = sql """ select count(1) from ${tableName} """
+    assert rowCount.get(0).get(0) == 0
+
+    sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
+    sql """drop table if exists `${tableName}` force"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to