kfaraz commented on code in PR #18098:
URL: https://github.com/apache/druid/pull/18098#discussion_r2138419299
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -257,42 +284,44 @@ private void processBatchesDue()
{
clearQueueIfNotLeader();
- // Process all the batches that are already due
- int numProcessedBatches = 0;
- AllocateRequestBatch nextBatch = processingQueue.peekFirst();
- while (nextBatch != null && nextBatch.isDue()) {
- // Process the next batch in the queue
- processingQueue.pollFirst();
- final AllocateRequestBatch currentBatch = nextBatch;
- boolean processed;
- try {
- processed = processBatch(currentBatch);
- }
- catch (Throwable t) {
- currentBatch.failPendingRequests(t);
- processed = true;
- log.error(t, "Error while processing batch[%s].", currentBatch.key);
- }
-
- // Requeue if not fully processed yet
- if (processed) {
- ++numProcessedBatches;
+ // Process the batches that are already due
+ int numSubmittedBatches = 0;
+ int numSkippedBatches = 0;
+
+ // Although thread-safe, this iterator might not see entries added by other
+ // concurrent threads. Those entries will be handled in the next
processBatchesDue().
+ final Iterator<AllocateRequestBatch> queueIterator =
processingQueue.iterator();
+ while (queueIterator.hasNext() && runningDatasources.size() <
config.getBatchAllocationNumThreads()) {
+ final AllocateRequestBatch nextBatch = queueIterator.next();
+ final String dataSource = nextBatch.key.dataSource;
+ if (nextBatch.isDue()) {
+ if (runningDatasources.contains(dataSource)) {
+ // Skip this batch as another batch for the same datasource is in
progress
Review Comment:
Yes, thanks for catching this! This was a mental TODO but I forgot to mark
it.
I will check what we can do here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]