This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a591090f2c [multistage] Close OpChains That Return Error-Block (#10319)
a591090f2c is described below
commit a591090f2cb73ac7937ff6422b482580c71ba8dc
Author: Ankit Sultana <[email protected]>
AuthorDate: Thu Feb 23 11:33:33 2023 +0530
[multistage] Close OpChains That Return Error-Block (#10319)
---
.../runtime/executor/OpChainSchedulerService.java | 17 ++++++++------
.../executor/OpChainSchedulerServiceTest.java | 27 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 8 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 5c3e9fa29f..15ae722142 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -81,17 +81,19 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
result = operatorChain.getRoot().nextBlock();
}
- if (!result.isEndOfStreamBlock()) {
+ if (result.isNoOpBlock()) {
// TODO: There should be a waiting-for-data state in
OpChainStats.
operatorChain.getStats().queued();
_scheduler.yield(operatorChain);
- } else if (result.isEndOfStreamBlock()) {
- isFinished = true;
- LOGGER.error("({}): Completed erroneously {} {}", operatorChain,
operatorChain.getStats(),
- result.getDataBlock().getExceptions());
} else {
-
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
- LOGGER.debug("({}): Completed {}", operatorChain,
operatorChain.getStats());
+ isFinished = true;
+ if (result.isErrorBlock()) {
+ LOGGER.error("({}): Completed erroneously {} {}",
operatorChain, operatorChain.getStats(),
+ result.getDataBlock().getExceptions());
+ } else {
+
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
+ LOGGER.debug("({}): Completed {}", operatorChain,
operatorChain.getStats());
+ }
}
} catch (Exception e) {
LOGGER.error("({}): Failed to execute operator chain! {}",
operatorChain, operatorChain.getStats(), e);
@@ -100,6 +102,7 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
if (isFinished) {
closeOpChain(operatorChain);
} else if (thrown != null) {
+ // TODO: It would make sense to cancel OpChains if they returned
an error-block.
cancelOpChain(operatorChain, thrown);
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index 9f0d9e8963..eef2740fe7 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -144,6 +144,7 @@ public class OpChainSchedulerServiceTest {
OpChain opChain = getChain(_operatorA);
CountDownLatch latch = new CountDownLatch(3);
AtomicBoolean returnedOpChain = new AtomicBoolean(false);
+
Mockito.when(_operatorA.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
Mockito.when(_scheduler.next(Mockito.anyLong(),
Mockito.any())).thenAnswer(inv -> {
latch.countDown();
if (latch.getCount() == 0 && returnedOpChain.compareAndSet(false, true))
{
@@ -160,7 +161,7 @@ public class OpChainSchedulerServiceTest {
}
@Test
- public void shouldCallCloseOnOperators()
+ public void shouldCallCloseOnOperatorsThatFinishSuccessfully()
throws InterruptedException {
initExecutor(1);
OpChain opChain = getChain(_operatorA);
@@ -181,6 +182,30 @@ public class OpChainSchedulerServiceTest {
scheduler.stopAsync().awaitTerminated();
}
+ @Test
+ public void shouldCallCloseOnOperatorsThatReturnErrorBlock()
+ throws InterruptedException {
+ initExecutor(1);
+ OpChain opChain = getChain(_operatorA);
+ Mockito.when(_scheduler.next(Mockito.anyLong(),
Mockito.any())).thenReturn(opChain).thenReturn(null);
+ OpChainSchedulerService scheduler = new
OpChainSchedulerService(_scheduler, _executor);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ Mockito.when(_operatorA.nextBlock()).thenReturn(
+ TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException("foo")));
+ Mockito.doAnswer(inv -> {
+ latch.countDown();
+ return null;
+ }).when(_operatorA).close();
+
+ scheduler.startAsync().awaitRunning();
+ scheduler.register(opChain);
+
+ Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be
called in less than 10 seconds");
+ scheduler.stopAsync().awaitTerminated();
+ }
+
+
@Test
public void shouldCallCancelOnOpChainsThatThrow()
throws InterruptedException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]