This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6f9079921b [multistage] visitor pattern for server request
construction (#9653)
6f9079921b is described below
commit 6f9079921b23c899d1bb906c5ca6d30159c0b555
Author: Rong Rong <[email protected]>
AuthorDate: Mon Oct 31 10:40:14 2022 -0700
[multistage] visitor pattern for server request construction (#9653)
* use visitor pattern for server request construction
* move MailboxService into context
Co-authored-by: Rong Rong <[email protected]>
---
.../apache/pinot/query/runtime/QueryRunner.java | 55 ++++-
.../runtime/executor/WorkerQueryExecutor.java | 6 +-
.../{executor => plan}/PhysicalPlanVisitor.java | 75 +++----
.../query/runtime/plan/PlanRequestContext.java | 68 ++++++
.../runtime/plan/ServerRequestPlanVisitor.java | 242 +++++++++++++++++++++
.../plan/server/ServerPlanRequestContext.java | 67 ++++++
.../query/runtime/utils/ServerRequestUtils.java | 235 --------------------
.../pinot/query/runtime/QueryRunnerTest.java | 10 +-
8 files changed, 469 insertions(+), 289 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1ff15eb189..1a9a8cd88c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime;
+import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -31,6 +32,7 @@ import org.apache.pinot.common.datablock.BaseDataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
@@ -49,10 +51,16 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.utils.ServerRequestUtils;
+import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,13 +120,14 @@ public class QueryRunner {
// TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
// and package it here for return. But we should really use a
MailboxSendOperator directly put into the
// server executor.
- List<ServerQueryRequest> serverQueryRequests =
- ServerRequestUtils.constructServerQueryRequest(distributedStagePlan,
requestMetadataMap,
- _helixPropertyStore);
+ List<ServerPlanRequestContext> serverQueryRequests =
constructServerQueryRequests(distributedStagePlan,
+ requestMetadataMap, _helixPropertyStore, _mailboxService);
// send the data table via mailbox in one-off fashion (e.g. no
block-level split, one data table/partition key)
List<BaseDataBlock> serverQueryResults = new
ArrayList<>(serverQueryRequests.size());
- for (ServerQueryRequest request : serverQueryRequests) {
+ for (ServerPlanRequestContext requestContext : serverQueryRequests) {
+ ServerQueryRequest request = new
ServerQueryRequest(requestContext.getInstanceRequest(),
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
System.currentTimeMillis());
serverQueryResults.add(processServerQuery(request, executorService));
}
@@ -139,6 +148,42 @@ public class QueryRunner {
}
}
+ private static List<ServerPlanRequestContext>
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
+ Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord>
helixPropertyStore,
+ MailboxService<TransferableBlock> mailboxService) {
+ StageMetadata stageMetadata =
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
+ Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
+ "Server request for V2 engine should only have 1 scan table per
request.");
+ String rawTableName = stageMetadata.getScannedTables().get(0);
+ Map<String, List<String>> tableToSegmentListMap =
stageMetadata.getServerInstanceToSegmentsMap()
+ .get(distributedStagePlan.getServerInstance());
+ List<ServerPlanRequestContext> requests = new ArrayList<>();
+ for (Map.Entry<String, List<String>> tableEntry :
tableToSegmentListMap.entrySet()) {
+ String tableType = tableEntry.getKey();
+ // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it
should not cause too much out-of-the-box
+ // network traffic. but there's chance to improve this:
+ // TODO: use TableDataManager: it is already getting tableConfig and
Schema when processing segments.
+ if (TableType.OFFLINE.name().equals(tableType)) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
+
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+ Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+ requests.add(ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap,
+ tableConfig, schema, stageMetadata.getTimeBoundaryInfo(),
TableType.OFFLINE, tableEntry.getValue()));
+ } else if (TableType.REALTIME.name().equals(tableType)) {
+ TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
+
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+ Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+ requests.add(ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap,
+ tableConfig, schema, stageMetadata.getTimeBoundaryInfo(),
TableType.REALTIME, tableEntry.getValue()));
+ } else {
+ throw new IllegalArgumentException("Unsupported table type key: " +
tableType);
+ }
+ }
+ return requests;
+ }
+
private BaseDataBlock processServerQuery(ServerQueryRequest
serverQueryRequest, ExecutorService executorService) {
BaseDataBlock dataBlock;
try {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 08de5ca7e0..2436061aa9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -29,6 +29,8 @@ import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +73,8 @@ public class WorkerQueryExecutor {
long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
StageNode stageRoot = queryRequest.getStageRoot();
- Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build(
- _mailboxService, _hostName, _port, requestId,
queryRequest.getMetadataMap(), stageRoot);
+ Operator<TransferableBlock> rootOperator =
PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(
+ _mailboxService, requestId, stageRoot.getStageId(), _hostName, _port,
queryRequest.getMetadataMap()));
executorService.submit(new TraceRunnable() {
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
similarity index 69%
rename from
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
rename to
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 6af229aa78..6dab5865ba 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.executor;
+package org.apache.pinot.query.runtime.plan;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
@@ -45,7 +44,6 @@ import
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.SortOperator;
import org.apache.pinot.query.runtime.operator.TransformOperator;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
/**
@@ -53,69 +51,54 @@ import
org.apache.pinot.query.runtime.plan.DistributedStagePlan;
* this works only for the intermediate stage nodes, leaf stage nodes are
expected to compile into
* v1 operators at this point in time.
*
- * <p>This class should be used statically via {@link #build(MailboxService,
String, int, long, Map, StageNode)}
+ * <p>This class should be used statically via {@link #build(StageNode,
PlanRequestContext)}
*
* @see
org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan,
ExecutorService, Map)
*/
-public class PhysicalPlanVisitor implements
StageNodeVisitor<Operator<TransferableBlock>, Void> {
+public class PhysicalPlanVisitor implements
StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> {
+ private static final PhysicalPlanVisitor INSTANCE = new
PhysicalPlanVisitor();
- private final MailboxService<TransferableBlock> _mailboxService;
- private final String _hostName;
- private final int _port;
- private final long _requestId;
- private final Map<Integer, StageMetadata> _metadataMap;
-
- public static Operator<TransferableBlock>
build(MailboxService<TransferableBlock> mailboxService,
- String hostName, int port, long requestId, Map<Integer, StageMetadata>
metadataMap, StageNode node) {
- return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port,
requestId, metadataMap), null);
- }
-
- private PhysicalPlanVisitor(MailboxService<TransferableBlock>
mailboxService, String hostName, int port,
- long requestId, Map<Integer, StageMetadata> metadataMap) {
- _mailboxService = mailboxService;
- _hostName = hostName;
- _port = port;
- _requestId = requestId;
- _metadataMap = metadataMap;
+ public static Operator<TransferableBlock> build(StageNode node,
PlanRequestContext context) {
+ return node.visit(INSTANCE, context);
}
@Override
- public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode
node, Void context) {
- List<ServerInstance> sendingInstances =
_metadataMap.get(node.getSenderStageId()).getServerInstances();
- return new MailboxReceiveOperator(_mailboxService, node.getDataSchema(),
sendingInstances,
- node.getExchangeType(), node.getPartitionKeySelector(), _hostName,
_port, _requestId,
- node.getSenderStageId());
+ public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode
node, PlanRequestContext context) {
+ List<ServerInstance> sendingInstances =
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
+ return new MailboxReceiveOperator(context.getMailboxService(),
node.getDataSchema(), sendingInstances,
+ node.getExchangeType(), node.getPartitionKeySelector(),
context.getHostName(), context.getPort(),
+ context.getRequestId(), node.getSenderStageId());
}
@Override
- public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node,
Void context) {
- Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
- StageMetadata receivingStageMetadata =
_metadataMap.get(node.getReceiverStageId());
- return new MailboxSendOperator(_mailboxService, node.getDataSchema(),
nextOperator,
+ public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node,
PlanRequestContext context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, context);
+ StageMetadata receivingStageMetadata =
context.getMetadataMap().get(node.getReceiverStageId());
+ return new MailboxSendOperator(context.getMailboxService(),
node.getDataSchema(), nextOperator,
receivingStageMetadata.getServerInstances(), node.getExchangeType(),
node.getPartitionKeySelector(),
- _hostName, _port, _requestId, node.getStageId());
+ context.getHostName(), context.getPort(), context.getRequestId(),
node.getStageId());
}
@Override
- public Operator<TransferableBlock> visitAggregate(AggregateNode node, Void
context) {
- Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ public Operator<TransferableBlock> visitAggregate(AggregateNode node,
PlanRequestContext context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, context);
return new AggregateOperator(nextOperator, node.getDataSchema(),
node.getAggCalls(),
node.getGroupSet(), node.getInputs().get(0).getDataSchema());
}
@Override
- public Operator<TransferableBlock> visitFilter(FilterNode node, Void
context) {
- Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ public Operator<TransferableBlock> visitFilter(FilterNode node,
PlanRequestContext context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, context);
return new FilterOperator(nextOperator, node.getDataSchema(),
node.getCondition());
}
@Override
- public Operator<TransferableBlock> visitJoin(JoinNode node, Void context) {
+ public Operator<TransferableBlock> visitJoin(JoinNode node,
PlanRequestContext context) {
StageNode left = node.getInputs().get(0);
StageNode right = node.getInputs().get(1);
- Operator<TransferableBlock> leftOperator = left.visit(this, null);
- Operator<TransferableBlock> rightOperator = right.visit(this, null);
+ Operator<TransferableBlock> leftOperator = left.visit(this, context);
+ Operator<TransferableBlock> rightOperator = right.visit(this, context);
return new HashJoinOperator(leftOperator, left.getDataSchema(),
rightOperator,
right.getDataSchema(), node.getDataSchema(), node.getJoinKeys(),
@@ -123,26 +106,26 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<Operator<Transferab
}
@Override
- public Operator<TransferableBlock> visitProject(ProjectNode node, Void
context) {
- Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ public Operator<TransferableBlock> visitProject(ProjectNode node,
PlanRequestContext context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, context);
return new TransformOperator(nextOperator, node.getDataSchema(),
node.getProjects(),
node.getInputs().get(0).getDataSchema());
}
@Override
- public Operator<TransferableBlock> visitSort(SortNode node, Void context) {
- Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, null);
+ public Operator<TransferableBlock> visitSort(SortNode node,
PlanRequestContext context) {
+ Operator<TransferableBlock> nextOperator =
node.getInputs().get(0).visit(this, context);
return new SortOperator(nextOperator, node.getCollationKeys(),
node.getCollationDirections(),
node.getFetch(), node.getOffset(), node.getDataSchema());
}
@Override
- public Operator<TransferableBlock> visitTableScan(TableScanNode node, Void
context) {
+ public Operator<TransferableBlock> visitTableScan(TableScanNode node,
PlanRequestContext context) {
throw new UnsupportedOperationException("Stage node of type TableScanNode
is not supported!");
}
@Override
- public Operator<TransferableBlock> visitValue(ValueNode node, Void context) {
+ public Operator<TransferableBlock> visitValue(ValueNode node,
PlanRequestContext context) {
return new LiteralValueOperator(node.getDataSchema(),
node.getLiteralRows());
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
new file mode 100644
index 0000000000..ae3e6c26db
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class PlanRequestContext {
+ protected final MailboxService<TransferableBlock> _mailboxService;
+ protected final long _requestId;
+ protected final int _stageId;
+ protected final String _hostName;
+ protected final int _port;
+ protected final Map<Integer, StageMetadata> _metadataMap;
+
+ public PlanRequestContext(MailboxService<TransferableBlock> mailboxService,
long requestId, int stageId,
+ String hostName, int port, Map<Integer, StageMetadata> metadataMap) {
+ _mailboxService = mailboxService;
+ _requestId = requestId;
+ _stageId = stageId;
+ _hostName = hostName;
+ _port = port;
+ _metadataMap = metadataMap;
+ }
+
+ public long getRequestId() {
+ return _requestId;
+ }
+
+ public int getStageId() {
+ return _stageId;
+ }
+
+ public String getHostName() {
+ return _hostName;
+ }
+
+ public int getPort() {
+ return _port;
+ }
+
+ public Map<Integer, StageMetadata> getMetadataMap() {
+ return _metadataMap;
+ }
+
+ public MailboxService<TransferableBlock> getMailboxService() {
+ return _mailboxService;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
new file mode 100644
index 0000000000..8735d59674
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.optimizer.QueryOptimizer;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
+import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+
+
+/**
+ * Plan visitor for direct leaf-stage server request.
+ *
+ * This should be merged with logics in {@link
org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
+ * to directly produce operator chain.
+ *
+ * As of now, the reason why we use the plan visitor for server request is for
additional support such as dynamic
+ * filtering and other auxiliary functionalities.
+ */
+public class ServerRequestPlanVisitor implements StageNodeVisitor<Void,
ServerPlanRequestContext> {
+ private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
+ private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
+ ImmutableList.of(
+ PredicateComparisonRewriter.class.getName(),
+ NonAggregationGroupByToDistinctQueryRewriter.class.getName()
+ );
+ private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
+ QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
+ private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
+
+ private static final ServerRequestPlanVisitor INSTANCE = new
ServerRequestPlanVisitor();
+ private static Void _aVoid = null;
+
+ public static ServerPlanRequestContext
build(MailboxService<TransferableBlock> mailboxService,
+ DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap,
TableConfig tableConfig, Schema schema,
+ TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String>
segmentList) {
+ // Before-visit: construct the ServerPlanRequestContext baseline
+ long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
+ PinotQuery pinotQuery = new PinotQuery();
+ pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
+ pinotQuery.setExplain(false);
+ ServerPlanRequestContext context = new
ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(),
+ stagePlan.getServerInstance().getHostname(),
stagePlan.getServerInstance().getPort(),
+ stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo);
+
+ // visit the plan and create query physical plan.
+ ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
+
+ // Post-visit: finalize context.
+ // 1. global rewrite/optimize
+ if (timeBoundaryInfo != null) {
+ attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType ==
TableType.OFFLINE);
+ }
+ for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
+ pinotQuery = queryRewriter.rewrite(pinotQuery);
+ }
+ QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
+
+ // 2. wrapped around in broker request
+ BrokerRequest brokerRequest = new BrokerRequest();
+ brokerRequest.setPinotQuery(pinotQuery);
+ DataSource dataSource = pinotQuery.getDataSource();
+ if (dataSource != null) {
+ QuerySource querySource = new QuerySource();
+ querySource.setTableName(dataSource.getTableName());
+ brokerRequest.setQuerySource(querySource);
+ }
+
+ // 3. create instance request with segmentList
+ InstanceRequest instanceRequest = new InstanceRequest();
+ instanceRequest.setRequestId(requestId);
+ instanceRequest.setBrokerId("unknown");
+ instanceRequest.setEnableTrace(false);
+ instanceRequest.setSearchSegments(segmentList);
+ instanceRequest.setQuery(brokerRequest);
+
+ context.setInstanceRequest(instanceRequest);
+ return context;
+ }
+
+ private static void walkStageNode(StageNode node, ServerPlanRequestContext
context) {
+ node.visit(INSTANCE, context);
+ }
+
+ @Override
+ public Void visitAggregate(AggregateNode node, ServerPlanRequestContext
context) {
+ visitChildren(node, context);
+ // set group-by list
+
context.getPinotQuery().setGroupByList(CalciteRexExpressionParser.convertGroupByList(
+ node.getGroupSet(), context.getPinotQuery()));
+ // set agg list
+
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.addSelectList(
+ context.getPinotQuery().getGroupByList(), node.getAggCalls(),
context.getPinotQuery()));
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+
context.getPinotQuery().setFilterExpression(CalciteRexExpressionParser.toExpression(
+ node.getCondition(), context.getPinotQuery()));
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitMailboxReceive(MailboxReceiveNode node,
ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext
context) {
+ visitChildren(node, context);
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitProject(ProjectNode node, ServerPlanRequestContext context)
{
+ visitChildren(node, context);
+
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.overwriteSelectList(
+ node.getProjects(), context.getPinotQuery()));
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitSort(SortNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ if (node.getCollationKeys().size() > 0) {
+
context.getPinotQuery().setOrderByList(CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(),
+ node.getCollationDirections(), context.getPinotQuery()));
+ }
+ if (node.getFetch() > 0) {
+ context.getPinotQuery().setLimit(node.getFetch());
+ }
+ if (node.getOffset() > 0) {
+ context.getPinotQuery().setOffset(node.getOffset());
+ }
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitTableScan(TableScanNode node, ServerPlanRequestContext
context) {
+ DataSource dataSource = new DataSource();
+ String tableNameWithType = TableNameBuilder.forType(context.getTableType())
+
.tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
+ dataSource.setTableName(tableNameWithType);
+ context.getPinotQuery().setDataSource(dataSource);
+ context.getPinotQuery().setSelectList(node.getTableScanColumns().stream()
+
.map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+ return _aVoid;
+ }
+
+ @Override
+ public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return _aVoid;
+ }
+
+ private void visitChildren(StageNode node, ServerPlanRequestContext context)
{
+ for (StageNode child : node.getInputs()) {
+ child.visit(this, context);
+ }
+ }
+ /**
+ * Helper method to attach the time boundary to the given PinotQuery.
+ */
+ private static void attachTimeBoundary(PinotQuery pinotQuery,
TimeBoundaryInfo timeBoundaryInfo,
+ boolean isOfflineRequest) {
+ String timeColumn = timeBoundaryInfo.getTimeColumn();
+ String timeValue = timeBoundaryInfo.getTimeValue();
+ Expression timeFilterExpression = RequestUtils.getFunctionExpression(
+ isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() :
FilterKind.GREATER_THAN.name());
+ timeFilterExpression.getFunctionCall().setOperands(
+ Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn),
RequestUtils.getLiteralExpression(timeValue)));
+
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ if (filterExpression != null) {
+ Expression andFilterExpression =
RequestUtils.getFunctionExpression(FilterKind.AND.name());
+
andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression,
timeFilterExpression));
+ pinotQuery.setFilterExpression(andFilterExpression);
+ } else {
+ pinotQuery.setFilterExpression(timeFilterExpression);
+ }
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
new file mode 100644
index 0000000000..cc63d04f0e
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.server;
+
+import java.util.Map;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.spi.config.table.TableType;
+
+
+/**
+ * Context class for converting a {@link
org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
+ * {@link PinotQuery} to execute on server.
+ */
+public class ServerPlanRequestContext extends PlanRequestContext {
+ protected TableType _tableType;
+ protected TimeBoundaryInfo _timeBoundaryInfo;
+
+ protected PinotQuery _pinotQuery;
+ protected InstanceRequest _instanceRequest;
+
+ public ServerPlanRequestContext(MailboxService<TransferableBlock>
mailboxService, long requestId, int stageId,
+ String hostName, int port, Map<Integer, StageMetadata> metadataMap,
PinotQuery pinotQuery, TableType tableType,
+ TimeBoundaryInfo timeBoundaryInfo) {
+ super(mailboxService, requestId, stageId, hostName, port, metadataMap);
+ _pinotQuery = pinotQuery;
+ _tableType = tableType;
+ _timeBoundaryInfo = timeBoundaryInfo;
+ }
+
+ public TableType getTableType() {
+ return _tableType;
+ }
+
+ public PinotQuery getPinotQuery() {
+ return _pinotQuery;
+ }
+
+ public void setInstanceRequest(InstanceRequest instanceRequest) {
+ _instanceRequest = instanceRequest;
+ }
+
+ public InstanceRequest getInstanceRequest() {
+ return _instanceRequest;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
deleted file mode 100644
index 2e68c601ab..0000000000
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.utils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.DataSource;
-import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.request.QuerySource;
-import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.optimizer.QueryOptimizer;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
-import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.parser.CalciteRexExpressionParser;
-import org.apache.pinot.query.planner.StageMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.FilterKind;
-import
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
-import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
-import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
-import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
-
-
-/**
- * {@code ServerRequestUtils} converts the {@link DistributedStagePlan} into a
{@link ServerQueryRequest}.
- *
- * <p>In order to reuse the current pinot {@link
org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}, a
- * conversion step is needed so that the V2 query plan can be converted into a
compatible format to run V1 executor.
- */
-public class ServerRequestUtils {
- private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
- private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
- ImmutableList.of(
- PredicateComparisonRewriter.class.getName(),
- NonAggregationGroupByToDistinctQueryRewriter.class.getName()
- );
- private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
- QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
- private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
-
- private ServerRequestUtils() {
- // do not instantiate.
- }
-
- // TODO: This is a hack, make an actual ServerQueryRequest converter.
- public static List<ServerQueryRequest>
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
- Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord>
helixPropertyStore) {
- StageMetadata stageMetadata =
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
- Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
- "Server request for V2 engine should only have 1 scan table per
request.");
- String rawTableName = stageMetadata.getScannedTables().get(0);
- Map<String, List<String>> tableToSegmentListMap =
stageMetadata.getServerInstanceToSegmentsMap()
- .get(distributedStagePlan.getServerInstance());
- List<ServerQueryRequest> requests = new ArrayList<>();
- for (Map.Entry<String, List<String>> tableEntry :
tableToSegmentListMap.entrySet()) {
- String tableType = tableEntry.getKey();
- // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it
should not cause too much out-of-the-box
- // network traffic. but there's chance to improve this:
- // TODO: use TableDataManager: it is already getting tableConfig and
Schema when processing segments.
- if (TableType.OFFLINE.name().equals(tableType)) {
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
-
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
-
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(constructServerQueryRequest(distributedStagePlan,
requestMetadataMap, tableConfig, schema,
- stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE,
tableEntry.getValue()));
- } else if (TableType.REALTIME.name().equals(tableType)) {
- TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
-
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
-
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(constructServerQueryRequest(distributedStagePlan,
requestMetadataMap, tableConfig, schema,
- stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME,
tableEntry.getValue()));
- } else {
- throw new IllegalArgumentException("Unsupported table type key: " +
tableType);
- }
- }
- return requests;
- }
-
- public static ServerQueryRequest
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
- Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema
schema,
- TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String>
segmentList) {
- InstanceRequest instanceRequest = new InstanceRequest();
-
instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID")));
- instanceRequest.setBrokerId("unknown");
- instanceRequest.setEnableTrace(false);
- instanceRequest.setSearchSegments(segmentList);
- instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan,
tableType, tableConfig, schema,
- timeBoundaryInfo));
- return new ServerQueryRequest(instanceRequest, new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
- System.currentTimeMillis());
- }
-
- // TODO: this is a hack, create a broker request object should not be needed
because we rewrite the entire
- // query into stages already.
- public static BrokerRequest constructBrokerRequest(DistributedStagePlan
distributedStagePlan, TableType tableType,
- TableConfig tableConfig, Schema schema, TimeBoundaryInfo
timeBoundaryInfo) {
- PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan,
tableType, tableConfig, schema, timeBoundaryInfo);
- BrokerRequest brokerRequest = new BrokerRequest();
- brokerRequest.setPinotQuery(pinotQuery);
- // Set table name in broker request because it is used for access control,
query routing etc.
- DataSource dataSource = pinotQuery.getDataSource();
- if (dataSource != null) {
- QuerySource querySource = new QuerySource();
- querySource.setTableName(dataSource.getTableName());
- brokerRequest.setQuerySource(querySource);
- }
- return brokerRequest;
- }
-
- public static PinotQuery constructPinotQuery(DistributedStagePlan
distributedStagePlan, TableType tableType,
- TableConfig tableConfig, Schema schema, TimeBoundaryInfo
timeBoundaryInfo) {
- PinotQuery pinotQuery = new PinotQuery();
- pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
- pinotQuery.setExplain(false);
- walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery, tableType);
- if (timeBoundaryInfo != null) {
- attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType ==
TableType.OFFLINE);
- }
- for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
- pinotQuery = queryRewriter.rewrite(pinotQuery);
- }
- QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
- return pinotQuery;
- }
-
- private static void walkStageTree(StageNode node, PinotQuery pinotQuery,
TableType tableType) {
- // this walkStageTree should only be a sequential walk.
- for (StageNode child : node.getInputs()) {
- walkStageTree(child, pinotQuery, tableType);
- }
- if (node instanceof TableScanNode) {
- TableScanNode tableScanNode = (TableScanNode) node;
- DataSource dataSource = new DataSource();
- String tableNameWithType = TableNameBuilder.forType(tableType)
-
.tableNameWithType(TableNameBuilder.extractRawTableName(tableScanNode.getTableName()));
- dataSource.setTableName(tableNameWithType);
- pinotQuery.setDataSource(dataSource);
-
pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression)
- .collect(Collectors.toList()));
- } else if (node instanceof FilterNode) {
- pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(
- ((FilterNode) node).getCondition(), pinotQuery));
- } else if (node instanceof ProjectNode) {
- pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList(
- ((ProjectNode) node).getProjects(), pinotQuery));
- } else if (node instanceof AggregateNode) {
- // set group-by list
- pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
- ((AggregateNode) node).getGroupSet(), pinotQuery));
- // set agg list
-
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getGroupByList(),
- ((AggregateNode) node).getAggCalls(), pinotQuery));
- } else if (node instanceof SortNode) {
- if (((SortNode) node).getCollationKeys().size() > 0) {
-
pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode)
node).getCollationKeys(),
- ((SortNode) node).getCollationDirections(), pinotQuery));
- }
- if (((SortNode) node).getFetch() > 0) {
- pinotQuery.setLimit(((SortNode) node).getFetch());
- }
- if (((SortNode) node).getOffset() > 0) {
- pinotQuery.setOffset(((SortNode) node).getOffset());
- }
- } else if (node instanceof MailboxSendNode) {
- // TODO: MailboxSendNode should be the root of the leaf stage. but
ignore for now since it is handle seperately
- // in QueryRunner as a single step sender.
- } else {
- throw new UnsupportedOperationException("Unsupported logical plan node:
" + node);
- }
- }
-
- /**
- * Helper method to attach the time boundary to the given PinotQuery.
- */
- private static void attachTimeBoundary(PinotQuery pinotQuery,
TimeBoundaryInfo timeBoundaryInfo,
- boolean isOfflineRequest) {
- String timeColumn = timeBoundaryInfo.getTimeColumn();
- String timeValue = timeBoundaryInfo.getTimeValue();
- Expression timeFilterExpression = RequestUtils.getFunctionExpression(
- isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() :
FilterKind.GREATER_THAN.name());
- timeFilterExpression.getFunctionCall().setOperands(
- Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn),
RequestUtils.getLiteralExpression(timeValue)));
-
- Expression filterExpression = pinotQuery.getFilterExpression();
- if (filterExpression != null) {
- Expression andFilterExpression =
RequestUtils.getFunctionExpression(FilterKind.AND.name());
-
andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression,
timeFilterExpression));
- pinotQuery.setFilterExpression(andFilterExpression);
- } else {
- pinotQuery.setFilterExpression(timeFilterExpression);
- }
- }
-}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index eb7e354b9e..3c985d77aa 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -26,6 +26,10 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -38,6 +42,8 @@ import org.testng.annotations.Test;
public class QueryRunnerTest extends QueryRunnerTestBase {
+ private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
+ ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new
NamedThreadFactory("query_server_enclosure"));
@Test(dataProvider = "testDataWithSqlToFinalRowCount")
public void testSqlWithFinalRowCountChecker(String sql, int expectedRows)
@@ -71,7 +77,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
for (ServerInstance serverInstance :
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
DistributedStagePlan distributedStagePlan =
QueryDispatcher.constructDistributedStagePlan(queryPlan,
stageId, serverInstance);
- _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
+ EXECUTOR_SERVICE.submit(() -> {
+ _servers.get(serverInstance).processQuery(distributedStagePlan,
requestMetadataMap);
+ });
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]