abhishekrb19 commented on code in PR #18028:
URL: https://github.com/apache/druid/pull/18028#discussion_r2114262487
##########
server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java:
##########
@@ -39,14 +40,24 @@ public class SegmentsMetadataManagerConfig
@JsonProperty
private final SegmentMetadataCache.UsageMode useIncrementalCache;
+ @JsonProperty
+ private final UnusedSegmentKillerConfig killUnused;
+
@JsonCreator
public SegmentsMetadataManagerConfig(
@JsonProperty("pollDuration") Period pollDuration,
- @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache
+ @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache,
+ @JsonProperty("killUnused") UnusedSegmentKillerConfig killUnused
)
{
this.pollDuration = Configs.valueOrDefault(pollDuration,
Period.minutes(1));
this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.NEVER);
+ this.killUnused = Configs.valueOrDefault(killUnused, new
UnusedSegmentKillerConfig(null, null));
+ if (this.killUnused.isEnabled() && this.useIncrementalCache ==
SegmentMetadataCache.UsageMode.NEVER) {
+ throw InvalidInput.exception(
Review Comment:
1. Please include the runtime properties in the error message to make
corrective action easier; something like `Please set
"druid.manager.segments.useIncrementalCache=true" when ...`.
2. Should the target persona in this case be an operator rather than an end
user?
##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java:
##########
@@ -286,6 +287,37 @@ public List<DataSegmentPlus>
iterateAllUnusedSegmentsForDatasource(
}
}
+ /**
+ * Retrieves unused segments that are fully contained within the given
interval.
+ *
+ * @param interval Returned segments must be fully contained within
this
+ * interval
+ * @param versions Optional list of segment versions. If passed as
null,
+ * all segment versions are eligible.
+ * @param limit Maximum number of segments to return. If passed as
null,
+ * all segments are returned.
+ * @param maxUpdatedTime Returned segments must have a {@code
used_status_last_updated}
+ * which is either null or earlier than this value.
+ */
+ public List<DataSegment> findUnusedSegments(
+ String dataSource,
+ Interval interval,
+ @Nullable List<String> versions,
+ @Nullable Integer limit,
+ @Nullable DateTime maxUpdatedTime
+ )
+ {
+ try (
+ final CloseableIterator<DataSegment> iterator =
+ retrieveUnusedSegments(dataSource, List.of(interval), versions,
limit, null, null, maxUpdatedTime)
+ ) {
+ return ImmutableList.copyOf(iterator);
+ }
+ catch (IOException e) {
+ throw DruidException.defensive(e, "Error while reading unused segments");
+ }
Review Comment:
Hmm, I don't think an `IOException` should be a developer-facing defensive
exception. I think throwing a `RuntimeException` or an equivalent
DruidException would work better.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
Review Comment:
Given that this is a new feature, I think having an info log here is useful
when this duty is enabled on the Overlord
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldResetKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ resetKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%d] millis.",
Review Comment:
```suggestion
"Cancelling kill task[%s] as it has been running for [%,d]
millis.",
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldResetKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ resetKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%d] millis.",
+ taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+ );
+ taskInfo.future.cancel(true);
+ }
+ }
+
+ @Override
+ public DutySchedule getSchedule()
+ {
+ if (isEnabled()) {
+ // Check every hour that the kill queue is being processed normally
+ return new DutySchedule(Duration.standardHours(1).getMillis(),
Duration.standardMinutes(1).getMillis());
+ } else {
+ return new DutySchedule(0, 0);
+ }
+ }
+
+ private void updateStateIfNewLeader()
+ {
+ final int currentLeaderTerm = leaderSelector.localTerm();
+ if (currentLeaderTerm != previousLeaderTerm) {
+ previousLeaderTerm = currentLeaderTerm;
+ killQueue.clear();
+ lastResetTime.set(null);
+ }
+ }
+
+ /**
+ * Returns true if the kill queue is empty or if the queue has not been reset
+ * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}.
+ */
+ private boolean shouldResetKillQueue()
+ {
+ final DateTime now = DateTimes.nowUtc().plus(1);
+
+ return killQueue.isEmpty()
+ || lastResetTime.get() == null
+ || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now);
+ }
+
+ /**
+ * Resets the kill queue with fresh jobs.
+ * This method need not handle race conditions as it is always run on
+ * {@link #exec} which is single-threaded.
+ */
+ private void resetKillQueue()
+ {
+ final Stopwatch resetDuration = Stopwatch.createStarted();
+ try {
+ killQueue.clear();
+
+ final Set<String> dataSources =
storageCoordinator.retrieveAllDatasourceNames();
+
+ final Map<String, Integer> dataSourceToIntervalCounts = new HashMap<>();
+ for (String dataSource : dataSources) {
+ storageCoordinator.retrieveUnusedSegmentIntervals(dataSource,
MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach(
+ interval -> {
+ dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum);
+ killQueue.offer(new KillCandidate(dataSource, interval));
+ }
+ );
+ }
+
+ lastResetTime.set(DateTimes.nowUtc());
+ log.info(
+ "Queued [%d] kill jobs for [%d] datasources in [%d] millis.",
+ killQueue.size(), dataSources.size(), resetDuration.millisElapsed()
+ );
+ dataSourceToIntervalCounts.forEach(
+ (dataSource, intervalCount) -> emitMetric(
+ Metric.UNUSED_SEGMENT_INTERVALS,
+ intervalCount,
+ Map.of(DruidMetrics.DATASOURCE, dataSource)
+ )
+ );
+ emitMetric(Metric.QUEUE_RESET_TIME, resetDuration.millisElapsed(), null);
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while resetting kill queue.");
+ }
+ }
+
+ /**
+ * Launches an {@link EmbeddedKillTask} on the {@link #exec} for the next
+ * {@link KillCandidate} in the {@link #killQueue}. This method returns
+ * immediately.
+ */
+ public void startNextJobInKillQueue()
Review Comment:
Looks like this method can be `private` scoped.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionTestKit;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
+import org.apache.druid.indexing.test.TestDataSegmentKiller;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector;
+import
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnusedSegmentsKillerTest
+{
+ @Rule
+ public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
+
+ private static final List<DataSegment> WIKI_SEGMENTS_1X10D =
Review Comment:
The `1X10D` in the variable got me thinking this was a roman numeral of
sorts :) I see we use this convention in a few other places too
##########
server/src/main/java/org/apache/druid/metadata/UnusedSegmentKillerConfig.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+
+/**
+ * Config for {@code UnusedSegmentKiller}.
+ */
+public class UnusedSegmentKillerConfig
+{
+ @JsonProperty("enabled")
+ private final boolean enabled;
+
+ @JsonProperty("bufferPeriod")
+ private final Period bufferPeriod;
+
+ @JsonCreator
+ public UnusedSegmentKillerConfig(
+ @JsonProperty("enabled") @Nullable Boolean enabled,
+ @JsonProperty("bufferPeriod") @Nullable Period bufferPeriod
+ )
+ {
+ this.enabled = Configs.valueOrDefault(enabled, false);
+ this.bufferPeriod = Configs.valueOrDefault(bufferPeriod, Period.days(90));
+ }
+
+ /**
+ * Period for which segments are retained even after being marked as unused.
+ * If this returns null, segments are never killed by the {@code
UnusedSegmentKiller}
+ * but they might still be killed by the Coordinator.
Review Comment:
Can this actually be null since the default in the ctr is set to 90 days?
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -65,22 +72,26 @@ public Void perform(Task task, TaskActionToolbox toolbox)
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
segments);
try {
+ final Set<Interval> intervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
toolbox.getTaskLockbox().doInCriticalSection(
task,
-
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
- CriticalAction.builder()
- .onValidLocks(
- () -> {
-
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
- return null;
- }
- )
- .onInvalidLocks(
- () -> {
- throw new ISE("Some locks for task[%s] are
already revoked", task.getId());
- }
- )
- .build()
+ intervals,
+ CriticalAction.builder().onValidLocks(
+ () -> {
+ int numDeletedSegments =
+
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
+ log.info(
+ "Deleted [%d] segments out of requested[%d] from"
+ + " metadata store for task[%s], datasource[%s],
intervals[%s].",
+ numDeletedSegments, segments.size(), task.getId(),
task.getDataSource(), intervals
Review Comment:
Also, in the interest of keeping critical sections as small as possible, I
think it would be better to move this log line outside the block alongside
where the metrics are emitted below.
##########
server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java:
##########
@@ -39,14 +40,24 @@ public class SegmentsMetadataManagerConfig
@JsonProperty
private final SegmentMetadataCache.UsageMode useIncrementalCache;
+ @JsonProperty
+ private final UnusedSegmentKillerConfig killUnused;
+
@JsonCreator
public SegmentsMetadataManagerConfig(
@JsonProperty("pollDuration") Period pollDuration,
- @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache
+ @JsonProperty("useIncrementalCache") SegmentMetadataCache.UsageMode
useIncrementalCache,
+ @JsonProperty("killUnused") UnusedSegmentKillerConfig killUnused
)
{
this.pollDuration = Configs.valueOrDefault(pollDuration,
Period.minutes(1));
this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache,
SegmentMetadataCache.UsageMode.NEVER);
+ this.killUnused = Configs.valueOrDefault(killUnused, new
UnusedSegmentKillerConfig(null, null));
+ if (this.killUnused.isEnabled() && this.useIncrementalCache ==
SegmentMetadataCache.UsageMode.NEVER) {
Review Comment:
I was wondering about how this overlord embedded kill tasks feature would
interact with the coordinator kill duty. @kfaraz I see your comment on this:
> When embedded kill tasks are running on the Overlord, it is recommended to
NOT launch kill tasks
manually or from the coordinator duty. The current implementation does not
do any validation around
this. But we can perhaps give a warning message when submitting a normal
kill task.
I think having a validation makes sense and we can:
- Fail fast if both the kill features on the Overlord and Coordinator are
enabled
- Log a warning in the kill task as you mention
For the validation, is it possible to bind the coordinator kill duty config
so that the Overlord knows about it (or vice-versa)?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldResetKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ resetKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%d] millis.",
+ taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+ );
+ taskInfo.future.cancel(true);
+ }
+ }
+
+ @Override
+ public DutySchedule getSchedule()
+ {
+ if (isEnabled()) {
+ // Check every hour that the kill queue is being processed normally
+ return new DutySchedule(Duration.standardHours(1).getMillis(),
Duration.standardMinutes(1).getMillis());
+ } else {
+ return new DutySchedule(0, 0);
+ }
+ }
+
+ private void updateStateIfNewLeader()
+ {
+ final int currentLeaderTerm = leaderSelector.localTerm();
+ if (currentLeaderTerm != previousLeaderTerm) {
+ previousLeaderTerm = currentLeaderTerm;
+ killQueue.clear();
+ lastResetTime.set(null);
+ }
+ }
+
+ /**
+ * Returns true if the kill queue is empty or if the queue has not been reset
Review Comment:
Why reset the queue if it's already empty?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
Review Comment:
Should we return early from `run()` similar to `!isEnabled()` if it's
determined that this overlord is not the leader, perhaps after
`updateStateIfNewLeader()`?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldResetKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ resetKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%d] millis.",
+ taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+ );
+ taskInfo.future.cancel(true);
+ }
+ }
+
+ @Override
+ public DutySchedule getSchedule()
+ {
+ if (isEnabled()) {
+ // Check every hour that the kill queue is being processed normally
+ return new DutySchedule(Duration.standardHours(1).getMillis(),
Duration.standardMinutes(1).getMillis());
+ } else {
+ return new DutySchedule(0, 0);
+ }
+ }
+
+ private void updateStateIfNewLeader()
+ {
+ final int currentLeaderTerm = leaderSelector.localTerm();
+ if (currentLeaderTerm != previousLeaderTerm) {
+ previousLeaderTerm = currentLeaderTerm;
+ killQueue.clear();
+ lastResetTime.set(null);
+ }
+ }
+
+ /**
+ * Returns true if the kill queue is empty or if the queue has not been reset
+ * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}.
+ */
+ private boolean shouldResetKillQueue()
+ {
+ final DateTime now = DateTimes.nowUtc().plus(1);
+
+ return killQueue.isEmpty()
+ || lastResetTime.get() == null
+ || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now);
+ }
+
+ /**
+ * Resets the kill queue with fresh jobs.
+ * This method need not handle race conditions as it is always run on
+ * {@link #exec} which is single-threaded.
+ */
+ private void resetKillQueue()
+ {
+ final Stopwatch resetDuration = Stopwatch.createStarted();
+ try {
+ killQueue.clear();
+
+ final Set<String> dataSources =
storageCoordinator.retrieveAllDatasourceNames();
+
+ final Map<String, Integer> dataSourceToIntervalCounts = new HashMap<>();
+ for (String dataSource : dataSources) {
+ storageCoordinator.retrieveUnusedSegmentIntervals(dataSource,
MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach(
+ interval -> {
+ dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum);
+ killQueue.offer(new KillCandidate(dataSource, interval));
+ }
+ );
+ }
+
+ lastResetTime.set(DateTimes.nowUtc());
+ log.info(
+ "Queued [%d] kill jobs for [%d] datasources in [%d] millis.",
+ killQueue.size(), dataSources.size(), resetDuration.millisElapsed()
+ );
+ dataSourceToIntervalCounts.forEach(
+ (dataSource, intervalCount) -> emitMetric(
+ Metric.UNUSED_SEGMENT_INTERVALS,
+ intervalCount,
+ Map.of(DruidMetrics.DATASOURCE, dataSource)
+ )
+ );
+ emitMetric(Metric.QUEUE_RESET_TIME, resetDuration.millisElapsed(), null);
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while resetting kill queue.");
+ }
+ }
+
+ /**
+ * Launches an {@link EmbeddedKillTask} on the {@link #exec} for the next
+ * {@link KillCandidate} in the {@link #killQueue}. This method returns
+ * immediately.
+ */
+ public void startNextJobInKillQueue()
+ {
+ if (!isEnabled() || !leaderSelector.isLeader()) {
+ return;
+ }
+
+ if (killQueue.isEmpty()) {
+ // If the last entry has been processed, emit the total processing time
and exit
+ final DateTime lastQueueResetTime = lastResetTime.get();
+ if (lastQueueResetTime != null) {
+ long processTimeMillis = DateTimes.nowUtc().getMillis() -
lastQueueResetTime.getMillis();
+ emitMetric(Metric.QUEUE_PROCESS_TIME, processTimeMillis, null);
+ }
+ return;
+ }
+
+ try {
+ final KillCandidate candidate = killQueue.poll();
+ if (candidate == null) {
+ return;
+ }
+
+ final String taskId = IdUtils.newTaskId(
+ TASK_ID_PREFIX,
+ KillUnusedSegmentsTask.TYPE,
+ candidate.dataSource,
+ candidate.interval
+ );
+
+ final Future<?> taskFuture = exec.submit(() -> {
+ runKillTask(candidate, taskId);
+ startNextJobInKillQueue();
+ });
+ currentTaskInfo.set(new TaskInfo(taskId, taskFuture));
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while processing kill queue.");
+ currentTaskInfo.set(null);
+ }
+ }
+
+ /**
+ * Launches an embedded kill task for the given candidate.
+ */
+ private void runKillTask(KillCandidate candidate, String taskId)
+ {
+ final Stopwatch taskRunTime = Stopwatch.createStarted();
+ final EmbeddedKillTask killTask = new EmbeddedKillTask(
+ taskId,
+ candidate,
+ DateTimes.nowUtc().minus(killConfig.getBufferPeriod())
+ );
+
+ final TaskActionClient taskActionClient =
taskActionClientFactory.create(killTask);
+ final TaskToolbox taskToolbox = KillTaskToolbox.create(taskActionClient,
dataSegmentKiller, emitter);
+
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, killTask);
+
+ try {
+ taskLockbox.add(killTask);
+ final boolean isReady = killTask.isReady(taskActionClient);
+ if (!isReady) {
+ emitter.emit(metricBuilder.setMetric(Metric.SKIPPED_INTERVALS, 1L));
+ return;
+ }
+
+ final TaskStatus status = killTask.runTask(taskToolbox);
+
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
taskRunTime.millisElapsed()));
+ }
+ catch (Throwable t) {
+ log.error(t, "Embedded kill task[%s] failed.", killTask.getId());
+
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder,
TaskStatus.failure(taskId, "Unknown error"));
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
taskRunTime.millisElapsed()));
+ }
+ finally {
+ cleanupLocksSilently(killTask);
+ emitMetric(Metric.PROCESSED_KILL_JOBS, 1L,
Map.of(DruidMetrics.DATASOURCE, candidate.dataSource));
+ }
+ }
+
+ private void cleanupLocksSilently(EmbeddedKillTask killTask)
+ {
+ try {
+ taskLockbox.remove(killTask);
+ }
+ catch (Throwable t) {
+ log.error(t, "Error while cleaning up locks for kill task[%s].",
killTask.getId());
+ }
+ }
+
+ private void emitMetric(String metricName, long value, Map<String, String>
dimensions)
+ {
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ if (dimensions != null) {
+ dimensions.forEach(builder::setDimension);
+ }
+ emitter.emit(builder.setMetric(metricName, value));
+ }
+
+ /**
+ * Represents a single candidate interval that contains unused segments.
+ */
+ private static class KillCandidate
+ {
+ private final String dataSource;
+ private final Interval interval;
+
+ private KillCandidate(String dataSource, Interval interval)
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ }
+ }
+
+ /**
+ * Info of the currently running task.
+ */
+ private static class TaskInfo
+ {
+ private final Stopwatch sinceTaskStarted = Stopwatch.createStarted();
Review Comment:
nit: consider moving this initialization `Stopwatch.createStarted()` inside
the constructor
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java:
##########
@@ -65,22 +72,26 @@ public Void perform(Task task, TaskActionToolbox toolbox)
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(),
segments);
try {
+ final Set<Interval> intervals =
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
toolbox.getTaskLockbox().doInCriticalSection(
task,
-
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
- CriticalAction.builder()
- .onValidLocks(
- () -> {
-
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
- return null;
- }
- )
- .onInvalidLocks(
- () -> {
- throw new ISE("Some locks for task[%s] are
already revoked", task.getId());
- }
- )
- .build()
+ intervals,
+ CriticalAction.builder().onValidLocks(
+ () -> {
+ int numDeletedSegments =
+
toolbox.getIndexerMetadataStorageCoordinator().deleteSegments(segments);
+ log.info(
+ "Deleted [%d] segments out of requested[%d] from"
+ + " metadata store for task[%s], datasource[%s],
intervals[%s].",
+ numDeletedSegments, segments.size(), task.getId(),
task.getDataSource(), intervals
Review Comment:
Can `intervals` be too large and pollute the logs?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldResetKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ resetKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%d] millis.",
+ taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+ );
+ taskInfo.future.cancel(true);
+ }
+ }
+
+ @Override
+ public DutySchedule getSchedule()
+ {
+ if (isEnabled()) {
+ // Check every hour that the kill queue is being processed normally
+ return new DutySchedule(Duration.standardHours(1).getMillis(),
Duration.standardMinutes(1).getMillis());
+ } else {
+ return new DutySchedule(0, 0);
+ }
+ }
+
+ private void updateStateIfNewLeader()
+ {
+ final int currentLeaderTerm = leaderSelector.localTerm();
+ if (currentLeaderTerm != previousLeaderTerm) {
+ previousLeaderTerm = currentLeaderTerm;
+ killQueue.clear();
+ lastResetTime.set(null);
+ }
+ }
+
+ /**
+ * Returns true if the kill queue is empty or if the queue has not been reset
+ * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}.
+ */
+ private boolean shouldResetKillQueue()
+ {
+ final DateTime now = DateTimes.nowUtc().plus(1);
+
+ return killQueue.isEmpty()
+ || lastResetTime.get() == null
+ || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now);
+ }
+
+ /**
+ * Resets the kill queue with fresh jobs.
+ * This method need not handle race conditions as it is always run on
+ * {@link #exec} which is single-threaded.
+ */
+ private void resetKillQueue()
Review Comment:
Ah, I see...the name `resetKillQueue` sort of suggests that the method only
clears the queue, but it also determines kill candidates and fills the queue
with new jobs. The javadoc clarifies this behavior briefly, but I think
renaming the method would help avoid confusion, something like
`clearAndUpdateKillQueue()` or `rebuildKillQueue()`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskMetrics.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+/**
+ * Task-related metrics emitted by the Druid cluster.
+ */
+public class TaskMetrics
+{
+ private TaskMetrics()
+ {
+ // no instantiation
+ }
+
+ public static final String RUN_DURATION = "task/run/time";
+
+ public static final String NUKED_SEGMENTS =
"segment/killed/metadataStore/count";
Review Comment:
To avoid ambiguity and for consistency, consider renaming this to
`SEGMENTS_DELETED_FROM_METADATA_STORE` or similar
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -150,6 +150,17 @@ List<DataSegment> retrieveUnusedSegmentsForInterval(
@Nullable DateTime maxUsedStatusLastUpdatedTime
);
+ /**
+ * Retrieves unused segments from the metadata store that match the given
+ * interval exactly.
Review Comment:
please also include some context on `maxUpdatedTime` and `limit` in the
javadoc
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -402,10 +434,25 @@ private List<DataSegment> getKillableSegments(
return parentIdToUnusedSegments.values()
.stream()
.flatMap(Set::stream)
- .filter(segment ->
!usedSegmentLoadSpecs.contains(segment.getLoadSpec()))
+ .filter(segment ->
!isSegmentLoadSpecPresentIn(segment, usedSegmentLoadSpecs))
.collect(Collectors.toList());
}
+ /**
+ * @return true if the load spec of the segment is present in the given set
of
+ * used load specs.
+ */
+ private boolean isSegmentLoadSpecPresentIn(
+ DataSegment segment,
+ Set<Map<String, Object>> usedSegmentLoadSpecs
+ )
+ {
+ boolean isPresent = usedSegmentLoadSpecs.contains(segment.getLoadSpec());
+ if (isPresent) {
+ LOG.warn("Skipping kill of segment[%s] as its load spec is also used by
other segments.", segment);
Review Comment:
Same comment re warn vs info
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java:
##########
@@ -385,6 +413,10 @@ private List<DataSegment> getKillableSegments(
response.getUpgradedToSegmentIds().forEach((parent, children) -> {
if (!CollectionUtils.isNullOrEmpty(children)) {
// Do not kill segment if its parent or any of its siblings still
exist in metadata store
+ LOG.warn(
+ "Skipping kill of segments[%s] as its load spec is also used
by segment IDs[%s].",
+ parentIdToUnusedSegments.get(parent), children
+ );
parentIdToUnusedSegments.remove(parent);
Review Comment:
If this is expected during normal operations when concurrent append/replace
is enabled, should this be logged at the info level instead of warn? We could
also consider emitting a "skipped" metric with the reason as a dimension, if
you think this is useful to track.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
+ * @see SegmentsMetadataManagerConfig to enable the cleanup
+ */
+public class UnusedSegmentsKiller implements OverlordDuty
+{
+ private static final EmittingLogger log = new
EmittingLogger(UnusedSegmentsKiller.class);
+
+ private static final String TASK_ID_PREFIX = "overlord-issued";
+
+ private static final int INITIAL_KILL_QUEUE_SIZE = 1000;
+ private static final int MAX_INTERVALS_TO_KILL_IN_DATASOURCE = 10_000;
+ private static final int MAX_SEGMENTS_TO_KILL_IN_INTERVAL = 1000;
+
+ /**
+ * Period after which the queue is reset even if there are existing jobs in
queue.
+ */
+ private static final Duration QUEUE_RESET_PERIOD = Duration.standardDays(1);
+
+ /**
+ * Duration for which a kill task is allowed to run.
+ */
+ private static final Duration MAX_TASK_DURATION =
Duration.standardMinutes(10);
+
+ private final ServiceEmitter emitter;
+ private final TaskLockbox taskLockbox;
+ private final DruidLeaderSelector leaderSelector;
+ private final DataSegmentKiller dataSegmentKiller;
+
+ private final UnusedSegmentKillerConfig killConfig;
+ private final TaskActionClientFactory taskActionClientFactory;
+ private final IndexerMetadataStorageCoordinator storageCoordinator;
+
+ /**
+ * Single-threaded executor to process kill jobs.
+ */
+ private final ScheduledExecutorService exec;
+ private int previousLeaderTerm;
+ private final AtomicReference<DateTime> lastResetTime = new
AtomicReference<>(null);
+
+ private final AtomicReference<TaskInfo> currentTaskInfo = new
AtomicReference<>(null);
+
+ /**
+ * Queue of kill candidates. Use a PriorityBlockingQueue to ensure
thread-safety
+ * since this queue is accessed by both {@link #run()} and {@link
#startNextJobInKillQueue}.
+ */
+ private final PriorityBlockingQueue<KillCandidate> killQueue;
+
+ @Inject
+ public UnusedSegmentsKiller(
+ SegmentsMetadataManagerConfig config,
+ TaskActionClientFactory taskActionClientFactory,
+ IndexerMetadataStorageCoordinator storageCoordinator,
+ @IndexingService DruidLeaderSelector leaderSelector,
+ ScheduledExecutorFactory executorFactory,
+ DataSegmentKiller dataSegmentKiller,
+ TaskLockbox taskLockbox,
+ ServiceEmitter emitter
+ )
+ {
+ this.emitter = emitter;
+ this.taskLockbox = taskLockbox;
+ this.leaderSelector = leaderSelector;
+ this.dataSegmentKiller = dataSegmentKiller;
+ this.storageCoordinator = storageCoordinator;
+ this.taskActionClientFactory = taskActionClientFactory;
+
+ this.killConfig = config.getKillUnused();
+
+ if (isEnabled()) {
+ this.exec = executorFactory.create(1, "UnusedSegmentsKiller-%s");
+ this.killQueue = new PriorityBlockingQueue<>(
+ INITIAL_KILL_QUEUE_SIZE,
+ Ordering.from(Comparators.intervalsByEndThenStart())
+ .onResultOf(candidate -> candidate.interval)
+ );
+ } else {
+ this.exec = null;
+ this.killQueue = null;
+ }
+ }
+
+ @Override
+ public boolean isEnabled()
+ {
+ return killConfig.isEnabled();
+ }
+
+ /**
+ * Ensures that things are moving along and the kill queue is not stuck.
+ * Updates the state if leadership changes or if the queue needs to be reset.
+ */
+ @Override
+ public void run()
+ {
+ if (!isEnabled()) {
+ return;
+ }
+
+ updateStateIfNewLeader();
+ if (shouldResetKillQueue()) {
+ // Clear the killQueue to stop further processing of already queued jobs
+ killQueue.clear();
+ exec.submit(() -> {
+ resetKillQueue();
+ startNextJobInKillQueue();
+ });
+ }
+
+ // Cancel the current task if it has been running for too long
+ final TaskInfo taskInfo = currentTaskInfo.get();
+ if (taskInfo != null && !taskInfo.future.isDone()
+ && taskInfo.sinceTaskStarted.hasElapsed(MAX_TASK_DURATION)) {
+ log.warn(
+ "Cancelling kill task[%s] as it has been running for [%d] millis.",
+ taskInfo.taskId, taskInfo.sinceTaskStarted.millisElapsed()
+ );
+ taskInfo.future.cancel(true);
+ }
+ }
+
+ @Override
+ public DutySchedule getSchedule()
+ {
+ if (isEnabled()) {
+ // Check every hour that the kill queue is being processed normally
+ return new DutySchedule(Duration.standardHours(1).getMillis(),
Duration.standardMinutes(1).getMillis());
+ } else {
+ return new DutySchedule(0, 0);
+ }
+ }
+
+ private void updateStateIfNewLeader()
+ {
+ final int currentLeaderTerm = leaderSelector.localTerm();
+ if (currentLeaderTerm != previousLeaderTerm) {
+ previousLeaderTerm = currentLeaderTerm;
+ killQueue.clear();
+ lastResetTime.set(null);
+ }
+ }
+
+ /**
+ * Returns true if the kill queue is empty or if the queue has not been reset
+ * yet or if {@code (lastResetTime + resetPeriod) < (now + 1)}.
+ */
+ private boolean shouldResetKillQueue()
+ {
+ final DateTime now = DateTimes.nowUtc().plus(1);
+
+ return killQueue.isEmpty()
+ || lastResetTime.get() == null
+ || lastResetTime.get().plus(QUEUE_RESET_PERIOD).isBefore(now);
+ }
+
+ /**
+ * Resets the kill queue with fresh jobs.
+ * This method need not handle race conditions as it is always run on
+ * {@link #exec} which is single-threaded.
+ */
+ private void resetKillQueue()
+ {
+ final Stopwatch resetDuration = Stopwatch.createStarted();
+ try {
+ killQueue.clear();
+
+ final Set<String> dataSources =
storageCoordinator.retrieveAllDatasourceNames();
+
+ final Map<String, Integer> dataSourceToIntervalCounts = new HashMap<>();
+ for (String dataSource : dataSources) {
+ storageCoordinator.retrieveUnusedSegmentIntervals(dataSource,
MAX_INTERVALS_TO_KILL_IN_DATASOURCE).forEach(
+ interval -> {
+ dataSourceToIntervalCounts.merge(dataSource, 1, Integer::sum);
+ killQueue.offer(new KillCandidate(dataSource, interval));
+ }
+ );
+ }
+
+ lastResetTime.set(DateTimes.nowUtc());
+ log.info(
+ "Queued [%d] kill jobs for [%d] datasources in [%d] millis.",
+ killQueue.size(), dataSources.size(), resetDuration.millisElapsed()
+ );
+ dataSourceToIntervalCounts.forEach(
+ (dataSource, intervalCount) -> emitMetric(
+ Metric.UNUSED_SEGMENT_INTERVALS,
+ intervalCount,
+ Map.of(DruidMetrics.DATASOURCE, dataSource)
+ )
+ );
+ emitMetric(Metric.QUEUE_RESET_TIME, resetDuration.millisElapsed(), null);
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while resetting kill queue.");
+ }
+ }
+
+ /**
+ * Launches an {@link EmbeddedKillTask} on the {@link #exec} for the next
+ * {@link KillCandidate} in the {@link #killQueue}. This method returns
+ * immediately.
+ */
+ public void startNextJobInKillQueue()
+ {
+ if (!isEnabled() || !leaderSelector.isLeader()) {
+ return;
+ }
+
+ if (killQueue.isEmpty()) {
+ // If the last entry has been processed, emit the total processing time
and exit
+ final DateTime lastQueueResetTime = lastResetTime.get();
+ if (lastQueueResetTime != null) {
+ long processTimeMillis = DateTimes.nowUtc().getMillis() -
lastQueueResetTime.getMillis();
+ emitMetric(Metric.QUEUE_PROCESS_TIME, processTimeMillis, null);
+ }
+ return;
+ }
+
+ try {
+ final KillCandidate candidate = killQueue.poll();
+ if (candidate == null) {
+ return;
+ }
+
+ final String taskId = IdUtils.newTaskId(
+ TASK_ID_PREFIX,
+ KillUnusedSegmentsTask.TYPE,
+ candidate.dataSource,
+ candidate.interval
+ );
+
+ final Future<?> taskFuture = exec.submit(() -> {
+ runKillTask(candidate, taskId);
+ startNextJobInKillQueue();
+ });
+ currentTaskInfo.set(new TaskInfo(taskId, taskFuture));
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while processing kill queue.");
+ currentTaskInfo.set(null);
+ }
+ }
+
+ /**
+ * Launches an embedded kill task for the given candidate.
+ */
+ private void runKillTask(KillCandidate candidate, String taskId)
+ {
+ final Stopwatch taskRunTime = Stopwatch.createStarted();
+ final EmbeddedKillTask killTask = new EmbeddedKillTask(
+ taskId,
+ candidate,
+ DateTimes.nowUtc().minus(killConfig.getBufferPeriod())
+ );
+
+ final TaskActionClient taskActionClient =
taskActionClientFactory.create(killTask);
+ final TaskToolbox taskToolbox = KillTaskToolbox.create(taskActionClient,
dataSegmentKiller, emitter);
+
+ final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, killTask);
+
+ try {
+ taskLockbox.add(killTask);
+ final boolean isReady = killTask.isReady(taskActionClient);
+ if (!isReady) {
+ emitter.emit(metricBuilder.setMetric(Metric.SKIPPED_INTERVALS, 1L));
+ return;
+ }
+
+ final TaskStatus status = killTask.runTask(taskToolbox);
+
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
taskRunTime.millisElapsed()));
+ }
+ catch (Throwable t) {
+ log.error(t, "Embedded kill task[%s] failed.", killTask.getId());
+
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder,
TaskStatus.failure(taskId, "Unknown error"));
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
taskRunTime.millisElapsed()));
+ }
+ finally {
+ cleanupLocksSilently(killTask);
+ emitMetric(Metric.PROCESSED_KILL_JOBS, 1L,
Map.of(DruidMetrics.DATASOURCE, candidate.dataSource));
+ }
+ }
+
+ private void cleanupLocksSilently(EmbeddedKillTask killTask)
+ {
+ try {
+ taskLockbox.remove(killTask);
+ }
+ catch (Throwable t) {
+ log.error(t, "Error while cleaning up locks for kill task[%s].",
killTask.getId());
+ }
+ }
+
+ private void emitMetric(String metricName, long value, Map<String, String>
dimensions)
+ {
+ final ServiceMetricEvent.Builder builder = new
ServiceMetricEvent.Builder();
+ if (dimensions != null) {
+ dimensions.forEach(builder::setDimension);
+ }
+ emitter.emit(builder.setMetric(metricName, value));
+ }
+
+ /**
+ * Represents a single candidate interval that contains unused segments.
+ */
+ private static class KillCandidate
+ {
+ private final String dataSource;
+ private final Interval interval;
+
+ private KillCandidate(String dataSource, Interval interval)
+ {
+ this.dataSource = dataSource;
+ this.interval = interval;
+ }
+ }
+
+ /**
+ * Info of the currently running task.
+ */
+ private static class TaskInfo
+ {
+ private final Stopwatch sinceTaskStarted = Stopwatch.createStarted();
+ private final String taskId;
+ private final Future<?> future;
+
+ private TaskInfo(String taskId, Future<?> future)
+ {
+ this.future = future;
+ this.taskId = taskId;
+ }
+ }
+
+ /**
+ * Embedded kill task. Unlike other task types, this task is not persisted
and
+ * does not run on a worker or indexer. Hence, it doesn't take up any task
slots.
+ * To ensure that locks are held very briefly over short segment intervals,
+ * this kill task processes:
+ * <ul>
+ * <li>only 1 unused segment interval</li>
+ * <li>only 1 batch of upto 1000 unused segments</li>
+ * </ul>
+ */
+ private class EmbeddedKillTask extends KillUnusedSegmentsTask
+ {
+ private EmbeddedKillTask(
+ String taskId,
+ KillCandidate candidate,
+ DateTime maxUpdatedTimeOfEligibleSegment
+ )
+ {
+ super(
+ taskId,
+ candidate.dataSource,
+ candidate.interval,
+ null,
+ Map.of(Tasks.PRIORITY_KEY, 25),
Review Comment:
The value 25 can be defined as a static constant in
`org.apache.druid.indexing.common.task.Tasks` where the default prioritites for
a few other task types are defined
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.duty;
+
+import com.google.common.collect.Ordering;
+import com.google.inject.Inject;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.discovery.DruidLeaderSelector;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.IndexTaskUtils;
+import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
+import org.apache.druid.indexing.common.task.TaskMetrics;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
+import org.apache.druid.metadata.UnusedSegmentKillerConfig;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * {@link OverlordDuty} to delete unused segments from metadata store and the
+ * deep storage. Launches {@link EmbeddedKillTask}s to clean unused segments
+ * of a single datasource-interval.
+ *
Review Comment:
I think it would be good to cross-link the coordinator-based
`KillUnusedSegments` duty in this class and vice-versa.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]