This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b1e8655449 [multistage][bugfix] fix PB operator never returns block 
(#10881)
b1e8655449 is described below

commit b1e8655449abd903330993370dee2541869f8be9
Author: Rong Rong <[email protected]>
AuthorDate: Fri Jun 9 16:37:53 2023 -0700

    [multistage][bugfix] fix PB operator never returns block (#10881)
    
    * fix PB operator never returns block thus hogging the opChainScheduler 
pool issue.
    * also fix empty block NPE, this never happened since previously we also 
add no-op block to the result map.
    
    ---------
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java | 7 ++++---
 .../pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java  | 4 +++-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index ffb01aa137..3c338ac1a8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -79,14 +79,14 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
       Map.Entry<Integer, Operator<TransferableBlock>> worker = 
_workerEntries.remove();
       TransferableBlock block = worker.getValue().nextBlock();
 
-      // Release the mailbox when the block is end-of-stream
-      if (block != null && block.isSuccessfulEndOfStreamBlock()) {
+      // Release the mailbox worker when the block is end-of-stream
+      if (block != null && !block.isNoOpBlock() && 
block.isSuccessfulEndOfStreamBlock()) {
         continue;
       }
 
       // Add the worker back to the queue if the block is not end-of-stream
       _workerEntries.add(worker);
-      if (block != null) {
+      if (block != null && !block.isNoOpBlock()) {
         if (block.isErrorBlock()) {
           _errorBlock = block;
           constructErrorResponse(block);
@@ -97,6 +97,7 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
         if (!block.isEndOfStreamBlock()) {
           blockList.add(block);
         }
+        return block;
       }
     }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 9b1f97426c..7cda76bfa2 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime.plan.server;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -109,7 +110,8 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
     }
     staticSide.visit(this, context);
     int resultMapId = 
context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide);
-    List<TransferableBlock> transferableBlocks = 
context.getPipelineBreakerResult().getResultMap().get(resultMapId);
+    List<TransferableBlock> transferableBlocks = 
context.getPipelineBreakerResult().getResultMap().getOrDefault(
+        resultMapId, Collections.emptyList());
     List<Object[]> resultDataContainer = new ArrayList<>();
     DataSchema dataSchema = dynamicSide.getDataSchema();
     for (TransferableBlock block : transferableBlocks) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to