This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f085e2b6a4 [multistage] Remove Old Colocated Join Implementation 
(#15941)
f085e2b6a4 is described below

commit f085e2b6a4a3bb114cac7b86be272437873a11f8
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu May 29 20:22:20 2025 -0500

    [multistage] Remove Old Colocated Join Implementation (#15941)
---
 .../planner/physical/PinotDispatchPlanner.java     |  17 +-
 .../planner/physical/colocated/ColocationKey.java  |  96 -----
 .../colocated/GreedyShuffleRewriteContext.java     | 127 ------
 .../GreedyShuffleRewritePreComputeVisitor.java     |  75 ----
 .../colocated/GreedyShuffleRewriteVisitor.java     | 435 ---------------------
 5 files changed, 2 insertions(+), 748 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index e3883cf3c2..656d0dc5b9 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -26,7 +26,6 @@ import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.SubPlan;
-import 
org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
 import 
org.apache.pinot.query.planner.physical.v2.PlanFragmentAndMailboxAssignment;
 import org.apache.pinot.query.planner.plannode.PlanNode;
 import org.apache.pinot.query.planner.validation.ArrayToMvValidationVisitor;
@@ -70,11 +69,9 @@ public class PinotDispatchPlanner {
     context.getWorkerManager().assignWorkers(rootFragment, context);
     // 4. compute the mailbox assignment for each stage.
     rootNode.visit(MailboxAssignmentVisitor.INSTANCE, context);
-    // 5. Run physical optimizations
-    runPhysicalOptimizers(rootNode, context, _tableCache);
-    // 6. Run validations
+    // 5. Run validations
     runValidations(rootFragment, context);
-    // 7. convert it into query plan.
+    // 6. convert it into query plan.
     return finalizeDispatchableSubPlan(rootFragment, context);
   }
 
@@ -120,16 +117,6 @@ public class PinotDispatchPlanner {
     }
   }
 
-  // TODO: Switch to Worker SPI to avoid multiple-places where workers are 
assigned.
-  private void runPhysicalOptimizers(PlanNode subPlanRoot, 
DispatchablePlanContext dispatchablePlanContext,
-      TableCache tableCache) {
-    if 
(dispatchablePlanContext.getPlannerContext().getOptions().getOrDefault("useColocatedJoin",
 "false")
-        .equals("true")) {
-      GreedyShuffleRewriteVisitor.optimizeShuffles(subPlanRoot,
-          dispatchablePlanContext.getDispatchablePlanMetadataMap(), 
tableCache);
-    }
-  }
-
   private static DispatchableSubPlan finalizeDispatchableSubPlan(PlanFragment 
subPlanRoot,
       DispatchablePlanContext dispatchablePlanContext) {
     return new DispatchableSubPlan(dispatchablePlanContext.getResultFields(),
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java
deleted file mode 100644
index e5d60cc134..0000000000
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import org.apache.calcite.rex.RexInputRef;
-
-
-/**
- * ColocationKey describes how data is distributed in a given stage. It 
consists of a list of columns which are stored
- * as a list of {@link RexInputRef#getIndex()}, the number of partitions and 
the hash-algorithm used. A given stage may
- * have more than 1 ColocationKey, in which case one may use a {@link 
java.util.Set< ColocationKey >} to represent this
- * behavior.
- *
- * <p>
- *  In other words, when a PlanNode has the schema: (user_uuid, col1, col2, 
...), and the ColocationKey is
- *  ([0], 8, murmur), then that means that the data for the PlanNode is 
partitioned using the user_uuid column, into
- *  8 partitions where the partitionId is computed using murmur(user_uuid) % 8.
- *
- *  For a join stage the data is partitioned by the senders using their 
respective join-keys. In that case, we may
- *  have more than 1 ColocationKey applicable for the JoinNode, and it can be 
represented by a set as:
- *  {([0], 8, murmur), ([leftSchemaSize + 0], 8, murmur)}, assuming both 
senders partition using Murmur into 8
- *  partitions. Note that a set of ColocationKey means that the partition keys 
are independent and they don't have any
- *  ordering, i.e. the data is partitioned by both the join-key of the left 
child and the join-key of the right child.
- * </p>
- */
-class ColocationKey {
-  private List<Integer> _indices;
-  private int _numPartitions;
-  private String _hashAlgorithm;
-
-  public List<Integer> getIndices() {
-    return _indices;
-  }
-
-  public int getNumPartitions() {
-    return _numPartitions;
-  }
-
-  public String getHashAlgorithm() {
-    return _hashAlgorithm;
-  }
-
-  public ColocationKey(int numPartitions, String algorithm) {
-    _numPartitions = numPartitions;
-    _hashAlgorithm = algorithm;
-    _indices = new ArrayList<>();
-  }
-
-  public ColocationKey(int index, int numPartitions, String algorithm) {
-    _indices = new ArrayList<>();
-    _indices.add(index);
-    _numPartitions = numPartitions;
-    _hashAlgorithm = algorithm;
-  }
-
-  public void addIndex(int index) {
-    _indices.add(index);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ColocationKey that = (ColocationKey) o;
-    return _indices.equals(that._indices) && _numPartitions == 
that._numPartitions && _hashAlgorithm
-        .equals(that._hashAlgorithm);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(_indices, _numPartitions, _hashAlgorithm);
-  }
-}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
deleted file mode 100644
index f85b2f61a6..0000000000
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteContext.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-
-
-/**
- * Context used for running the {@link GreedyShuffleRewriteVisitor}.
- */
-class GreedyShuffleRewriteContext {
-  private final Map<Integer, PlanNode> _rootPlanNode;
-  private final Map<Integer, List<PlanNode>> _leafNodes;
-  private final Set<Integer> _joinStages;
-  private final Set<Integer> _setOpStages;
-
-  /**
-   * A map to track the partition keys for the input to the MailboxSendNode of 
a given planFragmentId. This is needed
-   * because the {@link GreedyShuffleRewriteVisitor} doesn't determine the 
distribution of the sender if the receiver
-   * is a join-stage.
-   */
-  private final Map<Integer, Set<ColocationKey>> _senderInputColocationKeys;
-
-  GreedyShuffleRewriteContext() {
-    _rootPlanNode = new HashMap<>();
-    _leafNodes = new HashMap<>();
-    _joinStages = new HashSet<>();
-    _setOpStages = new HashSet<>();
-    _senderInputColocationKeys = new HashMap<>();
-  }
-
-  /**
-   * Returns the root PlanNode for a given planFragmentId.
-   */
-  PlanNode getRootPlanNode(Integer planFragmentId) {
-    return _rootPlanNode.get(planFragmentId);
-  }
-
-  /**
-   * Sets the root PlanNode for a given planFragmentId.
-   */
-  void setRootPlanNode(Integer planFragmentId, PlanNode planNode) {
-    _rootPlanNode.put(planFragmentId, planNode);
-  }
-
-  /**
-   * Returns all the leaf PlanNode for a given planFragmentId.
-   */
-  List<PlanNode> getLeafNodes(Integer planFragmentId) {
-    return _leafNodes.get(planFragmentId);
-  }
-
-  /**
-   * Adds a leaf PlanNode for a given planFragmentId.
-   */
-  void addLeafNode(Integer planFragmentId, PlanNode planNode) {
-    _leafNodes.computeIfAbsent(planFragmentId, (x) -> new 
ArrayList<>()).add(planNode);
-  }
-
-  /**
-   * {@link GreedyShuffleRewriteContext} allows checking whether a given 
planFragmentId has a JoinNode or not. During
-   * pre-computation, this method may be used to mark that the given 
planFragmentId has a JoinNode.
-   */
-  void markJoinStage(Integer planFragmentId) {
-    _joinStages.add(planFragmentId);
-  }
-
-  /**
-   * Returns true if the given planFragmentId has a JoinNode.
-   */
-  boolean isJoinStage(Integer planFragmentId) {
-    return _joinStages.contains(planFragmentId);
-  }
-
-  /**
-   * {@link GreedyShuffleRewriteContext} allows checking whether a given 
planFragmentId has a SetOpNode or not. During
-   * pre-computation, this method may be used to mark that the given 
planFragmentId has a SetOpNode.
-   */
-  void markSetOpStage(Integer planFragmentId) {
-    _setOpStages.add(planFragmentId);
-  }
-
-  /**
-   * Returns true if the given planFragmentId has a SetOpNode.
-   */
-  boolean isSetOpStage(Integer planFragmentId) {
-    return _setOpStages.contains(planFragmentId);
-  }
-
-  /**
-   * This returns the {@link Set<ColocationKey>} for the input to the {@link 
MailboxSendNode} of the given
-   * planFragmentId.
-   */
-  Set<ColocationKey> getColocationKeys(Integer planFragmentId) {
-    return _senderInputColocationKeys.get(planFragmentId);
-  }
-
-  /**
-   * This sets the {@link Set<ColocationKey>} for the input to the {@link 
MailboxSendNode} of the given planFragmentId.
-   */
-  void setColocationKeys(Integer planFragmentId, Set<ColocationKey> 
colocationKeys) {
-    _senderInputColocationKeys.put(planFragmentId, colocationKeys);
-  }
-}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
deleted file mode 100644
index 4508883f08..0000000000
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewritePreComputeVisitor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import 
org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
-import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.planner.plannode.SetOpNode;
-import org.apache.pinot.query.planner.plannode.TableScanNode;
-
-
-/**
- * A visitor that does the precomputation for the {@link 
GreedyShuffleRewriteContext} of a query plan.
- */
-class GreedyShuffleRewritePreComputeVisitor
-    extends DefaultPostOrderTraversalVisitor<Integer, 
GreedyShuffleRewriteContext> {
-
-  static GreedyShuffleRewriteContext preComputeContext(PlanNode rootPlanNode) {
-    GreedyShuffleRewriteContext context = new GreedyShuffleRewriteContext();
-    rootPlanNode.visit(new GreedyShuffleRewritePreComputeVisitor(), context);
-    return context;
-  }
-
-  @Override
-  public Integer process(PlanNode planNode, GreedyShuffleRewriteContext 
context) {
-    int currentStageId = planNode.getStageId();
-    context.setRootPlanNode(currentStageId, planNode);
-    return 0;
-  }
-
-  @Override
-  public Integer visitJoin(JoinNode joinNode, GreedyShuffleRewriteContext 
context) {
-    super.visitJoin(joinNode, context);
-    context.markJoinStage(joinNode.getStageId());
-    return 0;
-  }
-
-  @Override
-  public Integer visitMailboxReceive(MailboxReceiveNode planNode, 
GreedyShuffleRewriteContext context) {
-    super.visitMailboxReceive(planNode, context);
-    context.addLeafNode(planNode.getStageId(), planNode);
-    return 0;
-  }
-
-  @Override
-  public Integer visitTableScan(TableScanNode planNode, 
GreedyShuffleRewriteContext context) {
-    super.visitTableScan(planNode, context);
-    context.addLeafNode(planNode.getStageId(), planNode);
-    return 0;
-  }
-
-  @Override
-  public Integer visitSetOp(SetOpNode setOpNode, GreedyShuffleRewriteContext 
context) {
-    super.visitSetOp(setOpNode, context);
-    context.markSetOpStage(setOpNode.getStageId());
-    return 0;
-  }
-}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
deleted file mode 100644
index 07ef34caed..0000000000
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.planner.physical.colocated;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.config.provider.TableCache;
-import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.plannode.AggregateNode;
-import org.apache.pinot.query.planner.plannode.ExchangeNode;
-import org.apache.pinot.query.planner.plannode.ExplainedNode;
-import org.apache.pinot.query.planner.plannode.FilterNode;
-import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
-import org.apache.pinot.query.planner.plannode.ProjectNode;
-import org.apache.pinot.query.planner.plannode.SetOpNode;
-import org.apache.pinot.query.planner.plannode.SortNode;
-import org.apache.pinot.query.planner.plannode.TableScanNode;
-import org.apache.pinot.query.planner.plannode.ValueNode;
-import org.apache.pinot.query.planner.plannode.WindowNode;
-import org.apache.pinot.query.routing.QueryServerInstance;
-import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This shuffle optimizer can avoid shuffles by taking into account all of the 
following:
- *
- * 1. Servers assigned to the stages. The optimizer may also choose to change 
the server assignment if skipping
- *    shuffles is possible.
- * 2. The hash-algorithm and the number of partitions of the data in 
sender/receiver nodes. So for instance if we do a
- *    join on two tables where the left table is partitioned using Murmur but 
the right table is partitioned using
- *    hashCode, then this optimizer can detect this case and keep the shuffle.
- *
- * Also see: {@link ColocationKey} for its definition.
- */
-public class GreedyShuffleRewriteVisitor implements 
PlanNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GreedyShuffleRewriteVisitor.class);
-
-  private final TableCache _tableCache;
-  private final Map<Integer, DispatchablePlanMetadata> 
_dispatchablePlanMetadataMap;
-  private boolean _canSkipShuffleForJoin;
-
-  public static void optimizeShuffles(PlanNode rootPlanNode,
-      Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap, 
TableCache tableCache) {
-    GreedyShuffleRewriteContext context = 
GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootPlanNode);
-    // This assumes that if planFragmentId(S1) > planFragmentId(S2), then S1 
is not an ancestor of S2.
-    // TODO: If this assumption is wrong, we can compute the reverse 
topological ordering explicitly.
-    for (int planFragmentId = dispatchablePlanMetadataMap.size() - 1; 
planFragmentId >= 0; planFragmentId--) {
-      PlanNode planNode = context.getRootPlanNode(planFragmentId);
-      planNode.visit(new GreedyShuffleRewriteVisitor(tableCache, 
dispatchablePlanMetadataMap), context);
-    }
-  }
-
-  private GreedyShuffleRewriteVisitor(TableCache tableCache,
-      Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
-    _tableCache = tableCache;
-    _dispatchablePlanMetadataMap = dispatchablePlanMetadataMap;
-    _canSkipShuffleForJoin = false;
-  }
-
-  @Override
-  public Set<ColocationKey> visitAggregate(AggregateNode node, 
GreedyShuffleRewriteContext context) {
-    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, 
context);
-
-    Map<Integer, Integer> oldToNewIndex = new HashMap<>();
-    List<Integer> groupKeys = node.getGroupKeys();
-    int numGroupKeys = groupKeys.size();
-    for (int i = 0; i < numGroupKeys; i++) {
-      oldToNewIndex.put(groupKeys.get(i), i);
-    }
-
-    return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
-  }
-
-  @Override
-  public Set<ColocationKey> visitFilter(FilterNode node, 
GreedyShuffleRewriteContext context) {
-    // filters don't change partition keys
-    return node.getInputs().get(0).visit(this, context);
-  }
-
-  @Override
-  public Set<ColocationKey> visitJoin(JoinNode node, 
GreedyShuffleRewriteContext context) {
-    List<MailboxReceiveNode> innerLeafNodes =
-        context.getLeafNodes(node.getStageId()).stream().map(x -> 
(MailboxReceiveNode) x).collect(Collectors.toList());
-    Preconditions.checkState(innerLeafNodes.size() == 2);
-
-    // Multiple checks need to be made to ensure that shuffle can be skipped 
for a join.
-    // Step-1: Join can be skipped only for equality joins.
-    boolean canColocate = canJoinBeColocated(node);
-    // Step-2: Only if the servers assigned to both left and right nodes are 
equal and the servers assigned to the join
-    //         stage are a superset of those servers, can we skip shuffles.
-    canColocate =
-        canColocate && canServerAssignmentAllowShuffleSkip(node.getStageId(), 
innerLeafNodes.get(0).getSenderStageId(),
-            innerLeafNodes.get(1).getSenderStageId());
-    // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, 
check whether the key partitioning can
-    //         allow shuffle skip.
-    canColocate =
-        canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(0), 
innerLeafNodes.get(0).getSender(), context);
-    canColocate =
-        canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(1), 
innerLeafNodes.get(1).getSender(), context);
-    // Step-4: Finally, ensure that the number of partitions and the hash 
algorithm is same for partition keys of both
-    //         children.
-    canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), 
innerLeafNodes.get(1), context);
-    if (canColocate) {
-      // If shuffle can be skipped, reassign servers.
-      
_dispatchablePlanMetadataMap.get(node.getStageId()).setWorkerIdToServerInstanceMap(
-          
_dispatchablePlanMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getWorkerIdToServerInstanceMap());
-      _canSkipShuffleForJoin = true;
-    }
-
-    Set<ColocationKey> leftPKs = node.getInputs().get(0).visit(this, context);
-    Set<ColocationKey> rightPks = node.getInputs().get(1).visit(this, context);
-
-    int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
-    Set<ColocationKey> colocationKeys = new HashSet<>(leftPKs);
-
-    for (ColocationKey rightColocationKey : rightPks) {
-      ColocationKey newColocationKey =
-          new ColocationKey(rightColocationKey.getNumPartitions(), 
rightColocationKey.getHashAlgorithm());
-      for (Integer index : rightColocationKey.getIndices()) {
-        newColocationKey.addIndex(leftDataSchemaSize + index);
-      }
-      colocationKeys.add(newColocationKey);
-    }
-
-    return colocationKeys;
-  }
-
-  @Override
-  public Set<ColocationKey> visitMailboxReceive(MailboxReceiveNode node, 
GreedyShuffleRewriteContext context) {
-    Set<ColocationKey> oldColocationKeys = 
context.getColocationKeys(node.getSenderStageId());
-    List<Integer> distributionKeys = node.getKeys();
-
-    // If the current stage is not a join-stage, then we already know sender's 
distribution
-    if (!context.isJoinStage(node.getStageId())) {
-      if (distributionKeys == null) {
-        return new HashSet<>();
-      } else if (colocationKeyCondition(oldColocationKeys, distributionKeys) 
&& areServersSuperset(node.getStageId(),
-          node.getSenderStageId())) {
-        node.setDistributionType(RelDistribution.Type.SINGLETON);
-        
_dispatchablePlanMetadataMap.get(node.getStageId()).setWorkerIdToServerInstanceMap(
-            
_dispatchablePlanMetadataMap.get(node.getSenderStageId()).getWorkerIdToServerInstanceMap());
-        return oldColocationKeys;
-      }
-      // This means we can't skip shuffle and there's a partitioning enforced 
by receiver.
-      int numPartitions = new HashSet<>(
-          
_dispatchablePlanMetadataMap.get(node.getStageId()).getWorkerIdToServerInstanceMap().values()).size();
-      List<ColocationKey> colocationKeys =
-          distributionKeys.stream().map(x -> new ColocationKey(x, 
numPartitions, KeySelector.DEFAULT_HASH_ALGORITHM))
-              .collect(Collectors.toList());
-      return new HashSet<>(colocationKeys);
-    }
-    // If the current stage is a join-stage then we already know whether 
shuffle can be skipped.
-    if (_canSkipShuffleForJoin) {
-      node.setDistributionType(RelDistribution.Type.SINGLETON);
-      // For the join-case, servers are already re-assigned in visitJoin. 
Moreover, we haven't yet changed sender's
-      // distribution.
-      node.getSender().setDistributionType(RelDistribution.Type.SINGLETON);
-      return oldColocationKeys;
-    } else if (distributionKeys == null) {
-      return new HashSet<>();
-    }
-    // This means we can't skip shuffle and there's a partitioning enforced by 
receiver.
-    int numPartitions = new HashSet<>(
-        
_dispatchablePlanMetadataMap.get(node.getStageId()).getWorkerIdToServerInstanceMap().values()).size();
-    List<ColocationKey> colocationKeys =
-        distributionKeys.stream().map(x -> new ColocationKey(x, numPartitions, 
KeySelector.DEFAULT_HASH_ALGORITHM))
-            .collect(Collectors.toList());
-    return new HashSet<>(colocationKeys);
-  }
-
-  @Override
-  public Set<ColocationKey> visitMailboxSend(MailboxSendNode node, 
GreedyShuffleRewriteContext context) {
-    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, 
context);
-    List<Integer> distributionKeys = node.getKeys();
-
-    boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, 
distributionKeys);
-    // If receiver is not a join-stage, then we can determine distribution 
type now.
-    Iterable<Integer> receiverStageIds = node.getReceiverStageIds();
-    if (noneIsJoin(receiverStageIds, context)) {
-      Set<ColocationKey> colocationKeys;
-      if (canSkipShuffleBasic && allAreSuperSet(receiverStageIds, node)) {
-        // Servers are not re-assigned on sender-side. If needed, they are 
re-assigned on the receiver side.
-        node.setDistributionType(RelDistribution.Type.SINGLETON);
-        colocationKeys = oldColocationKeys;
-      } else {
-        colocationKeys = new HashSet<>();
-      }
-        context.setColocationKeys(node.getStageId(), colocationKeys);
-        return colocationKeys;
-      }
-    // If receiver is a join-stage, remember partition-keys of the child node 
of MailboxSendNode.
-    Set<ColocationKey> mailboxSendColocationKeys = canSkipShuffleBasic ? 
oldColocationKeys : new HashSet<>();
-    context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys);
-    return mailboxSendColocationKeys;
-  }
-
-  private boolean noneIsJoin(Iterable<Integer> receiveStageIds, 
GreedyShuffleRewriteContext context) {
-    for (Integer receiveStageId : receiveStageIds) {
-      if (context.isJoinStage(receiveStageId)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean allAreSuperSet(Iterable<Integer> receiveStageIds, 
MailboxSendNode node) {
-    for (Integer receiveStageId : receiveStageIds) {
-      if (!areServersSuperset(receiveStageId, node.getStageId())) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public Set<ColocationKey> visitProject(ProjectNode node, 
GreedyShuffleRewriteContext context) {
-    // Project reorders or removes keys
-    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, 
context);
-
-    Map<Integer, Integer> oldToNewIndex = new HashMap<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        int index = ((RexExpression.InputRef) rex).getIndex();
-        oldToNewIndex.put(index, i);
-      }
-    }
-
-    return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
-  }
-
-  @Override
-  public Set<ColocationKey> visitSort(SortNode node, 
GreedyShuffleRewriteContext context) {
-    return node.getInputs().get(0).visit(this, context);
-  }
-
-  @Override
-  public Set<ColocationKey> visitWindow(WindowNode node, 
GreedyShuffleRewriteContext context) {
-    return node.getInputs().get(0).visit(this, context);
-  }
-
-  @Override
-  public Set<ColocationKey> visitSetOp(SetOpNode setOpNode, 
GreedyShuffleRewriteContext context) {
-    return ImmutableSet.of();
-  }
-
-  @Override
-  public Set<ColocationKey> visitExchange(ExchangeNode exchangeNode, 
GreedyShuffleRewriteContext context) {
-    throw new UnsupportedOperationException("ExchangeNode should not be 
visited by this visitor");
-  }
-
-  @Override
-  public Set<ColocationKey> visitTableScan(TableScanNode node, 
GreedyShuffleRewriteContext context) {
-    TableConfig tableConfig = _tableCache.getTableConfig(node.getTableName());
-    if (tableConfig == null) {
-      LOGGER.warn("Couldn't find tableConfig for {}", node.getTableName());
-      return new HashSet<>();
-    }
-    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
-    if (indexingConfig != null && indexingConfig.getSegmentPartitionConfig() 
!= null) {
-      Map<String, ColumnPartitionConfig> columnPartitionMap =
-          indexingConfig.getSegmentPartitionConfig().getColumnPartitionMap();
-      if (columnPartitionMap != null) {
-        Set<String> partitionColumns = columnPartitionMap.keySet();
-        Set<ColocationKey> newColocationKeys = new HashSet<>();
-        List<String> columns = node.getColumns();
-        int numColumns = columns.size();
-        for (int i = 0; i < numColumns; i++) {
-          String columnName = columns.get(i);
-          if (partitionColumns.contains(columnName)) {
-            int numPartitions = 
columnPartitionMap.get(columnName).getNumPartitions();
-            String hashAlgorithm = 
columnPartitionMap.get(columnName).getFunctionName();
-            newColocationKeys.add(new ColocationKey(i, numPartitions, 
hashAlgorithm));
-          }
-        }
-        return newColocationKeys;
-      }
-    }
-    return new HashSet<>();
-  }
-
-  @Override
-  public Set<ColocationKey> visitExplained(ExplainedNode node, 
GreedyShuffleRewriteContext context) {
-    throw new UnsupportedOperationException("ExplainedNode should not be 
visited by this visitor");
-  }
-
-  @Override
-  public Set<ColocationKey> visitValue(ValueNode node, 
GreedyShuffleRewriteContext context) {
-    return new HashSet<>();
-  }
-
-  // TODO: Only equality joins can be colocated. We don't have join clause 
info available right now.
-  private boolean canJoinBeColocated(JoinNode joinNode) {
-    return true;
-  }
-
-  /**
-   * Checks if servers assigned to the receiver stage are a super-set of the 
sender stage.
-   */
-  private boolean areServersSuperset(int receiverStageId, int senderStageId) {
-    return new HashSet<>(
-        
_dispatchablePlanMetadataMap.get(receiverStageId).getWorkerIdToServerInstanceMap().values()).containsAll(
-        
_dispatchablePlanMetadataMap.get(senderStageId).getWorkerIdToServerInstanceMap().values());
-  }
-
-  /*
-   * We allow shuffle skip only when both of the following conditions are met:
-   * 1. Left and right stage have the same servers (say S).
-   * 2. Servers assigned to the join-stage are a superset of S.
-   */
-  private boolean canServerAssignmentAllowShuffleSkip(int currentStageId, int 
leftStageId, int rightStageId) {
-    Set<QueryServerInstance> leftServerInstances =
-        new 
HashSet<>(_dispatchablePlanMetadataMap.get(leftStageId).getWorkerIdToServerInstanceMap().values());
-    Set<QueryServerInstance> rightServerInstances =
-        new 
HashSet<>(_dispatchablePlanMetadataMap.get(rightStageId).getWorkerIdToServerInstanceMap().values());
-    Set<QueryServerInstance> currentServerInstances =
-        new 
HashSet<>(_dispatchablePlanMetadataMap.get(currentStageId).getWorkerIdToServerInstanceMap().values());
-    return leftServerInstances.containsAll(rightServerInstances)
-        && leftServerInstances.size() == rightServerInstances.size() && 
currentServerInstances.containsAll(
-        leftServerInstances);
-  }
-
-  /**
-   * Given the existing partitioning keys in oldPartitionsKeys, it takes the 
mapping from the old index to the new index
-   * and the computes the new partition keys. Any partition key that has an 
index that is not present in oldToNewIndex
-   * is dropped. If all indices of a ColocationKey are present in 
oldToNewIndex, we keep that partition key in the new
-   * keys and change the name using the map.
-   */
-  private static Set<ColocationKey> 
computeNewColocationKeys(Set<ColocationKey> oldColocationKeys,
-      Map<Integer, Integer> oldToNewIndex) {
-    Set<ColocationKey> colocationKeys = new HashSet<>();
-    for (ColocationKey colocationKey : oldColocationKeys) {
-      boolean shouldDrop = false;
-      ColocationKey newColocationKey =
-          new ColocationKey(colocationKey.getNumPartitions(), 
colocationKey.getHashAlgorithm());
-      for (Integer index : colocationKey.getIndices()) {
-        if (!oldToNewIndex.containsKey(index)) {
-          shouldDrop = true;
-          break;
-        }
-        newColocationKey.addIndex(oldToNewIndex.get(index));
-      }
-      if (!shouldDrop) {
-        colocationKeys.add(newColocationKey);
-      }
-    }
-
-    return colocationKeys;
-  }
-
-  private static boolean colocationKeyCondition(Set<ColocationKey> 
colocationKeys,
-      @Nullable List<Integer> distributionKeys) {
-    if (!colocationKeys.isEmpty() && distributionKeys != null) {
-      for (ColocationKey colocationKey : colocationKeys) {
-        if (distributionKeys.size() >= colocationKey.getIndices().size() && 
distributionKeys.subList(0,
-            
colocationKey.getIndices().size()).equals(colocationKey.getIndices())) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  private static boolean partitionKeyConditionForJoin(MailboxReceiveNode 
mailboxReceiveNode,
-      MailboxSendNode mailboxSendNode, GreedyShuffleRewriteContext context) {
-    // First check ColocationKeyCondition for the sender <--> 
sender.getInputs().get(0) pair
-    Set<ColocationKey> oldColocationKeys = 
context.getColocationKeys(mailboxSendNode.getStageId());
-    if (!colocationKeyCondition(oldColocationKeys, mailboxSendNode.getKeys())) 
{
-      return false;
-    }
-    // Check ColocationKeyCondition for the sender <--> receiver pair
-    // Since shuffle can be skipped, oldPartitionsKeys == senderColocationKeys
-    return colocationKeyCondition(oldColocationKeys, 
mailboxReceiveNode.getKeys());
-  }
-
-  private static ColocationKey getEquivalentSenderKey(Set<ColocationKey> 
colocationKeys,
-      List<Integer> distributionKeys) {
-    if (!colocationKeys.isEmpty() && distributionKeys != null) {
-      for (ColocationKey colocationKey : colocationKeys) {
-        if (distributionKeys.size() >= colocationKey.getIndices().size() && 
distributionKeys.subList(0,
-            
colocationKey.getIndices().size()).equals(colocationKey.getIndices())) {
-          return colocationKey;
-        }
-      }
-    }
-    throw new IllegalStateException("Receiver's Equivalent Key in Sender Can't 
be Determined. This indicates a bug.");
-  }
-
-  private static boolean checkPartitionScheme(MailboxReceiveNode 
leftReceiveNode, MailboxReceiveNode rightReceiveNode,
-      GreedyShuffleRewriteContext context) {
-    int leftSender = leftReceiveNode.getSenderStageId();
-    int rightSender = rightReceiveNode.getSenderStageId();
-    ColocationKey leftPKey = 
getEquivalentSenderKey(context.getColocationKeys(leftSender), 
leftReceiveNode.getKeys());
-    ColocationKey rightPKey =
-        getEquivalentSenderKey(context.getColocationKeys(rightSender), 
rightReceiveNode.getKeys());
-    if (leftPKey.getNumPartitions() != rightPKey.getNumPartitions()) {
-      return false;
-    }
-    return leftPKey.getHashAlgorithm().equals(rightPKey.getHashAlgorithm());
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to