github-advanced-security[bot] commented on code in PR #18402: URL: https://github.com/apache/druid/pull/18402#discussion_r2275711471
########## indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import org.apache.druid.indexing.template.BatchIndexingJob; +import org.apache.druid.indexing.template.JobParams; + +import java.util.List; + +/** + * Supervisor to perform batch ingestion using {@link BatchIndexingJob}. + */ +public interface BatchIndexingSupervisor + <J extends BatchIndexingJob, P extends JobParams> extends Supervisor +{ + /** + * Checks if this supervisor is ready to create jobs in the current run. + * + * @param jobParams Parameters for the current run of the scheduler. + */ + boolean shouldCreateJobs(P jobParams); Review Comment: ## Useless parameter The parameter 'jobParams' is never used. [Show more details](https://github.com/apache/druid/security/code-scanning/10178) ########## indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java: ########## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.compact; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.indexing.ClientCompactionTaskQuery; +import org.apache.druid.client.indexing.ClientTaskQuery; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; +import org.apache.druid.indexing.template.BatchIndexingJob; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; +import org.apache.druid.server.compaction.CompactionSlotManager; +import org.apache.druid.server.compaction.CompactionSnapshotBuilder; +import org.apache.druid.server.compaction.CompactionStatus; +import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.coordinator.ClusterCompactionConfig; +import org.apache.druid.server.coordinator.CompactionConfigValidationResult; +import org.apache.druid.server.coordinator.stats.CoordinatorRunStats; +import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.coordinator.stats.RowKey; +import org.apache.druid.server.coordinator.stats.Stats; + +import java.util.Map; +import java.util.Objects; +import java.util.PriorityQueue; + +/** + * Iterates over all eligible compaction jobs in order of their priority. + * A fresh instance of this class must be used in every run of the + * {@link CompactionScheduler}. + */ +public class CompactionJobQueue +{ + private static final Logger log = new Logger(CompactionJobQueue.class); + + private final CompactionJobParams jobParams; + private final CompactionCandidateSearchPolicy searchPolicy; + + private final ObjectMapper objectMapper; + private final CompactionStatusTracker statusTracker; + private final TaskActionClientFactory taskActionClientFactory; + private final OverlordClient overlordClient; + private final GlobalTaskLockbox taskLockbox; + + private final CompactionSnapshotBuilder snapshotBuilder; + private final PriorityQueue<CompactionJob> queue; + private final CoordinatorRunStats runStats; + + private final CompactionSlotManager slotManager; + + public CompactionJobQueue( + DataSourcesSnapshot dataSourcesSnapshot, + ClusterCompactionConfig clusterCompactionConfig, + CompactionStatusTracker statusTracker, + TaskActionClientFactory taskActionClientFactory, + GlobalTaskLockbox taskLockbox, + OverlordClient overlordClient, + ObjectMapper objectMapper + ) + { + this.searchPolicy = clusterCompactionConfig.getCompactionPolicy(); + this.queue = new PriorityQueue<>( + (o1, o2) -> searchPolicy.compareCandidates(o1.getCandidate(), o2.getCandidate()) + ); + this.jobParams = new CompactionJobParams( + DateTimes.nowUtc(), + objectMapper, + dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource()::get + ); + this.slotManager = new CompactionSlotManager( + overlordClient, + statusTracker, + clusterCompactionConfig + ); + + this.runStats = new CoordinatorRunStats(); + this.snapshotBuilder = new CompactionSnapshotBuilder(runStats); + this.taskActionClientFactory = taskActionClientFactory; + this.overlordClient = overlordClient; + this.statusTracker = statusTracker; + this.objectMapper = objectMapper; + this.taskLockbox = taskLockbox; + + computeAvailableTaskSlots(); + } + + /** + * Adds a job to this queue. + */ + public void add(CompactionJob job) + { + queue.add(job); + } + + /** + * Creates jobs for the given {@link CompactionSupervisor} and adds them to + * the job queue. + */ + public void createAndEnqueueJobs(CompactionSupervisor supervisor) + { + final String supervisorId = supervisor.getSpec().getId(); + try { + if (supervisor.shouldCreateJobs(jobParams)) { + queue.addAll(supervisor.createJobs(jobParams)); + } else { + log.debug("Skipping job creation for supervisor[%s]", supervisorId); + } + } + catch (Exception e) { + log.error(e, "Error while creating jobs for supervisor[%s]", supervisorId); + } + } + + /** + * Submits jobs which are ready to either the Overlord or a Broker (if it is + * an MSQ SQL job). + */ + public void runReadyJobs() + { + while (!queue.isEmpty()) { + final CompactionJob job = queue.poll(); + final ClientTaskQuery task = Objects.requireNonNull(job.getNonNullTask()); + + if (startJobIfPendingAndReady(job, searchPolicy)) { + statusTracker.onTaskSubmitted(task.getId(), job.getCandidate()); + runStats.add(Stats.Compaction.SUBMITTED_TASKS, RowKey.of(Dimension.DATASOURCE, task.getDataSource()), 1); + } + } + } + + /** + * Builds and returns the compaction snapshots for all the datasources being + * tracked in this queue. Must be called after {@link #runReadyJobs()}. + */ + public Map<String, AutoCompactionSnapshot> getCompactionSnapshots() + { + return snapshotBuilder.build(); + } + + public CoordinatorRunStats getRunStats() + { + return runStats; + } + + private void computeAvailableTaskSlots() + { + // Do not cancel any currently running compaction tasks to be valid + // Future iterations can cancel a job if it doesn't match the given template + for (ClientCompactionTaskQuery task : slotManager.fetchRunningCompactionTasks()) { + slotManager.reserveTaskSlots(task); + } + } + + /** + * Starts a job if it is ready and is not already in progress. + * + * @return true if the job was submitted successfully for execution + */ + private boolean startJobIfPendingAndReady(CompactionJob job, CompactionCandidateSearchPolicy policy) + { + // Check if the job is a valid compaction job + final CompactionCandidate candidate = job.getCandidate(); + final CompactionConfigValidationResult validationResult = validateCompactionJob(job); + if (!validationResult.isValid()) { + log.error("Compaction job[%s] is invalid due to reason[%s].", job, validationResult.getReason()); + snapshotBuilder.addToSkipped(candidate); + return false; + } + + // Check if the job is already running, completed or skipped + final CompactionStatus compactionStatus = getCurrentStatusForJob(job, policy); + switch (compactionStatus.getState()) { Review Comment: ## Missing enum case in switch Switch statement does not have a case for [PENDING](1). [Show more details](https://github.com/apache/druid/security/code-scanning/10180) ########## server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java: ########## @@ -271,333 +213,168 @@ } } - /** - * Queries the Overlord for the status of all tasks that were submitted - * recently but are not active anymore. The statuses are then updated in the - * {@link #statusTracker}. - */ - private void trackStatusOfCompletedTasks(Set<String> activeTaskIds) - { - final Set<String> finishedTaskIds = new HashSet<>(statusTracker.getSubmittedTaskIds()); - finishedTaskIds.removeAll(activeTaskIds); - - if (finishedTaskIds.isEmpty()) { - return; - } - - final Map<String, TaskStatus> taskStatusMap - = FutureUtils.getUnchecked(overlordClient.taskStatuses(finishedTaskIds), true); - for (String taskId : finishedTaskIds) { - // Assume unknown task to have finished successfully - final TaskStatus taskStatus = taskStatusMap.getOrDefault(taskId, TaskStatus.success(taskId)); - if (taskStatus.isComplete()) { - statusTracker.onTaskFinished(taskId, taskStatus); - } - } - } - - /** - * Cancels a currently running compaction task if the segment granularity - * for this datasource has changed in the compaction config. - * - * @return true if the task was canceled, false otherwise. - */ - private boolean cancelTaskIfGranularityChanged( - ClientCompactionTaskQuery compactionTaskQuery, - DataSourceCompactionConfig dataSourceCompactionConfig - ) - { - if (dataSourceCompactionConfig == null - || dataSourceCompactionConfig.getGranularitySpec() == null - || compactionTaskQuery.getGranularitySpec() == null) { - return false; - } - - Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec() - .getSegmentGranularity(); - Granularity taskSegmentGranularity = compactionTaskQuery.getGranularitySpec().getSegmentGranularity(); - if (configuredSegmentGranularity == null || configuredSegmentGranularity.equals(taskSegmentGranularity)) { - return false; - } - - LOG.info( - "Cancelling task[%s] as task segmentGranularity[%s] differs from compaction config segmentGranularity[%s].", - compactionTaskQuery.getId(), taskSegmentGranularity, configuredSegmentGranularity - ); - overlordClient.cancelTask(compactionTaskQuery.getId()); - return true; - } - - /** - * Gets a List of Intervals locked by higher priority tasks for each datasource. - * However, when using a REPLACE lock for compaction, intervals locked with any APPEND lock will not be returned - * Since compaction tasks submitted for these Intervals would have to wait anyway, - * we skip these Intervals until the next compaction run. - * <p> - * For now, Segment Locks are being treated the same as Time Chunk Locks even - * though they lock only a Segment and not the entire Interval. Thus, - * a compaction task will not be submitted for an Interval if - * <ul> - * <li>either the whole Interval is locked by a higher priority Task with an incompatible lock type</li> - * <li>or there is atleast one Segment in the Interval that is locked by a - * higher priority Task</li> - * </ul> - */ - private Map<String, List<Interval>> getLockedIntervals( - List<DataSourceCompactionConfig> compactionConfigs - ) - { - final List<LockFilterPolicy> lockFilterPolicies = compactionConfigs - .stream() - .map(config -> - new LockFilterPolicy(config.getDataSource(), config.getTaskPriority(), null, config.getTaskContext())) - .collect(Collectors.toList()); - final Map<String, List<Interval>> datasourceToLockedIntervals = - new HashMap<>(FutureUtils.getUnchecked(overlordClient.findLockedIntervals(lockFilterPolicies), true)); - LOG.debug( - "Skipping the following intervals for Compaction as they are currently locked: %s", - datasourceToLockedIntervals - ); - - return datasourceToLockedIntervals; - } - - /** - * Returns the maximum number of task slots used by one native compaction task at any time when the task is - * issued with the given tuningConfig. - */ - public static int findMaxNumTaskSlotsUsedByOneNativeCompactionTask( - @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig - ) - { - if (isParallelMode(tuningConfig)) { - @Nullable - Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks(); - // Max number of task slots used in parallel mode = maxNumConcurrentSubTasks + 1 (supervisor task) - return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1; - } else { - return 1; - } - } - - /** - * Returns the maximum number of task slots used by one MSQ compaction task at any time when the task is - * issued with the given context. - */ - static int findMaxNumTaskSlotsUsedByOneMsqCompactionTask(@Nullable Map<String, Object> context) - { - return context == null - ? ClientMSQContext.DEFAULT_MAX_NUM_TASKS - : (int) context.getOrDefault(ClientMSQContext.CTX_MAX_NUM_TASKS, ClientMSQContext.DEFAULT_MAX_NUM_TASKS); - } - - - /** - * Returns true if the compaction task can run in the parallel mode with the given tuningConfig. - * This method should be synchronized with ParallelIndexSupervisorTask.isParallelMode(InputSource, ParallelIndexTuningConfig). - */ - @VisibleForTesting - static boolean isParallelMode(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) - { - if (null == tuningConfig) { - return false; - } - boolean useRangePartitions = useRangePartitions(tuningConfig); - int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2; - return tuningConfig.getMaxNumConcurrentSubTasks() != null - && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks; - } - - private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig) - { - // dynamic partitionsSpec will be used if getPartitionsSpec() returns null - return tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec; - } - - private int getCompactionTaskCapacity(DruidCompactionConfig dynamicConfig) - { - int totalWorkerCapacity = CoordinatorDutyUtils.getTotalWorkerCapacity(overlordClient); - - return Math.min( - (int) (totalWorkerCapacity * dynamicConfig.getCompactionTaskSlotRatio()), - dynamicConfig.getMaxCompactionTaskSlots() - ); - } - - private int getAvailableCompactionTaskSlots(int compactionTaskCapacity, int busyCompactionTaskSlots) - { - final int availableCompactionTaskSlots; - if (busyCompactionTaskSlots > 0) { - availableCompactionTaskSlots = Math.max(0, compactionTaskCapacity - busyCompactionTaskSlots); - } else { - // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. - // This guarantees that at least one slot is available if - // compaction is enabled and estimatedIncompleteCompactionTasks is 0. - availableCompactionTaskSlots = Math.max(1, compactionTaskCapacity); - } - LOG.debug( - "Found [%d] available task slots for compaction out of max compaction task capacity [%d]", - availableCompactionTaskSlots, compactionTaskCapacity - ); - - return availableCompactionTaskSlots; - } - /** * Submits compaction tasks to the Overlord. Returns total number of tasks submitted. */ private int submitCompactionTasks( Map<String, DataSourceCompactionConfig> compactionConfigs, - Map<String, AutoCompactionSnapshot.Builder> currentRunAutoCompactionSnapshotBuilders, - int numAvailableCompactionTaskSlots, + CompactionSnapshotBuilder snapshotBuilder, + CompactionSlotManager slotManager, CompactionSegmentIterator iterator, + CompactionCandidateSearchPolicy policy, CompactionEngine defaultEngine ) { - if (numAvailableCompactionTaskSlots <= 0) { + if (slotManager.getNumAvailableTaskSlots() <= 0) { return 0; } int numSubmittedTasks = 0; int totalTaskSlotsAssigned = 0; - while (iterator.hasNext() && totalTaskSlotsAssigned < numAvailableCompactionTaskSlots) { + while (iterator.hasNext() && totalTaskSlotsAssigned < slotManager.getNumAvailableTaskSlots()) { final CompactionCandidate entry = iterator.next(); final String dataSourceName = entry.getDataSource(); + final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); - // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics - currentRunAutoCompactionSnapshotBuilders - .computeIfAbsent(dataSourceName, AutoCompactionSnapshot::builder) - .incrementCompactedStats(entry.getStats()); + final CompactionStatus compactionStatus = + statusTracker.computeCompactionStatus(entry, policy); + final CompactionCandidate candidatesWithStatus = entry.withCurrentStatus(compactionStatus); + statusTracker.onCompactionStatusComputed(candidatesWithStatus, config); - final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); - final List<DataSegment> segmentsToCompact = entry.getSegments(); - - // Create granularitySpec to send to compaction task - Granularity segmentGranularityToUse = null; - if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) { - // Determines segmentGranularity from the segmentsToCompact - // Each batch of segmentToCompact from CompactionSegmentIterator will contain the same interval as - // segmentGranularity is not set in the compaction config - Interval interval = segmentsToCompact.get(0).getInterval(); - if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps(segment.getInterval()))) { - try { - segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(); - } - catch (IllegalArgumentException iae) { - // This case can happen if the existing segment interval result in complicated periods. - // Fall back to setting segmentGranularity as null - LOG.warn("Cannot determine segmentGranularity from interval[%s].", interval); - } - } else { - LOG.warn( - "Not setting 'segmentGranularity' for auto-compaction task as" - + " the segments to compact do not have the same interval." - ); - } + if (compactionStatus.isComplete()) { + snapshotBuilder.addToComplete(candidatesWithStatus); + } else if (compactionStatus.isSkipped()) { + snapshotBuilder.addToSkipped(candidatesWithStatus); } else { - segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity(); + // As these segments will be compacted, we will aggregate the statistic to the Compacted statistics + snapshotBuilder.addToComplete(entry); } - final ClientCompactionTaskGranularitySpec granularitySpec = new ClientCompactionTaskGranularitySpec( - segmentGranularityToUse, - config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null, - config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null - ); - // Create dimensionsSpec to send to compaction task - final ClientCompactionTaskDimensionsSpec dimensionsSpec; - if (config.getDimensionsSpec() != null) { - dimensionsSpec = new ClientCompactionTaskDimensionsSpec( - config.getDimensionsSpec().getDimensions() - ); - } else { - dimensionsSpec = null; - } + final ClientCompactionTaskQuery taskPayload = createCompactionTask(entry, config, defaultEngine); - Boolean dropExisting = null; - if (config.getIoConfig() != null) { - dropExisting = config.getIoConfig().isDropExisting(); - } + final String taskId = taskPayload.getId(); + FutureUtils.getUnchecked(overlordClient.runTask(taskId, taskPayload), true); + statusTracker.onTaskSubmitted(taskId, entry); - // If all the segments found to be compacted are tombstones then dropExisting - // needs to be forced to true. This forcing needs to happen in the case that - // the flag is null, or it is false. It is needed when it is null to avoid the - // possibility of the code deciding to default it to false later. - // Forcing the flag to true will enable the task ingestion code to generate new, compacted, tombstones to - // cover the tombstones found to be compacted as well as to mark them - // as compacted (update their lastCompactionState). If we don't force the - // flag then every time this compact duty runs it will find the same tombstones - // in the interval since their lastCompactionState - // was not set repeating this over and over and the duty will not make progress; it - // will become stuck on this set of tombstones. - // This forcing code should be revised - // when/if the autocompaction code policy to decide which segments to compact changes - if (dropExisting == null || !dropExisting) { - if (segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) { - dropExisting = true; - LOG.info("Forcing dropExisting to true since all segments to compact are tombstones."); - } - } + LOG.debug( + "Submitted a compaction task[%s] for [%d] segments in datasource[%s], umbrella interval[%s].", + taskId, entry.numSegments(), dataSourceName, entry.getUmbrellaInterval() + ); + LOG.debugSegments(entry.getSegments(), "Compacting segments"); + numSubmittedTasks++; + totalTaskSlotsAssigned += slotManager.computeSlotsRequiredForTask(taskPayload, config); + } - final CompactionEngine compactionEngine = config.getEngine() == null ? defaultEngine : config.getEngine(); - final Map<String, Object> autoCompactionContext = newAutoCompactionContext(config.getTaskContext()); - int slotsRequiredForCurrentTask; - - if (compactionEngine == CompactionEngine.MSQ) { - if (autoCompactionContext.containsKey(ClientMSQContext.CTX_MAX_NUM_TASKS)) { - slotsRequiredForCurrentTask = (int) autoCompactionContext.get(ClientMSQContext.CTX_MAX_NUM_TASKS); - } else { - // Since MSQ needs all task slots for the calculated #tasks to be available upfront, allot all available - // compaction slots (upto a max of MAX_TASK_SLOTS_FOR_MSQ_COMPACTION) to current compaction task to avoid - // stalling. Setting "taskAssignment" to "auto" has the problem of not being able to determine the actual - // count, which is required for subsequent tasks. - slotsRequiredForCurrentTask = Math.min( - // Update the slots to 2 (min required for MSQ) if only 1 slot is available. - numAvailableCompactionTaskSlots == 1 ? 2 : numAvailableCompactionTaskSlots, - ClientMSQContext.MAX_TASK_SLOTS_FOR_MSQ_COMPACTION_TASK - ); - autoCompactionContext.put(ClientMSQContext.CTX_MAX_NUM_TASKS, slotsRequiredForCurrentTask); + LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks); + return numSubmittedTasks; + } + + /** + * Creates a {@link ClientCompactionTaskQuery} which can be submitted to an + * {@link OverlordClient} to start a compaction task. + */ + public static ClientCompactionTaskQuery createCompactionTask( + CompactionCandidate candidate, + DataSourceCompactionConfig config, + CompactionEngine defaultEngine + ) + { + final List<DataSegment> segmentsToCompact = candidate.getSegments(); + + // Create granularitySpec to send to compaction task + Granularity segmentGranularityToUse = null; + if (config.getGranularitySpec() == null || config.getGranularitySpec().getSegmentGranularity() == null) { + // Determines segmentGranularity from the segmentsToCompact + // Each batch of segmentToCompact from CompactionSegmentIterator will contain the same interval as + // segmentGranularity is not set in the compaction config + Interval interval = segmentsToCompact.get(0).getInterval(); + if (segmentsToCompact.stream().allMatch(segment -> interval.overlaps(segment.getInterval()))) { + try { + segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(); + } + catch (IllegalArgumentException iae) { + // This case can happen if the existing segment interval result in complicated periods. + // Fall back to setting segmentGranularity as null + LOG.warn("Cannot determine segmentGranularity from interval[%s].", interval); } } else { - slotsRequiredForCurrentTask = findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig()); + LOG.warn( + "Not setting 'segmentGranularity' for auto-compaction task as" + + " the segments to compact do not have the same interval." + ); } + } else { + segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity(); + } + final ClientCompactionTaskGranularitySpec granularitySpec = new ClientCompactionTaskGranularitySpec( + segmentGranularityToUse, + config.getGranularitySpec() != null ? config.getGranularitySpec().getQueryGranularity() : null, + config.getGranularitySpec() != null ? config.getGranularitySpec().isRollup() : null + ); + + // Create dimensionsSpec to send to compaction task + final ClientCompactionTaskDimensionsSpec dimensionsSpec; + if (config.getDimensionsSpec() != null) { + dimensionsSpec = new ClientCompactionTaskDimensionsSpec( + config.getDimensionsSpec().getDimensions() + ); + } else { + dimensionsSpec = null; + } - if (entry.getCurrentStatus() != null) { - autoCompactionContext.put(COMPACTION_REASON_KEY, entry.getCurrentStatus().getReason()); + Boolean dropExisting = null; + if (config.getIoConfig() != null) { + dropExisting = config.getIoConfig().isDropExisting(); + } + + // If all the segments found to be compacted are tombstones then dropExisting + // needs to be forced to true. This forcing needs to happen in the case that + // the flag is null, or it is false. It is needed when it is null to avoid the + // possibility of the code deciding to default it to false later. + // Forcing the flag to true will enable the task ingestion code to generate new, compacted, tombstones to + // cover the tombstones found to be compacted as well as to mark them + // as compacted (update their lastCompactionState). If we don't force the + // flag then every time this compact duty runs it will find the same tombstones + // in the interval since their lastCompactionState + // was not set repeating this over and over and the duty will not make progress; it + // will become stuck on this set of tombstones. + // This forcing code should be revised + // when/if the autocompaction code policy to decide which segments to compact changes + if (dropExisting == null || !dropExisting) { + if (segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) { + dropExisting = true; + LOG.info("Forcing dropExisting to true since all segments to compact are tombstones."); } + } - final String taskId = compactSegments( - entry, - config.getTaskPriority(), - ClientCompactionTaskQueryTuningConfig.from( - config.getTuningConfig(), - config.getMaxRowsPerSegment(), - config.getMetricsSpec() != null - ), - granularitySpec, - dimensionsSpec, - config.getMetricsSpec(), - config.getTransformSpec(), - config.getProjections(), - dropExisting, - autoCompactionContext, - new ClientCompactionRunnerInfo(compactionEngine) - ); + final CompactionEngine compactionEngine = config.getEngine() == null ? defaultEngine : config.getEngine(); + final Map<String, Object> autoCompactionContext = newAutoCompactionContext(config.getTaskContext()); - LOG.debug( - "Submitted a compaction task[%s] for [%d] segments in datasource[%s], umbrella interval[%s].", - taskId, segmentsToCompact.size(), dataSourceName, entry.getUmbrellaInterval() - ); - LOG.debugSegments(segmentsToCompact, "Compacting segments"); - numSubmittedTasks++; - totalTaskSlotsAssigned += slotsRequiredForCurrentTask; + if (candidate.getCurrentStatus() != null) { + autoCompactionContext.put(COMPACTION_REASON_KEY, candidate.getCurrentStatus().getReason()); } - LOG.info("Submitted a total of [%d] compaction tasks.", numSubmittedTasks); - return numSubmittedTasks; + return compactSegments( + candidate, + config.getTaskPriority(), + ClientCompactionTaskQueryTuningConfig.from( + config.getTuningConfig(), + config.getMaxRowsPerSegment(), Review Comment: ## Deprecated method or constructor invocation Invoking [DataSourceCompactionConfig.getMaxRowsPerSegment](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/druid/security/code-scanning/10179) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
