ankitsultana commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1111432194


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -35,116 +33,80 @@
  * This class provides the implementation for scheduling multistage queries on 
a single node based
  * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
  * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next()} will be requested.
- *
- * <p>Note that a yielded operator chain will be re-registered with the 
underlying scheduler.
+ * chain ({@link OpChainScheduler#next} will be requested.
  */
 @SuppressWarnings("UnstableApiUsage")
 public class OpChainSchedulerService extends AbstractExecutionThreadService {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-
-  private static final int TERMINATION_TIMEOUT_SEC = 60;
+  // Default time scheduler is allowed to wait for a runnable OpChain to be 
available
+  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
 
   private final OpChainScheduler _scheduler;
   private final ExecutorService _workerPool;
-  private final long _pollIntervalMs;
-
-  // anything that is guarded by this monitor should be non-blocking
-  private final Monitor _monitor = new Monitor();
-  private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
-    @Override
-    public boolean isSatisfied() {
-      return _scheduler.hasNext() || !isRunning();
-    }
-  };
 
-  // Note that workerPool is shut down in this class.
   public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    this(scheduler, workerPool, -1);
-  }
-
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool, long pollIntervalMs) {
     _scheduler = scheduler;
     _workerPool = workerPool;
-    _pollIntervalMs = pollIntervalMs;
   }
 
   @Override
   protected void triggerShutdown() {
+    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
     LOGGER.info("Triggered shutdown on OpChainScheduler...");
-    // this will just notify all waiters that the scheduler is shutting down
-    _monitor.enter();
-    _monitor.leave();
-    if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool, 
TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) {
-      LOGGER.error("Failed to shut down and terminate OpChainScheduler.");
-    }
-    _scheduler.shutDown();
   }
 
   @Override
   protected void run()
       throws Exception {
     while (isRunning()) {
-      if (enterMonitor()) {
-        try {
-          if (!isRunning()) {
-            return;
-          }
-
-          OpChain operatorChain = _scheduler.next();
-          LOGGER.trace("({}): Scheduling", operatorChain);
-          _workerPool.submit(new TraceRunnable() {
-            @Override
-            public void runJob() {
-              try {
-                LOGGER.trace("({}): Executing", operatorChain);
-                operatorChain.getStats().executing();
-
-                // so long as there's work to be done, keep getting the next 
block
-                // when the operator chain returns a NOOP block, then yield 
the execution
-                // of this to another worker
-                TransferableBlock result = operatorChain.getRoot().nextBlock();
-                while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
-                  result = operatorChain.getRoot().nextBlock();
-                }
+      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
+      if (operatorChain == null) {
+        continue;
+      }
+      LOGGER.trace("({}): Scheduling", operatorChain);
+      _workerPool.submit(new TraceRunnable() {
+        @Override
+        public void runJob() {
+          boolean isFinished = false;
+          Throwable thrown = null;
+          try {
+            LOGGER.trace("({}): Executing", operatorChain);
+            operatorChain.getStats().executing();
+
+            // so long as there's work to be done, keep getting the next block
+            // when the operator chain returns a NOOP block, then yield the 
execution
+            // of this to another worker
+            TransferableBlock result = operatorChain.getRoot().nextBlock();
+            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+              result = operatorChain.getRoot().nextBlock();
+            }
 
-                if (!result.isEndOfStreamBlock()) {
-                  // not complete, needs to re-register for scheduling
-                  register(operatorChain, false);
-                } else {
-                  if (result.isErrorBlock()) {
-                    operatorChain.getRoot().toExplainString();
-                    LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                        result.getDataBlock().getExceptions());
-                  } else {
-                    operatorChain.getRoot().toExplainString();
-                    
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
-                    LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-                  }
-                  operatorChain.close();
-                }
-              } catch (Exception e) {
-                operatorChain.close();
+            if (!result.isEndOfStreamBlock()) {
+              _scheduler.yield(operatorChain);
+            } else {
+              isFinished = true;
+              if (result.isErrorBlock()) {
                 operatorChain.getRoot().toExplainString();
-                LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
+                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
+                    result.getDataBlock().getExceptions());
+              } else {
+                operatorChain.getRoot().toExplainString();
+                
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
+                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
               }
             }
-          });
-        } finally {
-          _monitor.leave();
+          } catch (Exception e) {
+            LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
+            thrown = e;
+          } finally {
+            if (isFinished) {

Review Comment:
   In the else block above we are making a couple of calls: `toExplainString` 
and setting operator stats map. Though right now they don't throw any 
exception, if in the future those methods throw any exception I wanted to be 
sure that we will still be able to ensure that the OpChains are de-registered 
and closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to