gortiz commented on code in PR #11205:
URL: https://github.com/apache/pinot/pull/11205#discussion_r1289954302


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -31,151 +32,73 @@
 import org.slf4j.LoggerFactory;
 
 
-/**
- * This class provides the implementation for scheduling multistage queries on 
a single node based
- * on the {@link OpChainScheduler} logic that is passed in. Multistage queries 
support partial execution
- * and will return a NOOP metadata block as a "yield" signal, indicating that 
the next operator
- * chain ({@link OpChainScheduler#next} will be requested.
- */
-@SuppressWarnings("UnstableApiUsage")
-public class OpChainSchedulerService extends AbstractExecutionThreadService {
+public class OpChainSchedulerService implements SchedulerService {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(OpChainSchedulerService.class);
-  /**
-   * Default time scheduler is allowed to wait for a runnable OpChain to be 
available.
-   */
-  private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
-  /**
-   * Default cancel signal retention, this should be set to several times 
larger than
-   * {@link 
org.apache.pinot.query.service.QueryConfig#DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS}.
-   */
-  private static final long SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS = 
60_000L;
 
-  private final OpChainScheduler _scheduler;
-  private final ExecutorService _workerPool;
-  private final Cache<Long, Long> _cancelledRequests = 
CacheBuilder.newBuilder()
-      .expireAfterWrite(SCHEDULER_CANCELLATION_SIGNAL_RETENTION_MS, 
TimeUnit.MILLISECONDS).build();
+  private final ExecutorService _executorService;
+  private final ConcurrentHashMap<OpChainId, Future<?>> _submittedOpChainMap;
 
-  public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService 
workerPool) {
-    _scheduler = scheduler;
-    _workerPool = workerPool;
+  public OpChainSchedulerService(ExecutorService executorService) {
+    _executorService = executorService;
+    _submittedOpChainMap = new ConcurrentHashMap<>();
   }
 
   @Override
-  protected void triggerShutdown() {
-    // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
-    LOGGER.info("Triggered shutdown on OpChainScheduler...");
-  }
-
-  @Override
-  protected void run()
-      throws Exception {
-    while (isRunning()) {
-      OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS, 
TimeUnit.MILLISECONDS);
-      if (operatorChain == null) {
-        continue;
-      }
-      LOGGER.trace("({}): Scheduling", operatorChain);
-      _workerPool.submit(new TraceRunnable() {
-        @Override
-        public void runJob() {
-          boolean isFinished = false;
-          boolean returnedErrorBlock = false;
-          Throwable thrown = null;
-          try {
-            LOGGER.trace("({}): Executing", operatorChain);
-            // throw if the operatorChain is cancelled.
-            if 
(_cancelledRequests.asMap().containsKey(operatorChain.getId().getRequestId())) {
-              throw new InterruptedException("Query was cancelled!");
-            }
-            operatorChain.getStats().executing();
-            // so long as there's work to be done, keep getting the next block
-            // when the operator chain returns a NOOP block, then yield the 
execution
-            // of this to another worker
-            TransferableBlock result = operatorChain.getRoot().nextBlock();
-            while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
-              result = operatorChain.getRoot().nextBlock();
-            }
-
-            if (result.isNoOpBlock()) {
-              // TODO: There should be a waiting-for-data state in 
OpChainStats.
-              operatorChain.getStats().queued();
-              _scheduler.yield(operatorChain);
-            } else {
-              isFinished = true;
-              if (result.isErrorBlock()) {
-                returnedErrorBlock = true;
-                LOGGER.error("({}): Completed erroneously {} {}", 
operatorChain, operatorChain.getStats(),
-                    result.getDataBlock().getExceptions());
-              } else {
-                LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
-              }
-            }
-          } catch (Exception e) {
-            LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
-            thrown = e;
-          } finally {
-            if (returnedErrorBlock || thrown != null) {
-              cancelOpChain(operatorChain, thrown);
-            } else if (isFinished) {
-              closeOpChain(operatorChain);
+  public void register(OpChain operatorChain) {
+    Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
+      @Override
+      public void runJob() {
+        boolean isFinished = false;
+        TransferableBlock returnedErrorBlock = null;
+        Throwable thrown = null;
+        try {
+          LOGGER.trace("({}): Executing", operatorChain);
+          operatorChain.getStats().executing();
+          TransferableBlock result = operatorChain.getRoot().nextBlock();
+          while (!result.isEndOfStreamBlock()) {
+            result = operatorChain.getRoot().nextBlock();
+          }
+          isFinished = true;
+          if (result.isErrorBlock()) {
+            returnedErrorBlock = result;
+            LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
operatorChain.getStats(),
+                result.getDataBlock().getExceptions());
+          } else {
+            LOGGER.debug("({}): Completed {}", operatorChain, 
operatorChain.getStats());
+          }
+        } catch (Exception e) {
+          LOGGER.error("({}): Failed to execute operator chain! {}", 
operatorChain, operatorChain.getStats(), e);
+          thrown = e;
+        } finally {
+          _submittedOpChainMap.remove(operatorChain.getId());
+          if (returnedErrorBlock != null || thrown != null) {
+            if (thrown == null) {
+              thrown = new RuntimeException("Error block " + 
returnedErrorBlock.getDataBlock().getExceptions());
             }
+            operatorChain.cancel(thrown);
+          } else if (isFinished) {
+            operatorChain.close();
           }
         }
-      });
-    }
-  }
-
-  /**
-   * Register a new operator chain with the scheduler.
-   *
-   * @param operatorChain the chain to register
-   */
-  public final void register(OpChain operatorChain) {
-    operatorChain.getStats().queued();
-    _scheduler.register(operatorChain);
-    LOGGER.debug("({}): Scheduler is now handling operator chain listening to 
mailboxes {}. "
-            + "There are a total of {} chains awaiting execution.", 
operatorChain,
-        operatorChain.getReceivingMailboxIds(),
-        _scheduler.size());
-  }
-
-  /**
-   * Async cancel a request. Request will not be fully cancelled until the 
next time opChain is being polled.
-   *
-   * @param requestId requestId to be cancelled.
-   */
-  public final void cancel(long requestId) {
-    _cancelledRequests.put(requestId, requestId);
-  }
-
-  /**
-   * This method should be called whenever data is available for an {@link 
OpChain} to consume.
-   * Implementations of this method should be idempotent, it may be called in 
the scenario that no data is available.
-   *
-   * @param opChainId the identifier of the operator chain
-   */
-  public final void onDataAvailable(OpChainId opChainId) {
-    _scheduler.onDataAvailable(opChainId);
-  }
-
-  // TODO: remove this method after we pipe down the proper executor pool to 
the v1 engine
-  public ExecutorService getWorkerPool() {
-    return _workerPool;
-  }
-
-  private void closeOpChain(OpChain opChain) {
-    try {
-      opChain.close();
-    } finally {
-      _scheduler.deregister(opChain);
-    }
+      }
+    });
+    _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
   }
 
-  private void cancelOpChain(OpChain opChain, Throwable e) {
-    try {
-      opChain.cancel(e);
-    } finally {
-      _scheduler.deregister(opChain);
+  @Override
+  public void cancel(long requestId) {
+    // simple cancellation. for leaf stage this cannot be a dangling opchain 
b/c they will eventually be cleared up
+    // via query timeout.
+    List<Map.Entry<OpChainId, Future<?>>> entries =
+        _submittedOpChainMap.entrySet().stream().filter(entry -> 
entry.getKey().getRequestId() == requestId)
+            .collect(Collectors.toList());
+    for (Map.Entry<OpChainId, Future<?>> entry : entries) {
+      OpChainId key = entry.getKey();
+      Future<?> future = entry.getValue();
+      if (future != null) {
+        future.cancel(true);
+      }
+      _submittedOpChainMap.remove(key);

Review Comment:
   I didn't want to use that approach because Map.entrySet says:
   
   ```If the map is modified while an iteration over the set is in progress 
(except through the iterator's own remove operation, or through the setValue 
operation on a map entry returned by the iterator) the results of the iteration 
are undefined```
   
   On the other hand, ConcurrentHashMap.entrySet says:
   ```
   The view's iterators and spliterators are [weakly 
consistent](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly).
   ```
   Which is defined in 
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly
 as:
   ```
   Most concurrent Collection implementations (including most Queues) also 
differ from the usual java.util conventions in that their 
[Iterators](https://docs.oracle.com/javase/8/docs/api/java/util/Iterator.html) 
and 
[Spliterators](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)
 provide weakly consistent rather than fast-fail traversal:
   
   - they may proceed concurrently with other operations
   - they will never throw 
[ConcurrentModificationException](https://docs.oracle.com/javase/8/docs/api/java/util/ConcurrentModificationException.html)
   - they are guaranteed to traverse elements as they existed upon construction 
exactly once, and may (but are not guaranteed to) reflect any modifications 
subsequent to construction.
   ```
   
   So your code should be safe, but seems difficult to reason about that. We 
also don't know the blocking implications, although they doesn't seem to be 
that important (we assume they are correct, but we don't care that much if we 
block other accesses to the map for some extra microseconds).
   
   That is why I though the current code is easier to reason about, but I'm 
open to use yours if you prefer that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to