ankitsultana commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1110441581


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -79,128 +69,145 @@ public RoundRobinScheduler(long releaseTimeout) {
   public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
     _releaseTimeout = releaseTimeoutMs;
     _ticker = ticker;
+    _availableOpChainReleaseService = 
Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r);
+      t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+      t.setDaemon(true);
+      return t;
+    });
+    if (releaseTimeoutMs > 0) {
+      _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+        for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+          if (Thread.interrupted()) {
+            LOGGER.warn("Thread={} interrupted. Scheduler may be shutting 
down.", AVAILABLE_RELEASE_THREAD_NAME);
+            break;
+          }
+          OpChainId opChainId = entry.getKey();
+          if (_ticker.get() + _releaseTimeout > entry.getValue()) {
+            _lock.lock();
+            try {
+              if (_available.containsKey(opChainId)) {
+                _available.remove(opChainId);
+                _ready.offer(_aliveChains.get(opChainId));
+              }
+            } finally {
+              _lock.unlock();
+            }
+          }
+        }
+      }, _releaseTimeout, _releaseTimeout, TimeUnit.MILLISECONDS);
+    }
   }
 
   @Override
-  public void register(OpChain operatorChain, boolean isNew) {
-    if (_isShutDown) {
-      return;
-    }
-    // the first time an operator chain is scheduled, it should
-    // immediately be considered ready in case it does not need
-    // read from any mailbox (e.g. with a LiteralValueOperator)
-    if (isNew) {
+  public void register(OpChain operatorChain) {
+    _lock.lock();
+    try {
+      _aliveChains.put(operatorChain.getId(), operatorChain);
       _ready.add(operatorChain);
-    } else {
-      long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() + 
_releaseTimeout;
-      _available.add(new AvailableEntry(operatorChain, releaseTs));
+    } finally {
+      _lock.unlock();
     }
     trace("registered " + operatorChain);
   }
 
   @Override
-  public void onDataAvailable(MailboxIdentifier mailbox) {
-    // it may be possible to receive this callback when there's no 
corresponding
-    // operator chain registered to the mailbox - this can happen when either
-    // (1) we get the callback before the first register is called or (2) we 
get
-    // the callback while the operator chain is executing. to account for this,
-    // we just store it in a set of seen mail and only check for it when 
hasNext
-    // is called.
-    //
-    // note that scenario (2) may cause a false-positive schedule where an 
operator
-    // chain gets scheduled for mail that it had already processed, in which 
case
-    // the operator chain will simply do nothing and get put back onto the 
queue.
-    // scenario (2) may additionally cause a memory leak - if onDataAvailable 
is
-    // called with an EOS block _while_ the operator chain is executing, the 
chain
-    // will consume the EOS block and computeReady() will never remove the 
mailbox
-    // from the _seenMail set.
-    //
-    // TODO: fix the memory leak by adding a close(opChain) callback
-    _seenMail.add(mailbox);
-    trace("got mail for " + mailbox);
-  }
-
-  @Override
-  public boolean hasNext() {
-    if (!_ready.isEmpty()) {
-      return true;
+  public void deregister(OpChain operatorChain) {
+    _lock.lock();
+    try {
+      _aliveChains.remove(operatorChain.getId());
+      // deregister can only be called if the OpChain is running, so remove 
from _runningChains.
+      _runningChains.remove(operatorChain.getId());
+      // it could be that the onDataAvailable callback was called when the 
OpChain was executing, in which case there
+      // could be a dangling entry in _seenMail.
+      _seenMail.remove(operatorChain.getId());
+    } finally {
+      _lock.unlock();
     }
-    computeReady();
-    return !_ready.isEmpty();
-  }
-
-  @Override
-  public OpChain next() {
-    OpChain op = _ready.poll();
-    trace("Polled " + op);
-    return op;
   }
 
   @Override
-  public int size() {
-    return _ready.size() + _available.size();
+  public void yield(OpChain operatorChain) {
+    long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() + 
_releaseTimeout;
+    _lock.lock();
+    try {
+      _runningChains.remove(operatorChain.getId());
+      // It could be that this OpChain received data before it could be 
yielded completely. In that case, mark it ready
+      // to get it scheduled asap.
+      if (_seenMail.contains(operatorChain.getId())) {
+        _seenMail.remove(operatorChain.getId());
+        _ready.add(operatorChain);
+        return;
+      }
+      _available.put(operatorChain.getId(), releaseTs);
+    } finally {
+      _lock.unlock();
+    }
   }
 
   @Override
-  public void shutDown() {
-    if (_isShutDown) {
+  public void onDataAvailable(MailboxIdentifier mailbox) {
+    // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+    OpChainId opChainId = new 
OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
+        mailbox.getReceiverStageId());
+    // If this chain isn't alive as per the scheduler, don't do anything. If 
the OpChain is registered after this, it
+    // will anyways be scheduled to run since new OpChains are run immediately.
+    if (!_aliveChains.containsKey(opChainId)) {

Review Comment:
   Yeah so in that case once the OpChain finally gets registered in the 
receiver, since we directly add it to the ready queue, it will start execution 
asap.
   
   The way I think about `onDataAvailable` callback in context of this 
scheduler implementation is that it is supposed to only signal that a yielded 
OpChain can be resumed. If the OpChain itself is not registered yet, we can 
ignore this. Once it gets scheduled it will run anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to