This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ddce328ca32 [fix](streaming-job) fix filteredRows always 0 on
single-table S3 streaming (#62816)
ddce328ca32 is described below
commit ddce328ca32aded871c696eeee676f2363192560
Author: wudi <[email protected]>
AuthorDate: Wed May 6 10:19:38 2026 +0800
[fix](streaming-job) fix filteredRows always 0 on single-table S3 streaming
(#62816)
Fix `filteredRows` always reported as 0 in `jobs("type"="insert")` for
single-table S3 streaming insert jobs under `enable_insert_strict=false` +
`insert_max_filter_ratio>0`.
The filter count is now propagated from BE through the txn commit
attachment into job statistics, and survives FE EditLog replay and cloud
meta-service round-trip.
Added regression test `test_streaming_insert_job_filtered_rows`.
---
cloud/src/meta-service/meta_service_txn.cpp | 4 +
cloud/test/meta_service_job_test.cpp | 5 ++
.../apache/doris/cloud/transaction/TxnUtil.java | 3 +-
.../insert/streaming/StreamingInsertJob.java | 5 ++
.../StreamingTaskTxnCommitAttachment.java | 9 +-
.../apache/doris/load/loadv2/LoadStatistic.java | 12 +++
.../commands/insert/AbstractInsertExecutor.java | 3 +
gensrc/proto/cloud.proto | 1 +
.../test_streaming_insert_job_filtered_rows.groovy | 100 +++++++++++++++++++++
9 files changed, 140 insertions(+), 2 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 6be303a4c00..2f850a4cd13 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -655,12 +655,15 @@ void update_streaming_job_meta(MetaServiceCode& code,
std::string& msg,
}
new_job_info.set_scanned_rows(new_job_info.scanned_rows() +
commit_attachment.scanned_rows());
+ new_job_info.set_filtered_rows(new_job_info.filtered_rows() +
+ commit_attachment.filtered_rows());
new_job_info.set_load_bytes(new_job_info.load_bytes() +
commit_attachment.load_bytes());
new_job_info.set_num_files(new_job_info.num_files() +
commit_attachment.num_files());
new_job_info.set_file_bytes(new_job_info.file_bytes() +
commit_attachment.file_bytes());
} else {
new_job_info.set_job_id(commit_attachment.job_id());
new_job_info.set_scanned_rows(commit_attachment.scanned_rows());
+ new_job_info.set_filtered_rows(commit_attachment.filtered_rows());
new_job_info.set_load_bytes(commit_attachment.load_bytes());
new_job_info.set_num_files(commit_attachment.num_files());
new_job_info.set_file_bytes(commit_attachment.file_bytes());
@@ -981,6 +984,7 @@ void
MetaServiceImpl::reset_streaming_job_offset(::google::protobuf::RpcControll
// Preserve existing statistics if they exist
if (prev_existed) {
new_job_info.set_scanned_rows(prev_job_info.scanned_rows());
+ new_job_info.set_filtered_rows(prev_job_info.filtered_rows());
new_job_info.set_load_bytes(prev_job_info.load_bytes());
new_job_info.set_num_files(prev_job_info.num_files());
new_job_info.set_file_bytes(prev_job_info.file_bytes());
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index ddba9b4c750..d5c837e8711 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -5213,6 +5213,7 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
streaming_attach->set_job_id(1002);
streaming_attach->set_offset("test_offset_3");
streaming_attach->set_scanned_rows(2000);
+ streaming_attach->set_filtered_rows(150);
streaming_attach->set_load_bytes(10000);
streaming_attach->set_num_files(20);
streaming_attach->set_file_bytes(15000);
@@ -5241,6 +5242,7 @@ TEST(MetaServiceJobTest,
GetStreamingTaskCommitAttachTest) {
EXPECT_TRUE(response.has_commit_attach());
EXPECT_EQ(response.commit_attach().job_id(), 1002);
EXPECT_EQ(response.commit_attach().scanned_rows(), 2000);
+ EXPECT_EQ(response.commit_attach().filtered_rows(), 150);
EXPECT_EQ(response.commit_attach().load_bytes(), 10000);
EXPECT_EQ(response.commit_attach().num_files(), 20);
EXPECT_EQ(response.commit_attach().file_bytes(), 15000);
@@ -5363,6 +5365,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
streaming_attach->set_job_id(job_id);
streaming_attach->set_offset("original_offset");
streaming_attach->set_scanned_rows(1000);
+ streaming_attach->set_filtered_rows(50);
streaming_attach->set_load_bytes(5000);
streaming_attach->set_num_files(10);
streaming_attach->set_file_bytes(8000);
@@ -5391,6 +5394,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
EXPECT_TRUE(response.has_commit_attach());
EXPECT_EQ(response.commit_attach().offset(), "original_offset");
EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+ EXPECT_EQ(response.commit_attach().filtered_rows(), 50);
EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
}
@@ -5427,6 +5431,7 @@ TEST(MetaServiceJobTest, ResetStreamingJobOffsetTest) {
EXPECT_EQ(response.commit_attach().offset(), "reset_offset");
// Other fields should remain unchanged
EXPECT_EQ(response.commit_attach().scanned_rows(), 1000);
+ EXPECT_EQ(response.commit_attach().filtered_rows(), 50);
EXPECT_EQ(response.commit_attach().load_bytes(), 5000);
EXPECT_EQ(response.commit_attach().num_files(), 10);
EXPECT_EQ(response.commit_attach().file_bytes(), 8000);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index b947ef0649e..187d551dbd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -285,7 +285,8 @@ public class TxnUtil {
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
.setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
.setNumFiles(streamingTaskTxnCommitAttachment.getNumFiles())
- .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes());
+ .setFileBytes(streamingTaskTxnCommitAttachment.getFileBytes())
+
.setFilteredRows(streamingTaskTxnCommitAttachment.getFilteredRows());
if (streamingTaskTxnCommitAttachment.getOffset() != null) {
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 4583271a64b..f0fd31623fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -792,6 +792,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() +
attachment.getLoadBytes());
this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() +
attachment.getNumFiles());
this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() +
attachment.getFileBytes());
+ this.jobStatistic.setFilteredRows(this.jobStatistic.getFilteredRows()
+ attachment.getFilteredRows());
Offset endOffset =
offsetProvider.deserializeOffset(attachment.getOffset());
offsetProvider.updateOffset(endOffset);
// Sync offsetProviderPersist after each offset update so the
checkpoint thread
@@ -807,6 +808,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
//update metric
if (MetricRepo.isInit && !isReplay) {
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(attachment.getFilteredRows());
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
}
}
@@ -819,11 +821,13 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic.setLoadBytes(attachment.getLoadBytes());
this.jobStatistic.setFileNumber(attachment.getNumFiles());
this.jobStatistic.setFileSize(attachment.getFileBytes());
+ this.jobStatistic.setFilteredRows(attachment.getFilteredRows());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
//update metric
if (MetricRepo.isInit && !isReplay) {
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.update(attachment.getFilteredRows());
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
}
}
@@ -1210,6 +1214,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
loadStatistic.getLoadBytes(),
loadStatistic.getFileNumber(),
loadStatistic.getTotalFileSizeB(),
+ loadStatistic.getFilteredRows(),
offsetJson));
passCheck = true;
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 01fdb3e16e4..c8bf72559e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -32,7 +32,8 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
}
public StreamingTaskTxnCommitAttachment(long jobId, long taskId,
- long scannedRows, long loadBytes, long numFiles, long
fileBytes, String offset) {
+ long scannedRows, long loadBytes, long numFiles, long
fileBytes,
+ long filteredRows, String offset) {
super(TransactionState.LoadJobSourceType.STREAMING_JOB);
this.jobId = jobId;
this.taskId = taskId;
@@ -40,6 +41,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
this.loadBytes = loadBytes;
this.numFiles = numFiles;
this.fileBytes = fileBytes;
+ this.filteredRows = filteredRows;
this.offset = offset;
}
@@ -49,6 +51,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
this.loadBytes = pb.getLoadBytes();
this.numFiles = pb.getNumFiles();
this.fileBytes = pb.getFileBytes();
+ this.filteredRows = pb.getFilteredRows();
this.offset = pb.getOffset();
}
@@ -69,6 +72,9 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
@SerializedName(value = "fs")
@Getter
private long fileBytes;
+ @SerializedName(value = "fr")
+ @Getter
+ private long filteredRows;
@SerializedName(value = "of")
@Getter
private String offset;
@@ -80,6 +86,7 @@ public class StreamingTaskTxnCommitAttachment extends
TxnCommitAttachment {
+ ", loadBytes=" + loadBytes
+ ", numFiles=" + numFiles
+ ", fileBytes=" + fileBytes
+ + ", filteredRows=" + filteredRows
+ ", offset=" + offset
+ "]";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
index cf9e51b1f2a..fc88a0977bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
@@ -58,6 +58,9 @@ public class LoadStatistic {
public int fileNum = 0;
public long totalFileSizeB = 0;
+ // number of rows filtered by BE (DPP_ABNORMAL_ALL), set once after
coordinator finishes
+ private long filteredRows = 0;
+
// init the statistic of specified load task
public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId>
fragmentIds, List<Long> relatedBackendIds) {
counterTbl.rowMap().remove(loadId);
@@ -133,6 +136,14 @@ public class LoadStatistic {
return totalFileSizeB;
}
+ public long getFilteredRows() {
+ return filteredRows;
+ }
+
+ public void setFilteredRows(long filteredRows) {
+ this.filteredRows = filteredRows;
+ }
+
public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
@@ -156,6 +167,7 @@ public class LoadStatistic {
details.put("LoadBytes", totalBytes);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
+ details.put("FilteredRows", filteredRows);
details.put("TaskNumber", counterTbl.rowMap().size());
details.put("Unfinished backends", unfinishedBackendIdsList);
details.put("All backends", allBackendIdsList);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 262b10a25c4..407f75afd51 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -222,6 +222,9 @@ public abstract class AbstractInsertExecutor {
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) !=
null) {
filteredRows =
Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
+ if (insertLoadJob != null) {
+ insertLoadJob.getLoadStatistic().setFilteredRows(filteredRows);
+ }
}
private void checkStrictModeAndFilterRatio() throws Exception {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 569bcf6e023..710a0b0dd79 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -418,6 +418,7 @@ message StreamingTaskCommitAttachmentPB {
optional int64 load_bytes = 4;
optional int64 num_files = 5;
optional int64 file_bytes = 6;
+ optional int64 filtered_rows = 7;
}
message TxnCommitAttachmentPB {
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy
new file mode 100644
index 00000000000..f30756f4ce9
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_filtered_rows.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_filtered_rows") {
+ def tableName = "test_streaming_insert_job_filtered_rows_tbl"
+ def jobName = "test_streaming_insert_job_filtered_rows_name"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ // c2 INT NOT NULL forces BE to filter every row when CSV provides
+ // non-parseable strings in that column.
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` int NOT NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // insert_max_filter_ratio=1 lets the task succeed even if every row is
filtered.
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1",
+ "session.enable_insert_strict" = "false",
+ "session.insert_max_filter_ratio" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSucceedCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ log.info("jobSucceedCount: " + jobSucceedCount)
+ jobSucceedCount.size() == 1 && '2' <=
jobSucceedCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex) {
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ def loadStat = parseJson(jobInfo.get(0).get(2))
+ log.info("loadStatistic: " + jobInfo.get(0).get(2))
+
+ assert loadStat.scannedRows == 20
+ assert loadStat.fileNumber == 2
+ assert loadStat.filteredRows == 20
+
+ def rowCount = sql """ select count(1) from ${tableName} """
+ assert rowCount.get(0).get(0) == 0
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+ sql """drop table if exists `${tableName}` force"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]