This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f085e2b6a4 [multistage] Remove Old Colocated Join Implementation
(#15941)
f085e2b6a4 is described below
commit f085e2b6a4a3bb114cac7b86be272437873a11f8
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu May 29 20:22:20 2025 -0500
[multistage] Remove Old Colocated Join Implementation (#15941)
---
.../planner/physical/PinotDispatchPlanner.java | 17 +-
.../planner/physical/colocated/ColocationKey.java | 96 -----
.../colocated/GreedyShuffleRewriteContext.java | 127 ------
.../GreedyShuffleRewritePreComputeVisitor.java | 75 ----
.../colocated/GreedyShuffleRewriteVisitor.java | 435 ---------------------
5 files changed, 2 insertions(+), 748 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index e3883cf3c2..656d0dc5b9 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -26,7 +26,6 @@ import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.SubPlan;
-import
org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
import
org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.validation.ArrayToMvValidationVisitor;
@@ -70,11 +69,9 @@ public class PinotDispatchPlanner {
context.getWorkerManager().assignWorkers(rootFragment, context);
// 4. compute the mailbox assignment for each stage.
rootNode.visit(MailboxAssignmentVisitor.INSTANCE, context);
- // 5. Run physical optimizations
- runPhysicalOptimizers(rootNode, context, _tableCache);
- // 6. Run validations
+ // 5. Run validations
runValidations(rootFragment, context);
- // 7. convert it into query plan.
+ // 6. convert it into query plan.
return finalizeDispatchableSubPlan(rootFragment, context);
}
@@ -120,16 +117,6 @@ public class PinotDispatchPlanner {
}
}
- // TODO: Switch to Worker SPI to avoid multiple-places where workers are
assigned.
- private void runPhysicalOptimizers(PlanNode subPlanRoot,
DispatchablePlanContext dispatchablePlanContext,
- TableCache tableCache) {
- if
(dispatchablePlanContext.getPlannerContext().getOptions().getOrDefault("useColocatedJoin",
"false")
- .equals("true")) {
- GreedyShuffleRewriteVisitor.optimizeShuffles(subPlanRoot,
- dispatchablePlanContext.getDispatchablePlanMetadataMap(),
tableCache);
- }
- }
-
private static DispatchableSubPlan finalizeDispatchableSubPlan(PlanFragment
subPlanRoot,
DispatchablePlanContext dispatchablePlanContext) {
return new DispatchableSubPlan(dispatchablePlanContext.getResultFields(),
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java
deleted file mode 100644
index e5d60cc134..0000000000
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import org.apache.calcite.rex.RexInputRef;
-
-
-/**
- * ColocationKey describes how data is distributed in a given stage. It
consists of a list of columns which are stored
- * as a list of {@link RexInputRef#getIndex()}, the number of partitions and
the hash-algorithm used. A given stage may
- * have more than 1 ColocationKey, in which case one may use a {@link
java.util.Set< ColocationKey >} to represent this
- * behavior.
- *
- * <p>
- * In other words, when a PlanNode has the schema: (user_uuid, col1, col2,
...), and the ColocationKey is
- * ([0], 8, murmur), then that means that the data for the PlanNode is
partitioned using the user_uuid column, into
- * 8 partitions where the partitionId is computed using murmur(user_uuid) % 8.
- *
- * For a join stage the data is partitioned by the senders using their
respective join-keys. In that case, we may
- * have more than 1 ColocationKey applicable for the JoinNode, and it can be
represented by a set as:
- * {([0], 8, murmur), ([leftSchemaSize + 0], 8, murmur)}, assuming both
senders partition using Murmur into 8
- * partitions. Note that a set of ColocationKey means that the partition keys
are independent and they don't have any
- * ordering, i.e. the data is partitioned by both the join-key of the left
child and the join-key of the right child.
- * </p>
- */
-class ColocationKey {
- private List<Integer> _indices;
- private int _numPartitions;
- private String _hashAlgorithm;
-
- public List<Integer> getIndices() {
- return _indices;
- }
-
- public int getNumPartitions() {
- return _numPartitions;
- }
-
- public String getHashAlgorithm() {
- return _hashAlgorithm;
- }
-
- public ColocationKey(int numPartitions, String algorithm) {
- _numPartitions = numPartitions;
- _hashAlgorithm = algorithm;
- _indices = new ArrayList<>();
- }
-
- public ColocationKey(int index, int numPartitions, String algorithm) {
- _indices = new ArrayList<>();
- _indices.add(index);
- _numPartitions = numPartitions;
- _hashAlgorithm = algorithm;
- }
-
- public void addIndex(int index) {
- _indices.add(index);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ColocationKey that = (ColocationKey) o;
- return _indices.equals(that._indices) && _numPartitions ==
that._numPartitions && _hashAlgorithm
- .equals(that._hashAlgorithm);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(_indices, _numPartitions, _hashAlgorithm);
- }
-}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
deleted file mode 100644
index f85b2f61a6..0000000000
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-
-
-/**
- * Context used for running the {@link GreedyShuffleRewriteVisitor}.
- */
-class GreedyShuffleRewriteContext {
- private final Map<Integer, PlanNode> _rootPlanNode;
- private final Map<Integer, List<PlanNode>> _leafNodes;
- private final Set<Integer> _joinStages;
- private final Set<Integer> _setOpStages;
-
- /**
- * A map to track the partition keys for the input to the MailboxSendNode of
a given planFragmentId. This is needed
- * because the {@link GreedyShuffleRewriteVisitor} doesn't determine the
distribution of the sender if the receiver
- * is a join-stage.
- */
- private final Map<Integer, Set<ColocationKey>> _senderInputColocationKeys;
-
- GreedyShuffleRewriteContext() {
- _rootPlanNode = new HashMap<>();
- _leafNodes = new HashMap<>();
- _joinStages = new HashSet<>();
- _setOpStages = new HashSet<>();
- _senderInputColocationKeys = new HashMap<>();
- }
-
- /**
- * Returns the root PlanNode for a given planFragmentId.
- */
- PlanNode getRootPlanNode(Integer planFragmentId) {
- return _rootPlanNode.get(planFragmentId);
- }
-
- /**
- * Sets the root PlanNode for a given planFragmentId.
- */
- void setRootPlanNode(Integer planFragmentId, PlanNode planNode) {
- _rootPlanNode.put(planFragmentId, planNode);
- }
-
- /**
- * Returns all the leaf PlanNode for a given planFragmentId.
- */
- List<PlanNode> getLeafNodes(Integer planFragmentId) {
- return _leafNodes.get(planFragmentId);
- }
-
- /**
- * Adds a leaf PlanNode for a given planFragmentId.
- */
- void addLeafNode(Integer planFragmentId, PlanNode planNode) {
- _leafNodes.computeIfAbsent(planFragmentId, (x) -> new
ArrayList<>()).add(planNode);
- }
-
- /**
- * {@link GreedyShuffleRewriteContext} allows checking whether a given
planFragmentId has a JoinNode or not. During
- * pre-computation, this method may be used to mark that the given
planFragmentId has a JoinNode.
- */
- void markJoinStage(Integer planFragmentId) {
- _joinStages.add(planFragmentId);
- }
-
- /**
- * Returns true if the given planFragmentId has a JoinNode.
- */
- boolean isJoinStage(Integer planFragmentId) {
- return _joinStages.contains(planFragmentId);
- }
-
- /**
- * {@link GreedyShuffleRewriteContext} allows checking whether a given
planFragmentId has a SetOpNode or not. During
- * pre-computation, this method may be used to mark that the given
planFragmentId has a SetOpNode.
- */
- void markSetOpStage(Integer planFragmentId) {
- _setOpStages.add(planFragmentId);
- }
-
- /**
- * Returns true if the given planFragmentId has a SetOpNode.
- */
- boolean isSetOpStage(Integer planFragmentId) {
- return _setOpStages.contains(planFragmentId);
- }
-
- /**
- * This returns the {@link Set<ColocationKey>} for the input to the {@link
MailboxSendNode} of the given
- * planFragmentId.
- */
- Set<ColocationKey> getColocationKeys(Integer planFragmentId) {
- return _senderInputColocationKeys.get(planFragmentId);
- }
-
- /**
- * This sets the {@link Set<ColocationKey>} for the input to the {@link
MailboxSendNode} of the given planFragmentId.
- */
- void setColocationKeys(Integer planFragmentId, Set<ColocationKey>
colocationKeys) {
- _senderInputColocationKeys.put(planFragmentId, colocationKeys);
- }
-}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
deleted file mode 100644
index 4508883f08..0000000000
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import
org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
-import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.planner.plannode.SetOpNode;
-import org.apache.pinot.query.planner.plannode.TableScanNode;
-
-
-/**
- * A visitor that does the precomputation for the {@link
GreedyShuffleRewriteContext} of a query plan.
- */
-class GreedyShuffleRewritePreComputeVisitor
- extends DefaultPostOrderTraversalVisitor<Integer,
GreedyShuffleRewriteContext> {
-
- static GreedyShuffleRewriteContext preComputeContext(PlanNode rootPlanNode) {
- GreedyShuffleRewriteContext context = new GreedyShuffleRewriteContext();
- rootPlanNode.visit(new GreedyShuffleRewritePreComputeVisitor(), context);
- return context;
- }
-
- @Override
- public Integer process(PlanNode planNode, GreedyShuffleRewriteContext
context) {
- int currentStageId = planNode.getStageId();
- context.setRootPlanNode(currentStageId, planNode);
- return 0;
- }
-
- @Override
- public Integer visitJoin(JoinNode joinNode, GreedyShuffleRewriteContext
context) {
- super.visitJoin(joinNode, context);
- context.markJoinStage(joinNode.getStageId());
- return 0;
- }
-
- @Override
- public Integer visitMailboxReceive(MailboxReceiveNode planNode,
GreedyShuffleRewriteContext context) {
- super.visitMailboxReceive(planNode, context);
- context.addLeafNode(planNode.getStageId(), planNode);
- return 0;
- }
-
- @Override
- public Integer visitTableScan(TableScanNode planNode,
GreedyShuffleRewriteContext context) {
- super.visitTableScan(planNode, context);
- context.addLeafNode(planNode.getStageId(), planNode);
- return 0;
- }
-
- @Override
- public Integer visitSetOp(SetOpNode setOpNode, GreedyShuffleRewriteContext
context) {
- super.visitSetOp(setOpNode, context);
- context.markSetOpStage(setOpNode.getStageId());
- return 0;
- }
-}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
deleted file mode 100644
index 07ef34caed..0000000000
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.plannode.AggregateNode;
-import org.apache.pinot.query.planner.plannode.ExchangeNode;
-import org.apache.pinot.query.planner.plannode.ExplainedNode;
-import org.apache.pinot.query.planner.plannode.FilterNode;
-import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
-import org.apache.pinot.query.planner.plannode.ProjectNode;
-import org.apache.pinot.query.planner.plannode.SetOpNode;
-import org.apache.pinot.query.planner.plannode.SortNode;
-import org.apache.pinot.query.planner.plannode.TableScanNode;
-import org.apache.pinot.query.planner.plannode.ValueNode;
-import org.apache.pinot.query.planner.plannode.WindowNode;
-import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This shuffle optimizer can avoid shuffles by taking into account all of the
following:
- *
- * 1. Servers assigned to the stages. The optimizer may also choose to change
the server assignment if skipping
- * shuffles is possible.
- * 2. The hash-algorithm and the number of partitions of the data in
sender/receiver nodes. So for instance if we do a
- * join on two tables where the left table is partitioned using Murmur but
the right table is partitioned using
- * hashCode, then this optimizer can detect this case and keep the shuffle.
- *
- * Also see: {@link ColocationKey} for its definition.
- */
-public class GreedyShuffleRewriteVisitor implements
PlanNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(GreedyShuffleRewriteVisitor.class);
-
- private final TableCache _tableCache;
- private final Map<Integer, DispatchablePlanMetadata>
_dispatchablePlanMetadataMap;
- private boolean _canSkipShuffleForJoin;
-
- public static void optimizeShuffles(PlanNode rootPlanNode,
- Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap,
TableCache tableCache) {
- GreedyShuffleRewriteContext context =
GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootPlanNode);
- // This assumes that if planFragmentId(S1) > planFragmentId(S2), then S1
is not an ancestor of S2.
- // TODO: If this assumption is wrong, we can compute the reverse
topological ordering explicitly.
- for (int planFragmentId = dispatchablePlanMetadataMap.size() - 1;
planFragmentId >= 0; planFragmentId--) {
- PlanNode planNode = context.getRootPlanNode(planFragmentId);
- planNode.visit(new GreedyShuffleRewriteVisitor(tableCache,
dispatchablePlanMetadataMap), context);
- }
- }
-
- private GreedyShuffleRewriteVisitor(TableCache tableCache,
- Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
- _tableCache = tableCache;
- _dispatchablePlanMetadataMap = dispatchablePlanMetadataMap;
- _canSkipShuffleForJoin = false;
- }
-
- @Override
- public Set<ColocationKey> visitAggregate(AggregateNode node,
GreedyShuffleRewriteContext context) {
- Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this,
context);
-
- Map<Integer, Integer> oldToNewIndex = new HashMap<>();
- List<Integer> groupKeys = node.getGroupKeys();
- int numGroupKeys = groupKeys.size();
- for (int i = 0; i < numGroupKeys; i++) {
- oldToNewIndex.put(groupKeys.get(i), i);
- }
-
- return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
- }
-
- @Override
- public Set<ColocationKey> visitFilter(FilterNode node,
GreedyShuffleRewriteContext context) {
- // filters don't change partition keys
- return node.getInputs().get(0).visit(this, context);
- }
-
- @Override
- public Set<ColocationKey> visitJoin(JoinNode node,
GreedyShuffleRewriteContext context) {
- List<MailboxReceiveNode> innerLeafNodes =
- context.getLeafNodes(node.getStageId()).stream().map(x ->
(MailboxReceiveNode) x).collect(Collectors.toList());
- Preconditions.checkState(innerLeafNodes.size() == 2);
-
- // Multiple checks need to be made to ensure that shuffle can be skipped
for a join.
- // Step-1: Join can be skipped only for equality joins.
- boolean canColocate = canJoinBeColocated(node);
- // Step-2: Only if the servers assigned to both left and right nodes are
equal and the servers assigned to the join
- // stage are a superset of those servers, can we skip shuffles.
- canColocate =
- canColocate && canServerAssignmentAllowShuffleSkip(node.getStageId(),
innerLeafNodes.get(0).getSenderStageId(),
- innerLeafNodes.get(1).getSenderStageId());
- // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs,
check whether the key partitioning can
- // allow shuffle skip.
- canColocate =
- canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(0),
innerLeafNodes.get(0).getSender(), context);
- canColocate =
- canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(1),
innerLeafNodes.get(1).getSender(), context);
- // Step-4: Finally, ensure that the number of partitions and the hash
algorithm is same for partition keys of both
- // children.
- canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0),
innerLeafNodes.get(1), context);
- if (canColocate) {
- // If shuffle can be skipped, reassign servers.
-
_dispatchablePlanMetadataMap.get(node.getStageId()).setWorkerIdToServerInstanceMap(
-
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getWorkerIdToServerInstanceMap());
- _canSkipShuffleForJoin = true;
- }
-
- Set<ColocationKey> leftPKs = node.getInputs().get(0).visit(this, context);
- Set<ColocationKey> rightPks = node.getInputs().get(1).visit(this, context);
-
- int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
- Set<ColocationKey> colocationKeys = new HashSet<>(leftPKs);
-
- for (ColocationKey rightColocationKey : rightPks) {
- ColocationKey newColocationKey =
- new ColocationKey(rightColocationKey.getNumPartitions(),
rightColocationKey.getHashAlgorithm());
- for (Integer index : rightColocationKey.getIndices()) {
- newColocationKey.addIndex(leftDataSchemaSize + index);
- }
- colocationKeys.add(newColocationKey);
- }
-
- return colocationKeys;
- }
-
- @Override
- public Set<ColocationKey> visitMailboxReceive(MailboxReceiveNode node,
GreedyShuffleRewriteContext context) {
- Set<ColocationKey> oldColocationKeys =
context.getColocationKeys(node.getSenderStageId());
- List<Integer> distributionKeys = node.getKeys();
-
- // If the current stage is not a join-stage, then we already know sender's
distribution
- if (!context.isJoinStage(node.getStageId())) {
- if (distributionKeys == null) {
- return new HashSet<>();
- } else if (colocationKeyCondition(oldColocationKeys, distributionKeys)
&& areServersSuperset(node.getStageId(),
- node.getSenderStageId())) {
- node.setDistributionType(RelDistribution.Type.SINGLETON);
-
_dispatchablePlanMetadataMap.get(node.getStageId()).setWorkerIdToServerInstanceMap(
-
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getWorkerIdToServerInstanceMap());
- return oldColocationKeys;
- }
- // This means we can't skip shuffle and there's a partitioning enforced
by receiver.
- int numPartitions = new HashSet<>(
-
_dispatchablePlanMetadataMap.get(node.getStageId()).getWorkerIdToServerInstanceMap().values()).size();
- List<ColocationKey> colocationKeys =
- distributionKeys.stream().map(x -> new ColocationKey(x,
numPartitions, KeySelector.DEFAULT_HASH_ALGORITHM))
- .collect(Collectors.toList());
- return new HashSet<>(colocationKeys);
- }
- // If the current stage is a join-stage then we already know whether
shuffle can be skipped.
- if (_canSkipShuffleForJoin) {
- node.setDistributionType(RelDistribution.Type.SINGLETON);
- // For the join-case, servers are already re-assigned in visitJoin.
Moreover, we haven't yet changed sender's
- // distribution.
- node.getSender().setDistributionType(RelDistribution.Type.SINGLETON);
- return oldColocationKeys;
- } else if (distributionKeys == null) {
- return new HashSet<>();
- }
- // This means we can't skip shuffle and there's a partitioning enforced by
receiver.
- int numPartitions = new HashSet<>(
-
_dispatchablePlanMetadataMap.get(node.getStageId()).getWorkerIdToServerInstanceMap().values()).size();
- List<ColocationKey> colocationKeys =
- distributionKeys.stream().map(x -> new ColocationKey(x, numPartitions,
KeySelector.DEFAULT_HASH_ALGORITHM))
- .collect(Collectors.toList());
- return new HashSet<>(colocationKeys);
- }
-
- @Override
- public Set<ColocationKey> visitMailboxSend(MailboxSendNode node,
GreedyShuffleRewriteContext context) {
- Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this,
context);
- List<Integer> distributionKeys = node.getKeys();
-
- boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys,
distributionKeys);
- // If receiver is not a join-stage, then we can determine distribution
type now.
- Iterable<Integer> receiverStageIds = node.getReceiverStageIds();
- if (noneIsJoin(receiverStageIds, context)) {
- Set<ColocationKey> colocationKeys;
- if (canSkipShuffleBasic && allAreSuperSet(receiverStageIds, node)) {
- // Servers are not re-assigned on sender-side. If needed, they are
re-assigned on the receiver side.
- node.setDistributionType(RelDistribution.Type.SINGLETON);
- colocationKeys = oldColocationKeys;
- } else {
- colocationKeys = new HashSet<>();
- }
- context.setColocationKeys(node.getStageId(), colocationKeys);
- return colocationKeys;
- }
- // If receiver is a join-stage, remember partition-keys of the child node
of MailboxSendNode.
- Set<ColocationKey> mailboxSendColocationKeys = canSkipShuffleBasic ?
oldColocationKeys : new HashSet<>();
- context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys);
- return mailboxSendColocationKeys;
- }
-
- private boolean noneIsJoin(Iterable<Integer> receiveStageIds,
GreedyShuffleRewriteContext context) {
- for (Integer receiveStageId : receiveStageIds) {
- if (context.isJoinStage(receiveStageId)) {
- return false;
- }
- }
- return true;
- }
-
- private boolean allAreSuperSet(Iterable<Integer> receiveStageIds,
MailboxSendNode node) {
- for (Integer receiveStageId : receiveStageIds) {
- if (!areServersSuperset(receiveStageId, node.getStageId())) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public Set<ColocationKey> visitProject(ProjectNode node,
GreedyShuffleRewriteContext context) {
- // Project reorders or removes keys
- Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this,
context);
-
- Map<Integer, Integer> oldToNewIndex = new HashMap<>();
- for (int i = 0; i < node.getProjects().size(); i++) {
- RexExpression rex = node.getProjects().get(i);
- if (rex instanceof RexExpression.InputRef) {
- int index = ((RexExpression.InputRef) rex).getIndex();
- oldToNewIndex.put(index, i);
- }
- }
-
- return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
- }
-
- @Override
- public Set<ColocationKey> visitSort(SortNode node,
GreedyShuffleRewriteContext context) {
- return node.getInputs().get(0).visit(this, context);
- }
-
- @Override
- public Set<ColocationKey> visitWindow(WindowNode node,
GreedyShuffleRewriteContext context) {
- return node.getInputs().get(0).visit(this, context);
- }
-
- @Override
- public Set<ColocationKey> visitSetOp(SetOpNode setOpNode,
GreedyShuffleRewriteContext context) {
- return ImmutableSet.of();
- }
-
- @Override
- public Set<ColocationKey> visitExchange(ExchangeNode exchangeNode,
GreedyShuffleRewriteContext context) {
- throw new UnsupportedOperationException("ExchangeNode should not be
visited by this visitor");
- }
-
- @Override
- public Set<ColocationKey> visitTableScan(TableScanNode node,
GreedyShuffleRewriteContext context) {
- TableConfig tableConfig = _tableCache.getTableConfig(node.getTableName());
- if (tableConfig == null) {
- LOGGER.warn("Couldn't find tableConfig for {}", node.getTableName());
- return new HashSet<>();
- }
- IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
- if (indexingConfig != null && indexingConfig.getSegmentPartitionConfig()
!= null) {
- Map<String, ColumnPartitionConfig> columnPartitionMap =
- indexingConfig.getSegmentPartitionConfig().getColumnPartitionMap();
- if (columnPartitionMap != null) {
- Set<String> partitionColumns = columnPartitionMap.keySet();
- Set<ColocationKey> newColocationKeys = new HashSet<>();
- List<String> columns = node.getColumns();
- int numColumns = columns.size();
- for (int i = 0; i < numColumns; i++) {
- String columnName = columns.get(i);
- if (partitionColumns.contains(columnName)) {
- int numPartitions =
columnPartitionMap.get(columnName).getNumPartitions();
- String hashAlgorithm =
columnPartitionMap.get(columnName).getFunctionName();
- newColocationKeys.add(new ColocationKey(i, numPartitions,
hashAlgorithm));
- }
- }
- return newColocationKeys;
- }
- }
- return new HashSet<>();
- }
-
- @Override
- public Set<ColocationKey> visitExplained(ExplainedNode node,
GreedyShuffleRewriteContext context) {
- throw new UnsupportedOperationException("ExplainedNode should not be
visited by this visitor");
- }
-
- @Override
- public Set<ColocationKey> visitValue(ValueNode node,
GreedyShuffleRewriteContext context) {
- return new HashSet<>();
- }
-
- // TODO: Only equality joins can be colocated. We don't have join clause
info available right now.
- private boolean canJoinBeColocated(JoinNode joinNode) {
- return true;
- }
-
- /**
- * Checks if servers assigned to the receiver stage are a super-set of the
sender stage.
- */
- private boolean areServersSuperset(int receiverStageId, int senderStageId) {
- return new HashSet<>(
-
_dispatchablePlanMetadataMap.get(receiverStageId).getWorkerIdToServerInstanceMap().values()).containsAll(
-
_dispatchablePlanMetadataMap.get(senderStageId).getWorkerIdToServerInstanceMap().values());
- }
-
- /*
- * We allow shuffle skip only when both of the following conditions are met:
- * 1. Left and right stage have the same servers (say S).
- * 2. Servers assigned to the join-stage are a superset of S.
- */
- private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int
leftStageId, int rightStageId) {
- Set<QueryServerInstance> leftServerInstances =
- new
HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId).getWorkerIdToServerInstanceMap().values());
- Set<QueryServerInstance> rightServerInstances =
- new
HashSet<>(_dispatchablePlanMetadataMap.get(rightStageId).getWorkerIdToServerInstanceMap().values());
- Set<QueryServerInstance> currentServerInstances =
- new
HashSet<>(_dispatchablePlanMetadataMap.get(currentStageId).getWorkerIdToServerInstanceMap().values());
- return leftServerInstances.containsAll(rightServerInstances)
- && leftServerInstances.size() == rightServerInstances.size() &&
currentServerInstances.containsAll(
- leftServerInstances);
- }
-
- /**
- * Given the existing partitioning keys in oldPartitionsKeys, it takes the
mapping from the old index to the new index
- * and the computes the new partition keys. Any partition key that has an
index that is not present in oldToNewIndex
- * is dropped. If all indices of a ColocationKey are present in
oldToNewIndex, we keep that partition key in the new
- * keys and change the name using the map.
- */
- private static Set<ColocationKey>
computeNewColocationKeys(Set<ColocationKey> oldColocationKeys,
- Map<Integer, Integer> oldToNewIndex) {
- Set<ColocationKey> colocationKeys = new HashSet<>();
- for (ColocationKey colocationKey : oldColocationKeys) {
- boolean shouldDrop = false;
- ColocationKey newColocationKey =
- new ColocationKey(colocationKey.getNumPartitions(),
colocationKey.getHashAlgorithm());
- for (Integer index : colocationKey.getIndices()) {
- if (!oldToNewIndex.containsKey(index)) {
- shouldDrop = true;
- break;
- }
- newColocationKey.addIndex(oldToNewIndex.get(index));
- }
- if (!shouldDrop) {
- colocationKeys.add(newColocationKey);
- }
- }
-
- return colocationKeys;
- }
-
- private static boolean colocationKeyCondition(Set<ColocationKey>
colocationKeys,
- @Nullable List<Integer> distributionKeys) {
- if (!colocationKeys.isEmpty() && distributionKeys != null) {
- for (ColocationKey colocationKey : colocationKeys) {
- if (distributionKeys.size() >= colocationKey.getIndices().size() &&
distributionKeys.subList(0,
-
colocationKey.getIndices().size()).equals(colocationKey.getIndices())) {
- return true;
- }
- }
- }
- return false;
- }
-
- private static boolean partitionKeyConditionForJoin(MailboxReceiveNode
mailboxReceiveNode,
- MailboxSendNode mailboxSendNode, GreedyShuffleRewriteContext context) {
- // First check ColocationKeyCondition for the sender <-->
sender.getInputs().get(0) pair
- Set<ColocationKey> oldColocationKeys =
context.getColocationKeys(mailboxSendNode.getStageId());
- if (!colocationKeyCondition(oldColocationKeys, mailboxSendNode.getKeys()))
{
- return false;
- }
- // Check ColocationKeyCondition for the sender <--> receiver pair
- // Since shuffle can be skipped, oldPartitionsKeys == senderColocationKeys
- return colocationKeyCondition(oldColocationKeys,
mailboxReceiveNode.getKeys());
- }
-
- private static ColocationKey getEquivalentSenderKey(Set<ColocationKey>
colocationKeys,
- List<Integer> distributionKeys) {
- if (!colocationKeys.isEmpty() && distributionKeys != null) {
- for (ColocationKey colocationKey : colocationKeys) {
- if (distributionKeys.size() >= colocationKey.getIndices().size() &&
distributionKeys.subList(0,
-
colocationKey.getIndices().size()).equals(colocationKey.getIndices())) {
- return colocationKey;
- }
- }
- }
- throw new IllegalStateException("Receiver's Equivalent Key in Sender Can't
be Determined. This indicates a bug.");
- }
-
- private static boolean checkPartitionScheme(MailboxReceiveNode
leftReceiveNode, MailboxReceiveNode rightReceiveNode,
- GreedyShuffleRewriteContext context) {
- int leftSender = leftReceiveNode.getSenderStageId();
- int rightSender = rightReceiveNode.getSenderStageId();
- ColocationKey leftPKey =
getEquivalentSenderKey(context.getColocationKeys(leftSender),
leftReceiveNode.getKeys());
- ColocationKey rightPKey =
- getEquivalentSenderKey(context.getColocationKeys(rightSender),
rightReceiveNode.getKeys());
- if (leftPKey.getNumPartitions() != rightPKey.getNumPartitions()) {
- return false;
- }
- return leftPKey.getHashAlgorithm().equals(rightPKey.getHashAlgorithm());
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]