xiangfu0 commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1110703495
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java:
##########
@@ -35,116 +33,80 @@
* This class provides the implementation for scheduling multistage queries on
a single node based
* on the {@link OpChainScheduler} logic that is passed in. Multistage queries
support partial execution
* and will return a NOOP metadata block as a "yield" signal, indicating that
the next operator
- * chain ({@link OpChainScheduler#next()} will be requested.
- *
- * <p>Note that a yielded operator chain will be re-registered with the
underlying scheduler.
+ * chain ({@link OpChainScheduler#next} will be requested.
*/
@SuppressWarnings("UnstableApiUsage")
public class OpChainSchedulerService extends AbstractExecutionThreadService {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(OpChainSchedulerService.class);
-
- private static final int TERMINATION_TIMEOUT_SEC = 60;
+ // Default time scheduler is allowed to wait for a runnable OpChain to be
available
+ private static final long DEFAULT_SCHEDULER_NEXT_WAIT_MS = 100;
private final OpChainScheduler _scheduler;
private final ExecutorService _workerPool;
- private final long _pollIntervalMs;
-
- // anything that is guarded by this monitor should be non-blocking
- private final Monitor _monitor = new Monitor();
- private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
- @Override
- public boolean isSatisfied() {
- return _scheduler.hasNext() || !isRunning();
- }
- };
- // Note that workerPool is shut down in this class.
public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService
workerPool) {
- this(scheduler, workerPool, -1);
- }
-
- public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService
workerPool, long pollIntervalMs) {
_scheduler = scheduler;
_workerPool = workerPool;
- _pollIntervalMs = pollIntervalMs;
}
@Override
protected void triggerShutdown() {
+ // TODO: Figure out shutdown lifecycle with graceful shutdown in mind.
LOGGER.info("Triggered shutdown on OpChainScheduler...");
- // this will just notify all waiters that the scheduler is shutting down
- _monitor.enter();
- _monitor.leave();
- if (!MoreExecutors.shutdownAndAwaitTermination(_workerPool,
TERMINATION_TIMEOUT_SEC, TimeUnit.SECONDS)) {
- LOGGER.error("Failed to shut down and terminate OpChainScheduler.");
- }
- _scheduler.shutDown();
}
@Override
protected void run()
throws Exception {
while (isRunning()) {
- if (enterMonitor()) {
- try {
- if (!isRunning()) {
- return;
- }
-
- OpChain operatorChain = _scheduler.next();
- LOGGER.trace("({}): Scheduling", operatorChain);
- _workerPool.submit(new TraceRunnable() {
- @Override
- public void runJob() {
- try {
- LOGGER.trace("({}): Executing", operatorChain);
- operatorChain.getStats().executing();
-
- // so long as there's work to be done, keep getting the next
block
- // when the operator chain returns a NOOP block, then yield
the execution
- // of this to another worker
- TransferableBlock result = operatorChain.getRoot().nextBlock();
- while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
- result = operatorChain.getRoot().nextBlock();
- }
+ OpChain operatorChain = _scheduler.next(DEFAULT_SCHEDULER_NEXT_WAIT_MS,
TimeUnit.MILLISECONDS);
+ if (operatorChain == null) {
+ continue;
+ }
+ LOGGER.trace("({}): Scheduling", operatorChain);
+ _workerPool.submit(new TraceRunnable() {
+ @Override
+ public void runJob() {
+ boolean isFinished = false;
+ Throwable thrown = null;
+ try {
+ LOGGER.trace("({}): Executing", operatorChain);
+ operatorChain.getStats().executing();
+
+ // so long as there's work to be done, keep getting the next block
+ // when the operator chain returns a NOOP block, then yield the
execution
+ // of this to another worker
+ TransferableBlock result = operatorChain.getRoot().nextBlock();
+ while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) {
+ result = operatorChain.getRoot().nextBlock();
+ }
- if (!result.isEndOfStreamBlock()) {
- // not complete, needs to re-register for scheduling
- register(operatorChain, false);
- } else {
- if (result.isErrorBlock()) {
- operatorChain.getRoot().toExplainString();
- LOGGER.error("({}): Completed erroneously {} {}",
operatorChain, operatorChain.getStats(),
- result.getDataBlock().getExceptions());
- } else {
- operatorChain.getRoot().toExplainString();
-
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
- LOGGER.debug("({}): Completed {}", operatorChain,
operatorChain.getStats());
- }
- operatorChain.close();
- }
- } catch (Exception e) {
- operatorChain.close();
+ if (!result.isEndOfStreamBlock()) {
+ _scheduler.yield(operatorChain);
+ } else {
+ isFinished = true;
+ if (result.isErrorBlock()) {
operatorChain.getRoot().toExplainString();
- LOGGER.error("({}): Failed to execute operator chain! {}",
operatorChain, operatorChain.getStats(), e);
+ LOGGER.error("({}): Completed erroneously {} {}",
operatorChain, operatorChain.getStats(),
+ result.getDataBlock().getExceptions());
+ } else {
+ operatorChain.getRoot().toExplainString();
+
operatorChain.getStats().setOperatorStatsMap(result.getResultMetadata());
+ LOGGER.debug("({}): Completed {}", operatorChain,
operatorChain.getStats());
}
}
- });
- } finally {
- _monitor.leave();
+ } catch (Exception e) {
+ LOGGER.error("({}): Failed to execute operator chain! {}",
operatorChain, operatorChain.getStats(), e);
+ thrown = e;
+ } finally {
+ if (isFinished) {
Review Comment:
why put this finally block here instead of splitting them into the else
block and the catch block?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -39,168 +47,204 @@
* but will only schedule them when there is work to be done. The availability
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
+ * <p>
+ * Design: There are 3 states for a OpChain:
+ *
+ * 1. Ready: This state means that an OpChain is ready to be run. If an
OpChain is in this state, then it would be in
+ * the _ready queue. The _ready queue is polled on every call to
{@link #next}.
+ * 2. Available/Yielded: This is when the OpChain was run and it returned a
no-op block, indicating that it has no
+ * new data to process. The {@link OpChainId} for
these OpChains are stored in the _available
+ * map, where the value of the map is the
release-timeout.
+ * 3. Running: When the OpChain is returned by the {@link #next} method, it
is considered to be in the Running state.
+ *
+ * The following state transitions are possible:
+ * 1. Ready ==> Running: This happens when the OpChain is returned by the
{@link #next} method.
+ * 2. Running ==> Available/Yielded: This happens when a running OpChain
returns a No-op block, following which a
+ * yield is called and there's no entry
for the OpChain in _seenMail.
+ * 3. Available/Yielded ==> Ready: This can happen in two cases: (1) When
yield is called but _seenMail has an
+ * entry for the corresponding OpChainId,
which means there was some data received
+ * by MailboxReceiveOperator after the last
poll. (2) When a sender has died or hasn't
+ * sent any data in the last
_releaseTimeoutMs milliseconds.
+ *
+ * The OpChain is considered "alive" from the time it is registered until it
is de-registered. Any reference to the
+ * OpChain or its related metadata is kept only while the OpChain is alive.
The {@link #onDataAvailable} callback
+ * can be called before an OpChain was ever registered. In that case, this
scheduler will simply ignore the callback,
+ * since once the OpChain gets registered it will anyways be put into the
_ready queue immediately. In case the
+ * OpChain never gets registered (e.g. if the broker couldn't dispatch it),
as long as the sender cleans up all
+ * resources that it has acquired, there will be no leak, since the
scheduler doesn't hold any references for
+ * non-alive OpChains.
+ * </p>
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT =
TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME =
"round-robin-scheduler-release-thread";
- private final long _releaseTimeout;
+ private final long _releaseTimeoutMs;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+ private final Map<OpChainId, OpChain> _aliveChains = new
ConcurrentHashMap<>();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
+
+ private final Lock _lock = new ReentrantLock();
+ private final ScheduledExecutorService _availableOpChainReleaseService;
- public RoundRobinScheduler(long releaseTimeout) {
- this(releaseTimeout, System::currentTimeMillis);
+ public RoundRobinScheduler(long releaseTimeoutMs) {
+ this(releaseTimeoutMs, System::currentTimeMillis);
}
- public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
- _releaseTimeout = releaseTimeoutMs;
+ RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+ Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for
round-robin scheduler should be > 0ms");
+ _releaseTimeoutMs = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ List<OpChainId> timedOutWaiting = new ArrayList<>();
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting
down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ if (_ticker.get() + _releaseTimeoutMs > entry.getValue()) {
Review Comment:
if (_ticker.get() > entry.getValue()) {
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -39,168 +47,204 @@
* but will only schedule them when there is work to be done. The availability
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
+ * <p>
+ * Design: There are 3 states for a OpChain:
+ *
+ * 1. Ready: This state means that an OpChain is ready to be run. If an
OpChain is in this state, then it would be in
+ * the _ready queue. The _ready queue is polled on every call to
{@link #next}.
+ * 2. Available/Yielded: This is when the OpChain was run and it returned a
no-op block, indicating that it has no
+ * new data to process. The {@link OpChainId} for
these OpChains are stored in the _available
+ * map, where the value of the map is the
release-timeout.
+ * 3. Running: When the OpChain is returned by the {@link #next} method, it
is considered to be in the Running state.
+ *
+ * The following state transitions are possible:
+ * 1. Ready ==> Running: This happens when the OpChain is returned by the
{@link #next} method.
+ * 2. Running ==> Available/Yielded: This happens when a running OpChain
returns a No-op block, following which a
+ * yield is called and there's no entry
for the OpChain in _seenMail.
+ * 3. Available/Yielded ==> Ready: This can happen in two cases: (1) When
yield is called but _seenMail has an
+ * entry for the corresponding OpChainId,
which means there was some data received
+ * by MailboxReceiveOperator after the last
poll. (2) When a sender has died or hasn't
+ * sent any data in the last
_releaseTimeoutMs milliseconds.
+ *
+ * The OpChain is considered "alive" from the time it is registered until it
is de-registered. Any reference to the
+ * OpChain or its related metadata is kept only while the OpChain is alive.
The {@link #onDataAvailable} callback
+ * can be called before an OpChain was ever registered. In that case, this
scheduler will simply ignore the callback,
+ * since once the OpChain gets registered it will anyways be put into the
_ready queue immediately. In case the
+ * OpChain never gets registered (e.g. if the broker couldn't dispatch it),
as long as the sender cleans up all
+ * resources that it has acquired, there will be no leak, since the
scheduler doesn't hold any references for
+ * non-alive OpChains.
+ * </p>
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT =
TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME =
"round-robin-scheduler-release-thread";
- private final long _releaseTimeout;
+ private final long _releaseTimeoutMs;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+ private final Map<OpChainId, OpChain> _aliveChains = new
ConcurrentHashMap<>();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
+
+ private final Lock _lock = new ReentrantLock();
+ private final ScheduledExecutorService _availableOpChainReleaseService;
- public RoundRobinScheduler(long releaseTimeout) {
- this(releaseTimeout, System::currentTimeMillis);
+ public RoundRobinScheduler(long releaseTimeoutMs) {
+ this(releaseTimeoutMs, System::currentTimeMillis);
}
- public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
- _releaseTimeout = releaseTimeoutMs;
+ RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+ Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for
round-robin scheduler should be > 0ms");
+ _releaseTimeoutMs = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ List<OpChainId> timedOutWaiting = new ArrayList<>();
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting
down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ if (_ticker.get() + _releaseTimeoutMs > entry.getValue()) {
+ timedOutWaiting.add(entry.getKey());
+ }
+ }
+ for (OpChainId opChainId : timedOutWaiting) {
+ _lock.lock();
+ try {
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }, _releaseTimeoutMs, _releaseTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- public void register(OpChain operatorChain, boolean isNew) {
- if (_isShutDown) {
- return;
- }
- // the first time an operator chain is scheduled, it should
- // immediately be considered ready in case it does not need
- // read from any mailbox (e.g. with a LiteralValueOperator)
- if (isNew) {
+ public void register(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.put(operatorChain.getId(), operatorChain);
_ready.add(operatorChain);
- } else {
- long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() +
_releaseTimeout;
- _available.add(new AvailableEntry(operatorChain, releaseTs));
+ } finally {
+ _lock.unlock();
}
trace("registered " + operatorChain);
}
@Override
- public void onDataAvailable(MailboxIdentifier mailbox) {
- // it may be possible to receive this callback when there's no
corresponding
- // operator chain registered to the mailbox - this can happen when either
- // (1) we get the callback before the first register is called or (2) we
get
- // the callback while the operator chain is executing. to account for this,
- // we just store it in a set of seen mail and only check for it when
hasNext
- // is called.
- //
- // note that scenario (2) may cause a false-positive schedule where an
operator
- // chain gets scheduled for mail that it had already processed, in which
case
- // the operator chain will simply do nothing and get put back onto the
queue.
- // scenario (2) may additionally cause a memory leak - if onDataAvailable
is
- // called with an EOS block _while_ the operator chain is executing, the
chain
- // will consume the EOS block and computeReady() will never remove the
mailbox
- // from the _seenMail set.
- //
- // TODO: fix the memory leak by adding a close(opChain) callback
- _seenMail.add(mailbox);
- trace("got mail for " + mailbox);
+ public void deregister(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.remove(operatorChain.getId());
+ // it could be that the onDataAvailable callback was called when the
OpChain was executing, in which case there
+ // could be a dangling entry in _seenMail.
+ _seenMail.remove(operatorChain.getId());
+ } finally {
+ _lock.unlock();
+ }
}
@Override
- public boolean hasNext() {
- if (!_ready.isEmpty()) {
- return true;
+ public void yield(OpChain operatorChain) {
+ long releaseTs = _ticker.get() + _releaseTimeoutMs;
+ _lock.lock();
+ try {
+ // It could be that this OpChain received data before it could be
yielded completely. In that case, mark it ready
+ // to get it scheduled asap.
+ if (_seenMail.contains(operatorChain.getId())) {
+ _seenMail.remove(operatorChain.getId());
+ _ready.add(operatorChain);
+ return;
+ }
+ _available.put(operatorChain.getId(), releaseTs);
+ } finally {
+ _lock.unlock();
}
- computeReady();
- return !_ready.isEmpty();
}
@Override
- public OpChain next() {
- OpChain op = _ready.poll();
- trace("Polled " + op);
+ public void onDataAvailable(MailboxIdentifier mailbox) {
+ // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+ OpChainId opChainId = new
OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
+ mailbox.getReceiverStageId());
+ // If this chain isn't alive as per the scheduler, don't do anything. If
the OpChain is registered after this, it
+ // will anyways be scheduled to run since new OpChains are run immediately.
+ if (!_aliveChains.containsKey(opChainId)) {
+ trace("got mail, but the OpChain is not registered so ignoring the event
" + mailbox);
+ return;
+ }
+ _lock.lock();
+ try {
+ if (!_aliveChains.containsKey(opChainId)) {
+ return;
+ }
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ } else {
+ // There are two cases here:
+ // 1. OpChain is in the _ready queue: the next time it gets polled,
we'll remove the _seenMail entry.
+ // 2. OpChain is running: the next time yield is called for it, we'll
check against _seenMail and put it back
+ // in the _ready queue again.
+ _seenMail.add(opChainId);
+ }
+ } finally {
+ _lock.unlock();
+ }
+ trace("got mail for " + mailbox);
+ }
+
+ @Override
+ public OpChain next(long time, TimeUnit timeUnit) throws
InterruptedException {
+ OpChain op = _ready.poll(time, timeUnit);
return op;
}
@Override
public int size() {
- return _ready.size() + _available.size();
+ return _aliveChains.size();
}
@Override
- public void shutDown() {
- if (_isShutDown) {
- return;
- }
- while (!_ready.isEmpty()) {
- _ready.poll().close();
- }
- while (!_available.isEmpty()) {
- _available.poll()._opChain.close();
- }
- _isShutDown = true;
+ public void shutdownNow() {
+ // TODO: Figure out shutdown flow in context of graceful shutdown.
+ _availableOpChainReleaseService.shutdownNow();
}
- private void computeReady() {
- Iterator<AvailableEntry> availableChains = _available.iterator();
-
- // the algorithm here iterates through all available chains and checks
- // to see whether or not any of the available chains have seen mail for
- // at least one of the mailboxes they receive from - if they do, then
- // we should make that chain available and remove any mail from the
- // mailboxes that it would consume from (after it is scheduled, all
- // mail available to it will have been consumed).
- while (availableChains.hasNext()) {
- AvailableEntry chain = availableChains.next();
- Sets.SetView<MailboxIdentifier> intersect =
Sets.intersection(chain._opChain.getReceivingMailbox(), _seenMail);
-
- if (!intersect.isEmpty()) {
- // use an immutable copy because set views use the underlying sets
- // directly, which would cause a concurrent modification exception
- // when removing data from _seenMail
- _seenMail.removeAll(intersect.immutableCopy());
- _ready.add(chain._opChain);
- availableChains.remove();
- } else if (_ticker.get() > chain._releaseTs) {
- LOGGER.warn("({}) Scheduling operator chain reading from {} after
timeout. Ready: {}, Available: {}, Mail: {}.",
- chain._opChain, chain._opChain.getReceivingMailbox(), _ready,
_available, _seenMail);
- _ready.add(chain._opChain);
- availableChains.remove();
- }
- }
+ @VisibleForTesting
+ int readySize() {
+ return _ready.size();
}
- private void trace(String operation) {
- LOGGER.trace("({}) Ready: {}, Available: {}, Mail: {}", operation, _ready,
_available, _seenMail);
+ @VisibleForTesting
+ int availableSize() {
+ return _available.size();
}
- private static class AvailableEntry {
-
- final OpChain _opChain;
- final long _releaseTs;
+ @VisibleForTesting
+ int seenMailSize() {
+ return _seenMail.size();
+ }
- private AvailableEntry(OpChain opChain, long releaseTs) {
- _opChain = opChain;
- _releaseTs = releaseTs;
- }
+ @VisibleForTesting
+ int aliveChainsSize() {
+ return _aliveChains.size();
Review Comment:
duplicated as `size()`?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -39,168 +47,204 @@
* but will only schedule them when there is work to be done. The availability
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
+ * <p>
+ * Design: There are 3 states for a OpChain:
+ *
+ * 1. Ready: This state means that an OpChain is ready to be run. If an
OpChain is in this state, then it would be in
+ * the _ready queue. The _ready queue is polled on every call to
{@link #next}.
+ * 2. Available/Yielded: This is when the OpChain was run and it returned a
no-op block, indicating that it has no
+ * new data to process. The {@link OpChainId} for
these OpChains are stored in the _available
+ * map, where the value of the map is the
release-timeout.
+ * 3. Running: When the OpChain is returned by the {@link #next} method, it
is considered to be in the Running state.
+ *
+ * The following state transitions are possible:
+ * 1. Ready ==> Running: This happens when the OpChain is returned by the
{@link #next} method.
+ * 2. Running ==> Available/Yielded: This happens when a running OpChain
returns a No-op block, following which a
+ * yield is called and there's no entry
for the OpChain in _seenMail.
+ * 3. Available/Yielded ==> Ready: This can happen in two cases: (1) When
yield is called but _seenMail has an
+ * entry for the corresponding OpChainId,
which means there was some data received
+ * by MailboxReceiveOperator after the last
poll. (2) When a sender has died or hasn't
+ * sent any data in the last
_releaseTimeoutMs milliseconds.
+ *
+ * The OpChain is considered "alive" from the time it is registered until it
is de-registered. Any reference to the
+ * OpChain or its related metadata is kept only while the OpChain is alive.
The {@link #onDataAvailable} callback
+ * can be called before an OpChain was ever registered. In that case, this
scheduler will simply ignore the callback,
+ * since once the OpChain gets registered it will anyways be put into the
_ready queue immediately. In case the
+ * OpChain never gets registered (e.g. if the broker couldn't dispatch it),
as long as the sender cleans up all
+ * resources that it has acquired, there will be no leak, since the
scheduler doesn't hold any references for
+ * non-alive OpChains.
+ * </p>
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT =
TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME =
"round-robin-scheduler-release-thread";
- private final long _releaseTimeout;
+ private final long _releaseTimeoutMs;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+ private final Map<OpChainId, OpChain> _aliveChains = new
ConcurrentHashMap<>();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
+
+ private final Lock _lock = new ReentrantLock();
+ private final ScheduledExecutorService _availableOpChainReleaseService;
- public RoundRobinScheduler(long releaseTimeout) {
- this(releaseTimeout, System::currentTimeMillis);
+ public RoundRobinScheduler(long releaseTimeoutMs) {
+ this(releaseTimeoutMs, System::currentTimeMillis);
}
- public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
- _releaseTimeout = releaseTimeoutMs;
+ RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+ Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for
round-robin scheduler should be > 0ms");
+ _releaseTimeoutMs = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ List<OpChainId> timedOutWaiting = new ArrayList<>();
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting
down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ if (_ticker.get() + _releaseTimeoutMs > entry.getValue()) {
+ timedOutWaiting.add(entry.getKey());
+ }
+ }
+ for (OpChainId opChainId : timedOutWaiting) {
+ _lock.lock();
+ try {
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }, _releaseTimeoutMs, _releaseTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- public void register(OpChain operatorChain, boolean isNew) {
- if (_isShutDown) {
- return;
- }
- // the first time an operator chain is scheduled, it should
- // immediately be considered ready in case it does not need
- // read from any mailbox (e.g. with a LiteralValueOperator)
- if (isNew) {
+ public void register(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.put(operatorChain.getId(), operatorChain);
_ready.add(operatorChain);
- } else {
- long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() +
_releaseTimeout;
- _available.add(new AvailableEntry(operatorChain, releaseTs));
+ } finally {
+ _lock.unlock();
}
trace("registered " + operatorChain);
}
@Override
- public void onDataAvailable(MailboxIdentifier mailbox) {
- // it may be possible to receive this callback when there's no
corresponding
- // operator chain registered to the mailbox - this can happen when either
- // (1) we get the callback before the first register is called or (2) we
get
- // the callback while the operator chain is executing. to account for this,
- // we just store it in a set of seen mail and only check for it when
hasNext
- // is called.
- //
- // note that scenario (2) may cause a false-positive schedule where an
operator
- // chain gets scheduled for mail that it had already processed, in which
case
- // the operator chain will simply do nothing and get put back onto the
queue.
- // scenario (2) may additionally cause a memory leak - if onDataAvailable
is
- // called with an EOS block _while_ the operator chain is executing, the
chain
- // will consume the EOS block and computeReady() will never remove the
mailbox
- // from the _seenMail set.
- //
- // TODO: fix the memory leak by adding a close(opChain) callback
- _seenMail.add(mailbox);
- trace("got mail for " + mailbox);
+ public void deregister(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.remove(operatorChain.getId());
Review Comment:
nit: add
operatorChainId = operatorChain.getId();
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java:
##########
@@ -18,52 +18,67 @@
*/
package org.apache.pinot.query.runtime.executor;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.query.mailbox.MailboxIdentifier;
import org.apache.pinot.query.runtime.operator.OpChain;
/**
- * An interface that defines different scheduling strategies to work with the
- * {@link OpChainSchedulerService}. All methods are thread safe and can be
guaranteed
- * to never be called concurrently - therefore all implementations may use data
- * structures that are not concurrent.
+ * An interface that defines different scheduling strategies to work with the
{@link OpChainSchedulerService}.
*/
+@ThreadSafe
public interface OpChainScheduler {
-
/**
+ * Registers a new OpChain with the scheduler.
* @param operatorChain the operator chain to register
- * @param isNew whether or not this is the first time the operator
is scheduled
*/
- void register(OpChain operatorChain, boolean isNew);
+ void register(OpChain operatorChain);
/**
- * This method is called whenever {@code mailbox} has new data available to
consume,
- * this can be useful for advanced scheduling algorithms
- *
- * @param mailbox the mailbox ID
+ * When the OpChain is finished, error or otherwise, deregister is called
for it so the scheduler can do any required
+ * cleanup. After an OpChain is de-registered, the scheduler service will
never call any other method for it.
+ * However, the {@link #onDataAvailable} callback may be called even after
an OpChain is de-registered, and the
+ * scheduler should handle that scenario.
+ * @param operatorChain an operator chain that is finished (error or
otherwise).
*/
- void onDataAvailable(MailboxIdentifier mailbox);
+ void deregister(OpChain operatorChain);
/**
- * This method is called when scheduler is terminating. It should clean up
all of the resources if there are any.
- * register() and onDataAvailable() shouldn't be called anymore after
shutDown is called.
+ * Used by {@link OpChainSchedulerService} to indicate that a given OpChain
can be suspended until it receives some
+ * data. Note that this method is only used by the scheduler service to
"indicate" that an OpChain can be suspended.
+ * The decision on whether to actually suspend or not can be taken by the
scheduler.
*/
- void shutDown();
+ void yield(OpChain opChain);
/**
- * @return whether or not there is any work for the scheduler to do
+ * A callback called whenever data is received for the given mailbox. This
can be used by the scheduler
+ * implementations to re-scheduled suspended OpChains. This method may be
called for an OpChain that has not yet
+ * been scheduled, or an OpChain that has already been de-registered.
+ * @param mailbox the mailbox ID
*/
- boolean hasNext();
+ void onDataAvailable(MailboxIdentifier mailbox);
/**
- * @return the next operator chain to process
- * @throws java.util.NoSuchElementException if {@link #hasNext()} returns
false
- * prior to this call
+ * Returns an OpChain that is ready to be run by {@link
OpChainSchedulerService}, waiting for the given time if
+ * there are no such OpChains ready yet. Will return null if there's no
ready OpChains even after the specified time.
+ *
+ * @param time non-negative value that determines the time the scheduler
will wait for new OpChains to be ready.
+ * @param timeUnit TimeUnit for the await time.
+ * @return a non-null OpChain that's ready to be run, or null if there's no
OpChain ready even after waiting for the
+ * given time.
+ * @throws InterruptedException if the wait for a ready OpChain was
interrupted.
*/
- OpChain next();
+ OpChain next(long time, TimeUnit timeUnit)
Review Comment:
how do you distinguish not ready and no OpChain exist?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -39,168 +47,204 @@
* but will only schedule them when there is work to be done. The availability
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
+ * <p>
+ * Design: There are 3 states for a OpChain:
+ *
+ * 1. Ready: This state means that an OpChain is ready to be run. If an
OpChain is in this state, then it would be in
+ * the _ready queue. The _ready queue is polled on every call to
{@link #next}.
+ * 2. Available/Yielded: This is when the OpChain was run and it returned a
no-op block, indicating that it has no
+ * new data to process. The {@link OpChainId} for
these OpChains are stored in the _available
+ * map, where the value of the map is the
release-timeout.
+ * 3. Running: When the OpChain is returned by the {@link #next} method, it
is considered to be in the Running state.
+ *
+ * The following state transitions are possible:
+ * 1. Ready ==> Running: This happens when the OpChain is returned by the
{@link #next} method.
+ * 2. Running ==> Available/Yielded: This happens when a running OpChain
returns a No-op block, following which a
+ * yield is called and there's no entry
for the OpChain in _seenMail.
+ * 3. Available/Yielded ==> Ready: This can happen in two cases: (1) When
yield is called but _seenMail has an
+ * entry for the corresponding OpChainId,
which means there was some data received
+ * by MailboxReceiveOperator after the last
poll. (2) When a sender has died or hasn't
+ * sent any data in the last
_releaseTimeoutMs milliseconds.
+ *
+ * The OpChain is considered "alive" from the time it is registered until it
is de-registered. Any reference to the
+ * OpChain or its related metadata is kept only while the OpChain is alive.
The {@link #onDataAvailable} callback
+ * can be called before an OpChain was ever registered. In that case, this
scheduler will simply ignore the callback,
+ * since once the OpChain gets registered it will anyways be put into the
_ready queue immediately. In case the
+ * OpChain never gets registered (e.g. if the broker couldn't dispatch it),
as long as the sender cleans up all
+ * resources that it has acquired, there will be no leak, since the
scheduler doesn't hold any references for
+ * non-alive OpChains.
+ * </p>
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT =
TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME =
"round-robin-scheduler-release-thread";
- private final long _releaseTimeout;
+ private final long _releaseTimeoutMs;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+ private final Map<OpChainId, OpChain> _aliveChains = new
ConcurrentHashMap<>();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
+
+ private final Lock _lock = new ReentrantLock();
+ private final ScheduledExecutorService _availableOpChainReleaseService;
- public RoundRobinScheduler(long releaseTimeout) {
- this(releaseTimeout, System::currentTimeMillis);
+ public RoundRobinScheduler(long releaseTimeoutMs) {
+ this(releaseTimeoutMs, System::currentTimeMillis);
}
- public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
- _releaseTimeout = releaseTimeoutMs;
+ RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+ Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for
round-robin scheduler should be > 0ms");
+ _releaseTimeoutMs = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ List<OpChainId> timedOutWaiting = new ArrayList<>();
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting
down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ if (_ticker.get() + _releaseTimeoutMs > entry.getValue()) {
+ timedOutWaiting.add(entry.getKey());
+ }
+ }
+ for (OpChainId opChainId : timedOutWaiting) {
+ _lock.lock();
+ try {
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }, _releaseTimeoutMs, _releaseTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- public void register(OpChain operatorChain, boolean isNew) {
- if (_isShutDown) {
- return;
- }
- // the first time an operator chain is scheduled, it should
- // immediately be considered ready in case it does not need
- // read from any mailbox (e.g. with a LiteralValueOperator)
- if (isNew) {
+ public void register(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.put(operatorChain.getId(), operatorChain);
_ready.add(operatorChain);
- } else {
- long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() +
_releaseTimeout;
- _available.add(new AvailableEntry(operatorChain, releaseTs));
+ } finally {
+ _lock.unlock();
}
trace("registered " + operatorChain);
}
@Override
- public void onDataAvailable(MailboxIdentifier mailbox) {
- // it may be possible to receive this callback when there's no
corresponding
- // operator chain registered to the mailbox - this can happen when either
- // (1) we get the callback before the first register is called or (2) we
get
- // the callback while the operator chain is executing. to account for this,
- // we just store it in a set of seen mail and only check for it when
hasNext
- // is called.
- //
- // note that scenario (2) may cause a false-positive schedule where an
operator
- // chain gets scheduled for mail that it had already processed, in which
case
- // the operator chain will simply do nothing and get put back onto the
queue.
- // scenario (2) may additionally cause a memory leak - if onDataAvailable
is
- // called with an EOS block _while_ the operator chain is executing, the
chain
- // will consume the EOS block and computeReady() will never remove the
mailbox
- // from the _seenMail set.
- //
- // TODO: fix the memory leak by adding a close(opChain) callback
- _seenMail.add(mailbox);
- trace("got mail for " + mailbox);
+ public void deregister(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.remove(operatorChain.getId());
+ // it could be that the onDataAvailable callback was called when the
OpChain was executing, in which case there
+ // could be a dangling entry in _seenMail.
+ _seenMail.remove(operatorChain.getId());
+ } finally {
+ _lock.unlock();
+ }
}
@Override
- public boolean hasNext() {
- if (!_ready.isEmpty()) {
- return true;
+ public void yield(OpChain operatorChain) {
+ long releaseTs = _ticker.get() + _releaseTimeoutMs;
+ _lock.lock();
+ try {
+ // It could be that this OpChain received data before it could be
yielded completely. In that case, mark it ready
+ // to get it scheduled asap.
+ if (_seenMail.contains(operatorChain.getId())) {
+ _seenMail.remove(operatorChain.getId());
+ _ready.add(operatorChain);
+ return;
+ }
+ _available.put(operatorChain.getId(), releaseTs);
+ } finally {
+ _lock.unlock();
}
- computeReady();
- return !_ready.isEmpty();
}
@Override
- public OpChain next() {
- OpChain op = _ready.poll();
- trace("Polled " + op);
+ public void onDataAvailable(MailboxIdentifier mailbox) {
+ // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+ OpChainId opChainId = new
OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
+ mailbox.getReceiverStageId());
+ // If this chain isn't alive as per the scheduler, don't do anything. If
the OpChain is registered after this, it
+ // will anyways be scheduled to run since new OpChains are run immediately.
+ if (!_aliveChains.containsKey(opChainId)) {
+ trace("got mail, but the OpChain is not registered so ignoring the event
" + mailbox);
+ return;
+ }
+ _lock.lock();
+ try {
+ if (!_aliveChains.containsKey(opChainId)) {
+ return;
+ }
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ } else {
+ // There are two cases here:
+ // 1. OpChain is in the _ready queue: the next time it gets polled,
we'll remove the _seenMail entry.
+ // 2. OpChain is running: the next time yield is called for it, we'll
check against _seenMail and put it back
+ // in the _ready queue again.
+ _seenMail.add(opChainId);
+ }
+ } finally {
+ _lock.unlock();
+ }
+ trace("got mail for " + mailbox);
+ }
+
+ @Override
+ public OpChain next(long time, TimeUnit timeUnit) throws
InterruptedException {
+ OpChain op = _ready.poll(time, timeUnit);
Review Comment:
Shall we check if the OpChain is still registered in _aliveChains?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]