This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4935326785 [multistage] refactor traversals of stage nodes into 
visitor pattern (#9560)
4935326785 is described below

commit 493532678507ce8e39f576b747b2a9eb684676b5
Author: Almog Gavra <[email protected]>
AuthorDate: Mon Oct 17 12:59:05 2022 -0700

    [multistage] refactor traversals of stage nodes into visitor pattern (#9560)
    
    This PR improves the traversal code around StageNode. It sets up a common 
pattern for visiting nodes, collecting information, rewriting and making other 
changes. This PR is setup for one that will help us implement a global sort 
stage for LIMIT/OFFSET queries and support sort push down.
    
    There are five main parts to look at:
    1. I added the `StageNodeVisitor` interface and implemented `visit` in all 
of the Stage Node implementations
    2. I refactored the `partitionKey` optimization (that removes a shuffle if 
not necessary) into a Visitor (`ShuffleRewriter`)
    3. I refactored constructing the `QueryPlan` metadata into a visitor (this 
is in preparation for the next PR) (`QueryPlanGenerator`)
    4. I refactored constructing the `Operator` into a visitor 
(`PhyscialPlanBuilder`)
    5. I refactored `QueryPlan#explain` into a visitor, and also improved the 
functionality (see new plan explain below)
    
    Lastly, I added some quality of life improvements in debug-ability and I 
identified a "bug" in nested loop joins - though I'll fix that one in a future 
PR (see `FIXME` comment)
    
    Example of the improved explain (it now properly recognizes which nodes are 
executing what code):
    ```
    [0]@localhost:51925 MAIL_RECEIVE(RANDOM_DISTRIBUTED)
    ├── [1]@localhost:51923 
MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925} (Subtree Omitted)
    └── [1]@localhost:51924 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925}
       └── [1]@localhost:51924 JOIN
          ├── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
          │   ├── [2]@localhost:51924 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
          │   │   └── [2]@localhost:51924 TABLE SCAN (a) {REALTIME=[a3]}
          │   └── [2]@localhost:51923 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
          │      └── [2]@localhost:51923 TABLE SCAN (a) {REALTIME=[a1, a2]}
          └── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
             └── [3]@localhost:51923 
MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
                └── [3]@localhost:51923 TABLE SCAN (b) {REALTIME=[b1]}
    ```
---
 .../query/planner/ExplainPlanStageVisitor.java     | 224 +++++++++++++++++++++
 .../org/apache/pinot/query/planner/QueryPlan.java  |  60 +-----
 .../apache/pinot/query/planner/StageMetadata.java  |   7 +
 .../planner/logical/ShuffleRewriteVisitor.java     | 192 ++++++++++++++++++
 .../planner/logical/StageMetadataVisitor.java      | 132 ++++++++++++
 .../pinot/query/planner/logical/StagePlanner.java  | 171 +++-------------
 .../query/planner/stage/AbstractStageNode.java     |  15 --
 .../pinot/query/planner/stage/AggregateNode.java   |   5 +
 .../pinot/query/planner/stage/FilterNode.java      |   5 +
 .../apache/pinot/query/planner/stage/JoinNode.java |   5 +
 .../query/planner/stage/MailboxReceiveNode.java    |  23 ++-
 .../pinot/query/planner/stage/MailboxSendNode.java |  11 +-
 .../pinot/query/planner/stage/ProjectNode.java     |   5 +
 .../apache/pinot/query/planner/stage/SortNode.java |   5 +
 .../pinot/query/planner/stage/StageNode.java       |   8 +-
 .../query/planner/stage/StageNodeVisitor.java      |  57 ++++++
 .../pinot/query/planner/stage/TableScanNode.java   |   5 +
 .../pinot/query/planner/stage/ValueNode.java       |   5 +
 .../pinot/query/mailbox/GrpcMailboxService.java    |   5 +
 .../query/runtime/blocks/TransferableBlock.java    |   2 +-
 .../runtime/executor/PhysicalPlanVisitor.java      | 149 ++++++++++++++
 .../runtime/executor/WorkerQueryExecutor.java      |  79 +-------
 .../query/runtime/operator/AggregateOperator.java  |   4 +-
 .../query/runtime/operator/FilterOperator.java     |   4 +-
 .../query/runtime/operator/HashJoinOperator.java   |   8 +-
 .../runtime/operator/MailboxReceiveOperator.java   |   7 +
 .../runtime/operator/MailboxSendOperator.java      |   4 +-
 .../pinot/query/runtime/operator/SortOperator.java |   4 +-
 .../query/runtime/operator/TransformOperator.java  |   4 +-
 29 files changed, 898 insertions(+), 307 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
new file mode 100644
index 0000000000..e0345590dc
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * A visitor that converts a {@code QueryPlan} into a human-readable string 
representation.
+ *
+ * <p>It is currently not used programmatically and cannot be accessed by the 
user. Instead,
+ * it is intended for use in manual debugging (e.g. setting breakpoints and 
calling QueryPlan#explain()).
+ */
+public class ExplainPlanStageVisitor implements 
StageNodeVisitor<StringBuilder, ExplainPlanStageVisitor.Context> {
+
+  private final QueryPlan _queryPlan;
+
+  /**
+   * Explains the query plan.
+   *
+   * @see QueryPlan#explain()
+   * @param queryPlan the queryPlan to explain
+   * @return a String representation of the query plan tree
+   */
+  public static String explain(QueryPlan queryPlan) {
+    if (queryPlan.getQueryStageMap().isEmpty()) {
+      return "EMPTY";
+    }
+
+    // the root of a query plan always only has a single node
+    ServerInstance rootServer = 
queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
+    return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0), 
rootServer);
+  }
+
+  /**
+   * Explains the query plan from a specific point in the subtree, taking 
{@code rootServer}
+   * as the node that is executing this sub-tree. This is helpful for 
debugging what is happening
+   * at a given point in time (for example, printing the tree that will be 
executed on a
+   * local node right before it is executed).
+   *
+   * @param queryPlan the entire query plan, including non-executed portions
+   * @param node the node to begin traversal
+   * @param rootServer the server instance that is executing this plan (should 
execute {@code node})
+   *
+   * @return a query plan associated with
+   */
+  public static String explainFrom(QueryPlan queryPlan, StageNode node, 
ServerInstance rootServer) {
+    final ExplainPlanStageVisitor visitor = new 
ExplainPlanStageVisitor(queryPlan);
+    return node
+        .visit(visitor, new Context(rootServer, "", "", new StringBuilder()))
+        .toString();
+  }
+
+  private ExplainPlanStageVisitor(QueryPlan queryPlan) {
+    _queryPlan = queryPlan;
+  }
+
+  private StringBuilder appendInfo(StageNode node, Context context) {
+    int stage = node.getStageId();
+    context._builder
+        .append(context._prefix)
+        .append('[')
+        .append(stage)
+        .append("]@")
+        .append(context._host.getHostname())
+        .append(':')
+        .append(context._host.getPort())
+        .append(' ')
+        .append(node.explain());
+    return context._builder;
+  }
+
+  private StringBuilder visitSimpleNode(StageNode node, Context context) {
+    appendInfo(node, context).append('\n');
+    return node.getInputs().get(0).visit(this, context.next(false, 
context._host));
+  }
+
+  @Override
+  public StringBuilder visitAggregate(AggregateNode node, Context context) {
+    return visitSimpleNode(node, context);
+  }
+
+  @Override
+  public StringBuilder visitFilter(FilterNode node, Context context) {
+    return visitSimpleNode(node, context);
+  }
+
+  @Override
+  public StringBuilder visitJoin(JoinNode node, Context context) {
+    appendInfo(node, context).append('\n');
+    node.getInputs().get(0).visit(this, context.next(true, context._host));
+    node.getInputs().get(1).visit(this, context.next(false, context._host));
+    return context._builder;
+  }
+
+  @Override
+  public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context 
context) {
+    appendInfo(node, context).append('\n');
+
+    MailboxSendNode sender = (MailboxSendNode) node.getSender();
+    int senderStageId = node.getSenderStageId();
+    StageMetadata metadata = 
_queryPlan.getStageMetadataMap().get(senderStageId);
+    Map<ServerInstance, Map<String, List<String>>> segments = 
metadata.getServerInstanceToSegmentsMap();
+
+    Iterator<ServerInstance> iterator = 
metadata.getServerInstances().iterator();
+    while (iterator.hasNext()) {
+      ServerInstance serverInstance = iterator.next();
+      if (segments.containsKey(serverInstance)) {
+        // always print out leaf stages
+        sender.visit(this, context.next(iterator.hasNext(), serverInstance));
+      } else {
+        if (!iterator.hasNext()) {
+          // always print out the last one
+          sender.visit(this, context.next(false, serverInstance));
+        } else {
+          // only print short version of the sender node
+          appendMailboxSend(sender, context.next(true, serverInstance))
+              .append(" (Subtree Omitted)")
+              .append('\n');
+        }
+      }
+    }
+    return context._builder;
+  }
+
+  @Override
+  public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) 
{
+    appendMailboxSend(node, context).append('\n');
+    return node.getInputs().get(0).visit(this, context.next(false, 
context._host));
+  }
+
+  private StringBuilder appendMailboxSend(MailboxSendNode node, Context 
context) {
+    appendInfo(node, context);
+
+    int receiverStageId = node.getReceiverStageId();
+    List<ServerInstance> servers = 
_queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
+    context._builder.append("->");
+    String receivers = servers.stream()
+        .map(s -> s.getHostname() + ':' + s.getPort())
+        .map(s -> "[" + receiverStageId + "]@" + s)
+        .collect(Collectors.joining(",", "{", "}"));
+    return context._builder.append(receivers);
+  }
+
+  @Override
+  public StringBuilder visitProject(ProjectNode node, Context context) {
+    return visitSimpleNode(node, context);
+  }
+
+  @Override
+  public StringBuilder visitSort(SortNode node, Context context) {
+    return visitSimpleNode(node, context);
+  }
+
+  @Override
+  public StringBuilder visitTableScan(TableScanNode node, Context context) {
+    return appendInfo(node, context)
+        .append(' ')
+        .append(_queryPlan.getStageMetadataMap()
+            .get(node.getStageId())
+            .getServerInstanceToSegmentsMap()
+            .get(context._host))
+        .append('\n');
+  }
+
+  @Override
+  public StringBuilder visitValue(ValueNode node, Context context) {
+    return appendInfo(node, context);
+  }
+
+  static class Context {
+    final ServerInstance _host;
+    final String _prefix;
+    final String _childPrefix;
+    final StringBuilder _builder;
+
+    Context(ServerInstance host, String prefix, String childPrefix, 
StringBuilder builder) {
+      _host = host;
+      _prefix = prefix;
+      _childPrefix = childPrefix;
+      _builder = builder;
+    }
+
+    Context next(boolean hasMoreChildren, ServerInstance host) {
+      return new Context(
+          host,
+          hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ",
+          hasMoreChildren ? _childPrefix + "│   " : _childPrefix + "   ",
+          _builder
+      );
+    }
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index d8770ed3bf..651d259979 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -18,14 +18,11 @@
  */
 package org.apache.pinot.query.planner;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.calcite.util.Pair;
 import org.apache.pinot.query.planner.logical.LogicalPlanner;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
 
 
 /**
@@ -75,53 +72,16 @@ public class QueryPlan {
     return _queryResultFields;
   }
 
+  /**
+   * Explains the {@code QueryPlan}
+   *
+   * @return a human-readable tree explaining the query plan
+   * @see ExplainPlanStageVisitor#explain(QueryPlan)
+   * @apiNote this is <b>NOT</b> identical to the SQL {@code EXPLAIN PLAN FOR} 
functionality
+   *          and is instead intended to be used by developers debugging 
during feature
+   *          development
+   */
   public String explain() {
-    if (_queryStageMap.isEmpty()) {
-      return "EMPTY";
-    }
-
-    StringBuilder builder = new StringBuilder();
-    explain(
-        builder,
-        _queryStageMap.get(0),
-        "",
-        "");
-    return builder.toString();
-  }
-
-  private void explain(
-      StringBuilder builder,
-      StageNode root,
-      String prefix,
-      String childPrefix
-  ) {
-    int stage = root.getStageId();
-
-    builder
-        .append(prefix)
-        .append("[").append(stage).append("] ")
-        .append(root.explain());
-
-    if (root instanceof TableScanNode) {
-      builder.append(' ');
-      
builder.append(_stageMetadataMap.get(root.getStageId()).getServerInstanceToSegmentsMap());
-    }
-
-    builder.append('\n');
-
-    if (root instanceof MailboxReceiveNode) {
-      int senderStage = ((MailboxReceiveNode) root).getSenderStageId();
-      StageNode sender = _queryStageMap.get(senderStage);
-      explain(builder, sender, childPrefix + "└── ", childPrefix + "    ");
-    } else {
-      for (Iterator<StageNode> iterator = root.getInputs().iterator(); 
iterator.hasNext();) {
-        StageNode input = iterator.next();
-        if (iterator.hasNext()) {
-          explain(builder, input, childPrefix + "├── ", childPrefix + "│   ");
-        } else {
-          explain(builder, input, childPrefix + "└── ", childPrefix + "    ");
-        }
-      }
-    }
+    return ExplainPlanStageVisitor.explain(this);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index fe2531f9b6..2f21a64c27 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -97,4 +97,11 @@ public class StageMetadata implements Serializable {
   public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
     _timeBoundaryInfo = timeBoundaryInfo;
   }
+
+  @Override
+  public String toString() {
+    return "StageMetadata{" + "_scannedTables=" + _scannedTables + ", 
_serverInstances=" + _serverInstances
+        + ", _serverInstanceToSegmentsMap=" + _serverInstanceToSegmentsMap + 
", _timeBoundaryInfo=" + _timeBoundaryInfo
+        + '}';
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
new file mode 100644
index 0000000000..58adfc96d1
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * {@code ShuffleRewriteVisitor} removes unnecessary shuffles from a stage 
node plan by
+ * inspecting whether all data required by a specific subtree are already 
colocated.
+ * a single host. It gathers the information recursively by checking which 
partitioned
+ * data is selected by each node in the tree.
+ *
+ * <p>The only method that should be used externally is {@link 
#optimizeShuffles(StageNode)},
+ * other public methods are used only by {@link 
StageNode#visit(StageNodeVisitor, Object)}.
+ */
+public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, 
Void> {
+
+  /**
+   * This method rewrites {@code root} <b>in place</b>, removing any 
unnecessary shuffles
+   * by replacing the distribution type with {@link 
RelDistribution.Type#SINGLETON}.
+   *
+   * @param root the root node of the tree to rewrite
+   */
+  public static void optimizeShuffles(StageNode root) {
+    root.visit(new ShuffleRewriteVisitor(), null);
+  }
+
+  /**
+   * Access to this class should only be used via {@link 
#optimizeShuffles(StageNode)}
+   */
+  private ShuffleRewriteVisitor() {
+  }
+
+  @Override
+  public Set<Integer> visitAggregate(AggregateNode node, Void context) {
+    Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, 
context);
+
+    // any input reference directly carries over in group set of aggregation
+    // should still be a partition key
+    Set<Integer> partitionKeys = new HashSet<>();
+    for (int i = 0; i < node.getGroupSet().size(); i++) {
+      RexExpression rex = node.getGroupSet().get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        if (oldPartitionKeys.contains(((RexExpression.InputRef) 
rex).getIndex())) {
+          partitionKeys.add(i);
+        }
+      }
+    }
+
+    return partitionKeys;
+  }
+
+  @Override
+  public Set<Integer> visitFilter(FilterNode node, Void context) {
+    // filters don't change partition keys
+    return node.getInputs().get(0).visit(this, context);
+  }
+
+  @Override
+  public Set<Integer> visitJoin(JoinNode node, Void context) {
+    Set<Integer> leftPKs = node.getInputs().get(0).visit(this, context);
+    Set<Integer> rightPks = node.getInputs().get(1).visit(this, context);
+
+    // Currently, JOIN criteria is guaranteed to only have one 
FieldSelectionKeySelector
+    FieldSelectionKeySelector leftJoinKey = (FieldSelectionKeySelector) 
node.getJoinKeys().getLeftJoinKeySelector();
+    FieldSelectionKeySelector rightJoinKey = (FieldSelectionKeySelector) 
node.getJoinKeys().getRightJoinKeySelector();
+
+    int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
+    Set<Integer> partitionKeys = new HashSet<>();
+    for (int i = 0; i < leftJoinKey.getColumnIndices().size(); i++) {
+      int leftIdx = leftJoinKey.getColumnIndices().get(i);
+      int rightIdx = rightJoinKey.getColumnIndices().get(i);
+      if (leftPKs.contains(leftIdx)) {
+        partitionKeys.add(leftIdx);
+      }
+      if (rightPks.contains(rightIdx)) {
+        // combined schema will have all the left fields before the right 
fields
+        // so add the leftDataSchemaSize before adding the key
+        partitionKeys.add(leftDataSchemaSize + rightIdx);
+      }
+    }
+
+    return partitionKeys;
+  }
+
+  @Override
+  public Set<Integer> visitMailboxReceive(MailboxReceiveNode node, Void 
context) {
+    Set<Integer> oldPartitionKeys = node.getSender().visit(this, context);
+    KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+
+    if (canSkipShuffle(oldPartitionKeys, selector)) {
+      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      return oldPartitionKeys;
+    } else if (selector == null) {
+      return new HashSet<>();
+    } else {
+      return new HashSet<>(((FieldSelectionKeySelector) 
selector).getColumnIndices());
+    }
+  }
+
+  @Override
+  public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
+    Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, 
context);
+    KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+
+    if (canSkipShuffle(oldPartitionKeys, selector)) {
+      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      return oldPartitionKeys;
+    } else {
+      // reset the context partitionKeys since we've determined that
+      // a shuffle is necessary (the MailboxReceiveNode that reads from
+      // this sender will necessarily be the result of a shuffle and
+      // will reset the partition keys based on its selector)
+      return new HashSet<>();
+    }
+  }
+
+  @Override
+  public Set<Integer> visitProject(ProjectNode node, Void context) {
+    Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, 
context);
+
+    // all inputs carry over if they're still in the projection result
+    Set<Integer> partitionKeys = new HashSet<>();
+    for (int i = 0; i < node.getProjects().size(); i++) {
+      RexExpression rex = node.getProjects().get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        if (oldPartitionKeys.contains(((RexExpression.InputRef) 
rex).getIndex())) {
+          partitionKeys.add(i);
+        }
+      }
+    }
+
+    return partitionKeys;
+  }
+
+  @Override
+  public Set<Integer> visitSort(SortNode node, Void context) {
+    // sort doesn't change the partition keys
+    return node.getInputs().get(0).visit(this, context);
+  }
+
+  @Override
+  public Set<Integer> visitTableScan(TableScanNode node, Void context) {
+    // TODO: add table partition in table config as partition keys - this info 
is not yet available
+    return new HashSet<>();
+  }
+
+  @Override
+  public Set<Integer> visitValue(ValueNode node, Void context) {
+    return new HashSet<>();
+  }
+
+  private static boolean canSkipShuffle(Set<Integer> partitionKeys, 
KeySelector<Object[], Object[]> keySelector) {
+    if (!partitionKeys.isEmpty() && keySelector != null) {
+      Set<Integer> targetSet = new HashSet<>(((FieldSelectionKeySelector) 
keySelector).getColumnIndices());
+      return targetSet.containsAll(partitionKeys);
+    }
+    return false;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
new file mode 100644
index 0000000000..cf76134c8a
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.HashMap;
+import java.util.List;
+import org.apache.calcite.util.Pair;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * {@code StageMetadataVisitor} computes the {@link StageMetadata} for a 
{@link StageNode}
+ * tree and attaches it in the form of a {@link QueryPlan}.
+ */
+public class StageMetadataVisitor implements StageNodeVisitor<Void, QueryPlan> 
{
+
+  public static QueryPlan attachMetadata(List<Pair<Integer, String>> fields, 
StageNode root) {
+    QueryPlan queryPlan = new QueryPlan(fields, new HashMap<>(), new 
HashMap<>());
+    root.visit(new StageMetadataVisitor(), queryPlan);
+    return queryPlan;
+  }
+
+  /**
+   * Usage of this class should only come through {@link #attachMetadata(List, 
StageNode)}.
+   */
+  private StageMetadataVisitor() {
+  }
+
+  private void visit(StageNode node, QueryPlan queryPlan) {
+    queryPlan
+        .getStageMetadataMap()
+        .computeIfAbsent(node.getStageId(), (id) -> new StageMetadata())
+        .attach(node);
+  }
+
+  @Override
+  public Void visitAggregate(AggregateNode node, QueryPlan context) {
+    node.getInputs().get(0).visit(this, context);
+    visit(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitFilter(FilterNode node, QueryPlan context) {
+    node.getInputs().get(0).visit(this, context);
+    visit(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitJoin(JoinNode node, QueryPlan context) {
+    node.getInputs().forEach(join -> join.visit(this, context));
+    visit(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitMailboxReceive(MailboxReceiveNode node, QueryPlan context) {
+    node.getSender().visit(this, context);
+    visit(node, context);
+
+    // special case for the global mailbox receive node
+    if (node.getStageId() == 0) {
+      context.getQueryStageMap().put(0, node);
+    }
+
+    return null;
+  }
+
+  @Override
+  public Void visitMailboxSend(MailboxSendNode node, QueryPlan context) {
+    node.getInputs().get(0).visit(this, context);
+    visit(node, context);
+
+    context.getQueryStageMap().put(node.getStageId(), node);
+    return null;
+  }
+
+  @Override
+  public Void visitProject(ProjectNode node, QueryPlan context) {
+    node.getInputs().get(0).visit(this, context);
+    visit(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitSort(SortNode node, QueryPlan context) {
+    node.getInputs().get(0).visit(this, context);
+    visit(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitTableScan(TableScanNode node, QueryPlan context) {
+    visit(node, context);
+    return null;
+  }
+
+  @Override
+  public Void visitValue(ValueNode node, QueryPlan context) {
+    visit(node, context);
+    return null;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 8c940294ef..323e7f506b 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pinot.query.planner.logical;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
@@ -32,14 +29,9 @@ import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
 import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.routing.WorkerManager;
 
 
@@ -51,9 +43,6 @@ import org.apache.pinot.query.routing.WorkerManager;
 public class StagePlanner {
   private final PlannerContext _plannerContext;
   private final WorkerManager _workerManager;
-
-  private Map<Integer, StageNode> _queryStageMap;
-  private Map<Integer, StageMetadata> _stageMetadataMap;
   private int _stageIdCounter;
 
   public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
@@ -69,176 +58,68 @@ public class StagePlanner {
    */
   public QueryPlan makePlan(RelRoot relRoot) {
     RelNode relRootNode = relRoot.rel;
-    // clear the state
-    _queryStageMap = new HashMap<>();
-    _stageMetadataMap = new HashMap<>();
     // Stage ID starts with 1, 0 will be reserved for ROOT stage.
     _stageIdCounter = 1;
 
     // walk the plan and create stages.
     StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
+    ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot);
 
     // global root needs to send results back to the ROOT, a.k.a. the client 
response node. the last stage only has one
     // receiver so doesn't matter what the exchange type is. setting it to 
SINGLETON by default.
-    StageNode globalReceiverNode =
-        new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), 
globalStageRoot.getStageId(),
-            RelDistribution.Type.RANDOM_DISTRIBUTED, null);
     StageNode globalSenderNode = new 
MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
-        globalReceiverNode.getStageId(), 
RelDistribution.Type.RANDOM_DISTRIBUTED, null);
+        0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
     globalSenderNode.addInput(globalStageRoot);
-    _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
-    StageMetadata stageMetadata = 
_stageMetadataMap.get(globalSenderNode.getStageId());
-    stageMetadata.attach(globalSenderNode);
 
-    _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
-    StageMetadata globalReceivingStageMetadata = new StageMetadata();
-    globalReceivingStageMetadata.attach(globalReceiverNode);
-    _stageMetadataMap.put(globalReceiverNode.getStageId(), 
globalReceivingStageMetadata);
+    StageNode globalReceiverNode =
+        new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), 
globalStageRoot.getStageId(),
+            RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
+
+    QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, 
globalReceiverNode);
 
     // assign workers to each stage.
-    for (Map.Entry<Integer, StageMetadata> e : _stageMetadataMap.entrySet()) {
+    for (Map.Entry<Integer, StageMetadata> e : 
queryPlan.getStageMetadataMap().entrySet()) {
       _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
     }
 
-    return new QueryPlan(relRoot.fields, _queryStageMap, _stageMetadataMap);
+    return queryPlan;
   }
 
   // non-threadsafe
   // TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
   private StageNode walkRelPlan(RelNode node, int currentStageId) {
     if (isExchangeNode(node)) {
-      // 1. exchangeNode always have only one input, get its input converted 
as a new stage root.
       StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
       RelDistribution distribution = ((LogicalExchange) 
node).getDistribution();
-      List<Integer> distributionKeys = distribution.getKeys();
-      RelDistribution.Type exchangeType = distribution.getType();
-
-      // 2. make an exchange sender and receiver node pair
-      // only HASH_DISTRIBUTED requires a partition key selector; so all other 
types (SINGLETON and BROADCAST)
-      // of exchange will not carry a partition key selector.
-      KeySelector<Object[], Object[]> keySelector = exchangeType == 
RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null;
-
-      StageNode mailboxReceiver;
-      StageNode mailboxSender;
-      if (canSkipShuffle(nextStageRoot, keySelector)) {
-        // Use SINGLETON exchange type indicates a LOCAL-to-LOCAL data 
transfer between execution threads.
-        // TODO: actually implement the SINGLETON exchange without going 
through the over-the-wire GRPC mailbox
-        // sender and receiver.
-        mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getDataSchema(),
-            nextStageRoot.getStageId(), RelDistribution.Type.SINGLETON, 
keySelector);
-        mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), 
nextStageRoot.getDataSchema(),
-            mailboxReceiver.getStageId(), RelDistribution.Type.SINGLETON, 
keySelector);
-      } else {
-        mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getDataSchema(),
-            nextStageRoot.getStageId(), exchangeType, keySelector);
-        mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), 
nextStageRoot.getDataSchema(),
-            mailboxReceiver.getStageId(), exchangeType, keySelector);
-      }
-      mailboxSender.addInput(nextStageRoot);
-
-      // 3. put the sender side as a completed stage.
-      _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
-
-      // 4. update stage metadata.
-      updateStageMetadata(mailboxSender.getStageId(), mailboxSender, 
_stageMetadataMap);
-      updateStageMetadata(mailboxReceiver.getStageId(), mailboxReceiver, 
_stageMetadataMap);
-
-      // 5. return the receiver, this is considered as a "virtual table scan" 
node for its parent.
-      return mailboxReceiver;
+      return createSendReceivePair(nextStageRoot, distribution, 
currentStageId);
     } else {
       StageNode stageNode = RelToStageConverter.toStageNode(node, 
currentStageId);
       List<RelNode> inputs = node.getInputs();
       for (RelNode input : inputs) {
         stageNode.addInput(walkRelPlan(input, currentStageId));
       }
-      updateStageMetadata(currentStageId, stageNode, _stageMetadataMap);
       return stageNode;
     }
   }
 
-  private boolean canSkipShuffle(StageNode stageNode, KeySelector<Object[], 
Object[]> keySelector) {
-    Set<Integer> originSet = stageNode.getPartitionKeys();
-    if (!originSet.isEmpty() && keySelector != null) {
-      Set<Integer> targetSet = new HashSet<>(((FieldSelectionKeySelector) 
keySelector).getColumnIndices());
-      return targetSet.containsAll(originSet);
-    }
-    return false;
-  }
 
-  private static void updateStageMetadata(int stageId, StageNode node, 
Map<Integer, StageMetadata> stageMetadataMap) {
-    updatePartitionKeys(node);
-    StageMetadata stageMetadata = stageMetadataMap.computeIfAbsent(stageId, 
(id) -> new StageMetadata());
-    stageMetadata.attach(node);
-  }
+  private StageNode createSendReceivePair(StageNode nextStageRoot, 
RelDistribution distribution, int currentStageId) {
+    List<Integer> distributionKeys = distribution.getKeys();
+    RelDistribution.Type exchangeType = distribution.getType();
 
-  private static void updatePartitionKeys(StageNode node) {
-    if (node instanceof ProjectNode) {
-      // any input reference directly carry over should still be a partition 
key.
-      Set<Integer> previousPartitionKeys = 
node.getInputs().get(0).getPartitionKeys();
-      Set<Integer> newPartitionKeys = new HashSet<>();
-      ProjectNode projectNode = (ProjectNode) node;
-      for (int i = 0; i < projectNode.getProjects().size(); i++) {
-        RexExpression rexExpression = projectNode.getProjects().get(i);
-        if (rexExpression instanceof RexExpression.InputRef
-            && previousPartitionKeys.contains(((RexExpression.InputRef) 
rexExpression).getIndex())) {
-          newPartitionKeys.add(i);
-        }
-      }
-      projectNode.setPartitionKeys(newPartitionKeys);
-    } else if (node instanceof FilterNode) {
-      // filter node doesn't change partition keys.
-      node.setPartitionKeys(node.getInputs().get(0).getPartitionKeys());
-    } else if (node instanceof AggregateNode) {
-      // any input reference directly carry over in group set of aggregation 
should still be a partition key.
-      Set<Integer> previousPartitionKeys = 
node.getInputs().get(0).getPartitionKeys();
-      Set<Integer> newPartitionKeys = new HashSet<>();
-      AggregateNode aggregateNode = (AggregateNode) node;
-      for (int i = 0; i < aggregateNode.getGroupSet().size(); i++) {
-        RexExpression rexExpression = aggregateNode.getGroupSet().get(i);
-        if (rexExpression instanceof RexExpression.InputRef
-            && previousPartitionKeys.contains(((RexExpression.InputRef) 
rexExpression).getIndex())) {
-          newPartitionKeys.add(i);
-        }
-      }
-      aggregateNode.setPartitionKeys(newPartitionKeys);
-    } else if (node instanceof JoinNode) {
-      int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
-      Set<Integer> leftPartitionKeys = 
node.getInputs().get(0).getPartitionKeys();
-      Set<Integer> rightPartitionKeys = 
node.getInputs().get(1).getPartitionKeys();
-      // Currently, JOIN criteria guarantee to only have one 
FieldSelectionKeySelector.
-      FieldSelectionKeySelector leftJoinKeySelector =
-          (FieldSelectionKeySelector) ((JoinNode) 
node).getJoinKeys().getLeftJoinKeySelector();
-      FieldSelectionKeySelector rightJoinKeySelector =
-          (FieldSelectionKeySelector) ((JoinNode) 
node).getJoinKeys().getRightJoinKeySelector();
-      Set<Integer> newPartitionKeys = new HashSet<>();
-      for (int i = 0; i < leftJoinKeySelector.getColumnIndices().size(); i++) {
-        int leftIndex = leftJoinKeySelector.getColumnIndices().get(i);
-        int rightIndex = rightJoinKeySelector.getColumnIndices().get(i);
-        if (leftPartitionKeys.contains(leftIndex)) {
-          newPartitionKeys.add(leftIndex);
-        }
-        if (rightPartitionKeys.contains(rightIndex)) {
-          newPartitionKeys.add(leftDataSchemaSize + rightIndex);
-        }
-      }
-      node.setPartitionKeys(newPartitionKeys);
-    } else if (node instanceof TableScanNode) {
-      // TODO: add table partition in table config as partition keys. we dont 
have that information yet.
-    } else if (node instanceof MailboxReceiveNode) {
-      // hash distribution key is partition key.
-      FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
-          ((MailboxReceiveNode) node).getPartitionKeySelector();
-      if (keySelector != null) {
-        node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
-      }
-    } else if (node instanceof MailboxSendNode) {
-      FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
-          ((MailboxSendNode) node).getPartitionKeySelector();
-      if (keySelector != null) {
-        node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
-      }
-    }
+    // make an exchange sender and receiver node pair
+    // only HASH_DISTRIBUTED requires a partition key selector; so all other 
types (SINGLETON and BROADCAST)
+    // of exchange will not carry a partition key selector.
+    KeySelector<Object[], Object[]> keySelector = exchangeType == 
RelDistribution.Type.HASH_DISTRIBUTED
+        ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+    StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), 
nextStageRoot.getDataSchema(),
+        currentStageId, exchangeType, keySelector);
+    StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, 
nextStageRoot.getDataSchema(),
+        nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
+    mailboxSender.addInput(nextStageRoot);
+
+    return mailboxReceiver;
   }
 
   private boolean isExchangeNode(RelNode node) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
index 594c6d7e38..46de8731b7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
@@ -19,10 +19,7 @@
 package org.apache.pinot.query.planner.stage;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import org.apache.pinot.common.proto.Plan;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.serde.ProtoSerializable;
@@ -34,7 +31,6 @@ public abstract class AbstractStageNode implements StageNode, 
ProtoSerializable
   protected final int _stageId;
   protected final List<StageNode> _inputs;
   protected DataSchema _dataSchema;
-  protected Set<Integer> _partitionedKeys;
 
   public AbstractStageNode(int stageId) {
     this(stageId, null);
@@ -44,7 +40,6 @@ public abstract class AbstractStageNode implements StageNode, 
ProtoSerializable
     _stageId = stageId;
     _dataSchema = dataSchema;
     _inputs = new ArrayList<>();
-    _partitionedKeys = new HashSet<>();
   }
 
   @Override
@@ -72,16 +67,6 @@ public abstract class AbstractStageNode implements 
StageNode, ProtoSerializable
     _dataSchema = dataSchema;
   }
 
-  @Override
-  public Set<Integer> getPartitionKeys() {
-    return _partitionedKeys;
-  }
-
-  @Override
-  public void setPartitionKeys(Collection<Integer> partitionedKeys) {
-    _partitionedKeys.addAll(partitionedKeys);
-  }
-
   @Override
   public void fromObjectField(Plan.ObjectField objectField) {
     ProtoSerializationUtils.setObjectFieldToObject(this, objectField);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
index 70174df265..ea8dc2c1c1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -59,4 +59,9 @@ public class AggregateNode extends AbstractStageNode {
   public String explain() {
     return "AGGREGATE";
   }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitAggregate(this, context);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
index 0d960e951a..52ed004da1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
@@ -45,4 +45,9 @@ public class FilterNode extends AbstractStageNode {
   public String explain() {
     return "FILTER";
   }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitFilter(this, context);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
index b34d74d574..af9b4e03ed 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
@@ -64,6 +64,11 @@ public class JoinNode extends AbstractStageNode {
     return "JOIN";
   }
 
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitJoin(this, context);
+  }
+
   public static class JoinKeys {
     @ProtoProperties
     private KeySelector<Object[], Object[]> _leftJoinKeySelector;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index e358c2cb13..cf90a0005c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -33,22 +33,32 @@ public class MailboxReceiveNode extends AbstractStageNode {
   @ProtoProperties
   private KeySelector<Object[], Object[]> _partitionKeySelector;
 
+  // this is only available during planning and should not be relied
+  // on in any post-serialization code
+  private transient StageNode _sender;
+
   public MailboxReceiveNode(int stageId) {
     super(stageId);
   }
 
   public MailboxReceiveNode(int stageId, DataSchema dataSchema, int 
senderStageId,
-      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], 
Object[]> partitionKeySelector) {
+      RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], 
Object[]> partitionKeySelector,
+      StageNode sender) {
     super(stageId, dataSchema);
     _senderStageId = senderStageId;
     _exchangeType = exchangeType;
     _partitionKeySelector = partitionKeySelector;
+    _sender = sender;
   }
 
   public int getSenderStageId() {
     return _senderStageId;
   }
 
+  public void setExchangeType(RelDistribution.Type exchangeType) {
+    _exchangeType = exchangeType;
+  }
+
   public RelDistribution.Type getExchangeType() {
     return _exchangeType;
   }
@@ -57,8 +67,17 @@ public class MailboxReceiveNode extends AbstractStageNode {
     return _partitionKeySelector;
   }
 
+  public StageNode getSender() {
+    return _sender;
+  }
+
   @Override
   public String explain() {
-    return "MAIL_RECEIVE";
+    return "MAIL_RECEIVE(" + _exchangeType + ")";
+  }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitMailboxReceive(this, context);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 05459b5634..4219590100 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -49,6 +49,10 @@ public class MailboxSendNode extends AbstractStageNode {
     return _receiverStageId;
   }
 
+  public void setExchangeType(RelDistribution.Type exchangeType) {
+    _exchangeType = exchangeType;
+  }
+
   public RelDistribution.Type getExchangeType() {
     return _exchangeType;
   }
@@ -59,6 +63,11 @@ public class MailboxSendNode extends AbstractStageNode {
 
   @Override
   public String explain() {
-    return "MAIL_SEND";
+    return "MAIL_SEND(" + _exchangeType + ")";
+  }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitMailboxSend(this, context);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
index 250a38885f..8371dda609 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
@@ -46,4 +46,9 @@ public class ProjectNode extends AbstractStageNode {
   public String explain() {
     return "PROJECT";
   }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitProject(this, context);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
index 4df23e7325..38b2da6c56 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -72,4 +72,9 @@ public class SortNode extends AbstractStageNode {
   public String explain() {
     return "SORT" + (_fetch > 0 ? " (LIMIT " + _fetch + ")" : "");
   }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitSort(this, context);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
index 8f170a065c..7e3278cfe8 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.query.planner.stage;
 
 import java.io.Serializable;
-import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import org.apache.pinot.common.utils.DataSchema;
 
 
@@ -45,9 +43,7 @@ public interface StageNode extends Serializable {
 
   void setDataSchema(DataSchema dataSchema);
 
-  Set<Integer> getPartitionKeys();
-
-  void setPartitionKeys(Collection<Integer> partitionKeys);
-
   String explain();
+
+  <T, C> T visit(StageNodeVisitor<T, C> visitor, C context);
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
new file mode 100644
index 0000000000..614dbb877a
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import org.apache.pinot.query.planner.QueryPlan;
+
+
+/**
+ * {@code StageNodeVisitor} is a skeleton class that allows for 
implementations of {@code StageNode}
+ * tree traversals using the {@link StageNode#visit(StageNodeVisitor, Object)} 
method. There is no
+ * enforced traversal order, and should be implemented by subclasses.
+ *
+ * <p>It is recommended that implementors use private constructors and static 
methods to access main
+ * functionality (see {@link 
org.apache.pinot.query.planner.ExplainPlanStageVisitor#explain(QueryPlan)}
+ * as an example of a usage of this pattern.
+ *
+ * @param <T> the return type for all visits
+ * @param <C> a Context that will be passed as the second parameter to {@code 
StageNode#visit},
+ *            implementors can decide how they want to use this context (e.g. 
whether or not
+ *            it can be modified in place or whether it's an immutable context)
+ */
+public interface StageNodeVisitor<T, C> {
+
+  T visitAggregate(AggregateNode node, C context);
+
+  T visitFilter(FilterNode node, C context);
+
+  T visitJoin(JoinNode node, C context);
+
+  T visitMailboxReceive(MailboxReceiveNode node, C context);
+
+  T visitMailboxSend(MailboxSendNode node, C context);
+
+  T visitProject(ProjectNode node, C context);
+
+  T visitSort(SortNode node, C context);
+
+  T visitTableScan(TableScanNode node, C context);
+
+  T visitValue(ValueNode node, C context);
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
index bcd8493cc7..7711dd6235 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
@@ -51,4 +51,9 @@ public class TableScanNode extends AbstractStageNode {
   public String explain() {
     return "TABLE SCAN (" + _tableName + ")";
   }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitTableScan(this, context);
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
index 3918338d19..b3ad0d40f6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
@@ -56,4 +56,9 @@ public class ValueNode extends AbstractStageNode {
   public String explain() {
     return "LITERAL";
   }
+
+  @Override
+  public <T, C> T visit(StageNodeVisitor<T, C> visitor, C context) {
+    return visitor.visitValue(this, context);
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index c4d81fd76a..bdc36a7571 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -99,4 +99,9 @@ public class GrpcMailboxService implements 
MailboxService<MailboxContent> {
   public ManagedChannel getChannel(String mailboxId) {
     return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
   }
+
+  @Override
+  public String toString() {
+    return "GrpcMailboxService{" + "_hostname='" + _hostname + '\'' + ", 
_mailboxPort=" + _mailboxPort + '}';
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 9b457644d0..46dd9dc967 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -123,7 +123,7 @@ public class TransferableBlock implements Block {
             throw new UnsupportedOperationException("Unable to build from 
container with type: " + _type);
         }
       } catch (Exception e) {
-        throw new RuntimeException("Unable to create DataBlock");
+        throw new RuntimeException("Unable to create DataBlock", e);
       }
     }
     return _dataBlock;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
new file mode 100644
index 0000000000..b4cc73d5ac
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
+import org.apache.pinot.query.runtime.operator.FilterOperator;
+import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+
+
+/**
+ * This visitor constructs a physical plan of operators from a {@link 
StageNode} tree. Note that
+ * this works only for the intermediate stage nodes, leaf stage nodes are 
expected to compile into
+ * v1 operators at this point in time.
+ *
+ * <p>This class should be used statically via {@link #build(MailboxService, 
String, int, long, Map, StageNode)}
+ *
+ * @see 
org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, 
ExecutorService, Map)
+ */
+public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<TransferableBlock>, Void> {
+
+  private final MailboxService<Mailbox.MailboxContent> _mailboxService;
+  private final String _hostName;
+  private final int _port;
+  private final long _requestId;
+  private final Map<Integer, StageMetadata> _metadataMap;
+
+  public static Operator<TransferableBlock> 
build(MailboxService<Mailbox.MailboxContent> mailboxService,
+      String hostName, int port, long requestId, Map<Integer, StageMetadata> 
metadataMap, StageNode node) {
+    return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port, 
requestId, metadataMap), null);
+  }
+
+  private PhysicalPlanVisitor(MailboxService<Mailbox.MailboxContent> 
mailboxService, String hostName, int port,
+      long requestId, Map<Integer, StageMetadata> metadataMap) {
+    _mailboxService = mailboxService;
+    _hostName = hostName;
+    _port = port;
+    _requestId = requestId;
+    _metadataMap = metadataMap;
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode 
node, Void context) {
+    List<ServerInstance> sendingInstances = 
_metadataMap.get(node.getSenderStageId()).getServerInstances();
+    return new MailboxReceiveOperator(_mailboxService, node.getDataSchema(), 
sendingInstances,
+        node.getExchangeType(), node.getPartitionKeySelector(), _hostName, 
_port, _requestId,
+        node.getSenderStageId());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, 
Void context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+    StageMetadata receivingStageMetadata = 
_metadataMap.get(node.getReceiverStageId());
+    return new MailboxSendOperator(_mailboxService, node.getDataSchema(), 
nextOperator,
+        receivingStageMetadata.getServerInstances(), node.getExchangeType(), 
node.getPartitionKeySelector(),
+        _hostName, _port, _requestId, node.getStageId());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitAggregate(AggregateNode node, Void 
context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+    return new AggregateOperator(nextOperator, node.getDataSchema(), 
node.getAggCalls(),
+        node.getGroupSet(), node.getInputs().get(0).getDataSchema());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitFilter(FilterNode node, Void 
context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+    return new FilterOperator(nextOperator, node.getDataSchema(), 
node.getCondition());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitJoin(JoinNode node, Void context) {
+    StageNode left = node.getInputs().get(0);
+    StageNode right = node.getInputs().get(1);
+
+    Operator<TransferableBlock> leftOperator = left.visit(this, null);
+    Operator<TransferableBlock> rightOperator = right.visit(this, null);
+
+    return new HashJoinOperator(leftOperator, left.getDataSchema(), 
rightOperator,
+        right.getDataSchema(), node.getDataSchema(), node.getJoinKeys(),
+        node.getJoinClauses(), node.getJoinRelType());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitProject(ProjectNode node, Void 
context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+    return new TransformOperator(nextOperator, node.getDataSchema(), 
node.getProjects(),
+        node.getInputs().get(0).getDataSchema());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitSort(SortNode node, Void context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+    return new SortOperator(nextOperator, node.getCollationKeys(), 
node.getCollationDirections(),
+        node.getFetch(), node.getOffset(), node.getDataSchema());
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitTableScan(TableScanNode node, Void 
context) {
+    throw new UnsupportedOperationException("Stage node of type TableScanNode 
is not supported!");
+  }
+
+  @Override
+  public Operator<TransferableBlock> visitValue(ValueNode node, Void context) {
+    return new LiteralValueOperator(node.getDataSchema(), 
node.getLiteralRows());
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 5f0ed67d08..5fb7d05d69 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -18,36 +18,17 @@
  */
 package org.apache.pinot.query.runtime.executor;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.request.context.ThreadTimer;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.StageMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.AggregateOperator;
-import org.apache.pinot.query.runtime.operator.FilterOperator;
-import org.apache.pinot.query.runtime.operator.HashJoinOperator;
-import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
-import org.apache.pinot.query.runtime.operator.SortOperator;
-import org.apache.pinot.query.runtime.operator.TransformOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
@@ -77,6 +58,7 @@ public class WorkerQueryExecutor {
     _port = port;
   }
 
+
   public synchronized void start() {
     LOGGER.info("Worker query executor started");
   }
@@ -85,12 +67,14 @@ public class WorkerQueryExecutor {
     LOGGER.info("Worker query executor shut down");
   }
 
-  // TODO: split this execution from PhysicalPlanner
   public void processQuery(DistributedStagePlan queryRequest, Map<String, 
String> requestMetadataMap,
       ExecutorService executorService) {
     long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
     StageNode stageRoot = queryRequest.getStageRoot();
-    BaseOperator<TransferableBlock> rootOperator = getOperator(requestId, 
stageRoot, queryRequest.getMetadataMap());
+
+    Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build(
+        _mailboxService, _hostName, _port, requestId, 
queryRequest.getMetadataMap(), stageRoot);
+
     executorService.submit(new TraceRunnable() {
       @Override
       public void runJob() {
@@ -102,55 +86,4 @@ public class WorkerQueryExecutor {
       }
     });
   }
-
-  // TODO: split this PhysicalPlanner into a separate module
-  // TODO: optimize this into a framework. (physical planner)
-  private BaseOperator<TransferableBlock> getOperator(long requestId, 
StageNode stageNode,
-      Map<Integer, StageMetadata> metadataMap) {
-    if (stageNode instanceof MailboxReceiveNode) {
-      MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
-      List<ServerInstance> sendingInstances = 
metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
-      return new MailboxReceiveOperator(_mailboxService, 
receiveNode.getDataSchema(), sendingInstances,
-          receiveNode.getExchangeType(), 
receiveNode.getPartitionKeySelector(), _hostName, _port, requestId,
-          receiveNode.getSenderStageId());
-    } else if (stageNode instanceof MailboxSendNode) {
-      MailboxSendNode sendNode = (MailboxSendNode) stageNode;
-      BaseOperator<TransferableBlock> nextOperator = getOperator(requestId, 
sendNode.getInputs().get(0), metadataMap);
-      StageMetadata receivingStageMetadata = 
metadataMap.get(sendNode.getReceiverStageId());
-      return new MailboxSendOperator(_mailboxService, 
sendNode.getDataSchema(), nextOperator,
-          receivingStageMetadata.getServerInstances(), 
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
-          _hostName, _port, requestId, sendNode.getStageId());
-    } else if (stageNode instanceof JoinNode) {
-      JoinNode joinNode = (JoinNode) stageNode;
-      BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, 
joinNode.getInputs().get(0), metadataMap);
-      BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, 
joinNode.getInputs().get(1), metadataMap);
-      return new HashJoinOperator(leftOperator, 
joinNode.getInputs().get(0).getDataSchema(), rightOperator,
-          joinNode.getInputs().get(1).getDataSchema(), 
joinNode.getDataSchema(), joinNode.getJoinKeys(),
-          joinNode.getJoinClauses(), joinNode.getJoinRelType());
-    } else if (stageNode instanceof AggregateNode) {
-      AggregateNode aggregateNode = (AggregateNode) stageNode;
-      BaseOperator<TransferableBlock> inputOperator =
-          getOperator(requestId, aggregateNode.getInputs().get(0), 
metadataMap);
-      return new AggregateOperator(inputOperator, 
aggregateNode.getDataSchema(), aggregateNode.getAggCalls(),
-          aggregateNode.getGroupSet(), 
aggregateNode.getInputs().get(0).getDataSchema());
-    } else if (stageNode instanceof FilterNode) {
-      FilterNode filterNode = (FilterNode) stageNode;
-      return new FilterOperator(getOperator(requestId, 
filterNode.getInputs().get(0), metadataMap),
-          filterNode.getDataSchema(), filterNode.getCondition());
-    } else if (stageNode instanceof ProjectNode) {
-      ProjectNode projectNode = (ProjectNode) stageNode;
-      return new TransformOperator(getOperator(requestId, 
projectNode.getInputs().get(0), metadataMap),
-          projectNode.getDataSchema(), projectNode.getProjects(), 
projectNode.getInputs().get(0).getDataSchema());
-    } else if (stageNode instanceof SortNode) {
-      SortNode sortNode = (SortNode) stageNode;
-      return new SortOperator(getOperator(requestId, 
sortNode.getInputs().get(0), metadataMap),
-          sortNode.getCollationKeys(), sortNode.getCollationDirections(), 
sortNode.getFetch(), sortNode.getOffset(),
-          sortNode.getDataSchema());
-    } else if (stageNode instanceof ValueNode) {
-      return new LiteralValueOperator(stageNode.getDataSchema(), ((ValueNode) 
stageNode).getLiteralRows());
-    } else {
-      throw new UnsupportedOperationException(
-          String.format("Stage node type %s is not supported!", 
stageNode.getClass().getSimpleName()));
-    }
-  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 6747b07ef4..0c261d26e3 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -48,7 +48,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
 
-  private BaseOperator<TransferableBlock> _inputOperator;
+  private Operator<TransferableBlock> _inputOperator;
   private List<RexExpression> _aggCalls;
   private List<RexExpression> _groupSet;
 
@@ -64,7 +64,7 @@ public class AggregateOperator extends 
BaseOperator<TransferableBlock> {
   private boolean _isCumulativeBlockConstructed;
 
   // TODO: refactor Pinot Reducer code to support the intermediate stage agg 
operator.
-  public AggregateOperator(BaseOperator<TransferableBlock> inputOperator, 
DataSchema dataSchema,
+  public AggregateOperator(Operator<TransferableBlock> inputOperator, 
DataSchema dataSchema,
       List<RexExpression> aggCalls, List<RexExpression> groupSet, DataSchema 
upstreamDataSchema) {
     _inputOperator = inputOperator;
     _aggCalls = aggCalls;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index f1ab55061e..b3aa17ac56 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -34,12 +34,12 @@ import 
org.apache.pinot.query.runtime.operator.operands.FilterOperand;
 
 public class FilterOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "FILTER";
-  private final BaseOperator<TransferableBlock> _upstreamOperator;
+  private final Operator<TransferableBlock> _upstreamOperator;
   private final FilterOperand _filterOperand;
   private final DataSchema _dataSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public FilterOperator(BaseOperator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema, RexExpression filter) {
+  public FilterOperator(Operator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema, RexExpression filter) {
     _upstreamOperator = upstreamOperator;
     _dataSchema = dataSchema;
     _filterOperand = FilterOperand.toFilterOperand(filter, dataSchema);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index cb43ff09c0..bcf6807dd9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -49,8 +49,8 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
 
   private final HashMap<Integer, List<Object[]>> _broadcastHashTable;
-  private final BaseOperator<TransferableBlock> _leftTableOperator;
-  private final BaseOperator<TransferableBlock> _rightTableOperator;
+  private final Operator<TransferableBlock> _leftTableOperator;
+  private final Operator<TransferableBlock> _rightTableOperator;
   private final JoinRelType _joinType;
   private final DataSchema _resultSchema;
   private final DataSchema _leftTableSchema;
@@ -62,8 +62,8 @@ public class HashJoinOperator extends 
BaseOperator<TransferableBlock> {
   private KeySelector<Object[], Object[]> _leftKeySelector;
   private KeySelector<Object[], Object[]> _rightKeySelector;
 
-  public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator, 
DataSchema leftSchema,
-      BaseOperator<TransferableBlock> rightTableOperator, DataSchema 
rightSchema, DataSchema outputSchema,
+  public HashJoinOperator(Operator<TransferableBlock> leftTableOperator, 
DataSchema leftSchema,
+      Operator<TransferableBlock> rightTableOperator, DataSchema rightSchema, 
DataSchema outputSchema,
       JoinNode.JoinKeys joinKeys, List<RexExpression> joinClauses, JoinRelType 
joinType) {
     _leftKeySelector = joinKeys.getLeftJoinKeySelector();
     _rightKeySelector = joinKeys.getRightJoinKeySelector();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad05f207ef..8169fc9121 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -79,6 +79,13 @@ public class MailboxReceiveOperator extends 
BaseOperator<TransferableBlock> {
           singletonInstance = serverInstance;
         }
       }
+
+      // FIXME: there's a bug where singletonInstance may be null in the case 
of a JOIN where
+      // one side is BROADCAST and the other is SINGLETON (this is the case 
with nested loop
+      // joins for inequality conditions). This causes NPEs in the logs, but 
actually works
+      // because the side that hits the NPE doesn't expect to get any data 
anyway (that's the
+      // side that gets the broadcast from one side but nothing from the 
SINGLETON)
+      // FIXME: https://github.com/apache/pinot/issues/9592
       _sendingStageInstances = Collections.singletonList(singletonInstance);
     } else {
       _sendingStageInstances = sendingStageInstances;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index ae4cab02f2..3e358ccc2c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -70,10 +70,10 @@ public class MailboxSendOperator extends 
BaseOperator<TransferableBlock> {
   private final int _stageId;
   private final MailboxService<Mailbox.MailboxContent> _mailboxService;
   private final DataSchema _dataSchema;
-  private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator;
+  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
 
   public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> 
mailboxService, DataSchema dataSchema,
-      BaseOperator<TransferableBlock> dataTableBlockBaseOperator, 
List<ServerInstance> receivingStageInstances,
+      Operator<TransferableBlock> dataTableBlockBaseOperator, 
List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> 
keySelector, String hostName, int port,
       long jobId, int stageId) {
     _dataSchema = dataSchema;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index b3741cb28f..1acb0c9a69 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -37,7 +37,7 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 
 public class SortOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "SORT";
-  private final BaseOperator<TransferableBlock> _upstreamOperator;
+  private final Operator<TransferableBlock> _upstreamOperator;
   private final int _fetch;
   private final int _offset;
   private final DataSchema _dataSchema;
@@ -47,7 +47,7 @@ public class SortOperator extends 
BaseOperator<TransferableBlock> {
   private boolean _isSortedBlockConstructed;
   private TransferableBlock _upstreamErrorBlock;
 
-  public SortOperator(BaseOperator<TransferableBlock> upstreamOperator, 
List<RexExpression> collationKeys,
+  public SortOperator(Operator<TransferableBlock> upstreamOperator, 
List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int 
offset, DataSchema dataSchema) {
     _upstreamOperator = upstreamOperator;
     _fetch = fetch;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 0c947e1ca6..90efc377ab 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -37,13 +37,13 @@ import 
org.apache.pinot.query.runtime.operator.operands.TransformOperand;
  */
 public class TransformOperator extends BaseOperator<TransferableBlock> {
   private static final String EXPLAIN_NAME = "TRANSFORM";
-  private final BaseOperator<TransferableBlock> _upstreamOperator;
+  private final Operator<TransferableBlock> _upstreamOperator;
   private final List<TransformOperand> _transformOperandsList;
   private final int _resultColumnSize;
   private final DataSchema _resultSchema;
   private TransferableBlock _upstreamErrorBlock;
 
-  public TransformOperator(BaseOperator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema,
+  public TransformOperator(Operator<TransferableBlock> upstreamOperator, 
DataSchema dataSchema,
       List<RexExpression> transforms, DataSchema upstreamDataSchema) {
     _upstreamOperator = upstreamOperator;
     _resultColumnSize = transforms.size();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to