kfaraz commented on code in PR #18098:
URL: https://github.com/apache/druid/pull/18098#discussion_r2138498330
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -257,42 +284,44 @@ private void processBatchesDue()
{
clearQueueIfNotLeader();
- // Process all the batches that are already due
- int numProcessedBatches = 0;
- AllocateRequestBatch nextBatch = processingQueue.peekFirst();
- while (nextBatch != null && nextBatch.isDue()) {
- // Process the next batch in the queue
- processingQueue.pollFirst();
- final AllocateRequestBatch currentBatch = nextBatch;
- boolean processed;
- try {
- processed = processBatch(currentBatch);
- }
- catch (Throwable t) {
- currentBatch.failPendingRequests(t);
- processed = true;
- log.error(t, "Error while processing batch[%s].", currentBatch.key);
- }
-
- // Requeue if not fully processed yet
- if (processed) {
- ++numProcessedBatches;
+ // Process the batches that are already due
+ int numSubmittedBatches = 0;
+ int numSkippedBatches = 0;
+
+ // Although thread-safe, this iterator might not see entries added by other
+ // concurrent threads. Those entries will be handled in the next
processBatchesDue().
+ final Iterator<AllocateRequestBatch> queueIterator =
processingQueue.iterator();
+ while (queueIterator.hasNext() && runningDatasources.size() <
config.getBatchAllocationNumThreads()) {
+ final AllocateRequestBatch nextBatch = queueIterator.next();
+ final String dataSource = nextBatch.key.dataSource;
+ if (nextBatch.isDue()) {
+ if (runningDatasources.contains(dataSource)) {
+ // Skip this batch as another batch for the same datasource is in
progress
Review Comment:
Updated. Added a small delay of 5 millis in case anything was skipped or if
all threads are busy.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java:
##########
@@ -80,13 +82,25 @@ public class SegmentAllocationQueue
private final ServiceEmitter emitter;
/**
- * Single-threaded executor to process allocation queue.
+ * Single-threaded executor to pick up jobs from allocation queue and assign
+ * to worker threads.
*/
- private final ScheduledExecutorService executor;
+ private final ScheduledExecutorService managerExec;
+
+ /**
+ * Multithreaded executor to process allocation jobs.
+ */
+ private final ScheduledExecutorService workerExec;
+
+ /**
+ * Thread-safe list of datasources for which a segment allocation is
currently in-progress.
+ */
+ private final List<String> runningDatasources =
Collections.synchronizedList(new ArrayList<>());
Review Comment:
Updated.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java:
##########
@@ -39,6 +39,9 @@ public class TaskLockConfig
@JsonProperty
private boolean batchAllocationReduceMetadataIO = true;
+ @JsonProperty
+ private int batchAllocationNumThreads = 5;
Review Comment:
Added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]