Jackie-Jiang commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419357282


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -208,10 +346,31 @@ private RoutingTable getRoutingTable(String tableName, 
TableType tableType, long
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType), requestId);
   }
 
+  // --------------------------------------------------------------------------
+  // Partitioned leaf stage assignment
+  // --------------------------------------------------------------------------
   private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata 
metadata,
-      DispatchablePlanContext context, String partitionKey, int numPartitions, 
int partitionParallelism) {
+      DispatchablePlanContext context, String partitionKey, Map<String, 
String> tableOptions) {
+    // when partition key exist, we assign workers for leaf-stage in 
partitioned fashion.
+
+    String numPartitionsStr = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+    Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided 
for partition key: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+    int numPartitions = Integer.parseInt(numPartitionsStr);
+    Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: 
%s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+
+    String partitionFunction = 
tableOptions.getOrDefault(PinotHintOptions.TableHintOptions.PARTITION_FUNCTION,

Review Comment:
   Don't add default. In most cases this is `murmur` but we should not assume it



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -35,18 +35,28 @@
 /**
  * The {@code DispatchablePlanMetadata} info contains the information for 
dispatching a particular plan fragment.
  *
- * <p>It contains information aboute:
+ * <p>It contains information
  * <ul>
- *   <li>the tables it is suppose to scan for</li>
- *   <li>the underlying segments a stage requires to execute upon.</li>
- *   <li>the server instances to which this stage should be execute on</li>
+ *   <li>extracted from {@link 
org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
+ *   <li>extracted from {@link 
org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  // These 2 fields are extracted from TableScanNode
+
+  // --------------------------------------------------------------------------
+  // Fields extracted with {@link DispatchablePlanVisitor}
+  // --------------------------------------------------------------------------
+  // info from TableNode
   private final List<String> _scannedTables;
   private Map<String, String> _tableOptions;
+  // info from MailboxSendNode - whether a stage is pre-partitioned by the 
same way the sending exchange desires
+  private boolean _isPrePartitioned;

Review Comment:
   (minor) I prefer `_isPartitioned` over `_isPrePartitioned`. What does `pre` 
stand for?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -333,26 +371,11 @@ private void 
assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
       throw new IllegalStateException(
           "No server instance found for intermediate stage for tables: " + 
Arrays.toString(tableNames.toArray()));
     }
-    if (metadata.isRequiresSingletonInstance()) {
-      // require singleton should return a single global worker ID with 0;
-      metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
-          new 
QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
-    } else {
-      Map<String, String> options = context.getPlannerContext().getOptions();
-      int stageParallelism = 
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
-      Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new 
HashMap<>();
-      int workerId = 0;
-      for (ServerInstance serverInstance : serverInstances) {
-        QueryServerInstance queryServerInstance = new 
QueryServerInstance(serverInstance);
-        for (int i = 0; i < stageParallelism; i++) {
-          workerIdToServerInstanceMap.put(workerId++, queryServerInstance);
-        }
-      }
-      metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
-    }
+    return serverInstances;
   }
 
-  private ColocatedTableInfo getColocatedTableInfo(String tableName, String 
partitionKey, int numPartitions) {
+  private ColocatedTableInfo getColocatedTableInfo(String tableName, String 
partitionKey, int numPartitions,

Review Comment:
   Either change it or add a TODO



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java:
##########
@@ -144,4 +145,23 @@ public Void process(PlanNode node, DispatchablePlanContext 
context) {
     }
     return null;
   }
+
+  private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, 
DispatchablePlanMetadata receiver) {
+    Map<Integer, QueryServerInstance> senderServerMap = 
sender.getWorkerIdToServerInstanceMap();
+    Map<Integer, QueryServerInstance> receiverServerMap = 
receiver.getWorkerIdToServerInstanceMap();
+
+    int numSenders = senderServerMap.size();
+    int numReceivers = receiverServerMap.size();
+    if (sender.getScannedTables().size() > 0 && 
receiver.getScannedTables().size() == 0) {
+      // leaf-to-intermediate condition
+      return numSenders * sender.getPartitionParallelism() == numReceivers
+          && sender.getPartitionFunction() != null
+          && 
sender.getPartitionFunction().equals(receiver.getPartitionFunction());

Review Comment:
   We should compare ignore cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to