This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe7086b1bf [multistage] Lite Mode with Scatter Gather Execution 
(#16000)
fe7086b1bf is described below

commit fe7086b1bfd053585feeb9cfe0aeaa90936958d7
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri Jun 6 20:04:06 2025 -0500

    [multistage] Lite Mode with Scatter Gather Execution (#16000)
---
 .../common/utils/config/QueryOptionsUtils.java     | 16 +++++++
 .../org/apache/pinot/query/QueryEnvironment.java   |  2 +-
 .../query/context/PhysicalPlannerContext.java      | 25 +----------
 .../opt/rules/LeafStageWorkerAssignmentRule.java   |  2 +-
 .../v2/opt/rules/LiteModeWorkerAssignmentRule.java | 30 +++++++++++--
 .../resources/queries/PhysicalOptimizerPlans.json  | 49 +++++++++++++++++++---
 .../query/service/dispatch/QueryDispatcher.java    | 22 ++++------
 .../apache/pinot/spi/utils/CommonConstants.java    |  3 ++
 8 files changed, 101 insertions(+), 48 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5c82c136df..bf29a0914f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -410,6 +410,22 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_REALTIME_SEGMENT_PARTITION,
 "false"));
   }
 
+  public static boolean isUsePhysicalOptimizer(Map<String, String> 
queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
 "false"));
+  }
+
+  public static boolean isUseLiteMode(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE, 
"false"));
+  }
+
+  public static boolean isUseBrokerPruning(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING,
 "false"));
+  }
+
+  public static boolean isRunInBroker(Map<String, String> queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER, 
"false"));
+  }
+
   @Nullable
   private static Integer uncheckedParseInt(String optionName, @Nullable String 
optionValue) {
     if (optionValue == null) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 092fe60c2e..4d737497fd 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -163,7 +163,7 @@ public class QueryEnvironment {
    */
   private PlannerContext getPlannerContext(SqlNodeAndOptions 
sqlNodeAndOptions) {
     WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
-    boolean usePhysicalOptimizer = 
PhysicalPlannerContext.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
+    boolean usePhysicalOptimizer = 
QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
     HepProgram traitProgram = getTraitProgram(workerManager, _envConfig, 
usePhysicalOptimizer);
     SqlExplainFormat format = SqlExplainFormat.DOT;
     if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 3cfd58aa03..bb6ee79a59 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -22,9 +22,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.query.routing.QueryServerInstance;
-import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 
 
 /**
@@ -81,7 +81,7 @@ public class PhysicalPlannerContext {
     _requestId = requestId;
     _instanceId = instanceId;
     _queryOptions = queryOptions == null ? Map.of() : queryOptions;
-    _useLiteMode = PhysicalPlannerContext.useLiteMode(queryOptions);
+    _useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions);
     _instanceIdToQueryServerInstance.put(instanceId, 
getBrokerQueryServerInstance());
   }
 
@@ -125,25 +125,4 @@ public class PhysicalPlannerContext {
   private QueryServerInstance getBrokerQueryServerInstance() {
     return new QueryServerInstance(_instanceId, _hostName, _port, _port);
   }
-
-  public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String> 
queryOptions) {
-    if (queryOptions == null) {
-      return false;
-    }
-    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
 "false"));
-  }
-
-  public static boolean isUseBrokerPruning(@Nullable Map<String, String> 
queryOptions) {
-    if (queryOptions == null) {
-      return false;
-    }
-    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING,
 "false"));
-  }
-
-  private static boolean useLiteMode(@Nullable Map<String, String> 
queryOptions) {
-    if (queryOptions == null) {
-      return false;
-    }
-    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE, 
"false"));
-  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index bd20948507..0993766049 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -119,7 +119,7 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
       leafStageRoot = leafStageRoot == null ? call._currentNode : 
leafStageRoot;
       String tableName = getActualTableName((TableScan) 
call._currentNode.unwrap());
       PinotQuery pinotQuery = 
LeafStageToPinotQuery.createPinotQueryForRouting(tableName, 
leafStageRoot.unwrap(),
-          
!PhysicalPlannerContext.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
+          
!QueryOptionsUtils.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
       return assignTableScan((PhysicalTableScan) call._currentNode, 
_physicalPlannerContext.getRequestId(),
           pinotQuery);
     }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
index 3e09c66022..70bb779f16 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
@@ -28,13 +28,17 @@ import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
 import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
 import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
 
 
@@ -46,16 +50,23 @@ import 
org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
 public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {
   private static final Random RANDOM = new Random();
   private final PhysicalPlannerContext _context;
+  private final boolean _runInBroker;
 
   public LiteModeWorkerAssignmentRule(PhysicalPlannerContext context) {
     _context = context;
+    _runInBroker = QueryOptionsUtils.isRunInBroker(context.getQueryOptions());
   }
 
   @Override
   public PRelNode execute(PRelNode currentNode) {
     Set<String> workerSet = new HashSet<>();
-    accumulateWorkers(currentNode, workerSet);
-    List<String> workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
+    List<String> workers;
+    if (_runInBroker) {
+      workers = List.of(String.format("0@%s", _context.getInstanceId()));
+    } else {
+      accumulateWorkers(currentNode, workerSet);
+      workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
+    }
     PinotDataDistribution pdd = new 
PinotDataDistribution(RelDistribution.Type.SINGLETON, workers, 
workers.hashCode(),
         null, null);
     return addExchangeAndWorkers(currentNode, null, pdd);
@@ -67,13 +78,24 @@ public class LiteModeWorkerAssignmentRule implements 
PRelNodeTransformer {
         return currentNode;
       }
       return new PhysicalExchange(nodeId(), currentNode, pdd, 
Collections.emptyList(),
-          ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+          ExchangeStrategy.SINGLETON_EXCHANGE, 
currentNode.unwrap().getTraitSet().getCollation(),
+          PinotExecStrategyTrait.getDefaultExecStrategy());
     }
     List<PRelNode> newInputs = new ArrayList<>();
     for (PRelNode input : currentNode.getPRelInputs()) {
       newInputs.add(addExchangeAndWorkers(input, currentNode, pdd));
     }
-    return currentNode.with(newInputs, pdd);
+    currentNode = currentNode.with(newInputs, pdd);
+    if (!currentNode.areTraitsSatisfied()) {
+      RelCollation collation = 
currentNode.unwrap().getTraitSet().getCollation();
+      Preconditions.checkState(collation != null && 
!collation.getFieldCollations().isEmpty(),
+          "Expected non-null collation since traits are not satisfied");
+      PinotDataDistribution sortedPDD = new PinotDataDistribution(
+          RelDistribution.Type.SINGLETON, pdd.getWorkers(), 
pdd.getWorkerHash(), null, collation);
+      return new PhysicalSort(currentNode.unwrap().getCluster(), 
RelTraitSet.createEmpty(), List.of(), collation,
+          null, null, currentNode, nodeId(), sortedPDD, false);
+    }
+    return currentNode;
   }
 
   /**
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index 7d09ed3427..6ef59fd734 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -535,11 +535,12 @@
           "\n  PhysicalProject(rnk=[$3], col1=[$0])",
           "\n    PhysicalFilter(condition=[=($3, 1)])",
           "\n      PhysicalWindow(window#0=[window(partition {1} order by [2] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n        PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
-          "\n          
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n            PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], 
limit=[100000])",
-          "\n              PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\n        PhysicalSort(sort0=[$2], dir0=[ASC])",
+          "\n          PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
+          "\n            
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n              PhysicalAggregate(group=[{0, 1, 2}], 
aggType=[LEAF], limit=[100000])",
+          "\n                PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n                  PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -604,6 +605,44 @@
       }
     ]
   },
+  "physical_opt_run_in_broker": {
+    "queries": [
+      {
+        "description": "(run-in-broker) Pagination on group-by results",
+        "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET 
runInBroker=true; EXPLAIN PLAN FOR WITH tmp AS (SELECT col1, col2, col3, 
COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col1, col2, col3 ORDER BY col2) 
SELECT * FROM tmp LIMIT 100,400",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalSort(offset=[100], fetch=[400])",
+          "\n  PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], 
aggType=[FINAL])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], 
aggType=[LEAF], limit=[100000])",
+          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "(run-in-broker) Query with single semi join and 
aggregation",
+        "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET 
runInBroker=true; EXPLAIN PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 = 
'foo' AND col2 IN (SELECT col1 FROM b) GROUP BY col2",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalProject(EXPR$0=[$1], col2=[$0])",
+          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[DIRECT])",
+          "\n    PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
+          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n        PhysicalSort(fetch=[100000])",
+          "\n          PhysicalProject(col2=[$1])",
+          "\n            PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n        PhysicalSort(fetch=[100000])",
+          "\n          PhysicalProject(col1=[$0])",
+          "\n            PhysicalTableScan(table=[[default, b]])",
+          "\n"
+        ]
+      }
+    ]
+  },
   "physical_opt_broker_pruning": {
     "queries": [
       {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 6b3749640b..6b77c22a8b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -68,7 +68,6 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.serde.PlanNodeDeserializer;
 import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
@@ -79,10 +78,11 @@ import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
 import 
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
 import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
 import 
org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
@@ -579,20 +579,13 @@ public class QueryDispatcher {
       long timeoutMs,
       Map<String, String> queryOptions,
       MailboxService mailboxService) {
-
     long startTimeMs = System.currentTimeMillis();
     long deadlineMs = startTimeMs + timeoutMs;
     // NOTE: Reduce stage is always stage 0
     DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
     PlanFragment planFragment = stagePlan.getPlanFragment();
     PlanNode rootNode = planFragment.getFragmentRoot();
-
-    Preconditions.checkState(rootNode instanceof MailboxReceiveNode,
-        "Expecting mailbox receive node as root of reduce stage, got: %s", 
rootNode.getClass().getSimpleName());
-
-    MailboxReceiveNode receiveNode = (MailboxReceiveNode) rootNode;
     List<WorkerMetadata> workerMetadata = stagePlan.getWorkerMetadataList();
-
     Preconditions.checkState(workerMetadata.size() == 1,
         "Expecting single worker for reduce stage, got: %s", 
workerMetadata.size());
 
@@ -603,7 +596,7 @@ public class QueryDispatcher {
             workerMetadata.get(0), null, parentContext, true);
 
     PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
-    DataSchema sourceSchema = receiveNode.getDataSchema();
+    DataSchema sourceSchema = rootNode.getDataSchema();
     int numColumns = resultFields.size();
     String[] columnNames = new String[numColumns];
     ColumnDataType[] columnTypes = new ColumnDataType[numColumns];
@@ -617,8 +610,9 @@ public class QueryDispatcher {
     ArrayList<Object[]> resultRows = new ArrayList<>();
     MseBlock block;
     MultiStageQueryStats queryStats;
-    try (MailboxReceiveOperator receiveOperator = new 
MailboxReceiveOperator(executionContext, receiveNode)) {
-      block = receiveOperator.nextBlock();
+    try (OpChain opChain = PlanNodeToOpChain.convert(rootNode, 
executionContext, (a, b) -> { })) {
+      MultiStageOperator rootOperator = opChain.getRoot();
+      block = rootOperator.nextBlock();
       while (block.isData()) {
         DataBlock dataBlock = ((MseBlock.Data) 
block).asSerialized().getDataBlock();
         int numRows = dataBlock.getNumberOfRows();
@@ -637,9 +631,9 @@ public class QueryDispatcher {
             resultRows.add(row);
           }
         }
-        block = receiveOperator.nextBlock();
+        block = rootOperator.nextBlock();
       }
-      queryStats = receiveOperator.calculateStats();
+      queryStats = rootOperator.calculateStats();
     }
     // TODO: Improve the error handling, e.g. return partial response
     if (block.isError()) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 363c964eb4..7152d83f2b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -648,6 +648,9 @@ public class CommonConstants {
         // TODO(mse-physical): Consider removing this query option and making 
this the default, since there's already
         //   a table config to enable broker pruning (it is disabled by 
default).
         public static final String USE_BROKER_PRUNING = "useBrokerPruning";
+        // When lite mode is enabled, if this flag is set, we will run all the 
non-leaf stage operators within the
+        // broker itself. That way, the MSE queries will model the scatter 
gather pattern used by the V1 Engine.
+        public static final String RUN_IN_BROKER = "runInBroker";
       }
 
       public static class QueryOptionValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to