This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb4966b0f7 [multistage] carry partition scheme for optimization (#9100)
fb4966b0f7 is described below
commit fb4966b0f79dcd830e20c53305cd0ecacc0f5440
Author: Rong Rong <[email protected]>
AuthorDate: Wed Aug 3 21:04:22 2022 -0700
[multistage] carry partition scheme for optimization (#9100)
* adding stage planner and mailbox sender to allow SINGELTON to be acutal
singelton
add partition key generator
* [change default join distribute to hash]
* adding logic to use singleton connection
* fix a bug on non-agg group-by random distribution
* add test
* address pr comments
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/query/planner/logical/StagePlanner.java | 131 +++++++++++++++++++--
.../query/planner/stage/AbstractStageNode.java | 16 +++
.../pinot/query/planner/stage/AggregateNode.java | 6 +-
.../query/planner/stage/MailboxReceiveNode.java | 11 +-
.../pinot/query/planner/stage/MailboxSendNode.java | 6 -
.../pinot/query/planner/stage/StageNode.java | 6 +
.../PinotAggregateExchangeNodeInsertRule.java | 2 +-
.../rules/PinotJoinExchangeNodeInsertRule.java | 10 +-
...ironmentTest.java => QueryCompilationTest.java} | 76 +++++++++++-
.../runtime/executor/WorkerQueryExecutor.java | 6 +-
.../runtime/operator/MailboxReceiveOperator.java | 23 +++-
.../runtime/operator/MailboxSendOperator.java | 18 ++-
.../pinot/query/service/QueryDispatcher.java | 4 +-
13 files changed, 274 insertions(+), 41 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index cf0b218708..ac30996efa 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -19,8 +19,10 @@
package org.apache.pinot.query.planner.logical;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
@@ -29,9 +31,15 @@ import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.routing.WorkerManager;
@@ -73,9 +81,9 @@ public class StagePlanner {
// receiver so doesn't matter what the exchange type is. setting it to
SINGLETON by default.
StageNode globalReceiverNode =
new MailboxReceiveNode(0, globalStageRoot.getDataSchema(),
globalStageRoot.getStageId(),
- RelDistribution.Type.SINGLETON);
+ RelDistribution.Type.RANDOM_DISTRIBUTED, null);
StageNode globalSenderNode = new
MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
- globalReceiverNode.getStageId(), RelDistribution.Type.SINGLETON);
+ globalReceiverNode.getStageId(),
RelDistribution.Type.RANDOM_DISTRIBUTED, null);
globalSenderNode.addInput(globalStageRoot);
_queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
StageMetadata stageMetadata =
_stageMetadataMap.get(globalSenderNode.getStageId());
@@ -105,17 +113,37 @@ public class StagePlanner {
RelDistribution.Type exchangeType = distribution.getType();
// 2. make an exchange sender and receiver node pair
- StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), exchangeType);
- StageNode mailboxSender = new
MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
- mailboxReceiver.getStageId(), exchangeType, exchangeType ==
RelDistribution.Type.HASH_DISTRIBUTED
- ? new FieldSelectionKeySelector(distributionKeys) : null);
+ // only HASH_DISTRIBUTED requires a partition key selector; so all other
types (SINGLETON and BROADCAST)
+ // of exchange will not carry a partition key selector.
+ KeySelector<Object[], Object[]> keySelector = exchangeType ==
RelDistribution.Type.HASH_DISTRIBUTED
+ ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+ StageNode mailboxReceiver;
+ StageNode mailboxSender;
+ if (canSkipShuffle(nextStageRoot, keySelector)) {
+ // Use SINGLETON exchange type indicates a LOCAL-to-LOCAL data
transfer between execution threads.
+ // TODO: actually implement the SINGLETON exchange without going
through the over-the-wire GRPC mailbox
+ // sender and receiver.
+ mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
+ nextStageRoot.getStageId(), RelDistribution.Type.SINGLETON,
keySelector);
+ mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
+ mailboxReceiver.getStageId(), RelDistribution.Type.SINGLETON,
keySelector);
+ } else {
+ mailboxReceiver = new MailboxReceiveNode(currentStageId,
nextStageRoot.getDataSchema(),
+ nextStageRoot.getStageId(), exchangeType, keySelector);
+ mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(),
nextStageRoot.getDataSchema(),
+ mailboxReceiver.getStageId(), exchangeType, keySelector);
+ }
mailboxSender.addInput(nextStageRoot);
// 3. put the sender side as a completed stage.
_queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
- // 4. return the receiver (this is considered as a "virtual table scan"
node for its parent.
+ // 4. update stage metadata.
+ updateStageMetadata(mailboxSender.getStageId(), mailboxSender,
_stageMetadataMap);
+ updateStageMetadata(mailboxReceiver.getStageId(), mailboxReceiver,
_stageMetadataMap);
+
+ // 5. return the receiver, this is considered as a "virtual table scan"
node for its parent.
return mailboxReceiver;
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node,
currentStageId);
@@ -123,12 +151,95 @@ public class StagePlanner {
for (RelNode input : inputs) {
stageNode.addInput(walkRelPlan(input, currentStageId));
}
- StageMetadata stageMetadata =
_stageMetadataMap.computeIfAbsent(currentStageId, (id) -> new StageMetadata());
- stageMetadata.attach(stageNode);
+ updateStageMetadata(currentStageId, stageNode, _stageMetadataMap);
return stageNode;
}
}
+ private boolean canSkipShuffle(StageNode stageNode, KeySelector<Object[],
Object[]> keySelector) {
+ Set<Integer> originSet = stageNode.getPartitionKeys();
+ if (!originSet.isEmpty() && keySelector != null) {
+ Set<Integer> targetSet = new HashSet<>(((FieldSelectionKeySelector)
keySelector).getColumnIndices());
+ return targetSet.containsAll(originSet);
+ }
+ return false;
+ }
+
+ private static void updateStageMetadata(int stageId, StageNode node,
Map<Integer, StageMetadata> stageMetadataMap) {
+ updatePartitionKeys(node);
+ StageMetadata stageMetadata = stageMetadataMap.computeIfAbsent(stageId,
(id) -> new StageMetadata());
+ stageMetadata.attach(node);
+ }
+
+ private static void updatePartitionKeys(StageNode node) {
+ if (node instanceof ProjectNode) {
+ // any input reference directly carry over should still be a partition
key.
+ Set<Integer> previousPartitionKeys =
node.getInputs().get(0).getPartitionKeys();
+ Set<Integer> newPartitionKeys = new HashSet<>();
+ ProjectNode projectNode = (ProjectNode) node;
+ for (int i = 0; i < projectNode.getProjects().size(); i++) {
+ RexExpression rexExpression = projectNode.getProjects().get(i);
+ if (rexExpression instanceof RexExpression.InputRef
+ && previousPartitionKeys.contains(((RexExpression.InputRef)
rexExpression).getIndex())) {
+ newPartitionKeys.add(i);
+ }
+ }
+ projectNode.setPartitionKeys(newPartitionKeys);
+ } else if (node instanceof FilterNode) {
+ // filter node doesn't change partition keys.
+ node.setPartitionKeys(node.getInputs().get(0).getPartitionKeys());
+ } else if (node instanceof AggregateNode) {
+ // any input reference directly carry over in group set of aggregation
should still be a partition key.
+ Set<Integer> previousPartitionKeys =
node.getInputs().get(0).getPartitionKeys();
+ Set<Integer> newPartitionKeys = new HashSet<>();
+ AggregateNode aggregateNode = (AggregateNode) node;
+ for (int i = 0; i < aggregateNode.getGroupSet().size(); i++) {
+ RexExpression rexExpression = aggregateNode.getGroupSet().get(i);
+ if (rexExpression instanceof RexExpression.InputRef
+ && previousPartitionKeys.contains(((RexExpression.InputRef)
rexExpression).getIndex())) {
+ newPartitionKeys.add(i);
+ }
+ }
+ aggregateNode.setPartitionKeys(newPartitionKeys);
+ } else if (node instanceof JoinNode) {
+ int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
+ Set<Integer> leftPartitionKeys =
node.getInputs().get(0).getPartitionKeys();
+ Set<Integer> rightPartitionKeys =
node.getInputs().get(1).getPartitionKeys();
+ // TODO: currently JOIN criteria guarantee to only have one
FieldSelectionKeySelector. Support more.
+ FieldSelectionKeySelector leftJoinKeySelector =
+ (FieldSelectionKeySelector) ((JoinNode)
node).getCriteria().get(0).getLeftJoinKeySelector();
+ FieldSelectionKeySelector rightJoinKeySelector =
+ (FieldSelectionKeySelector) ((JoinNode)
node).getCriteria().get(0).getRightJoinKeySelector();
+ Set<Integer> newPartitionKeys = new HashSet<>();
+ for (int i = 0; i < leftJoinKeySelector.getColumnIndices().size(); i++) {
+ int leftIndex = leftJoinKeySelector.getColumnIndices().get(i);
+ int rightIndex = rightJoinKeySelector.getColumnIndices().get(i);
+ if (leftPartitionKeys.contains(leftIndex)) {
+ newPartitionKeys.add(i);
+ }
+ if (rightPartitionKeys.contains(rightIndex)) {
+ newPartitionKeys.add(leftDataSchemaSize + i);
+ }
+ }
+ node.setPartitionKeys(newPartitionKeys);
+ } else if (node instanceof TableScanNode) {
+ // TODO: add table partition in table config as partition keys. we dont
have that information yet.
+ } else if (node instanceof MailboxReceiveNode) {
+ // hash distribution key is partition key.
+ FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
+ ((MailboxReceiveNode) node).getPartitionKeySelector();
+ if (keySelector != null) {
+ node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
+ }
+ } else if (node instanceof MailboxSendNode) {
+ FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
+ ((MailboxSendNode) node).getPartitionKeySelector();
+ if (keySelector != null) {
+ node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
+ }
+ }
+ }
+
private boolean isExchangeNode(RelNode node) {
return (node instanceof LogicalExchange);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
index 1de069f0b4..594c6d7e38 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
@@ -19,7 +19,10 @@
package org.apache.pinot.query.planner.stage;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.serde.ProtoSerializable;
@@ -31,6 +34,7 @@ public abstract class AbstractStageNode implements StageNode,
ProtoSerializable
protected final int _stageId;
protected final List<StageNode> _inputs;
protected DataSchema _dataSchema;
+ protected Set<Integer> _partitionedKeys;
public AbstractStageNode(int stageId) {
this(stageId, null);
@@ -40,6 +44,7 @@ public abstract class AbstractStageNode implements StageNode,
ProtoSerializable
_stageId = stageId;
_dataSchema = dataSchema;
_inputs = new ArrayList<>();
+ _partitionedKeys = new HashSet<>();
}
@Override
@@ -62,10 +67,21 @@ public abstract class AbstractStageNode implements
StageNode, ProtoSerializable
return _dataSchema;
}
+ @Override
public void setDataSchema(DataSchema dataSchema) {
_dataSchema = dataSchema;
}
+ @Override
+ public Set<Integer> getPartitionKeys() {
+ return _partitionedKeys;
+ }
+
+ @Override
+ public void setPartitionKeys(Collection<Integer> partitionedKeys) {
+ _partitionedKeys.addAll(partitionedKeys);
+ }
+
@Override
public void fromObjectField(Plan.ObjectField objectField) {
ProtoSerializationUtils.setObjectFieldToObject(this, objectField);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
index d0a28b0cbd..b67c5a9500 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -19,7 +19,6 @@
package org.apache.pinot.query.planner.stage;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.core.AggregateCall;
@@ -43,9 +42,8 @@ public class AggregateNode extends AbstractStageNode {
super(stageId, dataSchema);
_aggCalls =
aggCalls.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
_groupSet = new ArrayList<>(groupSet.cardinality());
- Iterator<Integer> groupSetIt = groupSet.iterator();
- while (groupSetIt.hasNext()) {
- _groupSet.add(new RexExpression.InputRef(groupSetIt.next()));
+ for (Integer integer : groupSet) {
+ _groupSet.add(new RexExpression.InputRef(integer));
}
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index abba178865..3fa3a55acd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.query.planner.stage;
+import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;
@@ -28,16 +30,19 @@ public class MailboxReceiveNode extends AbstractStageNode {
private int _senderStageId;
@ProtoProperties
private RelDistribution.Type _exchangeType;
+ @ProtoProperties
+ private KeySelector<Object[], Object[]> _partitionKeySelector;
public MailboxReceiveNode(int stageId) {
super(stageId);
}
public MailboxReceiveNode(int stageId, DataSchema dataSchema, int
senderStageId,
- RelDistribution.Type exchangeType) {
+ RelDistribution.Type exchangeType, @Nullable KeySelector<Object[],
Object[]> partitionKeySelector) {
super(stageId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
+ _partitionKeySelector = partitionKeySelector;
}
public int getSenderStageId() {
@@ -47,4 +52,8 @@ public class MailboxReceiveNode extends AbstractStageNode {
public RelDistribution.Type getExchangeType() {
return _exchangeType;
}
+
+ public KeySelector<Object[], Object[]> getPartitionKeySelector() {
+ return _partitionKeySelector;
+ }
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 962dbc73c4..f67f85e573 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -37,12 +37,6 @@ public class MailboxSendNode extends AbstractStageNode {
super(stageId);
}
- public MailboxSendNode(int stageId, DataSchema dataSchema, int
receiverStageId,
- RelDistribution.Type exchangeType) {
- // When exchangeType is not HASH_DISTRIBUTE, no partitionKeySelector is
needed.
- this(stageId, dataSchema, receiverStageId, exchangeType, null);
- }
-
public MailboxSendNode(int stageId, DataSchema dataSchema, int
receiverStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[],
Object[]> partitionKeySelector) {
super(stageId, dataSchema);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
index 6efa59ce2d..bd69371484 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
@@ -19,7 +19,9 @@
package org.apache.pinot.query.planner.stage;
import java.io.Serializable;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.pinot.common.utils.DataSchema;
@@ -42,4 +44,8 @@ public interface StageNode extends Serializable {
DataSchema getDataSchema();
void setDataSchema(DataSchema dataSchema);
+
+ Set<Integer> getPartitionKeys();
+
+ void setPartitionKeys(Collection<Integer> partitionKeys);
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
index 1150bc2085..180137e323 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotAggregateExchangeNodeInsertRule.java
@@ -106,7 +106,7 @@ public class PinotAggregateExchangeNodeInsertRule extends
RelOptRule {
List<Integer> groupSetIndices = ImmutableIntList.range(0,
oldAggRel.getGroupCount());
LogicalExchange exchange = null;
if (groupSetIndices.size() == 0) {
- exchange = LogicalExchange.create(newLeafAgg,
RelDistributions.SINGLETON);
+ exchange = LogicalExchange.create(newLeafAgg,
RelDistributions.hash(Collections.emptyList()));
} else {
exchange = LogicalExchange.create(newLeafAgg,
RelDistributions.hash(groupSetIndices));
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
index 6aaacccac1..f766afb756 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java
@@ -70,7 +70,12 @@ public class PinotJoinExchangeNodeInsertRule extends
RelOptRule {
RelNode leftExchange;
RelNode rightExchange;
List<RelHint> hints = join.getHints();
- if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+ if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+ // TODO: determine which side should be the broadcast table based on
table metadata
+ // TODO: support SINGLETON exchange if the non-broadcast table size is
small enough to stay local.
+ leftExchange = LogicalExchange.create(leftInput,
RelDistributions.RANDOM_DISTRIBUTED);
+ rightExchange = LogicalExchange.create(rightInput,
RelDistributions.BROADCAST_DISTRIBUTED);
+ } else { // if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
RexCall joinCondition = (RexCall) join.getCondition();
int leftNodeOffset = join.getLeft().getRowType().getFieldNames().size();
List<List<Integer>> conditions =
PlannerUtils.parseJoinConditions(joinCondition, leftNodeOffset);
@@ -78,9 +83,6 @@ public class PinotJoinExchangeNodeInsertRule extends
RelOptRule {
RelDistributions.hash(conditions.get(0)));
rightExchange = LogicalExchange.create(rightInput,
RelDistributions.hash(conditions.get(1)));
- } else { // if (hints.contains(PinotRelationalHints.USE_BROADCAST_JOIN))
- leftExchange = LogicalExchange.create(leftInput,
RelDistributions.SINGLETON);
- rightExchange = LogicalExchange.create(rightInput,
RelDistributions.BROADCAST_DISTRIBUTED);
}
RelNode newJoinNode =
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
similarity index 58%
rename from
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
rename to
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index 143eabf0f0..31c018d158 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -22,18 +22,26 @@ import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.sql.SqlNode;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AbstractStageNode;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.StageNode;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-public class QueryEnvironmentTest extends QueryEnvironmentTestBase {
+public class QueryCompilationTest extends QueryEnvironmentTestBase {
@Test(dataProvider = "testQueryParserDataProvider")
public void testQueryParser(String query, String digest)
@@ -45,7 +53,7 @@ public class QueryEnvironmentTest extends
QueryEnvironmentTestBase {
}
@Test(dataProvider = "testQueryDataProvider")
- public void testQueryToRel(String query)
+ public void testQueryPlanWithoutException(String query)
throws Exception {
try {
QueryPlan queryPlan = _queryEnvironment.planQuery(query);
@@ -65,6 +73,38 @@ public class QueryEnvironmentTest extends
QueryEnvironmentTestBase {
}
}
+ @Test
+ public void testQueryGroupByAfterJoinShouldNotDoDataShuffle()
+ throws Exception {
+ String query = "SELECT a.col1, a.col2, AVG(b.col3) FROM a JOIN b ON a.col1
= b.col2 "
+ + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1,
a.col2";
+ QueryPlan queryPlan = _queryEnvironment.planQuery(query);
+ Assert.assertEquals(queryPlan.getQueryStageMap().size(), 5);
+ Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 5);
+ for (Map.Entry<Integer, StageMetadata> e :
queryPlan.getStageMetadataMap().entrySet()) {
+ if (e.getValue().getScannedTables().size() == 0 &&
!PlannerUtils.isRootStage(e.getKey())) {
+ StageNode node = queryPlan.getQueryStageMap().get(e.getKey());
+ while (node != null) {
+ if (node instanceof JoinNode) {
+ // JOIN is exchanged with hash distribution (data shuffle)
+ MailboxReceiveNode left = (MailboxReceiveNode)
node.getInputs().get(0);
+ MailboxReceiveNode right = (MailboxReceiveNode)
node.getInputs().get(1);
+ Assert.assertEquals(left.getExchangeType(),
RelDistribution.Type.HASH_DISTRIBUTED);
+ Assert.assertEquals(right.getExchangeType(),
RelDistribution.Type.HASH_DISTRIBUTED);
+ break;
+ }
+ if (node instanceof AggregateNode && node.getInputs().get(0)
instanceof MailboxReceiveNode) {
+ // AGG is exchanged with singleton since it has already been
distributed by JOIN.
+ MailboxReceiveNode input = (MailboxReceiveNode)
node.getInputs().get(0);
+ Assert.assertEquals(input.getExchangeType(),
RelDistribution.Type.SINGLETON);
+ break;
+ }
+ node = node.getInputs().get(0);
+ }
+ }
+ }
+ }
+
@Test
public void testQueryAndAssertStageContentForJoin()
throws Exception {
@@ -95,12 +135,38 @@ public class QueryEnvironmentTest extends
QueryEnvironmentTestBase {
}
@Test
- public void testQueryProjectFilterPushdownForJoin() {
+ public void testQueryProjectFilterPushDownForJoin() {
String query = "SELECT a.col1, a.ts, b.col2, b.col3 FROM a JOIN b ON
a.col1 = b.col2 "
+ "WHERE a.col3 >= 0 AND a.col2 IN ('a', 'b') AND b.col3 < 0";
QueryPlan queryPlan = _queryEnvironment.planQuery(query);
- Assert.assertEquals(queryPlan.getQueryStageMap().size(), 4);
- Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 4);
+ List<StageNode> intermediateStageRoots =
+ queryPlan.getStageMetadataMap().entrySet().stream().filter(e ->
e.getValue().getScannedTables().size() == 0)
+ .map(e ->
queryPlan.getQueryStageMap().get(e.getKey())).collect(Collectors.toList());
+ // Assert that no project of filter node for any intermediate stage
because all should've been pushed down.
+ for (StageNode roots : intermediateStageRoots) {
+ assertNodeTypeNotIn(roots, ImmutableList.of(ProjectNode.class,
FilterNode.class));
+ }
+ }
+
+ // --------------------------------------------------------------------------
+ // Test Utils.
+ // --------------------------------------------------------------------------
+
+ private static void assertNodeTypeNotIn(StageNode node, List<Class<? extends
AbstractStageNode>> bannedNodeType) {
+ Assert.assertFalse(isOneOf(bannedNodeType, node));
+ for (StageNode child : node.getInputs()) {
+ assertNodeTypeNotIn(child, bannedNodeType);
+ }
+ }
+
+ private static boolean isOneOf(List<Class<? extends AbstractStageNode>>
allowedNodeTypes,
+ StageNode node) {
+ for (Class<? extends AbstractStageNode> allowedNodeType :
allowedNodeTypes) {
+ if (node.getClass() == allowedNodeType) {
+ return true;
+ }
+ }
+ return false;
}
@DataProvider(name = "testQueryParserDataProvider")
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 9c978f0ad9..7bfe5031e8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.executor;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.core.operator.BaseOperator;
@@ -106,8 +105,9 @@ public class WorkerQueryExecutor {
if (stageNode instanceof MailboxReceiveNode) {
MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
List<ServerInstance> sendingInstances =
metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
- return new MailboxReceiveOperator(_mailboxService,
receiveNode.getDataSchema(), RelDistribution.Type.ANY,
- sendingInstances, _hostName, _port, requestId,
receiveNode.getSenderStageId());
+ return new MailboxReceiveOperator(_mailboxService,
receiveNode.getDataSchema(), sendingInstances,
+ receiveNode.getExchangeType(),
receiveNode.getPartitionKeySelector(), _hostName, _port, requestId,
+ receiveNode.getSenderStageId());
} else if (stageNode instanceof MailboxSendNode) {
MailboxSendNode sendNode = (MailboxSendNode) stageNode;
BaseOperator<TransferableBlock> nextOperator = getOperator(requestId,
sendNode.getInputs().get(0), metadataMap);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 91d5de3d8d..81481b6311 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
@@ -34,6 +36,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.mailbox.ReceivingMailbox;
import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.service.QueryConfig;
@@ -51,6 +54,7 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
private final MailboxService<Mailbox.MailboxContent> _mailboxService;
private final RelDistribution.Type _exchangeType;
+ private final KeySelector<Object[], Object[]> _keySelector;
private final List<ServerInstance> _sendingStageInstances;
private final DataSchema _dataSchema;
private final String _hostName;
@@ -61,18 +65,31 @@ public class MailboxReceiveOperator extends
BaseOperator<TransferableBlock> {
private TransferableBlock _upstreamErrorBlock;
public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent>
mailboxService, DataSchema dataSchema,
- RelDistribution.Type exchangeType, List<ServerInstance>
sendingStageInstances, String hostName, int port,
- long jobId, int stageId) {
+ List<ServerInstance> sendingStageInstances, RelDistribution.Type
exchangeType,
+ KeySelector<Object[], Object[]> keySelector, String hostName, int port,
long jobId, int stageId) {
_dataSchema = dataSchema;
_mailboxService = mailboxService;
_exchangeType = exchangeType;
- _sendingStageInstances = sendingStageInstances;
+ if (_exchangeType == RelDistribution.Type.SINGLETON) {
+ ServerInstance singletonInstance = null;
+ for (ServerInstance serverInstance : sendingStageInstances) {
+ if (serverInstance.getHostname().equals(_mailboxService.getHostname())
+ && serverInstance.getQueryMailboxPort() ==
_mailboxService.getMailboxPort()) {
+ Preconditions.checkState(singletonInstance == null, "multiple
instance found for singleton exchange type!");
+ singletonInstance = serverInstance;
+ }
+ }
+ _sendingStageInstances = Collections.singletonList(singletonInstance);
+ } else {
+ _sendingStageInstances = sendingStageInstances;
+ }
_hostName = hostName;
_port = port;
_jobId = jobId;
_stageId = stageId;
_timeout = QueryConfig.DEFAULT_TIMEOUT_NANO;
_upstreamErrorBlock = null;
+ _keySelector = keySelector;
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index bcd49f0b9a..32375dc4fa 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -77,8 +78,20 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
_dataSchema = dataSchema;
_mailboxService = mailboxService;
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
- _receivingStageInstances = receivingStageInstances;
_exchangeType = exchangeType;
+ if (_exchangeType == RelDistribution.Type.SINGLETON) {
+ ServerInstance singletonInstance = null;
+ for (ServerInstance serverInstance : receivingStageInstances) {
+ if (serverInstance.getHostname().equals(_mailboxService.getHostname())
+ && serverInstance.getQueryMailboxPort() ==
_mailboxService.getMailboxPort()) {
+ Preconditions.checkState(singletonInstance == null, "multiple
instance found for singleton exchange type!");
+ singletonInstance = serverInstance;
+ }
+ }
+ _receivingStageInstances = Collections.singletonList(singletonInstance);
+ } else {
+ _receivingStageInstances = receivingStageInstances;
+ }
_keySelector = keySelector;
_serverHostName = hostName;
_serverPort = port;
@@ -112,7 +125,8 @@ public class MailboxSendOperator extends
BaseOperator<TransferableBlock> {
try {
switch (_exchangeType) {
case SINGLETON:
- // TODO: singleton or random distribution should've been
distinguished in planning phase.
+ sendDataTableBlock(_receivingStageInstances.get(0), dataBlock);
+ break;
case RANDOM_DISTRIBUTED:
if (isEndOfStream) {
for (ServerInstance serverInstance : _receivingStageInstances) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index 653d2b216d..0323bed72e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -151,8 +151,8 @@ public class QueryDispatcher {
List<ServerInstance> sendingInstances, long jobId, int stageId,
DataSchema dataSchema, String hostname,
int port) {
MailboxReceiveOperator mailboxReceiveOperator =
- new MailboxReceiveOperator(mailboxService, dataSchema,
RelDistribution.Type.ANY, sendingInstances, hostname,
- port, jobId, stageId);
+ new MailboxReceiveOperator(mailboxService, dataSchema,
sendingInstances,
+ RelDistribution.Type.RANDOM_DISTRIBUTED, null, hostname, port,
jobId, stageId);
return mailboxReceiveOperator;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]