walterddr commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1110321326
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -35,115 +33,78 @@
* This class provides the implementation for scheduling multistage queries on
a single node based
* on the {@link OpChainScheduler} logic that is passed in. Multistage queries
support partial execution
* and will return a NOOP metadata block as a "yield" signal, indicating that
the next operator
- * chain ({@link OpChainScheduler#next()} will be requested.
- *
- * <p>Note that a yielded operator chain will be re-registered with the
underlying scheduler.
+ * chain ({@link OpChainScheduler#next} will be requested.
*/
@SuppressWarnings("UnstableApiUsage")
public class OpChainSchedulerService extends AbstractExecutionThreadService {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(OpChainSchedulerService.class);
-
- private static final int TERMINATION_TIMEOUT_SEC = 60;
+ // Default time scheduler is allowed to wait for a runnable OpChain to be
available
+ private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
private final OpChainScheduler _scheduler;
private final ExecutorService _workerPool;
- private final long _pollIntervalMs;
-
- // anything that is guarded by this monitor should be non-blocking
- private final Monitor _monitor = new Monitor();
- private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
- @Override
- public boolean isSatisfied() {
- return _scheduler.hasNext() || !isRunning();
- }
- };
- // Note that workerPool is shut down in this class.
public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService
workerPool) {
- this(scheduler, workerPool, -1);
- }
-
- public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService
workerPool, long pollIntervalMs) {
_scheduler = scheduler;
_workerPool = workerPool;
- _pollIntervalMs = pollIntervalMs;
}
@Override
protected void triggerShutdown() {
+ // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
LOGGER.info("Triggered shutdown on OpChainScheduler...");
- // this will just notify all waiters that the scheduler is shutting down
- _monitor.enter();
- _monitor.leave();
- if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool,
TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) {
- LOGGER.error("Failed to shut down and terminate OpChainScheduler.");
- }
- _scheduler.shutDown();
}
@Override
protected void run()
throws Exception {
while (isRunning()) {
- if (enterMonitor()) {
- try {
- if (!isRunning()) {
- return;
- }
-
- OpChain operatorChain = _scheduler.next();
- LOGGER.trace("({}): Scheduling", operatorChain);
- _workerPool.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- try {
- LOGGER.trace("({}): Executing", operatorChain);
- operatorChain.getStats().executing();
-
- // so long as there's work to be done, keep getting the next
block
- // when the operator chain returns a NOOP block, then yield
the execution
- // of this to another worker
- TransferableBlock result = operatorChain.getRoot().nextBlock();
- while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
- result = operatorChain.getRoot().nextBlock();
- }
+ OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS,
TimeUnit.MILLISECONDS);
+ if (operatorChain == null) {
+ continue;
+ }
Review Comment:
this basically means we are essentially checking _scheduler.next with
indefinite timeout until shutdown. yes?
if I understand correctly this is mostly for shutdown to not take any time
longer than `DEFAULT_SCHEDULER_NEXT_WAIT_MS`. is my understanding correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]