This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a591090f2c [multistage] Close OpChains That Return Error-Block (#10319)
a591090f2c is described below

commit a591090f2cb73ac7937ff6422b482580c71ba8dc
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu Feb 23 11:33:33 2023 +0530

    [multistage] Close OpChains That Return Error-Block (#10319)
---
 .../runtime/executor/OpChainSchedulerService.java  | 17 ++++++++------
 .../executor/OpChainSchedulerServiceTest.java      | 27 +++++++++++++++++++++-
 2 files changed, 36 insertions(+), 8 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 5c3e9fa29f..15ae722142 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -81,17 +81,19 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
               result = operatorChain.getRoot().nextBlock();
             }
 
-            if (!result.isEndOfStreamBlock()) {
+            if (result.isNoOpBlock()) {
               // TODO: There should be a waiting-for-data state in 
OpChainStats.
               operatorChain.getStats().queued();
               _scheduler.yield(operatorChain);
-            } else if (result.isEndOfStreamBlock()) {
-              isFinished = true;
-              LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
-                  result.getDataBlock().getExceptions());
             } else {
-              
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
-              LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
+              isFinished = true;
+              if (result.isErrorBlock()) {
+                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
+                    result.getDataBlock().getExceptions());
+              } else {
+                
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
+                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
+              }
             }
           } catch (Exception e) {
             LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
@@ -100,6 +102,7 @@ public class OpChainSchedulerService extends 
AbstractExecutionThreadService {
             if (isFinished) {
               closeOpChain(operatorChain);
             } else if (thrown != null) {
+              // TODO: It would make sense to cancel OpChains if they returned 
an error-block.
               cancelOpChain(operatorChain, thrown);
             }
           }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 9f0d9e8963..eef2740fe7 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -144,6 +144,7 @@ public class OpChainSchedulerServiceTest {
     OpChain opChain = getChain(_operatorA);
     CountDownLatch latch = new CountDownLatch(3);
     AtomicBoolean returnedOpChain = new AtomicBoolean(false);
+    
Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
     Mockito.when(_scheduler.next(Mockito.anyLong(), 
Mockito.any())).thenAnswer(inv -> {
       latch.countDown();
       if (latch.getCount() == 0 && returnedOpChain.compareAndSet(false, true)) 
{
@@ -160,7 +161,7 @@ public class OpChainSchedulerServiceTest {
   }
 
   @Test
-  public void shouldCallCloseOnOperators()
+  public void shouldCallCloseOnOperatorsThatFinishSuccessfully()
       throws InterruptedException {
     initExecutor(1);
     OpChain opChain = getChain(_operatorA);
@@ -181,6 +182,30 @@ public class OpChainSchedulerServiceTest {
     scheduler.stopAsync().awaitTerminated();
   }
 
+  @Test
+  public void shouldCallCloseOnOperatorsThatReturnErrorBlock()
+      throws InterruptedException {
+    initExecutor(1);
+    OpChain opChain = getChain(_operatorA);
+    Mockito.when(_scheduler.next(Mockito.anyLong(), 
Mockito.any())).thenReturn(opChain).thenReturn(null);
+    OpChainSchedulerService scheduler = new 
OpChainSchedulerService(_scheduler, _executor);
+
+    CountDownLatch latch = new CountDownLatch(1);
+    Mockito.when(_operatorA.nextBlock()).thenReturn(
+        TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException("foo")));
+    Mockito.doAnswer(inv -> {
+      latch.countDown();
+      return null;
+    }).when(_operatorA).close();
+
+    scheduler.startAsync().awaitRunning();
+    scheduler.register(opChain);
+
+    Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be 
called in less than 10 seconds");
+    scheduler.stopAsync().awaitTerminated();
+  }
+
+
   @Test
   public void shouldCallCancelOnOpChainsThatThrow()
       throws InterruptedException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to