This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit 43ab5eb269b7be5f175400827f0c75cfdf0bb739 Author: 924060929 <[email protected]> AuthorDate: Fri Mar 27 19:42:01 2026 +0800 [feat](profile) add Is Profile Collection Completed marker in profile text Add IS_PROFILE_COLLECTION_COMPLETED field to SummaryProfile, written in queryFinished() after waitForFragmentsDone() ensures all BE pipeline task profiles are merged. Readers can poll for this field instead of using a fixed sleep to determine when the profile is fully populated. Update the local-shuffle consistency regression test to use this marker instead of a stability-based heuristic (two consecutive identical reads), which could incorrectly return a stable-but-incomplete profile. --- .../org/apache/doris/common/profile/SummaryProfile.java | 8 ++++++++ .../test_local_shuffle_fe_be_consistency.groovy | 16 ++++++---------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 1d0dde2f197..6ffc9d81fe8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -113,6 +113,10 @@ public class SummaryProfile { public static final String READ_BYTES_PER_SECOND = "Read Bytes Per Second"; public static final String REMOTE_READ_BYTES_PER_SECOND = "Remote Read Bytes Per Second"; + // Written last in queryFinished(), after all BE fragment profiles are merged. + // Readers can poll for this field to know the profile is fully collected. + public static final String IS_PROFILE_COLLECTION_COMPLETED = "Is Profile Collection Completed"; + public static final String PARSE_SQL_TIME = "Parse SQL Time"; public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table Time"; public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time"; @@ -508,6 +512,10 @@ public class SummaryProfile { // This method is used to display the final data status when the overall query ends. // This can avoid recalculating some strings and so on every time during the update process. public void queryFinished() { + // Mark profile collection as complete. This is written last, after all BE fragment + // profiles have been merged (called post-waitForFragmentsDone). Pollers can wait for + // this field to avoid reading a partial profile. + executionSummaryProfile.addInfoString(IS_PROFILE_COLLECTION_COMPLETED, "true"); if (assignedWeightPerBackend != null) { Map<String, Long> m = assignedWeightPerBackend.entrySet().stream() .sorted(Map.Entry.comparingByValue()) diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy index 1070694a18b..48f60313e7d 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy @@ -80,24 +80,20 @@ suite("test_local_shuffle_fe_be_consistency") { // enableFePlanner=true → enable_local_shuffle_planner=true (FE plans exchanges) // enableFePlanner=false → enable_local_shuffle_planner=false (BE plans natively) // ============================================================ - // Poll until the profile for queryId is stable (two consecutive reads match). - // The query ID appears in the header early, but operator metrics are written asynchronously - // after the query finishes. A stable profile means writing is complete. + // Poll until the profile contains "Is Profile Collection Completed: true". + // This field is written by SummaryProfile.queryFinished() after waitForFragmentsDone(), + // guaranteeing all BE operator metrics have been merged into the profile. def waitForProfile = { String queryId -> def maxAttempts = 30 def sleepMs = 300 - String prev = "" for (int i = 0; i < maxAttempts; i++) { Thread.sleep(sleepMs) try { def text = getProfile(queryId) - if (text.contains(queryId) && text == prev) { - return text // stable across two consecutive reads → complete + if (text.contains("Is Profile Collection Completed")) { + return text } - prev = text - } catch (Exception ignored) { - prev = "" - } + } catch (Exception ignored) {} } return getProfile(queryId) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
