This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a0cba7b2838 Add broker config to enable streaming group-by by default 
for a cluster (#18510)
a0cba7b2838 is described below

commit a0cba7b2838c6f90bfe83c8a6dcfdedd33fbd24e
Author: Yash Mayya <[email protected]>
AuthorDate: Wed May 20 21:12:21 2026 -0700

    Add broker config to enable streaming group-by by default for a cluster 
(#18510)
---
 .../MultiStageBrokerRequestHandler.java            | 26 +++++++
 .../MultiStageBrokerRequestHandlerTest.java        | 81 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    | 10 +++
 3 files changed, 117 insertions(+)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 846f6acb3c8..a5ad9811d60 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.requesthandler;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -139,6 +140,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   private final Set<String> _defaultDisabledPlannerRules;
   protected final long _extraPassiveTimeoutMs;
   protected final boolean _enableQueryFingerprinting;
+  @Nullable
+  protected final String _defaultStreamingGroupByFlushThreshold;
 
   protected final PinotMeter _stagesStartedMeter = 
BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter();
   protected final PinotMeter _stagesFinishedMeter = 
BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter();
@@ -204,6 +207,13 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     _enableQueryFingerprinting = _config.getProperty(
         CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING,
         CommonConstants.Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING);
+    int streamingGroupByFlushThreshold = _config.getProperty(
+        
CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD,
+        CommonConstants.Broker.DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD);
+    // Pre-format the threshold once so that we don't allocate a new String on 
every query when the feature is enabled.
+    // null indicates "feature disabled", which matches the 
broker-config-unset case.
+    _defaultStreamingGroupByFlushThreshold =
+        streamingGroupByFlushThreshold > 0 ? 
Integer.toString(streamingGroupByFlushThreshold) : null;
   }
 
   @Override
@@ -398,6 +408,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
       checkAuthorization(requesterIdentity, requestContext, httpHeaders, 
compiledQuery, rlsFiltersApplied);
 
+      // Apply broker-default query options before branching to 
EXPLAIN/execute so both paths see the same options.
+      applyBrokerDefaultQueryOptions(compiledQuery.getOptions());
+
       if (sqlNodeAndOptions.getSqlNode().getKind() == SqlKind.EXPLAIN) {
         return explain(compiledQuery, requestId, requestContext, queryTimer);
       } else {
@@ -545,6 +558,19 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         .build();
   }
 
+  /**
+   * Applies broker-level defaults for MSE query options. Per-query overrides 
(i.e. {@code SET option = value} in the
+   * SQL text) always win because we use {@link Map#putIfAbsent} — a user can 
set the option to {@code 0} to opt out of
+   * a streaming default that the cluster has enabled.
+   */
+  @VisibleForTesting
+  void applyBrokerDefaultQueryOptions(Map<String, String> queryOptions) {
+    if (_defaultStreamingGroupByFlushThreshold != null) {
+      
queryOptions.putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD,
+          _defaultStreamingGroupByFlushThreshold);
+    }
+  }
+
   private long getTimeoutMs(Map<String, String> queryOptions) {
     Long timeoutMsFromQueryOption = 
QueryOptionsUtils.getTimeoutMs(queryOptions);
     return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : 
_brokerTimeoutMs;
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index 7bb7b077610..b4b5a13b5fc 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
@@ -37,6 +39,8 @@ import org.apache.pinot.spi.auth.broker.RequesterIdentity;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
 import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
 import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -108,4 +112,81 @@ public class MultiStageBrokerRequestHandlerTest {
     Assert.assertNotNull(capturedResponse.get(),
         "onQueryCompletion hook must be called with the BrokerResponse from 
handleRequest for MSE");
   }
+
+  @Test
+  public void 
testApplyBrokerDefaultQueryOptionsInjectsStreamingGroupByFlushThreshold()
+      throws Exception {
+    // When the broker config is set, the option is injected for queries that 
don't already specify it.
+    MultiStageBrokerRequestHandler handler = 
newHandlerWithStreamingGroupByFlushThreshold("5000");
+
+    Map<String, String> queryOptions = new HashMap<>();
+    handler.applyBrokerDefaultQueryOptions(queryOptions);
+    
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
 "5000",
+        "Broker default should be injected when query option is absent");
+  }
+
+  @Test
+  public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins()
+      throws Exception {
+    // A per-query SET — including SET = 0 to disable — must take precedence 
over the broker default.
+    MultiStageBrokerRequestHandler handler = 
newHandlerWithStreamingGroupByFlushThreshold("5000");
+
+    Map<String, String> queryOptions = new HashMap<>();
+    queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "0");
+    handler.applyBrokerDefaultQueryOptions(queryOptions);
+    
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
 "0",
+        "Per-query SET = 0 must override the broker default");
+
+    queryOptions.clear();
+    queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "100");
+    handler.applyBrokerDefaultQueryOptions(queryOptions);
+    
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
 "100",
+        "Per-query SET must take precedence over the broker default");
+  }
+
+  @Test
+  public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset()
+      throws Exception {
+    // With the broker config unset (default -1), no option is injected.
+    MultiStageBrokerRequestHandler handler = 
newHandlerWithStreamingGroupByFlushThreshold(null);
+
+    Map<String, String> queryOptions = new HashMap<>();
+    handler.applyBrokerDefaultQueryOptions(queryOptions);
+    
Assert.assertFalse(queryOptions.containsKey(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
+        "No option should be injected when the broker default is unset");
+  }
+
+  private static MultiStageBrokerRequestHandler 
newHandlerWithStreamingGroupByFlushThreshold(
+      @Nullable String streamingGroupByFlushThreshold)
+      throws Exception {
+    PinotConfiguration config = new PinotConfiguration();
+    config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, 
"localhost");
+    config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, 
Integer.toString(NetUtils.findOpenPort()));
+    if (streamingGroupByFlushThreshold != null) {
+      
config.setProperty(CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD,
+          streamingGroupByFlushThreshold);
+    }
+    BrokerQueryEventListenerFactory.init(config);
+    BrokerMetrics.register(mock(BrokerMetrics.class));
+
+    QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
+    when(queryQuotaManager.acquire(anyString())).thenReturn(true);
+    when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
+    when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true);
+
+    return new MultiStageBrokerRequestHandler(config, "testBrokerId", new 
BrokerRequestIdGenerator(),
+        mock(RoutingManager.class), new AllowAllAccessControlFactory(), 
queryQuotaManager,
+        mock(TableCache.class), mock(MultiStageQueryThrottler.class), 
mock(FailureDetector.class),
+        ThreadAccountantUtils.getNoOpAccountant(), null, 
mock(WorkerManager.class), mock(WorkerManager.class)) {
+      @Override
+      public void start() {
+        // Skip dispatcher.start() and Calcite warmupCompile — neither is 
needed for this test.
+      }
+
+      @Override
+      public void shutDown() {
+        // Match start() — no dispatcher was started, so there is nothing to 
shut down.
+      }
+    };
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ac64394aad0..39b0e22a146 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -507,6 +507,16 @@ public class CommonConstants {
       public static final String CONFIG_OF_IGNORE_MISSING_SEGMENTS =
           "pinot.broker.query.ignore.missing.segments";
       public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false;
+
+    /**
+     * Default flush threshold for the streaming group-by leaf-stage operator 
on MSE. When positive, the broker
+     * injects this value as the `streamingGroupByFlushThreshold` query option 
for MSE queries that do not already
+     * specify it, opting the cluster into the streaming group-by behavior by 
default. Setting the query option
+     * explicitly (including to `0` to disable) always wins over the broker 
default.
+     */
+    public static final String 
CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD =
+        "pinot.broker.mse.streaming.group.by.flush.threshold";
+    public static final int DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD = 
-1;
     // Whether to infer partition hint by default or not.
     // This value can always be overridden by INFER_PARTITION_HINT query option
     public static final String CONFIG_OF_INFER_PARTITION_HINT = 
"pinot.broker.multistage.infer.partition.hint";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to