kfaraz commented on code in PR #18098:
URL: https://github.com/apache/druid/pull/18098#discussion_r2138951141


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -257,42 +284,44 @@ private void processBatchesDue()
   {
     clearQueueIfNotLeader();
 
-    // Process all the batches that are already due
-    int numProcessedBatches = 0;
-    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
-    while (nextBatch != null && nextBatch.isDue()) {
-      // Process the next batch in the queue
-      processingQueue.pollFirst();
-      final AllocateRequestBatch currentBatch = nextBatch;
-      boolean processed;
-      try {
-        processed = processBatch(currentBatch);
-      }
-      catch (Throwable t) {
-        currentBatch.failPendingRequests(t);
-        processed = true;
-        log.error(t, "Error while processing batch[%s].", currentBatch.key);
-      }
-
-      // Requeue if not fully processed yet
-      if (processed) {
-        ++numProcessedBatches;
+    // Process the batches that are already due
+    int numSubmittedBatches = 0;
+    int numSkippedBatches = 0;
+
+    // Although thread-safe, this iterator might not see entries added by other
+    // concurrent threads. Those entries will be handled in the next 
processBatchesDue().
+    final Iterator<AllocateRequestBatch> queueIterator = 
processingQueue.iterator();
+    while (queueIterator.hasNext() && runningDatasources.size() < 
config.getBatchAllocationNumThreads()) {
+      final AllocateRequestBatch nextBatch = queueIterator.next();
+      final String dataSource = nextBatch.key.dataSource;
+      if (nextBatch.isDue()) {
+        if (runningDatasources.contains(dataSource)) {
+          // Skip this batch as another batch for the same datasource is in 
progress

Review Comment:
   Thanks for the suggestion, @gianm!
   
   I have updated the approach in the PR but with some modifications that 
seemed to adhere
   to the current design of the class better.
   
   - Remove the delay
   - When skipping a batch, mark it as "skipped" and emit the metric. Do not 
emit metric again if already skipped.
   - Do not reschedule queue poll if all workers are busy OR if queue is empty 
OR if all batches were skipped.
   - When a worker finishes, schedule a queue poll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to