This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aee5a1f10ae Include MSE pipeline breaker stats by default (#18601)
aee5a1f10ae is described below

commit aee5a1f10ae0d82d8c806e0bf2cebd8b99b886b6
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 12:56:10 2026 -0700

    Include MSE pipeline breaker stats by default (#18601)
---
 .../tests/MultiStageEngineIntegrationTest.java     | 122 +++++++++++----------
 .../apache/pinot/spi/utils/CommonConstants.java    |  11 +-
 2 files changed, 72 insertions(+), 61 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index 025820debb3..e6008353e33 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -2329,16 +2329,13 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
   }
 
   @Test
-  public void testStageStatsPipelineBreaker()
-      throws Exception {
-    HelixConfigScope scope =
-        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
-            .build();
-    try {
-      _helixManager.getConfigAccessor()
-          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
-      String query = "select * from mytable "
-          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+  public void testStageStatsPipelineBreaker() {
+    String query = "select * from mytable "
+        + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+    // Pipeline breaker stats are kept by default. Retry in case a sibling 
test that overrode the default has just
+    // finished and the reset has not yet propagated to the server.
+    String errorMsg = "Failed to verify presence of pipeline breaker stats 
after multiple attempts";
+    TestUtils.waitForCondition(() -> {
       JsonNode response = postQuery(query);
       assertNotNull(response.get("stageStats"), "Should have stage stats");
 
@@ -2352,6 +2349,11 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
       
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
       
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
 
+      if (mytableLeaf.get("children") == null) {
+        // Sibling test's reset has not yet propagated. Retry.
+        return false;
+      }
+
       JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
       
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
 
@@ -2361,29 +2363,24 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
       JsonNode dayOfWeekLeaf = pipelineSend.get("children").get(0);
       
Assertions.assertThat(dayOfWeekLeaf.get("type").asText()).isEqualTo("LEAF");
       
Assertions.assertThat(dayOfWeekLeaf.get("table").asText()).isEqualTo("daysOfWeek");
-    } finally {
-      _helixManager.getConfigAccessor()
-          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
-    }
+      return true;
+    }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
   }
 
   @Test
-  public void testPipelineBreakerKeepsNumGroupsLimitReached()
-      throws Exception {
-    HelixConfigScope scope =
-        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
-            .build();
-    try {
-      _helixManager.getConfigAccessor()
-          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
-      String query = ""
-          + "SET numGroupsLimit = 1;"
-          + "SELECT * FROM daysOfWeek "
-          + "WHERE dayid in ("
-          + " SELECT DayOfWeek FROM mytable"
-          + " GROUP BY DayOfWeek"
-          + ")";
+  public void testPipelineBreakerKeepsNumGroupsLimitReached() {
+    String query = ""
+        + "SET numGroupsLimit = 1;"
+        + "SELECT * FROM daysOfWeek "
+        + "WHERE dayid in ("
+        + " SELECT DayOfWeek FROM mytable"
+        + " GROUP BY DayOfWeek"
+        + ")";
 
+    // Pipeline breaker stats are kept by default. Retry in case a sibling 
test that overrode the default has just
+    // finished and the reset has not yet propagated to the server.
+    String errorMsg = "Failed to verify numGroupsLimitReached on a pipeline 
breaker after multiple attempts";
+    TestUtils.waitForCondition(() -> {
       JsonNode response = postQuery(query);
       assertNotNull(response.get("stageStats"), "Should have stage stats");
 
@@ -2397,6 +2394,11 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
       
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
       
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualToIgnoringCase("daysOfWeek");
 
+      if (mytableLeaf.get("children") == null) {
+        // Sibling test's reset has not yet propagated. Retry.
+        return false;
+      }
+
       JsonNode pipelineReceive = mytableLeaf.get("children").get(0);
       
Assertions.assertThat(pipelineReceive.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
 
@@ -2406,36 +2408,46 @@ public class MultiStageEngineIntegrationTest extends 
BaseClusterIntegrationTestS
       
Assertions.assertThat(response.get("numGroupsLimitReached").asBoolean(false))
           .describedAs("numGroupsLimitReached should be true even when the 
limit is reached on a pipeline breaker")
           .isEqualTo(true);
-    } finally {
-      _helixManager.getConfigAccessor()
-          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
-    }
+      return true;
+    }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
   }
 
   @Test
   public void testPipelineBreakerWithoutKeepingStats() {
-    // let's try several times to give helix time to propagate the config 
change
-    String errorMsg = "Failed to verify absence of pipeline breaker stats 
after multiple attempts after 10 attempts";
-    TestUtils.waitForCondition(() -> {
-      String query = "select * from mytable "
-          + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
-      JsonNode response = postQuery(query);
-      assertNotNull(response.get("stageStats"), "Should have stage stats");
-
-      JsonNode receiveNode = response.get("stageStats");
-      
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
-
-      JsonNode sendNode = receiveNode.get("children").get(0);
-      
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
-
-      JsonNode mytableLeaf = sendNode.get("children").get(0);
-      
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
-      
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
-
-      Assert.assertNull(mytableLeaf.get("children"), "When pipeline breaker 
stats are not kept, "
-          + "there should be no children under the leaf node");
-      return true;
-    }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    try {
+      // Pipeline breaker stats are kept by default, so explicitly skip them 
for this test.
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"true");
+      // let's try several times to give helix time to propagate the config 
change
+      String errorMsg = "Failed to verify absence of pipeline breaker stats 
after multiple attempts after 10 attempts";
+      TestUtils.waitForCondition(() -> {
+        String query = "select * from mytable "
+            + "WHERE DayOfWeek in (select dayid from daysOfWeek)";
+        JsonNode response = postQuery(query);
+        assertNotNull(response.get("stageStats"), "Should have stage stats");
+
+        JsonNode receiveNode = response.get("stageStats");
+        
Assertions.assertThat(receiveNode.get("type").asText()).isEqualTo("MAILBOX_RECEIVE");
+
+        JsonNode sendNode = receiveNode.get("children").get(0);
+        
Assertions.assertThat(sendNode.get("type").asText()).isEqualTo("MAILBOX_SEND");
+
+        JsonNode mytableLeaf = sendNode.get("children").get(0);
+        
Assertions.assertThat(mytableLeaf.get("type").asText()).isEqualTo("LEAF");
+        
Assertions.assertThat(mytableLeaf.get("table").asText()).isEqualTo("mytable");
+
+        // Once the config change has propagated, there should be no children 
(pipeline breaker stats) under the leaf
+        // node. Return the result instead of asserting so that 
waitForCondition keeps retrying while the change is
+        // still propagating (AssertionError would not be caught and retried).
+        return mytableLeaf.get("children") == null;
+      }, 100, 10_000L, errorMsg, Duration.ofSeconds(1));
+    } finally {
+      _helixManager.getConfigAccessor()
+          .set(scope, 
CommonConstants.MultiStageQueryRunner.KEY_OF_SKIP_PIPELINE_BREAKER_STATS, 
"false");
+    }
   }
 
   @AfterClass
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 694f64cc4e3..20ec9305f5a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2526,13 +2526,12 @@ public class CommonConstants {
     public static final String KEY_OF_SEND_STATS_MODE = 
"pinot.query.mse.stats.mode";
     public static final String DEFAULT_SEND_STATS_MODE = "ALWAYS";
 
-    /// Used to indicate whether MSE pipeline breaker stats should be included 
in the queryStats field.
-    /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker 
stats were not kept. Starting from 1.5.0,
-    /// they are not included by default but can be included by setting this 
flag to false (upper or lower case).
-    ///
-    /// It is expected that in 1.6.0 and later, MSE pipeline breaker stats 
will be included by default.
+    /// Used to indicate whether MSE pipeline breaker stats should be included 
in the stageStats field.
+    /// This flag was introduced in 1.5.0. Before 1.5.0, MSE pipeline breaker 
stats were not kept. In 1.5.0 they were
+    /// not included by default but could be included by setting this flag to 
false (upper or lower case). Starting
+    /// from 1.6.0, they are included by default and can be excluded by 
setting this flag to true (upper or lower case).
     public static final String KEY_OF_SKIP_PIPELINE_BREAKER_STATS = 
"pinot.query.mse.skip.pipeline.breaker.stats";
-    public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = true;
+    public static final boolean DEFAULT_SKIP_PIPELINE_BREAKER_STATS = false;
 
     /// Used to indicate that MSE stats should be logged at INFO level for 
successful queries.
     ///


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to