This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit c0248ced8adaba5fccb5f75ce49859d05908ccea Author: 924060929 <[email protected]> AuthorDate: Fri Mar 27 19:08:33 2026 +0800 [test](local shuffle) fix flaky profile fetch in FE/BE consistency test Replace fixed Thread.sleep(1500) with a stability-based polling loop: poll until two consecutive reads of the profile are identical, which indicates the asynchronous operator-metric writing is complete. The old fixed sleep occasionally fetched an incomplete profile (query ID appears in the header early, but pipeline operator metrics are written asynchronously), causing false MISMATCH reports. --- .../test_local_shuffle_fe_be_consistency.groovy | 27 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy index 98ca9fff86d..1070694a18b 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy @@ -80,6 +80,28 @@ suite("test_local_shuffle_fe_be_consistency") { // enableFePlanner=true → enable_local_shuffle_planner=true (FE plans exchanges) // enableFePlanner=false → enable_local_shuffle_planner=false (BE plans natively) // ============================================================ + // Poll until the profile for queryId is stable (two consecutive reads match). + // The query ID appears in the header early, but operator metrics are written asynchronously + // after the query finishes. A stable profile means writing is complete. + def waitForProfile = { String queryId -> + def maxAttempts = 30 + def sleepMs = 300 + String prev = "" + for (int i = 0; i < maxAttempts; i++) { + Thread.sleep(sleepMs) + try { + def text = getProfile(queryId) + if (text.contains(queryId) && text == prev) { + return text // stable across two consecutive reads → complete + } + prev = text + } catch (Exception ignored) { + prev = "" + } + } + return getProfile(queryId) + } + def runAndGetSinkCounts = { String testSql, boolean enableFePlanner -> sql "set enable_profile=true" sql "set enable_local_shuffle_planner=${enableFePlanner}" @@ -90,10 +112,7 @@ suite("test_local_shuffle_fe_be_consistency") { def queryIdResult = sql "select last_query_id()" def queryId = queryIdResult[0][0].toString() - // Wait for profile to be fully collected - Thread.sleep(1500) - - def profileText = getProfile(queryId) + def profileText = waitForProfile(queryId) def counts = extractSinkExchangeCounts(profileText) logger.info("enable_local_shuffle_planner=${enableFePlanner}, query_id=${queryId}, LE sink counts=${counts}") return [queryId: queryId, counts: counts, profile: profileText] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
