This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e89ed5eebcc branch-3.0: [fix](load) reset routine load task EOF sign correctly #50048 (#50178) e89ed5eebcc is described below commit e89ed5eebcc985069d3493d1fd77e1e42f4df23d Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Apr 22 10:20:16 2025 +0800 branch-3.0: [fix](load) reset routine load task EOF sign correctly #50048 (#50178) Cherry-picked from #50048 Co-authored-by: hui lai <lai...@selectdb.com> --- .../load/routineload/RoutineLoadTaskInfo.java | 7 ++++++ .../routine_load/test_routine_load_eof.groovy | 29 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 5075311299d..0c662ce765d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -25,6 +25,7 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.service.ExecuteEnv; @@ -171,11 +172,17 @@ public abstract class RoutineLoadTaskInfo { } private void judgeEof(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + if (DebugPointUtil.isEnable("RoutineLoadTaskInfo.judgeEof")) { + this.isEof = false; + return; + } RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); if (rlTaskTxnCommitAttachment.getTotalRows() < routineLoadJob.getMaxBatchRows() && rlTaskTxnCommitAttachment.getReceivedBytes() < routineLoadJob.getMaxBatchSizeBytes() && rlTaskTxnCommitAttachment.getTaskExecutionTimeMs() < this.timeoutMs) { this.isEof = true; + } else { + this.isEof = false; } } diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy index d4078896068..ac0b08248ef 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy @@ -20,7 +20,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerConfig -suite("test_routine_load_eof","p0") { +suite("test_routine_load_eof","nonConcurrent") { def kafkaCsvTpoics = [ "test_eof", ] @@ -52,7 +52,7 @@ suite("test_routine_load_eof","p0") { producer.send(record) } } - if (count >= 120) { + if (count >= 180) { break } count++ @@ -166,6 +166,8 @@ suite("test_routine_load_eof","p0") { } break; } + def committedTaskNum1 = 0 + def committedTaskNum2 = 0 sleep(60 * 1000) def res = sql "show routine load for ${jobName}" def statistic = res[0][14].toString() @@ -174,6 +176,29 @@ suite("test_routine_load_eof","p0") { if (json.committedTaskNum > 20) { assertEquals(1, 2) } + committedTaskNum1 = json.committedTaskNum + try { + GetDebugPoint().enableDebugPointForAllFEs("RoutineLoadTaskInfo.judgeEof") + sleep(30 * 1000) + res = sql "show routine load for ${jobName}" + statistic = res[0][14].toString() + json = parseJson(res[0][14]) + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + if (json.committedTaskNum - committedTaskNum1 < 20) { + assertEquals(1, 2) + } + committedTaskNum2 = json.committedTaskNum + } finally { + GetDebugPoint().disableDebugPointForAllFEs("RoutineLoadTaskInfo.judgeEof") + } + sleep(60 * 1000) + res = sql "show routine load for ${jobName}" + statistic = res[0][14].toString() + json = parseJson(res[0][14]) + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + if (json.committedTaskNum - committedTaskNum2 > 20) { + assertEquals(1, 2) + } } finally { sql "stop routine load for ${jobName}" sql "DROP TABLE IF EXISTS ${tableName}" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org