This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 43ab5eb269b7be5f175400827f0c75cfdf0bb739
Author: 924060929 <[email protected]>
AuthorDate: Fri Mar 27 19:42:01 2026 +0800

    [feat](profile) add Is Profile Collection Completed marker in profile text
    
    Add IS_PROFILE_COLLECTION_COMPLETED field to SummaryProfile, written in
    queryFinished() after waitForFragmentsDone() ensures all BE pipeline task
    profiles are merged. Readers can poll for this field instead of using a
    fixed sleep to determine when the profile is fully populated.
    
    Update the local-shuffle consistency regression test to use this marker
    instead of a stability-based heuristic (two consecutive identical reads),
    which could incorrectly return a stable-but-incomplete profile.
---
 .../org/apache/doris/common/profile/SummaryProfile.java  |  8 ++++++++
 .../test_local_shuffle_fe_be_consistency.groovy          | 16 ++++++----------
 2 files changed, 14 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 1d0dde2f197..6ffc9d81fe8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -113,6 +113,10 @@ public class SummaryProfile {
     public static final String READ_BYTES_PER_SECOND = "Read Bytes Per Second";
     public static final String REMOTE_READ_BYTES_PER_SECOND = "Remote Read 
Bytes Per Second";
 
+    // Written last in queryFinished(), after all BE fragment profiles are 
merged.
+    // Readers can poll for this field to know the profile is fully collected.
+    public static final String IS_PROFILE_COLLECTION_COMPLETED = "Is Profile 
Collection Completed";
+
     public static final String PARSE_SQL_TIME = "Parse SQL Time";
     public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table 
Time";
     public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
@@ -508,6 +512,10 @@ public class SummaryProfile {
     // This method is used to display the final data status when the overall 
query ends.
     // This can avoid recalculating some strings and so on every time during 
the update process.
     public void queryFinished() {
+        // Mark profile collection as complete. This is written last, after 
all BE fragment
+        // profiles have been merged (called post-waitForFragmentsDone). 
Pollers can wait for
+        // this field to avoid reading a partial profile.
+        executionSummaryProfile.addInfoString(IS_PROFILE_COLLECTION_COMPLETED, 
"true");
         if (assignedWeightPerBackend != null) {
             Map<String, Long> m = assignedWeightPerBackend.entrySet().stream()
                     .sorted(Map.Entry.comparingByValue())
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
index 1070694a18b..48f60313e7d 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_fe_be_consistency.groovy
@@ -80,24 +80,20 @@ suite("test_local_shuffle_fe_be_consistency") {
     //  enableFePlanner=true  → enable_local_shuffle_planner=true  (FE plans 
exchanges)
     //  enableFePlanner=false → enable_local_shuffle_planner=false (BE plans 
natively)
     // ============================================================
-    // Poll until the profile for queryId is stable (two consecutive reads 
match).
-    // The query ID appears in the header early, but operator metrics are 
written asynchronously
-    // after the query finishes. A stable profile means writing is complete.
+    // Poll until the profile contains "Is Profile Collection Completed: true".
+    // This field is written by SummaryProfile.queryFinished() after 
waitForFragmentsDone(),
+    // guaranteeing all BE operator metrics have been merged into the profile.
     def waitForProfile = { String queryId ->
         def maxAttempts = 30
         def sleepMs = 300
-        String prev = ""
         for (int i = 0; i < maxAttempts; i++) {
             Thread.sleep(sleepMs)
             try {
                 def text = getProfile(queryId)
-                if (text.contains(queryId) && text == prev) {
-                    return text  // stable across two consecutive reads → 
complete
+                if (text.contains("Is Profile Collection Completed")) {
+                    return text
                 }
-                prev = text
-            } catch (Exception ignored) {
-                prev = ""
-            }
+            } catch (Exception ignored) {}
         }
         return getProfile(queryId)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to