This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 747d76bf7ba branch-4.0: [fix](job) fix streaming job fails with "No 
new files found" on second scheduling  #61249 (#61302)
747d76bf7ba is described below

commit 747d76bf7ba460b15b1377f681a3e2706e5f045d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Mar 14 14:43:42 2026 +0800

    branch-4.0: [fix](job) fix streaming job fails with "No new files found" on 
second scheduling  #61249 (#61302)
    
    Cherry-picked from #61249
    
    Co-authored-by: wudi <[email protected]>
---
 .../java/org/apache/doris/fs/obj/S3ObjStorage.java |  27 +++--
 .../org/apache/doris/job/common/FailureReason.java |   4 +-
 ...est_streaming_job_no_new_files_with_sibling.out |  12 +++
 ..._streaming_job_no_new_files_with_sibling.groovy | 110 +++++++++++++++++++++
 4 files changed, 145 insertions(+), 8 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
index 33694b1a3d8..d9d0c7e9442 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java
@@ -622,11 +622,25 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
 
             boolean isTruncated = false;
             boolean reachLimit = false;
+            String lastMatchedKey = "";
             do {
                 roundCnt++;
                 ListObjectsV2Response response = listObjectsV2(request);
                 for (S3Object obj : response.contents()) {
                     elementCnt++;
+
+                    // Limit already reached: scan remaining objects in this 
page to find
+                    // the next glob-matching key, so hasMoreDataToConsume() 
returns true
+                    // correctly without recording a non-matching raw S3 key 
as currentMaxFile.
+                    if (reachLimit) {
+                        java.nio.file.Path checkPath = Paths.get(obj.key());
+                        if (matcher.matches(checkPath)) {
+                            currentMaxFile = obj.key();
+                            break;
+                        }
+                        continue;
+                    }
+
                     java.nio.file.Path objPath = Paths.get(obj.key());
 
                     boolean isPrefix = false;
@@ -654,6 +668,7 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
                                 isPrefix ? 0 : 
obj.lastModified().toEpochMilli()
                         );
                         result.add(remoteFile);
+                        lastMatchedKey = obj.key();
 
                         if (hasLimits && reachLimit(result.size(), 
matchFileSize, fileSizeLimit, fileNumLimit)) {
                             reachLimit = true;
@@ -663,15 +678,13 @@ public class S3ObjStorage implements ObjStorage<S3Client> 
{
                         objPath = objPath.getParent();
                         isPrefix = true;
                     }
-                    if (reachLimit) {
-                        break;
-                    }
                 }
 
-                // Record current max file for limit scenario
-                if (!response.contents().isEmpty()) {
-                    S3Object lastS3Object = 
response.contents().get(response.contents().size() - 1);
-                    currentMaxFile = lastS3Object.key();
+                // If no next matching file was found after the limit in the 
current page,
+                // fall back to lastMatchedKey to avoid a non-matching raw S3 
key
+                // (e.g. a sibling file like .lz4) being recorded as 
currentMaxFile.
+                if (currentMaxFile.isEmpty()) {
+                    currentMaxFile = lastMatchedKey;
                 }
 
                 isTruncated = response.isTruncated();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 7f2ba752812..4280d43bb66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -50,7 +50,9 @@ public class FailureReason implements Writable {
 
     private static boolean isTooManyFailureRowsErr(String msg) {
         return msg.contains("Insert has filtered data in strict mode")
-                || msg.contains("too many filtered rows");
+                || msg.contains("too many filtered")
+                || msg.contains("Encountered unqualified data")
+                || msg.contains("parse number fail");
     }
 
     public InternalErrorCode getCode() {
diff --git 
a/regression-test/data/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.out
 
b/regression-test/data/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.out
new file mode 100644
index 00000000000..cad3e045e85
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+1      Emily   25
+2      Benjamin        35
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.groovy
new file mode 100644
index 00000000000..96a6b061b3c
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_no_new_files_with_sibling.groovy
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+// Regression test for: streaming job second scheduling fails with "No new 
files found"
+// when S3 listing returns non-matching sibling keys (e.g. example_1.csv) 
after the last
+// matched file, causing currentMaxFile to be set to a non-matching raw S3 key.
+//
+// Pattern example_[0-0].csv matches only example_0.csv, but getLongestPrefix 
strips
+// the bracket so S3 lists both example_0.csv and example_1.csv in the same 
page.
+// Without the fix, currentMaxFile = "example_1.csv" triggers a second 
scheduling
+// that finds no matching files and errors. With the fix, currentMaxFile = 
"example_0.csv"
+// and hasMoreDataToConsume() correctly returns false.
+suite("test_streaming_job_no_new_files_with_sibling") {
+    def tableName = "test_streaming_job_no_new_files_with_sibling_tbl"
+    def jobName = "test_streaming_job_no_new_files_with_sibling_job"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname = '${jobName}'
+    """
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // Use example_[0-0].csv: glob matches only example_0.csv, but S3 listing 
prefix
+    // "example_" also returns example_1.csv, which does not match the pattern.
+    // This reproduces the "non-matching sibling key" scenario.
+    sql """
+        CREATE JOB ${jobName}
+        ON STREAMING DO INSERT INTO ${tableName}
+        SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-0].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+
+    try {
+        // Wait for the first task to succeed
+        Awaitility.await().atMost(120, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    def res = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                    log.info("SucceedTaskCount: " + res)
+                    res.size() == 1 && '1' <= res.get(0).get(0)
+                }
+        )
+
+        // Wait extra time to allow a potential second (buggy) scheduling 
attempt
+        Thread.sleep(10000)
+
+        // Verify no failed tasks: the job should not have tried to 
re-schedule and
+        // hit "No new files found" after all matched files are consumed.
+        def jobStatus = sql """
+            select Status, SucceedTaskCount, FailedTaskCount, ErrorMsg
+            from jobs("type"="insert") where Name = '${jobName}'
+        """
+        log.info("jobStatus: " + jobStatus)
+        assert jobStatus.get(0).get(2) == '0' : "Expected no failed tasks, but 
got: " + jobStatus
+        assert jobStatus.get(0).get(0) != "STOPPED" : "Job should not be 
stopped, status: " + jobStatus
+
+        qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+    } catch (Exception ex) {
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex
+    } finally {
+        sql """
+            DROP JOB IF EXISTS where jobname = '${jobName}'
+        """
+        sql """drop table if exists `${tableName}` force"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to