github-advanced-security[bot] commented on code in PR #18402:
URL: https://github.com/apache/druid/pull/18402#discussion_r2275711471


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor;
+
+import org.apache.druid.indexing.template.BatchIndexingJob;
+import org.apache.druid.indexing.template.JobParams;
+
+import java.util.List;
+
+/**
+ * Supervisor to perform batch ingestion using {@link BatchIndexingJob}.
+ */
+public interface BatchIndexingSupervisor
+    <J extends BatchIndexingJob, P extends JobParams> extends Supervisor
+{
+  /**
+   * Checks if this supervisor is ready to create jobs in the current run.
+   *
+   * @param jobParams Parameters for the current run of the scheduler.
+   */
+  boolean shouldCreateJobs(P jobParams);

Review Comment:
   ## Useless parameter
   
   The parameter 'jobParams' is never used.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10178)



##########
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.compact;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.client.indexing.ClientTaskQuery;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.GlobalTaskLockbox;
+import org.apache.druid.indexing.template.BatchIndexingJob;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.server.compaction.CompactionCandidate;
+import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
+import org.apache.druid.server.compaction.CompactionSlotManager;
+import org.apache.druid.server.compaction.CompactionSnapshotBuilder;
+import org.apache.druid.server.compaction.CompactionStatus;
+import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+
+/**
+ * Iterates over all eligible compaction jobs in order of their priority.
+ * A fresh instance of this class must be used in every run of the
+ * {@link CompactionScheduler}.
+ */
+public class CompactionJobQueue
+{
+  private static final Logger log = new Logger(CompactionJobQueue.class);
+
+  private final CompactionJobParams jobParams;
+  private final CompactionCandidateSearchPolicy searchPolicy;
+
+  private final ObjectMapper objectMapper;
+  private final CompactionStatusTracker statusTracker;
+  private final TaskActionClientFactory taskActionClientFactory;
+  private final OverlordClient overlordClient;
+  private final GlobalTaskLockbox taskLockbox;
+
+  private final CompactionSnapshotBuilder snapshotBuilder;
+  private final PriorityQueue<CompactionJob> queue;
+  private final CoordinatorRunStats runStats;
+
+  private final CompactionSlotManager slotManager;
+
+  public CompactionJobQueue(
+      DataSourcesSnapshot dataSourcesSnapshot,
+      ClusterCompactionConfig clusterCompactionConfig,
+      CompactionStatusTracker statusTracker,
+      TaskActionClientFactory taskActionClientFactory,
+      GlobalTaskLockbox taskLockbox,
+      OverlordClient overlordClient,
+      ObjectMapper objectMapper
+  )
+  {
+    this.searchPolicy = clusterCompactionConfig.getCompactionPolicy();
+    this.queue = new PriorityQueue<>(
+        (o1, o2) -> searchPolicy.compareCandidates(o1.getCandidate(), 
o2.getCandidate())
+    );
+    this.jobParams = new CompactionJobParams(
+        DateTimes.nowUtc(),
+        objectMapper,
+        dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource()::get
+    );
+    this.slotManager = new CompactionSlotManager(
+        overlordClient,
+        statusTracker,
+        clusterCompactionConfig
+    );
+
+    this.runStats = new CoordinatorRunStats();
+    this.snapshotBuilder = new CompactionSnapshotBuilder(runStats);
+    this.taskActionClientFactory = taskActionClientFactory;
+    this.overlordClient = overlordClient;
+    this.statusTracker = statusTracker;
+    this.objectMapper = objectMapper;
+    this.taskLockbox = taskLockbox;
+
+    computeAvailableTaskSlots();
+  }
+
+  /**
+   * Adds a job to this queue.
+   */
+  public void add(CompactionJob job)
+  {
+    queue.add(job);
+  }
+
+  /**
+   * Creates jobs for the given {@link CompactionSupervisor} and adds them to
+   * the job queue.
+   */
+  public void createAndEnqueueJobs(CompactionSupervisor supervisor)
+  {
+    final String supervisorId = supervisor.getSpec().getId();
+    try {
+      if (supervisor.shouldCreateJobs(jobParams)) {
+        queue.addAll(supervisor.createJobs(jobParams));
+      } else {
+        log.debug("Skipping job creation for supervisor[%s]", supervisorId);
+      }
+    }
+    catch (Exception e) {
+      log.error(e, "Error while creating jobs for supervisor[%s]", 
supervisorId);
+    }
+  }
+
+  /**
+   * Submits jobs which are ready to either the Overlord or a Broker (if it is
+   * an MSQ SQL job).
+   */
+  public void runReadyJobs()
+  {
+    while (!queue.isEmpty()) {
+      final CompactionJob job = queue.poll();
+      final ClientTaskQuery task = 
Objects.requireNonNull(job.getNonNullTask());
+
+      if (startJobIfPendingAndReady(job, searchPolicy)) {
+        statusTracker.onTaskSubmitted(task.getId(), job.getCandidate());
+        runStats.add(Stats.Compaction.SUBMITTED_TASKS, 
RowKey.of(Dimension.DATASOURCE, task.getDataSource()), 1);
+      }
+    }
+  }
+
+  /**
+   * Builds and returns the compaction snapshots for all the datasources being
+   * tracked in this queue. Must be called after {@link #runReadyJobs()}.
+   */
+  public Map<String, AutoCompactionSnapshot> getCompactionSnapshots()
+  {
+    return snapshotBuilder.build();
+  }
+
+  public CoordinatorRunStats getRunStats()
+  {
+    return runStats;
+  }
+
+  private void computeAvailableTaskSlots()
+  {
+    // Do not cancel any currently running compaction tasks to be valid
+    // Future iterations can cancel a job if it doesn't match the given 
template
+    for (ClientCompactionTaskQuery task : 
slotManager.fetchRunningCompactionTasks()) {
+      slotManager.reserveTaskSlots(task);
+    }
+  }
+
+  /**
+   * Starts a job if it is ready and is not already in progress.
+   *
+   * @return true if the job was submitted successfully for execution
+   */
+  private boolean startJobIfPendingAndReady(CompactionJob job, 
CompactionCandidateSearchPolicy policy)
+  {
+    // Check if the job is a valid compaction job
+    final CompactionCandidate candidate = job.getCandidate();
+    final CompactionConfigValidationResult validationResult = 
validateCompactionJob(job);
+    if (!validationResult.isValid()) {
+      log.error("Compaction job[%s] is invalid due to reason[%s].", job, 
validationResult.getReason());
+      snapshotBuilder.addToSkipped(candidate);
+      return false;
+    }
+
+    // Check if the job is already running, completed or skipped
+    final CompactionStatus compactionStatus = getCurrentStatusForJob(job, 
policy);
+    switch (compactionStatus.getState()) {

Review Comment:
   ## Missing enum case in switch
   
   Switch statement does not have a case for [PENDING](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10180)



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -271,333 +213,168 @@
     }
   }
 
-  /**
-   * Queries the Overlord for the status of all tasks that were submitted
-   * recently but are not active anymore. The statuses are then updated in the
-   * {@link #statusTracker}.
-   */
-  private void trackStatusOfCompletedTasks(Set<String> activeTaskIds)
-  {
-    final Set<String> finishedTaskIds = new 
HashSet<>(statusTracker.getSubmittedTaskIds());
-    finishedTaskIds.removeAll(activeTaskIds);
-
-    if (finishedTaskIds.isEmpty()) {
-      return;
-    }
-
-    final Map<String, TaskStatus> taskStatusMap
-        = 
FutureUtils.getUnchecked(overlordClient.taskStatuses(finishedTaskIds), true);
-    for (String taskId : finishedTaskIds) {
-      // Assume unknown task to have finished successfully
-      final TaskStatus taskStatus = taskStatusMap.getOrDefault(taskId, 
TaskStatus.success(taskId));
-      if (taskStatus.isComplete()) {
-        statusTracker.onTaskFinished(taskId, taskStatus);
-      }
-    }
-  }
-
-  /**
-   * Cancels a currently running compaction task if the segment granularity
-   * for this datasource has changed in the compaction config.
-   *
-   * @return true if the task was canceled, false otherwise.
-   */
-  private boolean cancelTaskIfGranularityChanged(
-      ClientCompactionTaskQuery compactionTaskQuery,
-      DataSourceCompactionConfig dataSourceCompactionConfig
-  )
-  {
-    if (dataSourceCompactionConfig == null
-        || dataSourceCompactionConfig.getGranularitySpec() == null
-        || compactionTaskQuery.getGranularitySpec() == null) {
-      return false;
-    }
-
-    Granularity configuredSegmentGranularity = 
dataSourceCompactionConfig.getGranularitySpec()
-                                                                         
.getSegmentGranularity();
-    Granularity taskSegmentGranularity = 
compactionTaskQuery.getGranularitySpec().getSegmentGranularity();
-    if (configuredSegmentGranularity == null || 
configuredSegmentGranularity.equals(taskSegmentGranularity)) {
-      return false;
-    }
-
-    LOG.info(
-        "Cancelling task[%s] as task segmentGranularity[%s] differs from 
compaction config segmentGranularity[%s].",
-        compactionTaskQuery.getId(), taskSegmentGranularity, 
configuredSegmentGranularity
-    );
-    overlordClient.cancelTask(compactionTaskQuery.getId());
-    return true;
-  }
-
-  /**
-   * Gets a List of Intervals locked by higher priority tasks for each 
datasource.
-   * However, when using a REPLACE lock for compaction, intervals locked with 
any APPEND lock will not be returned
-   * Since compaction tasks submitted for these Intervals would have to wait 
anyway,
-   * we skip these Intervals until the next compaction run.
-   * <p>
-   * For now, Segment Locks are being treated the same as Time Chunk Locks even
-   * though they lock only a Segment and not the entire Interval. Thus,
-   * a compaction task will not be submitted for an Interval if
-   * <ul>
-   *   <li>either the whole Interval is locked by a higher priority Task with 
an incompatible lock type</li>
-   *   <li>or there is atleast one Segment in the Interval that is locked by a
-   *   higher priority Task</li>
-   * </ul>
-   */
-  private Map<String, List<Interval>> getLockedIntervals(
-      List<DataSourceCompactionConfig> compactionConfigs
-  )
-  {
-    final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs
-        .stream()
-        .map(config ->
-                 new LockFilterPolicy(config.getDataSource(), 
config.getTaskPriority(), null, config.getTaskContext()))
-        .collect(Collectors.toList());
-    final Map<String, List<Interval>> datasourceToLockedIntervals =
-        new 
HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies),
 true));
-    LOG.debug(
-        "Skipping the following intervals for Compaction as they are currently 
locked: %s",
-        datasourceToLockedIntervals
-    );
-
-    return datasourceToLockedIntervals;
-  }
-
-  /**
-   * Returns the maximum number of task slots used by one native compaction 
task at any time when the task is
-   * issued with the given tuningConfig.
-   */
-  public static int findMaxNumTaskSlotsUsedByOneNativeCompactionTask(
-      @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig
-  )
-  {
-    if (isParallelMode(tuningConfig)) {
-      @Nullable
-      Integer maxNumConcurrentSubTasks = 
tuningConfig.getMaxNumConcurrentSubTasks();
-      // Max number of task slots used in parallel mode = 
maxNumConcurrentSubTasks + 1 (supervisor task)
-      return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) 
+ 1;
-    } else {
-      return 1;
-    }
-  }
-
-  /**
-   * Returns the maximum number of task slots used by one MSQ compaction task 
at any time when the task is
-   * issued with the given context.
-   */
-  static int findMaxNumTaskSlotsUsedByOneMsqCompactionTask(@Nullable 
Map<String, Object> context)
-  {
-    return context == null
-           ? ClientMSQContext.DEFAULT_MAX_NUM_TASKS
-           : (int) context.getOrDefault(ClientMSQContext.CTX_MAX_NUM_TASKS, 
ClientMSQContext.DEFAULT_MAX_NUM_TASKS);
-  }
-
-
-  /**
-   * Returns true if the compaction task can run in the parallel mode with the 
given tuningConfig.
-   * This method should be synchronized with 
ParallelIndexSupervisorTask.isParallelMode(InputSource, 
ParallelIndexTuningConfig).
-   */
-  @VisibleForTesting
-  static boolean isParallelMode(@Nullable 
ClientCompactionTaskQueryTuningConfig tuningConfig)
-  {
-    if (null == tuningConfig) {
-      return false;
-    }
-    boolean useRangePartitions = useRangePartitions(tuningConfig);
-    int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
-    return tuningConfig.getMaxNumConcurrentSubTasks() != null
-           && tuningConfig.getMaxNumConcurrentSubTasks() >= 
minRequiredNumConcurrentSubTasks;
-  }
-
-  private static boolean 
useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig)
-  {
-    // dynamic partitionsSpec will be used if getPartitionsSpec() returns null
-    return tuningConfig.getPartitionsSpec() instanceof 
DimensionRangePartitionsSpec;
-  }
-
-  private int getCompactionTaskCapacity(DruidCompactionConfig dynamicConfig)
-  {
-    int totalWorkerCapacity = 
CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient);
-
-    return Math.min(
-        (int) (totalWorkerCapacity * 
dynamicConfig.getCompactionTaskSlotRatio()),
-        dynamicConfig.getMaxCompactionTaskSlots()
-    );
-  }
-
-  private int getAvailableCompactionTaskSlots(int compactionTaskCapacity, int 
busyCompactionTaskSlots)
-  {
-    final int availableCompactionTaskSlots;
-    if (busyCompactionTaskSlots > 0) {
-      availableCompactionTaskSlots = Math.max(0, compactionTaskCapacity - 
busyCompactionTaskSlots);
-    } else {
-      // compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
-      // This guarantees that at least one slot is available if
-      // compaction is enabled and estimatedIncompleteCompactionTasks is 0.
-      availableCompactionTaskSlots = Math.max(1, compactionTaskCapacity);
-    }
-    LOG.debug(
-        "Found [%d] available task slots for compaction out of max compaction 
task capacity [%d]",
-        availableCompactionTaskSlots, compactionTaskCapacity
-    );
-
-    return availableCompactionTaskSlots;
-  }
-
   /**
    * Submits compaction tasks to the Overlord. Returns total number of tasks 
submitted.
    */
   private int submitCompactionTasks(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, AutoCompactionSnapshot.Builder> 
currentRunAutoCompactionSnapshotBuilders,
-      int numAvailableCompactionTaskSlots,
+      CompactionSnapshotBuilder snapshotBuilder,
+      CompactionSlotManager slotManager,
       CompactionSegmentIterator iterator,
+      CompactionCandidateSearchPolicy policy,
       CompactionEngine defaultEngine
   )
   {
-    if (numAvailableCompactionTaskSlots <= 0) {
+    if (slotManager.getNumAvailableTaskSlots() <= 0) {
       return 0;
     }
 
     int numSubmittedTasks = 0;
     int totalTaskSlotsAssigned = 0;
 
-    while (iterator.hasNext() && totalTaskSlotsAssigned < 
numAvailableCompactionTaskSlots) {
+    while (iterator.hasNext() && totalTaskSlotsAssigned < 
slotManager.getNumAvailableTaskSlots()) {
       final CompactionCandidate entry = iterator.next();
       final String dataSourceName = entry.getDataSource();
+      final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSourceName);
 
-      // As these segments will be compacted, we will aggregate the statistic 
to the Compacted statistics
-      currentRunAutoCompactionSnapshotBuilders
-          .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder)
-          .incrementCompactedStats(entry.getStats());
+      final CompactionStatus compactionStatus =
+          statusTracker.computeCompactionStatus(entry, policy);
+      final CompactionCandidate candidatesWithStatus = 
entry.withCurrentStatus(compactionStatus);
+      statusTracker.onCompactionStatusComputed(candidatesWithStatus, config);
 
-      final DataSourceCompactionConfig config = 
compactionConfigs.get(dataSourceName);
-      final List<DataSegment> segmentsToCompact = entry.getSegments();
-
-      // Create granularitySpec to send to compaction task
-      Granularity segmentGranularityToUse = null;
-      if (config.getGranularitySpec() == null || 
config.getGranularitySpec().getSegmentGranularity() == null) {
-        // Determines segmentGranularity from the segmentsToCompact
-        // Each batch of segmentToCompact from CompactionSegmentIterator will 
contain the same interval as
-        // segmentGranularity is not set in the compaction config
-        Interval interval = segmentsToCompact.get(0).getInterval();
-        if (segmentsToCompact.stream().allMatch(segment -> 
interval.overlaps(segment.getInterval()))) {
-          try {
-            segmentGranularityToUse = 
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
-          }
-          catch (IllegalArgumentException iae) {
-            // This case can happen if the existing segment interval result in 
complicated periods.
-            // Fall back to setting segmentGranularity as null
-            LOG.warn("Cannot determine segmentGranularity from interval[%s].", 
interval);
-          }
-        } else {
-          LOG.warn(
-              "Not setting 'segmentGranularity' for auto-compaction task as"
-              + " the segments to compact do not have the same interval."
-          );
-        }
+      if (compactionStatus.isComplete()) {
+        snapshotBuilder.addToComplete(candidatesWithStatus);
+      } else if (compactionStatus.isSkipped()) {
+        snapshotBuilder.addToSkipped(candidatesWithStatus);
       } else {
-        segmentGranularityToUse = 
config.getGranularitySpec().getSegmentGranularity();
+        // As these segments will be compacted, we will aggregate the 
statistic to the Compacted statistics
+        snapshotBuilder.addToComplete(entry);
       }
-      final ClientCompactionTaskGranularitySpec granularitySpec = new 
ClientCompactionTaskGranularitySpec(
-          segmentGranularityToUse,
-          config.getGranularitySpec() != null ? 
config.getGranularitySpec().getQueryGranularity() : null,
-          config.getGranularitySpec() != null ? 
config.getGranularitySpec().isRollup() : null
-      );
 
-      // Create dimensionsSpec to send to compaction task
-      final ClientCompactionTaskDimensionsSpec dimensionsSpec;
-      if (config.getDimensionsSpec() != null) {
-        dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
-            config.getDimensionsSpec().getDimensions()
-        );
-      } else {
-        dimensionsSpec = null;
-      }
+      final ClientCompactionTaskQuery taskPayload = 
createCompactionTask(entry, config, defaultEngine);
 
-      Boolean dropExisting = null;
-      if (config.getIoConfig() != null) {
-        dropExisting = config.getIoConfig().isDropExisting();
-      }
+      final String taskId = taskPayload.getId();
+      FutureUtils.getUnchecked(overlordClient.runTask(taskId, taskPayload), 
true);
+      statusTracker.onTaskSubmitted(taskId, entry);
 
-      // If all the segments found to be compacted are tombstones then 
dropExisting
-      // needs to be forced to true. This forcing needs to  happen in the case 
that
-      // the flag is null, or it is false. It is needed when it is null to 
avoid the
-      // possibility of the code deciding to default it to false later.
-      // Forcing the flag to true will enable the task ingestion code to 
generate new, compacted, tombstones to
-      // cover the tombstones found to be compacted as well as to mark them
-      // as compacted (update their lastCompactionState). If we don't force the
-      // flag then every time this compact duty runs it will find the same 
tombstones
-      // in the interval since their lastCompactionState
-      // was not set repeating this over and over and the duty will not make 
progress; it
-      // will become stuck on this set of tombstones.
-      // This forcing code should be revised
-      // when/if the autocompaction code policy to decide which segments to 
compact changes
-      if (dropExisting == null || !dropExisting) {
-        if (segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) {
-          dropExisting = true;
-          LOG.info("Forcing dropExisting to true since all segments to compact 
are tombstones.");
-        }
-      }
+      LOG.debug(
+          "Submitted a compaction task[%s] for [%d] segments in 
datasource[%s], umbrella interval[%s].",
+          taskId, entry.numSegments(), dataSourceName, 
entry.getUmbrellaInterval()
+      );
+      LOG.debugSegments(entry.getSegments(), "Compacting segments");
+      numSubmittedTasks++;
+      totalTaskSlotsAssigned += 
slotManager.computeSlotsRequiredForTask(taskPayload, config);
+    }
 
-      final CompactionEngine compactionEngine = config.getEngine() == null ? 
defaultEngine : config.getEngine();
-      final Map<String, Object> autoCompactionContext = 
newAutoCompactionContext(config.getTaskContext());
-      int slotsRequiredForCurrentTask;
-
-      if (compactionEngine == CompactionEngine.MSQ) {
-        if 
(autoCompactionContext.containsKey(ClientMSQContext.CTX_MAX_NUM_TASKS)) {
-          slotsRequiredForCurrentTask = (int) 
autoCompactionContext.get(ClientMSQContext.CTX_MAX_NUM_TASKS);
-        } else {
-          // Since MSQ needs all task slots for the calculated #tasks to be 
available upfront, allot all available
-          // compaction slots (upto a max of 
MAX_TASK_SLOTS_FOR_MSQ_COMPACTION) to current compaction task to avoid
-          // stalling. Setting "taskAssignment" to "auto" has the problem of 
not being able to determine the actual
-          // count, which is required for subsequent tasks.
-          slotsRequiredForCurrentTask = Math.min(
-              // Update the slots to 2 (min required for MSQ) if only 1 slot 
is available.
-              numAvailableCompactionTaskSlots == 1 ? 2 : 
numAvailableCompactionTaskSlots,
-              ClientMSQContext.MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK
-          );
-          autoCompactionContext.put(ClientMSQContext.CTX_MAX_NUM_TASKS, 
slotsRequiredForCurrentTask);
+    LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks);
+    return numSubmittedTasks;
+  }
+
+  /**
+   * Creates a {@link ClientCompactionTaskQuery} which can be submitted to an
+   * {@link OverlordClient} to start a compaction task.
+   */
+  public static ClientCompactionTaskQuery createCompactionTask(
+      CompactionCandidate candidate,
+      DataSourceCompactionConfig config,
+      CompactionEngine defaultEngine
+  )
+  {
+    final List<DataSegment> segmentsToCompact = candidate.getSegments();
+
+    // Create granularitySpec to send to compaction task
+    Granularity segmentGranularityToUse = null;
+    if (config.getGranularitySpec() == null || 
config.getGranularitySpec().getSegmentGranularity() == null) {
+      // Determines segmentGranularity from the segmentsToCompact
+      // Each batch of segmentToCompact from CompactionSegmentIterator will 
contain the same interval as
+      // segmentGranularity is not set in the compaction config
+      Interval interval = segmentsToCompact.get(0).getInterval();
+      if (segmentsToCompact.stream().allMatch(segment -> 
interval.overlaps(segment.getInterval()))) {
+        try {
+          segmentGranularityToUse = 
GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
+        }
+        catch (IllegalArgumentException iae) {
+          // This case can happen if the existing segment interval result in 
complicated periods.
+          // Fall back to setting segmentGranularity as null
+          LOG.warn("Cannot determine segmentGranularity from interval[%s].", 
interval);
         }
       } else {
-        slotsRequiredForCurrentTask = 
findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig());
+        LOG.warn(
+            "Not setting 'segmentGranularity' for auto-compaction task as"
+            + " the segments to compact do not have the same interval."
+        );
       }
+    } else {
+      segmentGranularityToUse = 
config.getGranularitySpec().getSegmentGranularity();
+    }
+    final ClientCompactionTaskGranularitySpec granularitySpec = new 
ClientCompactionTaskGranularitySpec(
+        segmentGranularityToUse,
+        config.getGranularitySpec() != null ? 
config.getGranularitySpec().getQueryGranularity() : null,
+        config.getGranularitySpec() != null ? 
config.getGranularitySpec().isRollup() : null
+    );
+
+    // Create dimensionsSpec to send to compaction task
+    final ClientCompactionTaskDimensionsSpec dimensionsSpec;
+    if (config.getDimensionsSpec() != null) {
+      dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
+          config.getDimensionsSpec().getDimensions()
+      );
+    } else {
+      dimensionsSpec = null;
+    }
 
-      if (entry.getCurrentStatus() != null) {
-        autoCompactionContext.put(COMPACTION_REASON_KEY, 
entry.getCurrentStatus().getReason());
+    Boolean dropExisting = null;
+    if (config.getIoConfig() != null) {
+      dropExisting = config.getIoConfig().isDropExisting();
+    }
+
+    // If all the segments found to be compacted are tombstones then 
dropExisting
+    // needs to be forced to true. This forcing needs to  happen in the case 
that
+    // the flag is null, or it is false. It is needed when it is null to avoid 
the
+    // possibility of the code deciding to default it to false later.
+    // Forcing the flag to true will enable the task ingestion code to 
generate new, compacted, tombstones to
+    // cover the tombstones found to be compacted as well as to mark them
+    // as compacted (update their lastCompactionState). If we don't force the
+    // flag then every time this compact duty runs it will find the same 
tombstones
+    // in the interval since their lastCompactionState
+    // was not set repeating this over and over and the duty will not make 
progress; it
+    // will become stuck on this set of tombstones.
+    // This forcing code should be revised
+    // when/if the autocompaction code policy to decide which segments to 
compact changes
+    if (dropExisting == null || !dropExisting) {
+      if (segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) {
+        dropExisting = true;
+        LOG.info("Forcing dropExisting to true since all segments to compact 
are tombstones.");
       }
+    }
 
-      final String taskId = compactSegments(
-          entry,
-          config.getTaskPriority(),
-          ClientCompactionTaskQueryTuningConfig.from(
-              config.getTuningConfig(),
-              config.getMaxRowsPerSegment(),
-              config.getMetricsSpec() != null
-          ),
-          granularitySpec,
-          dimensionsSpec,
-          config.getMetricsSpec(),
-          config.getTransformSpec(),
-          config.getProjections(),
-          dropExisting,
-          autoCompactionContext,
-          new ClientCompactionRunnerInfo(compactionEngine)
-      );
+    final CompactionEngine compactionEngine = config.getEngine() == null ? 
defaultEngine : config.getEngine();
+    final Map<String, Object> autoCompactionContext = 
newAutoCompactionContext(config.getTaskContext());
 
-      LOG.debug(
-          "Submitted a compaction task[%s] for [%d] segments in 
datasource[%s], umbrella interval[%s].",
-          taskId, segmentsToCompact.size(), dataSourceName, 
entry.getUmbrellaInterval()
-      );
-      LOG.debugSegments(segmentsToCompact, "Compacting segments");
-      numSubmittedTasks++;
-      totalTaskSlotsAssigned += slotsRequiredForCurrentTask;
+    if (candidate.getCurrentStatus() != null) {
+      autoCompactionContext.put(COMPACTION_REASON_KEY, 
candidate.getCurrentStatus().getReason());
     }
 
-    LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks);
-    return numSubmittedTasks;
+    return compactSegments(
+        candidate,
+        config.getTaskPriority(),
+        ClientCompactionTaskQueryTuningConfig.from(
+            config.getTuningConfig(),
+            config.getMaxRowsPerSegment(),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSourceCompactionConfig.getMaxRowsPerSegment](1) should be 
avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10179)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to