gianm commented on code in PR #18098:
URL: https://github.com/apache/druid/pull/18098#discussion_r2138522798


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -257,42 +284,44 @@ private void processBatchesDue()
   {
     clearQueueIfNotLeader();
 
-    // Process all the batches that are already due
-    int numProcessedBatches = 0;
-    AllocateRequestBatch nextBatch = processingQueue.peekFirst();
-    while (nextBatch != null && nextBatch.isDue()) {
-      // Process the next batch in the queue
-      processingQueue.pollFirst();
-      final AllocateRequestBatch currentBatch = nextBatch;
-      boolean processed;
-      try {
-        processed = processBatch(currentBatch);
-      }
-      catch (Throwable t) {
-        currentBatch.failPendingRequests(t);
-        processed = true;
-        log.error(t, "Error while processing batch[%s].", currentBatch.key);
-      }
-
-      // Requeue if not fully processed yet
-      if (processed) {
-        ++numProcessedBatches;
+    // Process the batches that are already due
+    int numSubmittedBatches = 0;
+    int numSkippedBatches = 0;
+
+    // Although thread-safe, this iterator might not see entries added by other
+    // concurrent threads. Those entries will be handled in the next 
processBatchesDue().
+    final Iterator<AllocateRequestBatch> queueIterator = 
processingQueue.iterator();
+    while (queueIterator.hasNext() && runningDatasources.size() < 
config.getBatchAllocationNumThreads()) {
+      final AllocateRequestBatch nextBatch = queueIterator.next();
+      final String dataSource = nextBatch.key.dataSource;
+      if (nextBatch.isDue()) {
+        if (runningDatasources.contains(dataSource)) {
+          // Skip this batch as another batch for the same datasource is in 
progress

Review Comment:
   Hmm. I think this would still lead to a ton of `task/action/batch/skipped` 
metrics being emitted if we have a long-running allocation in flight with 
another queued up. Fixing that by extending the min wait time would be bad, 
because that would slow down our responsiveness to allocation requests. Delays 
are undesirable anyway- we want everything to be as reactive as possible.
   
   Is there an alternate approach you could go with? Maybe when we skip a 
batch, put it into a separate data structure keyed by datasource. Then when the 
current batch finishes, the worker thread running that batch could move the 
skipped batches back to the main queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to