walterddr commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1113292966


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -39,168 +47,208 @@
  * but will only schedule them when there is work to be done. The availability
  * of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
  * callback.
+ * <p>
+ *   Design: There are 3 states for a OpChain:
+ *
+ *   1. Ready: This state means that an OpChain is ready to be run. If an 
OpChain is in this state, then it would be in
+ *             the _ready queue. The _ready queue is polled on every call to 
{@link #next}.
+ *   2. Available/Yielded: This is when the OpChain was run and it returned a 
no-op block, indicating that it has no
+ *                         new data to process. The {@link OpChainId} for 
these OpChains are stored in the _available
+ *                         map, where the value of the map is the 
release-timeout.
+ *   3. Running: When the OpChain is returned by the {@link #next} method, it 
is considered to be in the Running state.
+ *
+ *   The following state transitions are possible:
+ *   1. Ready ==> Running: This happens when the OpChain is returned by the 
{@link #next} method.
+ *   2. Running ==> Available/Yielded: This happens when a running OpChain 
returns a No-op block, following which a
+ *                                     yield is called and there's no entry 
for the OpChain in _seenMail.
+ *   3. Available/Yielded ==> Ready: This can happen in two cases: (1) When 
yield is called but _seenMail has an
+ *                                   entry for the corresponding OpChainId, 
which means there was some data received
+ *                                   by MailboxReceiveOperator after the last 
poll. (2) When a sender has died or hasn't
+ *                                   sent any data in the last 
_releaseTimeoutMs milliseconds.
+ *
+ *   The OpChain is considered "alive" from the time it is registered until it 
is de-registered. Any reference to the
+ *   OpChain or its related metadata is kept only while the OpChain is alive. 
The {@link #onDataAvailable} callback
+ *   can be called before an OpChain was ever registered. In that case, this 
scheduler will simply ignore the callback,
+ *   since once the OpChain gets registered it will anyways be put into the 
_ready queue immediately. In case the
+ *   OpChain never gets registered (e.g. if the broker couldn't dispatch it), 
as long as the sender cleans up all
+ *   resources that it has acquired, there will be no leak, since the 
scheduler doesn't hold any references for
+ *   non-alive OpChains.
+ * </p>
  */
-@NotThreadSafe
+@ThreadSafe
 public class RoundRobinScheduler implements OpChainScheduler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RoundRobinScheduler.class);
-  private static final long DEFAULT_RELEASE_TIMEOUT = 
TimeUnit.MINUTES.toMillis(1);
+  private static final String AVAILABLE_RELEASE_THREAD_NAME = 
"round-robin-scheduler-release-thread";
 
-  private final long _releaseTimeout;
+  private final long _releaseTimeoutMs;
   private final Supplier<Long> _ticker;
 
-  // the _available queue contains operator chains that are available
-  // to this scheduler but do not have any data available to schedule
-  // while the _ready queue contains the operator chains that are ready
-  // to be scheduled (have data, or are first-time scheduled)
-  private final Queue<AvailableEntry> _available = new LinkedList<>();
-  private final Queue<OpChain> _ready = new LinkedList<>();
-
-  private boolean _isShutDown = false;
-
-  // using a Set here is acceptable because calling hasNext() and
-  // onDataAvailable() cannot occur concurrently - that means that
-  // anytime we schedule a new operator based on the presence of
-  // mail we can be certain that it will consume all of the mail
-  // form that mailbox, even if there are multiple items in it. If,
-  // during execution of that operator, more mail appears, then the
-  // operator will be rescheduled immediately potentially resulting
-  // in a false-positive schedule
-  @VisibleForTesting
-  final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+  private final Map<OpChainId, OpChain> _aliveChains = new 
ConcurrentHashMap<>();
+  final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+  private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
 
-  public RoundRobinScheduler() {
-    this(DEFAULT_RELEASE_TIMEOUT);
-  }
+  private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
+
+  private final Lock _lock = new ReentrantLock();
+  private final ScheduledExecutorService _availableOpChainReleaseService;
 
-  public RoundRobinScheduler(long releaseTimeout) {
-    this(releaseTimeout, System::currentTimeMillis);
+  public RoundRobinScheduler(long releaseTimeoutMs) {
+    this(releaseTimeoutMs, System::currentTimeMillis);
   }
 
-  public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
-    _releaseTimeout = releaseTimeoutMs;
+  RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+    Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for 
round-robin scheduler should be > 0ms");
+    _releaseTimeoutMs = releaseTimeoutMs;
     _ticker = ticker;
+    _availableOpChainReleaseService = 
Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r);
+      t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+      t.setDaemon(true);
+      return t;
+    });
+    _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+      List<OpChainId> timedOutWaiting = new ArrayList<>();
+      for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+        if (Thread.interrupted()) {
+          LOGGER.warn("Thread={} interrupted. Scheduler may be shutting 
down.", AVAILABLE_RELEASE_THREAD_NAME);
+          break;
+        }
+        if (_ticker.get() > entry.getValue()) {
+          timedOutWaiting.add(entry.getKey());
+        }
+      }
+      for (OpChainId opChainId : timedOutWaiting) {
+        _lock.lock();
+        try {
+          if (_available.containsKey(opChainId)) {
+            _available.remove(opChainId);
+            _ready.offer(_aliveChains.get(opChainId));

Review Comment:
   this offer is blocked or error out if the queue is full. but I guess this is 
currently not a concern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to