kfaraz commented on code in PR #15941:
URL: https://github.com/apache/druid/pull/15941#discussion_r1501735595


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +156,128 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
-    Collection<String> dataSourcesToKill =
-        
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
-        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
-        killTaskSlotRatio,
-        maxKillTaskSlots
-    );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
-        killTaskCapacity,
-        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
-    );
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
+    final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
+    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
 
-    if (0 < availableKillTaskSlots) {
+    if (availableKillTaskSlots > 0) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
+  /**
+   * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
 
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
-  )
-  {
+    final Collection<String> remainingDatasourcesToKill = new 
ArrayList<>(dataSourcesToKill);
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));

Review Comment:
   ```suggestion
           log.info(
               "Submitted [%d] kill tasks and reached kill task slot limit 
[%d].",
               submittedTasks, availableKillTaskSlots
           );
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +156,128 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
-    Collection<String> dataSourcesToKill =
-        
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
-        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
-        killTaskSlotRatio,
-        maxKillTaskSlots
-    );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
-        killTaskCapacity,
-        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
-    );
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
+    final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
+    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
 
-    if (0 < availableKillTaskSlots) {
+    if (availableKillTaskSlots > 0) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
+  /**
+   * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
 
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
-  )
-  {
+    final Collection<String> remainingDatasourcesToKill = new 
ArrayList<>(dataSourcesToKill);
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
+      final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+      final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime, stats);
+      if (intervalToKill == null) {
+        datasourceToLastKillIntervalEnd.remove(dataSource);
+        continue;
+      }
 
-        try {
-          FutureUtils.getUnchecked(
-              overlordClient.runKillTask(
-                  TASK_ID_PREFIX,
-                  dataSource,
-                  intervalToKill,
-                  maxSegmentsToKill,
-                  maxUsedStatusLastUpdatedTime
-              ),
-              true
-          );
-          ++submittedTasks;
-          datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        }
-        catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
-          if (Thread.currentThread().isInterrupted()) {
-            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
-            break;
-          }
+      try {
+        FutureUtils.getUnchecked(
+            overlordClient.runKillTask(
+                TASK_ID_PREFIX,
+                dataSource,
+                intervalToKill,
+                maxSegmentsToKill,
+                maxUsedStatusLastUpdatedTime
+            ),
+            true
+        );
+        ++submittedTasks;
+        datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
+        remainingDatasourcesToKill.remove(dataSource);
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("Skipping kill task scheduling because thread is 
interrupted.");
+          break;
         }
       }
     }
 
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "Submitted [%d] kill tasks for [%d] datasources.%s",
-          submittedTasks,
-          dataSourcesToKill.size(),
-          availableKillTaskSlots < dataSourcesToKill.size()
-              ? StringUtils.format(
-              " Datasources skipped: %s",
-              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size())
-          )
-              : ""
-      );
-    }
+    log.info(
+        "Submitted [%d] kill tasks for [%d] datasources. Remaining datasources 
to kill: %s",
+        submittedTasks,
+        dataSourcesToKill.size(),
+        remainingDatasourcesToKill
+    );
 
-    // report stats
-    return submittedTasks;
+    stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
   }
 
   /**
+   * <p>
    * Calculates the interval for which segments are to be killed in a 
datasource.
+   * Because this function uses string comparisons for date time, it doesn't 
find unused segments that are outside
+   * the range [{@link DateTimes#COMPARE_DATE_AS_STRING_MIN}, {@link 
DateTimes#COMPARE_DATE_AS_STRING_MAX}),
+   * such as {@link 
org.apache.druid.java.util.common.granularity.Granularities#ALL} partitioned 
segments and segments that
+   * end in {@link DateTimes#MAX}.

Review Comment:
   ```suggestion
      * such as {@link 
org.apache.druid.java.util.common.granularity.Granularities#ALL} partitioned 
segments
      * and segments that end in {@link DateTimes#MAX}.
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -87,29 +90,36 @@ public class KillUnusedSegments implements CoordinatorDuty
   private final OverlordClient overlordClient;
 
   public KillUnusedSegments(
-      SegmentsMetadataManager segmentsMetadataManager,
-      OverlordClient overlordClient,
-      DruidCoordinatorConfig config
+      final SegmentsMetadataManager segmentsMetadataManager,
+      final OverlordClient overlordClient,
+      final DruidCoordinatorConfig config
   )
   {
     if (config.getCoordinatorKillPeriod().getMillis() < 
config.getCoordinatorIndexingPeriod().getMillis()) {
-      throw InvalidInput.exception(
-          "druid.coordinator.kill.period[%s] must be >= 
druid.coordinator.period.indexingPeriod[%s]",
-          config.getCoordinatorKillPeriod(),
-          config.getCoordinatorIndexingPeriod()
-      );
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build(StringUtils.format(

Review Comment:
   Nit: would look neater like this:
   ```suggestion
                             .build(
                                 StringUtils.format(
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +156,128 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
-    Collection<String> dataSourcesToKill =
-        
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
-        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
-        killTaskSlotRatio,
-        maxKillTaskSlots
-    );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
-        killTaskCapacity,
-        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
-    );
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
+    final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
+    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
 
-    if (0 < availableKillTaskSlots) {
+    if (availableKillTaskSlots > 0) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
+  /**
+   * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
 
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
-  )
-  {
+    final Collection<String> remainingDatasourcesToKill = new 
ArrayList<>(dataSourcesToKill);
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
+      final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+      final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime, stats);
+      if (intervalToKill == null) {
+        datasourceToLastKillIntervalEnd.remove(dataSource);
+        continue;
+      }
 
-        try {
-          FutureUtils.getUnchecked(
-              overlordClient.runKillTask(
-                  TASK_ID_PREFIX,
-                  dataSource,
-                  intervalToKill,
-                  maxSegmentsToKill,
-                  maxUsedStatusLastUpdatedTime
-              ),
-              true
-          );
-          ++submittedTasks;
-          datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        }
-        catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
-          if (Thread.currentThread().isInterrupted()) {
-            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
-            break;
-          }
+      try {
+        FutureUtils.getUnchecked(
+            overlordClient.runKillTask(
+                TASK_ID_PREFIX,
+                dataSource,
+                intervalToKill,
+                maxSegmentsToKill,
+                maxUsedStatusLastUpdatedTime
+            ),
+            true
+        );
+        ++submittedTasks;
+        datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
+        remainingDatasourcesToKill.remove(dataSource);
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("Skipping kill task scheduling because thread is 
interrupted.");
+          break;
         }
       }
     }
 
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "Submitted [%d] kill tasks for [%d] datasources.%s",
-          submittedTasks,
-          dataSourcesToKill.size(),
-          availableKillTaskSlots < dataSourcesToKill.size()
-              ? StringUtils.format(
-              " Datasources skipped: %s",
-              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size())
-          )
-              : ""
-      );
-    }
+    log.info(
+        "Submitted [%d] kill tasks for [%d] datasources. Remaining datasources 
to kill: %s",
+        submittedTasks,
+        dataSourcesToKill.size(),
+        remainingDatasourcesToKill

Review Comment:
   Super nit: I generally prefer this style as the logging occupies fewer lines 
without compromising readability, but either way is fine really.
   ```suggestion
           submittedTasks, dataSourcesToKill.size(), remainingDatasourcesToKill
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +156,128 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
-    Collection<String> dataSourcesToKill =
-        
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
-        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
-        killTaskSlotRatio,
-        maxKillTaskSlots
-    );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
-        killTaskCapacity,
-        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
-    );
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
+    final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
+    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
 
-    if (0 < availableKillTaskSlots) {
+    if (availableKillTaskSlots > 0) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
+  /**
+   * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
 
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
-  )
-  {
+    final Collection<String> remainingDatasourcesToKill = new 
ArrayList<>(dataSourcesToKill);
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
+      final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+      final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime, stats);
+      if (intervalToKill == null) {
+        datasourceToLastKillIntervalEnd.remove(dataSource);
+        continue;
+      }
 
-        try {
-          FutureUtils.getUnchecked(
-              overlordClient.runKillTask(
-                  TASK_ID_PREFIX,
-                  dataSource,
-                  intervalToKill,
-                  maxSegmentsToKill,
-                  maxUsedStatusLastUpdatedTime
-              ),
-              true
-          );
-          ++submittedTasks;
-          datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        }
-        catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
-          if (Thread.currentThread().isInterrupted()) {
-            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
-            break;
-          }
+      try {
+        FutureUtils.getUnchecked(
+            overlordClient.runKillTask(
+                TASK_ID_PREFIX,
+                dataSource,
+                intervalToKill,
+                maxSegmentsToKill,
+                maxUsedStatusLastUpdatedTime
+            ),
+            true
+        );
+        ++submittedTasks;
+        datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
+        remainingDatasourcesToKill.remove(dataSource);
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("Skipping kill task scheduling because thread is 
interrupted.");
+          break;
         }
       }
     }
 
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "Submitted [%d] kill tasks for [%d] datasources.%s",
-          submittedTasks,
-          dataSourcesToKill.size(),
-          availableKillTaskSlots < dataSourcesToKill.size()
-              ? StringUtils.format(
-              " Datasources skipped: %s",
-              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size())
-          )
-              : ""
-      );
-    }
+    log.info(
+        "Submitted [%d] kill tasks for [%d] datasources. Remaining datasources 
to kill: %s",
+        submittedTasks,
+        dataSourcesToKill.size(),
+        remainingDatasourcesToKill
+    );
 
-    // report stats
-    return submittedTasks;
+    stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
   }
 
   /**
+   * <p>
    * Calculates the interval for which segments are to be killed in a 
datasource.
+   * Because this function uses string comparisons for date time, it doesn't 
find unused segments that are outside

Review Comment:
   ```suggestion
      * Since this method compares datetime as strings, it cannot find unused 
segments that are outside
   ```



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -221,7 +242,29 @@ public List<Interval> getUnusedSegmentIntervals(
       final DateTime maxUsedStatusLastUpdatedTime
   )
   {
-    return null;
+    final List<DataSegmentPlus> sortedUnusedSegmentPluses = new 
ArrayList<>(unusedSegments.values());
+    sortedUnusedSegmentPluses.sort(
+        Comparator.comparingLong(
+            dataSegmentPlus -> 
dataSegmentPlus.getDataSegment().getInterval().getStartMillis()
+        )
+    );
+
+    final List<Interval> unusedSegmentIntervals = new ArrayList<>();
+
+    for (final DataSegmentPlus unusedSegmentPlus : sortedUnusedSegmentPluses) {
+      final DataSegment unusedSegment = unusedSegmentPlus.getDataSegment();
+      if (dataSource.equals(unusedSegment.getDataSource())) {
+        final Interval interval = unusedSegment.getInterval();
+
+        if ((minStartTime == null || 
interval.getStart().isAfter(minStartTime)) &&
+            interval.getEnd().isBefore(maxEndTime) &&
+            
unusedSegmentPlus.getUsedStatusLastUpdatedDate().isBefore(maxUsedStatusLastUpdatedTime))
 {
+          unusedSegmentIntervals.add(interval);
+        }
+      }
+    }
+    log.info("Found [%d] unused segment intervals: [%s]", 
unusedSegmentIntervals.size(), unusedSegmentIntervals);

Review Comment:
   If this is really needed, maybe it should be a log line in 
`KillUnusedSegments` duty because `SqlSegmentsMetadataManager` doesn't seem to 
log this either.



##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java:
##########
@@ -124,8 +139,14 @@ public int markAsUnusedSegmentsInInterval(String 
dataSource, Interval interval)
   public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
   {
     int numModifiedSegments = 0;
+    final DateTime now = DateTimes.nowUtc();
+    final DateTime lastUpdatedDate = now.plus(10);

Review Comment:
   Please add a comment for why plus 10?



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/KillUnusedSegments.java:
##########
@@ -146,130 +156,128 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
     return runInternal(params);
   }
 
-  @VisibleForTesting
-  DruidCoordinatorRuntimeParams runInternal(DruidCoordinatorRuntimeParams 
params)
+  private DruidCoordinatorRuntimeParams runInternal(final 
DruidCoordinatorRuntimeParams params)
   {
-    TaskStats taskStats = new TaskStats();
-    Collection<String> dataSourcesToKill =
-        
params.getCoordinatorDynamicConfig().getSpecificDataSourcesToKillUnusedSegmentsIn();
-    double killTaskSlotRatio = 
params.getCoordinatorDynamicConfig().getKillTaskSlotRatio();
-    int maxKillTaskSlots = 
params.getCoordinatorDynamicConfig().getMaxKillTaskSlots();
-    int killTaskCapacity = getKillTaskCapacity(
-        CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient),
-        killTaskSlotRatio,
-        maxKillTaskSlots
-    );
-    int availableKillTaskSlots = getAvailableKillTaskSlots(
-        killTaskCapacity,
-        CoordinatorDutyUtils.getNumActiveTaskSlots(overlordClient, 
IS_AUTO_KILL_TASK).size()
-    );
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
     final CoordinatorRunStats stats = params.getCoordinatorStats();
 
-    taskStats.availableTaskSlots = availableKillTaskSlots;
-    taskStats.maxSlots = killTaskCapacity;
+    final int availableKillTaskSlots = 
getAvailableKillTaskSlots(dynamicConfig, stats);
+    Collection<String> dataSourcesToKill = 
dynamicConfig.getSpecificDataSourcesToKillUnusedSegmentsIn();
 
-    if (0 < availableKillTaskSlots) {
+    if (availableKillTaskSlots > 0) {
       // If no datasource has been specified, all are eligible for killing 
unused segments
       if (CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
         dataSourcesToKill = 
segmentsMetadataManager.retrieveAllDataSourceNames();
       }
 
-      log.debug("Killing unused segments for datasources[%s]", 
dataSourcesToKill);
       lastKillTime = DateTimes.nowUtc();
-      taskStats.submittedTasks = killUnusedSegments(dataSourcesToKill, 
availableKillTaskSlots);
+      killUnusedSegments(dataSourcesToKill, availableKillTaskSlots, stats);
     }
 
     // any datasources that are no longer being considered for kill should 
have their
     // last kill interval removed from map.
     datasourceToLastKillIntervalEnd.keySet().retainAll(dataSourcesToKill);
-    addStats(taskStats, stats);
     return params;
   }
 
-  private void addStats(
-      TaskStats taskStats,
-      CoordinatorRunStats stats
+  /**
+   * Spawn kill tasks for each datasource in {@code dataSourcesToKill} upto 
{@code availableKillTaskSlots}.
+   */
+  private void killUnusedSegments(
+      @Nullable final Collection<String> dataSourcesToKill,
+      final int availableKillTaskSlots,
+      final CoordinatorRunStats stats
   )
   {
-    stats.add(Stats.Kill.AVAILABLE_SLOTS, taskStats.availableTaskSlots);
-    stats.add(Stats.Kill.SUBMITTED_TASKS, taskStats.submittedTasks);
-    stats.add(Stats.Kill.MAX_SLOTS, taskStats.maxSlots);
-  }
+    if (CollectionUtils.isNullOrEmpty(dataSourcesToKill) || 
availableKillTaskSlots <= 0) {
+      stats.add(Stats.Kill.SUBMITTED_TASKS, 0);
+      return;
+    }
 
-  private int killUnusedSegments(
-      Collection<String> dataSourcesToKill,
-      int availableKillTaskSlots
-  )
-  {
+    final Collection<String> remainingDatasourcesToKill = new 
ArrayList<>(dataSourcesToKill);
     int submittedTasks = 0;
-    if (0 < availableKillTaskSlots && 
!CollectionUtils.isNullOrEmpty(dataSourcesToKill)) {
-      for (String dataSource : dataSourcesToKill) {
-        if (submittedTasks >= availableKillTaskSlots) {
-          log.debug(StringUtils.format(
-              "Submitted [%d] kill tasks and reached kill task slot limit 
[%d]. Will resume "
-              + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
-          break;
-        }
-        final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
-        final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime);
-        if (intervalToKill == null) {
-          datasourceToLastKillIntervalEnd.remove(dataSource);
-          continue;
-        }
+    for (String dataSource : dataSourcesToKill) {
+      if (submittedTasks >= availableKillTaskSlots) {
+        log.info(StringUtils.format(
+            "Submitted [%d] kill tasks and reached kill task slot limit [%d]. 
Will resume "
+            + "on the next coordinator cycle.", submittedTasks, 
availableKillTaskSlots));
+        break;
+      }
+      final DateTime maxUsedStatusLastUpdatedTime = 
DateTimes.nowUtc().minus(bufferPeriod);
+      final Interval intervalToKill = findIntervalForKill(dataSource, 
maxUsedStatusLastUpdatedTime, stats);
+      if (intervalToKill == null) {
+        datasourceToLastKillIntervalEnd.remove(dataSource);
+        continue;
+      }
 
-        try {
-          FutureUtils.getUnchecked(
-              overlordClient.runKillTask(
-                  TASK_ID_PREFIX,
-                  dataSource,
-                  intervalToKill,
-                  maxSegmentsToKill,
-                  maxUsedStatusLastUpdatedTime
-              ),
-              true
-          );
-          ++submittedTasks;
-          datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
-        }
-        catch (Exception ex) {
-          log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
-          if (Thread.currentThread().isInterrupted()) {
-            log.warn("Skipping kill task scheduling because thread is 
interrupted.");
-            break;
-          }
+      try {
+        FutureUtils.getUnchecked(
+            overlordClient.runKillTask(
+                TASK_ID_PREFIX,
+                dataSource,
+                intervalToKill,
+                maxSegmentsToKill,
+                maxUsedStatusLastUpdatedTime
+            ),
+            true
+        );
+        ++submittedTasks;
+        datasourceToLastKillIntervalEnd.put(dataSource, 
intervalToKill.getEnd());
+        remainingDatasourcesToKill.remove(dataSource);
+      }
+      catch (Exception ex) {
+        log.error(ex, "Failed to submit kill task for dataSource[%s] in 
interval[%s]", dataSource, intervalToKill);
+        if (Thread.currentThread().isInterrupted()) {
+          log.warn("Skipping kill task scheduling because thread is 
interrupted.");
+          break;
         }
       }
     }
 
-    if (log.isDebugEnabled()) {
-      log.debug(
-          "Submitted [%d] kill tasks for [%d] datasources.%s",
-          submittedTasks,
-          dataSourcesToKill.size(),
-          availableKillTaskSlots < dataSourcesToKill.size()
-              ? StringUtils.format(
-              " Datasources skipped: %s",
-              ImmutableList.copyOf(dataSourcesToKill).subList(submittedTasks, 
dataSourcesToKill.size())
-          )
-              : ""
-      );
-    }
+    log.info(
+        "Submitted [%d] kill tasks for [%d] datasources. Remaining datasources 
to kill: %s",
+        submittedTasks,
+        dataSourcesToKill.size(),
+        remainingDatasourcesToKill
+    );
 
-    // report stats
-    return submittedTasks;
+    stats.add(Stats.Kill.SUBMITTED_TASKS, submittedTasks);
   }
 
   /**
+   * <p>
    * Calculates the interval for which segments are to be killed in a 
datasource.
+   * Because this function uses string comparisons for date time, it doesn't 
find unused segments that are outside
+   * the range [{@link DateTimes#COMPARE_DATE_AS_STRING_MIN}, {@link 
DateTimes#COMPARE_DATE_AS_STRING_MAX}),
+   * such as {@link 
org.apache.druid.java.util.common.granularity.Granularities#ALL} partitioned 
segments and segments that
+   * end in {@link DateTimes#MAX}.
+   *</p><p>
+   * For more information, see <a 
href="https://github.com/apache/druid/issues/15951";> Issue#15951</a>.
+   * </p>
    */
   @Nullable
-  private Interval findIntervalForKill(String dataSource, DateTime 
maxUsedStatusLastUpdatedTime)
+  private Interval findIntervalForKill(
+      final String dataSource,
+      final DateTime maxUsedStatusLastUpdatedTime,
+      final CoordinatorRunStats stats
+  )
   {
     final DateTime maxEndTime = ignoreDurationToRetain
                                 ? DateTimes.COMPARE_DATE_AS_STRING_MAX
                                 : DateTimes.nowUtc().minus(durationToRetain);
-    List<Interval> unusedSegmentIntervals = segmentsMetadataManager
-        .getUnusedSegmentIntervals(dataSource, 
datasourceToLastKillIntervalEnd.get(dataSource), maxEndTime, maxSegmentsToKill, 
maxUsedStatusLastUpdatedTime);
+
+    final List<Interval> unusedSegmentIntervals = 
segmentsMetadataManager.getUnusedSegmentIntervals(
+        dataSource,
+        datasourceToLastKillIntervalEnd.get(dataSource),
+        maxEndTime,
+        maxSegmentsToKill,
+        maxUsedStatusLastUpdatedTime
+    );
+
+    // Each unused segment interval returned here has a 1:1 correspondence 
with an unused segment. So we can assume

Review Comment:
   How do we know this? A single interval can have many killable unused 
segments.



##########
docs/operations/metrics.md:
##########
@@ -342,6 +342,7 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`killTask/availableSlot/count`| Number of available task slots that can be 
used for auto kill tasks in the auto kill run. This is the max number of task 
slots minus any currently running auto kill tasks.                              
                                                                                
                                                                                
                                                                                
                                       | |Varies|
 |`killTask/maxSlot/count`| Maximum number of task slots available for auto 
kill tasks in the auto kill run.                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                | |Varies|
 |`kill/task/count`| Number of tasks issued in the auto kill run.               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                     | |Varies|
+|`kill/candidateUnusedSegments/count`|Number of candidate unused segments to 
be deleted from the metadata store in an auto kill run.|`dataSource`|Varies|

Review Comment:
   Reading at this doc and the name of the metric, it's not very clear what it 
represents.
   Is it the number of segments that the coordinator _decided to kill_ in a 
datasource or the number of segments that _should be killed_ in a datasource? 
Also, should it have `interval` as a dimension?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to