walterddr commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1110440128
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -79,128 +69,145 @@ public RoundRobinScheduler(long releaseTimeout) {
public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
_releaseTimeout = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ if (releaseTimeoutMs > 0) {
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting
down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ OpChainId opChainId = entry.getKey();
+ if (_ticker.get() + _releaseTimeout > entry.getValue()) {
+ _lock.lock();
+ try {
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }
+ }, _releaseTimeout, _releaseTimeout, TimeUnit.MILLISECONDS);
+ }
}
@Override
- public void register(OpChain operatorChain, boolean isNew) {
- if (_isShutDown) {
- return;
- }
- // the first time an operator chain is scheduled, it should
- // immediately be considered ready in case it does not need
- // read from any mailbox (e.g. with a LiteralValueOperator)
- if (isNew) {
+ public void register(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.put(operatorChain.getId(), operatorChain);
_ready.add(operatorChain);
- } else {
- long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() +
_releaseTimeout;
- _available.add(new AvailableEntry(operatorChain, releaseTs));
+ } finally {
+ _lock.unlock();
}
trace("registered " + operatorChain);
}
@Override
- public void onDataAvailable(MailboxIdentifier mailbox) {
- // it may be possible to receive this callback when there's no
corresponding
- // operator chain registered to the mailbox - this can happen when either
- // (1) we get the callback before the first register is called or (2) we
get
- // the callback while the operator chain is executing. to account for this,
- // we just store it in a set of seen mail and only check for it when
hasNext
- // is called.
- //
- // note that scenario (2) may cause a false-positive schedule where an
operator
- // chain gets scheduled for mail that it had already processed, in which
case
- // the operator chain will simply do nothing and get put back onto the
queue.
- // scenario (2) may additionally cause a memory leak - if onDataAvailable
is
- // called with an EOS block _while_ the operator chain is executing, the
chain
- // will consume the EOS block and computeReady() will never remove the
mailbox
- // from the _seenMail set.
- //
- // TODO: fix the memory leak by adding a close(opChain) callback
- _seenMail.add(mailbox);
- trace("got mail for " + mailbox);
- }
-
- @Override
- public boolean hasNext() {
- if (!_ready.isEmpty()) {
- return true;
+ public void deregister(OpChain operatorChain) {
+ _lock.lock();
+ try {
+ _aliveChains.remove(operatorChain.getId());
+ // deregister can only be called if the OpChain is running, so remove
from _runningChains.
+ _runningChains.remove(operatorChain.getId());
+ // it could be that the onDataAvailable callback was called when the
OpChain was executing, in which case there
+ // could be a dangling entry in _seenMail.
+ _seenMail.remove(operatorChain.getId());
+ } finally {
+ _lock.unlock();
}
- computeReady();
- return !_ready.isEmpty();
- }
-
- @Override
- public OpChain next() {
- OpChain op = _ready.poll();
- trace("Polled " + op);
- return op;
}
@Override
- public int size() {
- return _ready.size() + _available.size();
+ public void yield(OpChain operatorChain) {
+ long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() +
_releaseTimeout;
+ _lock.lock();
+ try {
+ _runningChains.remove(operatorChain.getId());
+ // It could be that this OpChain received data before it could be
yielded completely. In that case, mark it ready
+ // to get it scheduled asap.
+ if (_seenMail.contains(operatorChain.getId())) {
+ _seenMail.remove(operatorChain.getId());
+ _ready.add(operatorChain);
+ return;
+ }
+ _available.put(operatorChain.getId(), releaseTs);
+ } finally {
+ _lock.unlock();
+ }
}
@Override
- public void shutDown() {
- if (_isShutDown) {
+ public void onDataAvailable(MailboxIdentifier mailbox) {
+ // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+ OpChainId opChainId = new
OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
+ mailbox.getReceiverStageId());
+ // If this chain isn't alive as per the scheduler, don't do anything. If
the OpChain is registered after this, it
+ // will anyways be scheduled to run since new OpChains are run immediately.
+ if (!_aliveChains.containsKey(opChainId)) {
Review Comment:
isn't it possible that the _aliveChains is not registered (e.g. downstream
stage gets execute faster and start ending mails) but the _aliveChain is not
yet registered with the current stage)? what would happen then, say there's no
more onDataAvailable trigger?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]