This is an automated email from the ASF dual-hosted git repository.
hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 54266107479 [fix](test) stabilize routine load adaptive timeout check
(#65092)
54266107479 is described below
commit 54266107479f7bc1dd2d3cd1daf8212fbd753bd1
Author: shuke <[email protected]>
AuthorDate: Wed Jul 1 20:09:46 2026 +0800
[fix](test) stabilize routine load adaptive timeout check (#65092)
## Summary
- stabilize test_routine_load_adaptive_param by driving small Kafka
batches while polling adaptive timeout convergence
- avoid racing the short live routine-load transaction window by
checking committed transaction timeout via the task UUID label
- keep the change limited to the routine-load test and
RoutineLoadTestUtils helper used by this case
## Testing
- [x] git diff --check origin/master...HEAD
- [ ] Not run locally; this case requires a Kafka-enabled Doris
regression environment. Buildall will be requested after PR creation.
---
.../regression/util/RoutineLoadTestUtils.groovy | 101 ++++++++++++++++++---
.../test_routine_load_adaptive_param.groovy | 12 ++-
2 files changed, 95 insertions(+), 18 deletions(-)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
index c1bd321607b..7dce3402613 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/RoutineLoadTestUtils.groovy
@@ -91,13 +91,55 @@ class RoutineLoadTestUtils {
while (true) {
def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName =
'${jobName}'")
if (res.size() > 0) {
+ def txnId = res[0][1].toString()
+ def timeout = res[0][6].toString()
logger.info("res: ${res[0].toString()}")
- logger.info("timeout: ${res[0][6].toString()}")
- Assert.assertEquals(res[0][6].toString(), expectedTimeout)
+ logger.info("txnId: ${txnId}, timeout: ${timeout}, expected:
${expectedTimeout}")
+ // A task whose txn has not begun yet (txnId == -1) may still
carry the timeout
+ // computed in a previous schedule round; the adaptive timeout
only converges
+ // after a subsequent task is scheduled. Poll until a stable
task carries the
+ // expected timeout instead of asserting on a transient task.
+ if (txnId != "-1" && timeout == expectedTimeout) {
+ Assert.assertEquals(expectedTimeout, timeout)
+ break;
+ }
+ }
+ if (count > maxAttempts) {
+ Assert.fail("Timeout waiting for task timeout to converge to
${expectedTimeout} for job ${jobName}")
break;
+ } else {
+ sleep(1000)
+ count++
+ }
+ }
+ }
+
+ // Verify that the adaptive task timeout converges to expectedTimeout when
the job is caught up
+ // (EOF). The adaptive timeout is only (re)computed when a task is
actually scheduled WITH data
+ // to consume; once a job drains its data the renewed task stays idle
(txnId == -1) and keeps the
+ // timeout from the previous schedule round, so the EOF timeout is never
observed on its own.
+ // Drive a fresh small batch each round to force an isEof task to be
scheduled and recompute the
+ // timeout, then read whatever task is visible (the running task, or the
renewed idle one that
+ // inherits the just-converged value). Unlike checkTaskTimeout we do NOT
skip txnId == -1 here,
+ // because after EOF the converged value naturally settles on an idle task.
+ static void checkTaskTimeoutWithData(Closure sqlRunner, KafkaProducer
producer, List<String> topics,
+ String jobName, String
expectedTimeout, int maxAttempts = 60) {
+ def count = 0
+ while (true) {
+ sendTestDataToKafka(producer, topics)
+ def res = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName =
'${jobName}'")
+ if (res.size() > 0) {
+ def txnId = res[0][1].toString()
+ def timeout = res[0][6].toString()
+ logger.info("res: ${res[0].toString()}")
+ logger.info("txnId: ${txnId}, timeout: ${timeout}, expected:
${expectedTimeout}")
+ if (timeout == expectedTimeout) {
+ Assert.assertEquals(expectedTimeout, timeout)
+ break;
+ }
}
if (count > maxAttempts) {
- Assert.assertEquals(1, 2)
+ Assert.fail("Timeout waiting for task timeout to converge to
${expectedTimeout} for job ${jobName}")
break;
} else {
sleep(1000)
@@ -173,27 +215,56 @@ class RoutineLoadTestUtils {
}
}
- static void checkTxnTimeoutMatchesTaskTimeout(Closure sqlRunner, String
jobName, String expectedTimeoutMs, int maxAttempts = 60) {
+ // Verify that the transaction a routine-load task begins carries the
(adaptive) task timeout.
+ //
+ // Reading the timeout from a LIVE task txn (poll SHOW ROUTINE LOAD TASK
until txnId != -1, then
+ // SHOW TRANSACTION WHERE id = txnId) is inherently racy: a small-batch
routine-load txn begins and
+ // commits in well under the 1s poll interval, so the txnId != -1 window
is sub-second and the poll
+ // almost never samples it. The converged adaptive timeout is correct the
whole time; only the
+ // live-txn observation flakes.
+ //
+ // Instead join on the task UUID: SHOW ROUTINE LOAD TASK col[0] (TaskId)
is exactly the FE
+ // transaction label (RoutineLoadTaskInfo.beginTxn sets label =
printId(taskId)). Capture those
+ // UUIDs and read the timeout from the COMMITTED/VISIBLE transaction with
that label. The txn
+ // timeout (SHOW TRANSACTION col[13]) is a persisted field, frozen at
begin time and retained long
+ // after commit, so it is read without racing a live txn.
+ static void checkTxnTimeoutMatchesTaskTimeout(Closure sqlRunner,
KafkaProducer producer, List<String> topics,
+ String jobName, String
expectedTimeoutMs, int maxAttempts = 60) {
def count = 0
+ def seenTaskIds = new LinkedHashSet<String>()
while (true) {
+ // Keep a task scheduled so a txn keeps being begun and committed
for this job.
+ sendTestDataToKafka(producer, topics)
def taskRes = sqlRunner.call("SHOW ROUTINE LOAD TASK WHERE JobName
= '${jobName}'")
if (taskRes.size() > 0) {
- def txnId = taskRes[0][1].toString()
- logger.info("Task txnId: ${txnId}, task timeout:
${taskRes[0][6].toString()}")
- if (txnId != null && txnId != "null" && txnId != "-1") {
- // Get transaction timeout from SHOW TRANSACTION
- def txnRes = sqlRunner.call("SHOW TRANSACTION WHERE id =
${txnId}")
- if (txnRes.size() > 0) {
- def txnTimeoutMs = txnRes[0][13].toString()
- logger.info("Transaction timeout (ms):
${txnTimeoutMs}, expected: ${expectedTimeoutMs}")
+ def taskId = taskRes[0][0].toString()
+ logger.info("Task id: ${taskId}, txnId:
${taskRes[0][1].toString()}, task timeout: ${taskRes[0][6].toString()}")
+ if (taskId != null && taskId != "null" && taskId != "") {
+ seenTaskIds.add(taskId)
+ }
+ }
+ // The committed txn for a captured task is queryable by its label
(the bare task UUID)
+ // whether or not it is currently running.
+ for (String label : seenTaskIds) {
+ def txnRes = null
+ try {
+ txnRes = sqlRunner.call("SHOW TRANSACTION WHERE label =
'${label}'")
+ } catch (Exception e) {
+ // The task has not begun its txn yet, so the label does
not exist; keep polling.
+ continue
+ }
+ if (txnRes != null && txnRes.size() > 0) {
+ def txnTimeoutMs = txnRes[0][13].toString()
+ logger.info("Transaction label: ${label}, timeout (ms):
${txnTimeoutMs}, expected: ${expectedTimeoutMs}")
+ if (txnTimeoutMs == expectedTimeoutMs) {
Assert.assertEquals(expectedTimeoutMs, txnTimeoutMs)
- break
+ return
}
}
}
if (count > maxAttempts) {
- Assert.fail("Timeout waiting for task and transaction to be
created")
- break
+ Assert.fail("Timeout waiting for a committed transaction of
job ${jobName} to carry timeout ${expectedTimeoutMs}")
+ return
} else {
sleep(1000)
count++
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
index 8993962a104..49d901c31c9 100644
---
a/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_adaptive_param.groovy
@@ -74,8 +74,12 @@ suite("test_routine_load_adaptive_param","nonConcurrent") {
logger.info("---test adaptively increase---")
RoutineLoadTestUtils.sendTestDataToKafka(producer,
kafkaCsvTpoics)
- RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "3600")
- RoutineLoadTestUtils.checkTxnTimeoutMatchesTaskTimeout(runSql,
job, "3600000")
+ // Drive data each round so an isEof=false task keeps being
scheduled. The converged
+ // adaptive timeout (3600) lives on the renewed idle task
(txnId == -1), so both checks
+ // poll by value (task timeout col, and the committed txn's
persisted timeout looked up
+ // by task-UUID label) instead of racing a sub-second running
task.
+ RoutineLoadTestUtils.checkTaskTimeoutWithData(runSql,
producer, kafkaCsvTpoics, job, "3600")
+ RoutineLoadTestUtils.checkTxnTimeoutMatchesTaskTimeout(runSql,
producer, kafkaCsvTpoics, job, "3600000")
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName,
2)
} finally {
GetDebugPoint().disableDebugPointForAllFEs(injection)
@@ -84,7 +88,9 @@ suite("test_routine_load_adaptive_param","nonConcurrent") {
logger.info("---test restore adaptively---")
RoutineLoadTestUtils.sendTestDataToKafka(producer, kafkaCsvTpoics)
RoutineLoadTestUtils.waitForTaskFinish(runSql, job, tableName, 4)
- RoutineLoadTestUtils.checkTaskTimeout(runSql, job, "100")
+ // After EOF the adaptive timeout only converges when an isEof
task is scheduled with
+ // data, so keep feeding small batches until the task timeout
restores to the job timeout.
+ RoutineLoadTestUtils.checkTaskTimeoutWithData(runSql, producer,
kafkaCsvTpoics, job, "100")
} finally {
sql "stop routine load for ${job}"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]