This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e89ed5eebcc branch-3.0: [fix](load) reset routine load task EOF sign 
correctly #50048 (#50178)
e89ed5eebcc is described below

commit e89ed5eebcc985069d3493d1fd77e1e42f4df23d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 22 10:20:16 2025 +0800

    branch-3.0: [fix](load) reset routine load task EOF sign correctly #50048 
(#50178)
    
    Cherry-picked from #50048
    
    Co-authored-by: hui lai <lai...@selectdb.com>
---
 .../load/routineload/RoutineLoadTaskInfo.java      |  7 ++++++
 .../routine_load/test_routine_load_eof.groovy      | 29 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 5075311299d..0c662ce765d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.service.ExecuteEnv;
@@ -171,11 +172,17 @@ public abstract class RoutineLoadTaskInfo {
     }
 
     private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) 
{
+        if (DebugPointUtil.isEnable("RoutineLoadTaskInfo.judgeEof")) {
+            this.isEof = false;
+            return;
+        }
         RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
         if (rlTaskTxnCommitAttachment.getTotalRows() < 
routineLoadJob.getMaxBatchRows()
                 && rlTaskTxnCommitAttachment.getReceivedBytes() < 
routineLoadJob.getMaxBatchSizeBytes()
                 && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < 
this.timeoutMs) {
             this.isEof = true;
+        } else {
+            this.isEof = false;
         }
     }
 
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
index d4078896068..ac0b08248ef 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -20,7 +20,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.ProducerConfig
 
-suite("test_routine_load_eof","p0") {
+suite("test_routine_load_eof","nonConcurrent") {
     def kafkaCsvTpoics = [
                   "test_eof",
                 ]
@@ -52,7 +52,7 @@ suite("test_routine_load_eof","p0") {
                         producer.send(record)
                     }
                 }
-                if (count >= 120) {
+                if (count >= 180) {
                     break
                 }
                 count++
@@ -166,6 +166,8 @@ suite("test_routine_load_eof","p0") {
                 }
                 break;
             }
+            def committedTaskNum1 = 0
+            def committedTaskNum2 = 0
             sleep(60 * 1000)
             def res = sql "show routine load for ${jobName}"
             def statistic = res[0][14].toString()
@@ -174,6 +176,29 @@ suite("test_routine_load_eof","p0") {
             if (json.committedTaskNum > 20) {
                 assertEquals(1, 2)
             }
+            committedTaskNum1 = json.committedTaskNum
+            try {
+                
GetDebugPoint().enableDebugPointForAllFEs("RoutineLoadTaskInfo.judgeEof")
+                sleep(30 * 1000)
+                res = sql "show routine load for ${jobName}"
+                statistic = res[0][14].toString()
+                json = parseJson(res[0][14])
+                log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+                if (json.committedTaskNum - committedTaskNum1 < 20) {
+                    assertEquals(1, 2)
+                }
+                committedTaskNum2 = json.committedTaskNum
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllFEs("RoutineLoadTaskInfo.judgeEof")
+            }
+            sleep(60 * 1000)
+            res = sql "show routine load for ${jobName}"
+            statistic = res[0][14].toString()
+            json = parseJson(res[0][14])
+            log.info("routine load statistic: 
${res[0][14].toString()}".toString())
+            if (json.committedTaskNum - committedTaskNum2 > 20) {
+                assertEquals(1, 2)
+            }
         } finally {
             sql "stop routine load for ${jobName}"
             sql "DROP TABLE IF EXISTS ${tableName}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to