kfaraz commented on code in PR #13369: URL: https://github.com/apache/druid/pull/13369#discussion_r1035524934
########## indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java: ########## @@ -0,0 +1,694 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.google.inject.Inject; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Queue for {@link SegmentAllocateRequest}s. + */ +@ManageLifecycle +public class SegmentAllocationQueue +{ + private static final Logger log = new Logger(SegmentAllocationQueue.class); + + private static final int MAX_QUEUE_SIZE = 2000; + + private final long maxWaitTimeMillis; + private final boolean enabled; + + private final TaskLockbox taskLockbox; + private final ScheduledExecutorService executor; + private final IndexerMetadataStorageCoordinator metadataStorage; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final ServiceEmitter emitter; + + private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>(); + private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + + @Inject + public SegmentAllocationQueue( + TaskLockbox taskLockbox, + TaskLockConfig taskLockConfig, + IndexerMetadataStorageCoordinator metadataStorage, + ServiceEmitter emitter, + ScheduledExecutorFactory executorFactory + ) + { + this.emitter = emitter; + this.taskLockbox = taskLockbox; + this.metadataStorage = metadataStorage; + this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime(); + this.enabled = taskLockConfig.isBatchSegmentAllocation(); + + this.executor = executorFactory.create(1, "SegmentAllocQueue-%s"); + } + + @LifecycleStart + public void start() + { + log.info("Initializing segment allocation queue."); + } + + @LifecycleStop + public void stop() + { + log.info("Tearing down segment allocation queue."); + executor.shutdownNow(); + } + + public boolean isEnabled() + { + return enabled; + } + + private void scheduleQueuePoll(long delay) + { + executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS); + } + + /** + * Gets the number of batches currently in the queue. + */ + public int size() + { + return processingQueue.size(); + } + + /** + * Queues a SegmentAllocateRequest. The returned future may complete successfully + * with a non-null value or with a non-null value. + */ + public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request) + { + if (!isLeader.get()) { + throw new ISE("Cannot allocate segment if not leader."); + } else if (!isEnabled()) { + throw new ISE("Batched segment allocation is disabled."); + } + + final AllocateRequestKey requestKey = new AllocateRequestKey(request, false); + final AtomicReference<Future<SegmentIdWithShardSpec>> requestFuture = new AtomicReference<>(); + + // Possible race condition: + // t1 -> new batch is added to queue or batch already exists in queue + // t2 -> executor pops batch, processes all requests in it + // t1 -> new request is added to dangling batch and is never picked up + // Solution: For existing batch, call keyToBatch.remove() on the key to + // wait on keyToBatch.compute() to finish before proceeding with processBatch(). + // For new batch, keyToBatch.remove() would not wait as key is not in map yet + // but a new batch is unlikely to be due immediately, so it won't get popped right away. + keyToBatch.compute(requestKey, (key, existingBatch) -> { + if (existingBatch == null) { + AllocateRequestBatch newBatch = new AllocateRequestBatch(key); + requestFuture.set(newBatch.add(request)); + return addBatchToQueue(newBatch) ? newBatch : null; + } else { + requestFuture.set(existingBatch.add(request)); + return existingBatch; + } + }); + + return requestFuture.get(); + } + + /** + * Tries to add the given batch to the processing queue. If the queue is full, + * marks the batch as completed failing all the pending requests. + */ + private boolean addBatchToQueue(AllocateRequestBatch batch) + { + batch.resetQueueTime(); + if (processingQueue.offer(batch)) { + log.debug("Added a new batch [%s] to queue.", batch.key); + return true; + } else { + log.warn("Cannot add batch [%s] as queue is full. Failing [%d] requests.", batch.key, batch.size()); Review Comment: It can also happen when things keep getting retried, due to frequent updates to the set of used segments. It would be up to the client to retry in such cases. The action can be to look at the emitted metrics `batch/runTime` and `batch/attempts`. A greater value of the first would be caused by metadata slowness, the second is retries. The user can then determine if the metadata store needs resizing. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
