This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe7086b1bf [multistage] Lite Mode with Scatter Gather Execution
(#16000)
fe7086b1bf is described below
commit fe7086b1bfd053585feeb9cfe0aeaa90936958d7
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri Jun 6 20:04:06 2025 -0500
[multistage] Lite Mode with Scatter Gather Execution (#16000)
---
.../common/utils/config/QueryOptionsUtils.java | 16 +++++++
.../org/apache/pinot/query/QueryEnvironment.java | 2 +-
.../query/context/PhysicalPlannerContext.java | 25 +----------
.../opt/rules/LeafStageWorkerAssignmentRule.java | 2 +-
.../v2/opt/rules/LiteModeWorkerAssignmentRule.java | 30 +++++++++++--
.../resources/queries/PhysicalOptimizerPlans.json | 49 +++++++++++++++++++---
.../query/service/dispatch/QueryDispatcher.java | 22 ++++------
.../apache/pinot/spi/utils/CommonConstants.java | 3 ++
8 files changed, 101 insertions(+), 48 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5c82c136df..bf29a0914f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -410,6 +410,22 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.INFER_REALTIME_SEGMENT_PARTITION,
"false"));
}
+ public static boolean isUsePhysicalOptimizer(Map<String, String>
queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
"false"));
+ }
+
+ public static boolean isUseLiteMode(Map<String, String> queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE,
"false"));
+ }
+
+ public static boolean isUseBrokerPruning(Map<String, String> queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING,
"false"));
+ }
+
+ public static boolean isRunInBroker(Map<String, String> queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER,
"false"));
+ }
+
@Nullable
private static Integer uncheckedParseInt(String optionName, @Nullable String
optionValue) {
if (optionValue == null) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 092fe60c2e..4d737497fd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -163,7 +163,7 @@ public class QueryEnvironment {
*/
private PlannerContext getPlannerContext(SqlNodeAndOptions
sqlNodeAndOptions) {
WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
- boolean usePhysicalOptimizer =
PhysicalPlannerContext.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
+ boolean usePhysicalOptimizer =
QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions());
HepProgram traitProgram = getTraitProgram(workerManager, _envConfig,
usePhysicalOptimizer);
SqlExplainFormat format = SqlExplainFormat.DOT;
if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 3cfd58aa03..bb6ee79a59 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -22,9 +22,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.query.routing.QueryServerInstance;
-import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
/**
@@ -81,7 +81,7 @@ public class PhysicalPlannerContext {
_requestId = requestId;
_instanceId = instanceId;
_queryOptions = queryOptions == null ? Map.of() : queryOptions;
- _useLiteMode = PhysicalPlannerContext.useLiteMode(queryOptions);
+ _useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions);
_instanceIdToQueryServerInstance.put(instanceId,
getBrokerQueryServerInstance());
}
@@ -125,25 +125,4 @@ public class PhysicalPlannerContext {
private QueryServerInstance getBrokerQueryServerInstance() {
return new QueryServerInstance(_instanceId, _hostName, _port, _port);
}
-
- public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String>
queryOptions) {
- if (queryOptions == null) {
- return false;
- }
- return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
"false"));
- }
-
- public static boolean isUseBrokerPruning(@Nullable Map<String, String>
queryOptions) {
- if (queryOptions == null) {
- return false;
- }
- return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING,
"false"));
- }
-
- private static boolean useLiteMode(@Nullable Map<String, String>
queryOptions) {
- if (queryOptions == null) {
- return false;
- }
- return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE,
"false"));
- }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index bd20948507..0993766049 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -119,7 +119,7 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
leafStageRoot = leafStageRoot == null ? call._currentNode :
leafStageRoot;
String tableName = getActualTableName((TableScan)
call._currentNode.unwrap());
PinotQuery pinotQuery =
LeafStageToPinotQuery.createPinotQueryForRouting(tableName,
leafStageRoot.unwrap(),
-
!PhysicalPlannerContext.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
+
!QueryOptionsUtils.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
return assignTableScan((PhysicalTableScan) call._currentNode,
_physicalPlannerContext.getRequestId(),
pinotQuery);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
index 3e09c66022..70bb779f16 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java
@@ -28,13 +28,17 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.query.context.PhysicalPlannerContext;
import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
@@ -46,16 +50,23 @@ import
org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer {
private static final Random RANDOM = new Random();
private final PhysicalPlannerContext _context;
+ private final boolean _runInBroker;
public LiteModeWorkerAssignmentRule(PhysicalPlannerContext context) {
_context = context;
+ _runInBroker = QueryOptionsUtils.isRunInBroker(context.getQueryOptions());
}
@Override
public PRelNode execute(PRelNode currentNode) {
Set<String> workerSet = new HashSet<>();
- accumulateWorkers(currentNode, workerSet);
- List<String> workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
+ List<String> workers;
+ if (_runInBroker) {
+ workers = List.of(String.format("0@%s", _context.getInstanceId()));
+ } else {
+ accumulateWorkers(currentNode, workerSet);
+ workers = List.of(sampleWorker(new ArrayList<>(workerSet)));
+ }
PinotDataDistribution pdd = new
PinotDataDistribution(RelDistribution.Type.SINGLETON, workers,
workers.hashCode(),
null, null);
return addExchangeAndWorkers(currentNode, null, pdd);
@@ -67,13 +78,24 @@ public class LiteModeWorkerAssignmentRule implements
PRelNodeTransformer {
return currentNode;
}
return new PhysicalExchange(nodeId(), currentNode, pdd,
Collections.emptyList(),
- ExchangeStrategy.SINGLETON_EXCHANGE, null,
PinotExecStrategyTrait.getDefaultExecStrategy());
+ ExchangeStrategy.SINGLETON_EXCHANGE,
currentNode.unwrap().getTraitSet().getCollation(),
+ PinotExecStrategyTrait.getDefaultExecStrategy());
}
List<PRelNode> newInputs = new ArrayList<>();
for (PRelNode input : currentNode.getPRelInputs()) {
newInputs.add(addExchangeAndWorkers(input, currentNode, pdd));
}
- return currentNode.with(newInputs, pdd);
+ currentNode = currentNode.with(newInputs, pdd);
+ if (!currentNode.areTraitsSatisfied()) {
+ RelCollation collation =
currentNode.unwrap().getTraitSet().getCollation();
+ Preconditions.checkState(collation != null &&
!collation.getFieldCollations().isEmpty(),
+ "Expected non-null collation since traits are not satisfied");
+ PinotDataDistribution sortedPDD = new PinotDataDistribution(
+ RelDistribution.Type.SINGLETON, pdd.getWorkers(),
pdd.getWorkerHash(), null, collation);
+ return new PhysicalSort(currentNode.unwrap().getCluster(),
RelTraitSet.createEmpty(), List.of(), collation,
+ null, null, currentNode, nodeId(), sortedPDD, false);
+ }
+ return currentNode;
}
/**
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index 7d09ed3427..6ef59fd734 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -535,11 +535,12 @@
"\n PhysicalProject(rnk=[$3], col1=[$0])",
"\n PhysicalFilter(condition=[=($3, 1)])",
"\n PhysicalWindow(window#0=[window(partition {1} order by [2]
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
- "\n
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
- "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF],
limit=[100000])",
- "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
- "\n PhysicalTableScan(table=[[default, a]])",
+ "\n PhysicalSort(sort0=[$2], dir0=[ASC])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
+ "\n
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}],
aggType=[LEAF], limit=[100000])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
"\n"
]
},
@@ -604,6 +605,44 @@
}
]
},
+ "physical_opt_run_in_broker": {
+ "queries": [
+ {
+ "description": "(run-in-broker) Pagination on group-by results",
+ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET
runInBroker=true; EXPLAIN PLAN FOR WITH tmp AS (SELECT col1, col2, col3,
COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col1, col2, col3 ORDER BY col2)
SELECT * FROM tmp LIMIT 100,400",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalSort(offset=[100], fetch=[400])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)],
aggType=[FINAL])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()],
aggType=[LEAF], limit=[100000])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "(run-in-broker) Query with single semi join and
aggregation",
+ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; SET
runInBroker=true; EXPLAIN PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 =
'foo' AND col2 IN (SELECT col1 FROM b) GROUP BY col2",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalProject(EXPR$0=[$1], col2=[$0])",
+ "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()],
aggType=[DIRECT])",
+ "\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(fetch=[100000])",
+ "\n PhysicalProject(col2=[$1])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(fetch=[100000])",
+ "\n PhysicalProject(col1=[$0])",
+ "\n PhysicalTableScan(table=[[default, b]])",
+ "\n"
+ ]
+ }
+ ]
+ },
"physical_opt_broker_pruning": {
"queries": [
{
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 6b3749640b..6b77c22a8b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -68,7 +68,6 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.serde.PlanNodeDeserializer;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
@@ -79,10 +78,11 @@ import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
import
org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesBrokerPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import
org.apache.pinot.query.service.dispatch.timeseries.TimeSeriesDispatchClient;
@@ -579,20 +579,13 @@ public class QueryDispatcher {
long timeoutMs,
Map<String, String> queryOptions,
MailboxService mailboxService) {
-
long startTimeMs = System.currentTimeMillis();
long deadlineMs = startTimeMs + timeoutMs;
// NOTE: Reduce stage is always stage 0
DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
PlanFragment planFragment = stagePlan.getPlanFragment();
PlanNode rootNode = planFragment.getFragmentRoot();
-
- Preconditions.checkState(rootNode instanceof MailboxReceiveNode,
- "Expecting mailbox receive node as root of reduce stage, got: %s",
rootNode.getClass().getSimpleName());
-
- MailboxReceiveNode receiveNode = (MailboxReceiveNode) rootNode;
List<WorkerMetadata> workerMetadata = stagePlan.getWorkerMetadataList();
-
Preconditions.checkState(workerMetadata.size() == 1,
"Expecting single worker for reduce stage, got: %s",
workerMetadata.size());
@@ -603,7 +596,7 @@ public class QueryDispatcher {
workerMetadata.get(0), null, parentContext, true);
PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
- DataSchema sourceSchema = receiveNode.getDataSchema();
+ DataSchema sourceSchema = rootNode.getDataSchema();
int numColumns = resultFields.size();
String[] columnNames = new String[numColumns];
ColumnDataType[] columnTypes = new ColumnDataType[numColumns];
@@ -617,8 +610,9 @@ public class QueryDispatcher {
ArrayList<Object[]> resultRows = new ArrayList<>();
MseBlock block;
MultiStageQueryStats queryStats;
- try (MailboxReceiveOperator receiveOperator = new
MailboxReceiveOperator(executionContext, receiveNode)) {
- block = receiveOperator.nextBlock();
+ try (OpChain opChain = PlanNodeToOpChain.convert(rootNode,
executionContext, (a, b) -> { })) {
+ MultiStageOperator rootOperator = opChain.getRoot();
+ block = rootOperator.nextBlock();
while (block.isData()) {
DataBlock dataBlock = ((MseBlock.Data)
block).asSerialized().getDataBlock();
int numRows = dataBlock.getNumberOfRows();
@@ -637,9 +631,9 @@ public class QueryDispatcher {
resultRows.add(row);
}
}
- block = receiveOperator.nextBlock();
+ block = rootOperator.nextBlock();
}
- queryStats = receiveOperator.calculateStats();
+ queryStats = rootOperator.calculateStats();
}
// TODO: Improve the error handling, e.g. return partial response
if (block.isError()) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 363c964eb4..7152d83f2b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -648,6 +648,9 @@ public class CommonConstants {
// TODO(mse-physical): Consider removing this query option and making
this the default, since there's already
// a table config to enable broker pruning (it is disabled by
default).
public static final String USE_BROKER_PRUNING = "useBrokerPruning";
+ // When lite mode is enabled, if this flag is set, we will run all the
non-leaf stage operators within the
+ // broker itself. That way, the MSE queries will model the scatter
gather pattern used by the V1 Engine.
+ public static final String RUN_IN_BROKER = "runInBroker";
}
public static class QueryOptionValue {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]