Jackie-Jiang commented on code in PR #11587:
URL: https://github.com/apache/pinot/pull/11587#discussion_r1325047039


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -199,21 +201,54 @@ public void processQuery(DistributedStagePlan 
distributedStagePlan, Map<String,
     _opChainScheduler.register(opChain);
   }
 
-  private void setStageCustomProperties(Map<String, String> customProperties, 
Map<String, String> requestMetadata) {
+  public void cancel(long requestId) {
+    _opChainScheduler.cancel(requestId);
+  }
+
+  private OpChain compileLeafStage(OpChainExecutionContext executionContext,
+      DistributedStagePlan distributedStagePlan) {
+    List<ServerPlanRequestContext> serverPlanRequestContexts =
+        ServerPlanRequestUtils.constructServerQueryRequests(executionContext, 
distributedStagePlan,
+            _helixManager.getHelixPropertyStore());
+    List<ServerQueryRequest> serverQueryRequests = new 
ArrayList<>(serverPlanRequestContexts.size());
+    long queryArrivalTimeMs = System.currentTimeMillis();
+    for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
+      serverQueryRequests.add(
+          new ServerQueryRequest(requestContext.getInstanceRequest(), 
_serverMetrics, queryArrivalTimeMs, true));
+    }
+    MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
+    MultiStageOperator leafStageOperator =
+        new LeafStageTransferableBlockOperator(executionContext, 
serverQueryRequests, sendNode.getDataSchema(),
+            _leafQueryExecutor, _executorService);
+    MailboxSendOperator mailboxSendOperator =
+        new MailboxSendOperator(executionContext, leafStageOperator, 
sendNode.getDistributionType(),
+            sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), 
sendNode.getCollationDirections(),
+            sendNode.isSortOnSender(), sendNode.getReceiverStageId());
+    return new OpChain(executionContext, mailboxSendOperator);
+  }
+
+  private Map<String, String> consolidateMetadata(Map<String, String> 
customProperties,

Review Comment:
   Any specific reason why we want to reorder the methods? 1. It is making this 
PR much harder to review; 2. It is actually used before `compileLeafStage()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -134,12 +133,10 @@ public AggregateOperator(OpChainExecutionContext context, 
MultiStageOperator inp
     // Initialize the appropriate executor.
     if (!groupSet.isEmpty()) {
       _isGroupByAggregation = true;
-      StageMetadata stageMetadata = context.getStageMetadata();
-      Map<String, String> customProperties =
-          stageMetadata != null ? stageMetadata.getCustomProperties() : 
Collections.emptyMap();
+      Map<String, String> metadata = context.getRequestMetadata();
       _groupByExecutor =
           new MultistageGroupByExecutor(groupByExpr, aggFunctions, 
filterArgIndexArray, aggType, _colNameToIndexMap,
-              _resultSchema, customProperties, nodeHint);
+              _resultSchema, metadata, nodeHint);

Review Comment:
   (minor) Also change the parameter name in the executor constructor



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -199,21 +201,54 @@ public void processQuery(DistributedStagePlan 
distributedStagePlan, Map<String,
     _opChainScheduler.register(opChain);
   }
 
-  private void setStageCustomProperties(Map<String, String> customProperties, 
Map<String, String> requestMetadata) {
+  public void cancel(long requestId) {
+    _opChainScheduler.cancel(requestId);
+  }
+
+  private OpChain compileLeafStage(OpChainExecutionContext executionContext,
+      DistributedStagePlan distributedStagePlan) {
+    List<ServerPlanRequestContext> serverPlanRequestContexts =
+        ServerPlanRequestUtils.constructServerQueryRequests(executionContext, 
distributedStagePlan,
+            _helixManager.getHelixPropertyStore());
+    List<ServerQueryRequest> serverQueryRequests = new 
ArrayList<>(serverPlanRequestContexts.size());
+    long queryArrivalTimeMs = System.currentTimeMillis();
+    for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
+      serverQueryRequests.add(
+          new ServerQueryRequest(requestContext.getInstanceRequest(), 
_serverMetrics, queryArrivalTimeMs, true));
+    }
+    MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
+    MultiStageOperator leafStageOperator =
+        new LeafStageTransferableBlockOperator(executionContext, 
serverQueryRequests, sendNode.getDataSchema(),
+            _leafQueryExecutor, _executorService);
+    MailboxSendOperator mailboxSendOperator =
+        new MailboxSendOperator(executionContext, leafStageOperator, 
sendNode.getDistributionType(),
+            sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), 
sendNode.getCollationDirections(),
+            sendNode.isSortOnSender(), sendNode.getReceiverStageId());
+    return new OpChain(executionContext, mailboxSendOperator);
+  }
+
+  private Map<String, String> consolidateMetadata(Map<String, String> 
customProperties,
+      Map<String, String> requestMetadataOriginal) {
+    Map<String, String> requestMetadata = new HashMap<>();
+    // first put all custom Properties
+    requestMetadata.putAll(customProperties);
+    // put all overrides from request
+    requestMetadata.putAll(requestMetadataOriginal);

Review Comment:
   I think we should put request level metadata first, then override the stage 
level custom property



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -149,13 +150,14 @@ public void shutDown() {
    * <p>This execution entry point should be asynchronously called by the 
request handler and caller should not wait
    * for results/exceptions.</p>
    */
-  public void processQuery(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadata) {
+  public void processQuery(DistributedStagePlan distributedStagePlan, 
Map<String, String> requestMetadataOriginal) {
+    Map<String, String> requestMetadata = consolidateMetadata(

Review Comment:
   This is not really request level metadata, but stage level metadata. Suggest 
renaming the arguments accordingly



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java:
##########
@@ -145,11 +143,9 @@ public HashJoinOperator(OpChainExecutionContext context, 
MultiStageOperator left
     } else {
       _matchedRightRows = null;
     }
-    StageMetadata stageMetadata = context.getStageMetadata();
-    Map<String, String> customProperties =
-        stageMetadata != null ? stageMetadata.getCustomProperties() : 
Collections.emptyMap();
-    _maxRowsInHashTable = getMaxRowInJoin(customProperties, 
node.getJoinHints());
-    _joinOverflowMode = getJoinOverflowMode(customProperties, 
node.getJoinHints());
+    Map<String, String> metadata = context.getRequestMetadata();
+    _maxRowsInHashTable = getMaxRowInJoin(metadata, node.getJoinHints());
+    _joinOverflowMode = getJoinOverflowMode(metadata, node.getJoinHints());

Review Comment:
   (minor) Also change the name of the parameter in these 2 methods



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java:
##########
@@ -85,7 +86,7 @@ public long getDeadlineMs() {
   }
 
   public Map<String, String> getRequestMetadata() {
-    return _requestMetadata;
+    return Collections.unmodifiableMap(_requestMetadata);

Review Comment:
   Suggest directly constructing it as unmodifiable map (in the constructor)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java:
##########
@@ -42,7 +43,7 @@ public List<WorkerMetadata> getWorkerMetadataList() {
   }
 
   public Map<String, String> getCustomProperties() {
-    return _customProperties;
+    return Collections.unmodifiableMap(_customProperties);

Review Comment:
   Suggest directly constructing it as unmodifiable map (in the constructor)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to