This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 36f3413913d Add new compaction policy to prioritize fragmented
intervals (#18802)
36f3413913d is described below
commit 36f3413913d6ab34e409983d87664d9157fb4893
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Dec 5 08:43:54 2025 +0530
Add new compaction policy to prioritize fragmented intervals (#18802)
---
.../compaction/BaseCandidateSearchPolicy.java | 5 +-
.../server/compaction/CompactionCandidate.java | 14 ++
.../CompactionCandidateSearchPolicy.java | 75 ++++++-
.../server/compaction/CompactionStatistics.java | 10 +
.../druid/server/compaction/CompactionStatus.java | 243 +++++++++++++++++----
.../server/compaction/CompactionStatusTracker.java | 12 +-
.../compaction/FixedIntervalOrderPolicy.java | 7 +-
.../MostFragmentedIntervalFirstPolicy.java | 180 +++++++++++++++
.../compaction/CompactionStatusTrackerTest.java | 2 +-
.../MostFragmentedIntervalFirstPolicyTest.java | 214 ++++++++++++++++++
10 files changed, 697 insertions(+), 65 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
index 7d7d117f08f..0a68f6a51c9 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
@@ -64,13 +64,12 @@ public abstract class BaseCandidateSearchPolicy implements
CompactionCandidateSe
}
@Override
- public boolean isEligibleForCompaction(
+ public Eligibility checkEligibilityForCompaction(
CompactionCandidate candidate,
- CompactionStatus currentCompactionStatus,
CompactionTaskStatus latestTaskStatus
)
{
- return true;
+ return Eligibility.OK;
}
/**
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
index f936f3d49a9..af8b32ebe6d 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidate.java
@@ -137,6 +137,20 @@ public class CompactionCandidate
return CompactionStatistics.create(totalBytes, numSegments(),
numIntervals);
}
+ @Nullable
+ public CompactionStatistics getCompactedStats()
+ {
+ return (currentStatus == null || currentStatus.getCompactedStats() == null)
+ ? null : currentStatus.getCompactedStats();
+ }
+
+ @Nullable
+ public CompactionStatistics getUncompactedStats()
+ {
+ return (currentStatus == null || currentStatus.getUncompactedStats() ==
null)
+ ? null : currentStatus.getUncompactedStats();
+ }
+
/**
* Current compaction status of the time chunk corresponding to this
candidate.
*/
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
index cc99e03bf21..bfb69787dd8 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
@@ -21,15 +21,19 @@ package org.apache.druid.server.compaction;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.coordinator.duty.CompactSegments;
+import java.util.Objects;
+
/**
* Policy used by {@link CompactSegments} duty to pick segments for compaction.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "newestSegmentFirst", value =
NewestSegmentFirstPolicy.class),
- @JsonSubTypes.Type(name = "fixedIntervalOrder", value =
FixedIntervalOrderPolicy.class)
+ @JsonSubTypes.Type(name = "fixedIntervalOrder", value =
FixedIntervalOrderPolicy.class),
+ @JsonSubTypes.Type(name = "mostFragmentedFirst", value =
MostFragmentedIntervalFirstPolicy.class)
})
public interface CompactionCandidateSearchPolicy
{
@@ -37,8 +41,8 @@ public interface CompactionCandidateSearchPolicy
* Compares between two compaction candidates. Used to determine the
* order in which segments and intervals should be picked for compaction.
*
- * @return A positive value if {@code candidateA} should be picked first, a
- * negative value if {@code candidateB} should be picked first or zero if the
+ * @return A negative value if {@code candidateA} should be picked first, a
+ * positive value if {@code candidateB} should be picked first or zero if the
* order does not matter.
*/
int compareCandidates(CompactionCandidate candidateA, CompactionCandidate
candidateB);
@@ -47,10 +51,71 @@ public interface CompactionCandidateSearchPolicy
* Checks if the given {@link CompactionCandidate} is eligible for compaction
* in the current iteration. A policy may implement this method to skip
* compacting intervals or segments that do not fulfil some required
criteria.
+ *
+ * @return {@link Eligibility#OK} only if eligible.
*/
- boolean isEligibleForCompaction(
+ Eligibility checkEligibilityForCompaction(
CompactionCandidate candidate,
- CompactionStatus currentCompactionStatus,
CompactionTaskStatus latestTaskStatus
);
+
+ /**
+ * Describes the eligibility of an interval for compaction.
+ */
+ class Eligibility
+ {
+ public static final Eligibility OK = new Eligibility(true, null);
+
+ private final boolean eligible;
+ private final String reason;
+
+ private Eligibility(boolean eligible, String reason)
+ {
+ this.eligible = eligible;
+ this.reason = reason;
+ }
+
+ public boolean isEligible()
+ {
+ return eligible;
+ }
+
+ public String getReason()
+ {
+ return reason;
+ }
+
+ public static Eligibility fail(String messageFormat, Object... args)
+ {
+ return new Eligibility(false, StringUtils.format(messageFormat, args));
+ }
+
+ @Override
+ public boolean equals(Object object)
+ {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ Eligibility that = (Eligibility) object;
+ return eligible == that.eligible && Objects.equals(reason, that.reason);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(eligible, reason);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Eligibility{" +
+ "eligible=" + eligible +
+ ", reason='" + reason + '\'' +
+ '}';
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
index d7e51655861..7d43a09aed8 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatistics.java
@@ -65,4 +65,14 @@ public class CompactionStatistics
numIntervals -= other.getNumIntervals();
numSegments -= other.getNumSegments();
}
+
+ @Override
+ public String toString()
+ {
+ return "CompactionStatistics{" +
+ "totalBytes=" + totalBytes +
+ ", numSegments=" + numSegments +
+ ", numIntervals=" + numIntervals +
+ '}';
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index be4acd00e21..cc52513b16c 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -36,11 +36,17 @@ import
org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.timeline.CompactionState;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
+import org.joda.time.Interval;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -49,7 +55,7 @@ import java.util.stream.Collectors;
*/
public class CompactionStatus
{
- private static final CompactionStatus COMPLETE = new
CompactionStatus(State.COMPLETE, null);
+ private static final CompactionStatus COMPLETE = new
CompactionStatus(State.COMPLETE, null, null, null);
public enum State
{
@@ -62,9 +68,7 @@ public class CompactionStatus
* The order of the checks must be honored while evaluating them.
*/
private static final List<Function<Evaluator, CompactionStatus>> CHECKS =
Arrays.asList(
- Evaluator::inputBytesAreWithinLimit,
Evaluator::segmentsHaveBeenCompactedAtLeastOnce,
- Evaluator::allCandidatesHaveSameCompactionState,
Evaluator::partitionsSpecIsUpToDate,
Evaluator::indexSpecIsUpToDate,
Evaluator::segmentGranularityIsUpToDate,
@@ -78,11 +82,20 @@ public class CompactionStatus
private final State state;
private final String reason;
-
- private CompactionStatus(State state, String reason)
+ private final CompactionStatistics compactedStats;
+ private final CompactionStatistics uncompactedStats;
+
+ private CompactionStatus(
+ State state,
+ String reason,
+ CompactionStatistics compactedStats,
+ CompactionStatistics uncompactedStats
+ )
{
this.state = state;
this.reason = reason;
+ this.compactedStats = compactedStats;
+ this.uncompactedStats = uncompactedStats;
}
public boolean isComplete()
@@ -105,18 +118,45 @@ public class CompactionStatus
return state;
}
+ public CompactionStatistics getCompactedStats()
+ {
+ return compactedStats;
+ }
+
+ public CompactionStatistics getUncompactedStats()
+ {
+ return uncompactedStats;
+ }
+
@Override
public String toString()
{
return "CompactionStatus{" +
"state=" + state +
", reason=" + reason +
+ ", compactedStats=" + compactedStats +
+ ", uncompactedStats=" + uncompactedStats +
'}';
}
public static CompactionStatus pending(String reasonFormat, Object... args)
{
- return new CompactionStatus(State.PENDING,
StringUtils.format(reasonFormat, args));
+ return new CompactionStatus(State.PENDING,
StringUtils.format(reasonFormat, args), null, null);
+ }
+
+ public static CompactionStatus pending(
+ CompactionStatistics compactedStats,
+ CompactionStatistics uncompactedStats,
+ String reasonFormat,
+ Object... args
+ )
+ {
+ return new CompactionStatus(
+ State.PENDING,
+ StringUtils.format(reasonFormat, args),
+ compactedStats,
+ uncompactedStats
+ );
}
/**
@@ -193,34 +233,26 @@ public class CompactionStatus
public static CompactionStatus skipped(String reasonFormat, Object... args)
{
- return new CompactionStatus(State.SKIPPED,
StringUtils.format(reasonFormat, args));
+ return new CompactionStatus(State.SKIPPED,
StringUtils.format(reasonFormat, args), null, null);
}
public static CompactionStatus running(String message)
{
- return new CompactionStatus(State.RUNNING, message);
- }
-
- public static CompactionStatus complete(String message)
- {
- return new CompactionStatus(State.COMPLETE, message);
+ return new CompactionStatus(State.RUNNING, message, null, null);
}
/**
* Determines the CompactionStatus of the given candidate segments by
evaluating
* the {@link #CHECKS} one by one. If any check returns an incomplete status,
- * further checks are not performed and the incomplete status is returned.
+ * further checks are still performed to determine the number of uncompacted
+ * segments but only the first incomplete status is returned.
*/
static CompactionStatus compute(
CompactionCandidate candidateSegments,
DataSourceCompactionConfig config
)
{
- final Evaluator evaluator = new Evaluator(candidateSegments, config);
- return CHECKS.stream()
- .map(f -> f.apply(evaluator))
- .filter(status -> !status.isComplete())
- .findFirst().orElse(COMPLETE);
+ return new Evaluator(candidateSegments, config).evaluate();
}
@Nullable
@@ -288,58 +320,127 @@ public class CompactionStatus
}
/**
- * Evaluates {@link #CHECKS} to determine the compaction status.
+ * Evaluates {@link #CHECKS} to determine the compaction status of a
+ * {@link CompactionCandidate}.
*/
private static class Evaluator
{
private final DataSourceCompactionConfig compactionConfig;
private final CompactionCandidate candidateSegments;
- private final CompactionState lastCompactionState;
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
- private final UserCompactionTaskGranularityConfig existingGranularitySpec;
private final UserCompactionTaskGranularityConfig
configuredGranularitySpec;
+ private final List<DataSegment> uncompactedSegments = new ArrayList<>();
+ private final Map<CompactionState, List<DataSegment>>
unknownStateToSegments = new HashMap<>();
+
private Evaluator(
CompactionCandidate candidateSegments,
DataSourceCompactionConfig compactionConfig
)
{
this.candidateSegments = candidateSegments;
- this.lastCompactionState =
candidateSegments.getSegments().get(0).getLastCompactionState();
this.compactionConfig = compactionConfig;
this.tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(compactionConfig);
this.configuredGranularitySpec = compactionConfig.getGranularitySpec();
- if (lastCompactionState == null) {
- this.existingGranularitySpec = null;
+ }
+
+ private CompactionStatus evaluate()
+ {
+ final CompactionStatus inputBytesCheck = inputBytesAreWithinLimit();
+ if (inputBytesCheck.isSkipped()) {
+ return inputBytesCheck;
+ }
+
+ final List<String> reasonsForCompaction =
+ CHECKS.stream()
+ .map(f -> f.apply(this))
+ .filter(status -> !status.isComplete())
+ .map(CompactionStatus::getReason)
+ .collect(Collectors.toList());
+
+ // Consider segments which have passed all checks to be compacted
+ final List<DataSegment> compactedSegments = unknownStateToSegments
+ .values()
+ .stream()
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+
+ if (reasonsForCompaction.isEmpty()) {
+ return COMPLETE;
} else {
- this.existingGranularitySpec =
UserCompactionTaskGranularityConfig.from(
- lastCompactionState.getGranularitySpec()
+ return CompactionStatus.pending(
+ createStats(compactedSegments),
+ createStats(uncompactedSegments),
+ reasonsForCompaction.get(0)
);
}
}
private CompactionStatus segmentsHaveBeenCompactedAtLeastOnce()
{
- if (lastCompactionState == null) {
- return CompactionStatus.pending("not compacted yet");
- } else {
- return COMPLETE;
+ // Identify the compaction states of all the segments
+ for (DataSegment segment : candidateSegments.getSegments()) {
+ final CompactionState segmentState = segment.getLastCompactionState();
+ if (segmentState == null) {
+ uncompactedSegments.add(segment);
+ } else {
+ unknownStateToSegments.computeIfAbsent(segmentState, s -> new
ArrayList<>()).add(segment);
+ }
}
- }
- private CompactionStatus allCandidatesHaveSameCompactionState()
- {
- final boolean allHaveSameCompactionState =
candidateSegments.getSegments().stream().allMatch(
- segment ->
lastCompactionState.equals(segment.getLastCompactionState())
- );
- if (allHaveSameCompactionState) {
+ if (uncompactedSegments.isEmpty()) {
return COMPLETE;
} else {
- return CompactionStatus.pending("segments have different last
compaction states");
+ return CompactionStatus.pending("not compacted yet");
}
}
private CompactionStatus partitionsSpecIsUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::partitionsSpecIsUpToDate);
+ }
+
+ private CompactionStatus indexSpecIsUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::indexSpecIsUpToDate);
+ }
+
+ private CompactionStatus projectionsAreUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::projectionsAreUpToDate);
+ }
+
+ private CompactionStatus segmentGranularityIsUpToDate()
+ {
+ return
evaluateForAllCompactionStates(this::segmentGranularityIsUpToDate);
+ }
+
+ private CompactionStatus rollupIsUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::rollupIsUpToDate);
+ }
+
+ private CompactionStatus queryGranularityIsUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::queryGranularityIsUpToDate);
+ }
+
+ private CompactionStatus dimensionsSpecIsUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::dimensionsSpecIsUpToDate);
+ }
+
+ private CompactionStatus metricsSpecIsUpToDate()
+ {
+ return evaluateForAllCompactionStates(this::metricsSpecIsUpToDate);
+ }
+
+ private CompactionStatus transformSpecFilterIsUpToDate()
+ {
+ return
evaluateForAllCompactionStates(this::transformSpecFilterIsUpToDate);
+ }
+
+ private CompactionStatus partitionsSpecIsUpToDate(CompactionState
lastCompactionState)
{
PartitionsSpec existingPartionsSpec =
lastCompactionState.getPartitionsSpec();
if (existingPartionsSpec instanceof DimensionRangePartitionsSpec) {
@@ -357,7 +458,7 @@ public class CompactionStatus
);
}
- private CompactionStatus indexSpecIsUpToDate()
+ private CompactionStatus indexSpecIsUpToDate(CompactionState
lastCompactionState)
{
return CompactionStatus.completeIfNullOrEqual(
"indexSpec",
@@ -367,7 +468,7 @@ public class CompactionStatus
);
}
- private CompactionStatus projectionsAreUpToDate()
+ private CompactionStatus projectionsAreUpToDate(CompactionState
lastCompactionState)
{
return CompactionStatus.completeIfNullOrEqual(
"projections",
@@ -390,7 +491,7 @@ public class CompactionStatus
}
}
- private CompactionStatus segmentGranularityIsUpToDate()
+ private CompactionStatus segmentGranularityIsUpToDate(CompactionState
lastCompactionState)
{
if (configuredGranularitySpec == null
|| configuredGranularitySpec.getSegmentGranularity() == null) {
@@ -398,6 +499,7 @@ public class CompactionStatus
}
final Granularity configuredSegmentGranularity =
configuredGranularitySpec.getSegmentGranularity();
+ final UserCompactionTaskGranularityConfig existingGranularitySpec =
getGranularitySpec(lastCompactionState);
final Granularity existingSegmentGranularity
= existingGranularitySpec == null ? null :
existingGranularitySpec.getSegmentGranularity();
@@ -406,7 +508,8 @@ public class CompactionStatus
} else if (existingSegmentGranularity == null) {
// Candidate segments were compacted without segment granularity
specified
// Check if the segments already have the desired segment granularity
- boolean needsCompaction =
candidateSegments.getSegments().stream().anyMatch(
+ final List<DataSegment> segmentsForState =
unknownStateToSegments.get(lastCompactionState);
+ boolean needsCompaction = segmentsForState.stream().anyMatch(
segment ->
!configuredSegmentGranularity.isAligned(segment.getInterval())
);
if (needsCompaction) {
@@ -427,11 +530,13 @@ public class CompactionStatus
return COMPLETE;
}
- private CompactionStatus rollupIsUpToDate()
+ private CompactionStatus rollupIsUpToDate(CompactionState
lastCompactionState)
{
if (configuredGranularitySpec == null) {
return COMPLETE;
} else {
+ final UserCompactionTaskGranularityConfig existingGranularitySpec
+ = getGranularitySpec(lastCompactionState);
return CompactionStatus.completeIfNullOrEqual(
"rollup",
configuredGranularitySpec.isRollup(),
@@ -441,11 +546,13 @@ public class CompactionStatus
}
}
- private CompactionStatus queryGranularityIsUpToDate()
+ private CompactionStatus queryGranularityIsUpToDate(CompactionState
lastCompactionState)
{
if (configuredGranularitySpec == null) {
return COMPLETE;
} else {
+ final UserCompactionTaskGranularityConfig existingGranularitySpec
+ = getGranularitySpec(lastCompactionState);
return CompactionStatus.completeIfNullOrEqual(
"queryGranularity",
configuredGranularitySpec.getQueryGranularity(),
@@ -460,7 +567,7 @@ public class CompactionStatus
* which can create a mismatch between expected and actual order of
dimensions. Partition dimensions are separately
* covered in {@link Evaluator#partitionsSpecIsUpToDate()} check.
*/
- private CompactionStatus dimensionsSpecIsUpToDate()
+ private CompactionStatus dimensionsSpecIsUpToDate(CompactionState
lastCompactionState)
{
if (compactionConfig.getDimensionsSpec() == null) {
return COMPLETE;
@@ -488,7 +595,7 @@ public class CompactionStatus
}
}
- private CompactionStatus metricsSpecIsUpToDate()
+ private CompactionStatus metricsSpecIsUpToDate(CompactionState
lastCompactionState)
{
final AggregatorFactory[] configuredMetricsSpec =
compactionConfig.getMetricsSpec();
if (ArrayUtils.isEmpty(configuredMetricsSpec)) {
@@ -512,7 +619,7 @@ public class CompactionStatus
}
}
- private CompactionStatus transformSpecFilterIsUpToDate()
+ private CompactionStatus transformSpecFilterIsUpToDate(CompactionState
lastCompactionState)
{
if (compactionConfig.getTransformSpec() == null) {
return COMPLETE;
@@ -526,5 +633,45 @@ public class CompactionStatus
String::valueOf
);
}
+
+ /**
+ * Evaluates the given check for each entry in the {@link
#unknownStateToSegments}.
+ * If any entry fails the given check by returning a status which is not
+ * COMPLETE, all the segments with that state are moved to {@link
#uncompactedSegments}.
+ *
+ * @return The first status which is not COMPLETE.
+ */
+ private CompactionStatus evaluateForAllCompactionStates(
+ Function<CompactionState, CompactionStatus> check
+ )
+ {
+ CompactionStatus firstIncompleteStatus = null;
+ for (CompactionState state :
List.copyOf(unknownStateToSegments.keySet())) {
+ final CompactionStatus status = check.apply(state);
+ if (!status.isComplete()) {
+ uncompactedSegments.addAll(unknownStateToSegments.remove(state));
+ if (firstIncompleteStatus == null) {
+ firstIncompleteStatus = status;
+ }
+ }
+ }
+
+ return firstIncompleteStatus == null ? COMPLETE : firstIncompleteStatus;
+ }
+
+ private static UserCompactionTaskGranularityConfig getGranularitySpec(
+ CompactionState compactionState
+ )
+ {
+ return
UserCompactionTaskGranularityConfig.from(compactionState.getGranularitySpec());
+ }
+
+ private static CompactionStatistics createStats(List<DataSegment> segments)
+ {
+ final Set<Interval> segmentIntervals =
+
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet());
+ final long totalBytes =
segments.stream().mapToLong(DataSegment::getSize).sum();
+ return CompactionStatistics.create(totalBytes, segments.size(),
segmentIntervals.size());
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
index 401f413e7fd..1dc409e7361 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
@@ -96,17 +96,19 @@ public class CompactionStatusTracker
if (lastTaskStatus != null
&& lastTaskStatus.getState() == TaskState.SUCCESS
&& snapshotTime != null &&
snapshotTime.isBefore(lastTaskStatus.getUpdatedTime())) {
- return CompactionStatus.complete(
+ return CompactionStatus.skipped(
"Segment timeline not updated since last compaction task succeeded"
);
}
// Skip intervals that have been filtered out by the policy
- if (!searchPolicy.isEligibleForCompaction(candidate,
CompactionStatus.pending(""), lastTaskStatus)) {
- return CompactionStatus.skipped("Rejected by search policy");
+ final CompactionCandidateSearchPolicy.Eligibility eligibility
+ = searchPolicy.checkEligibilityForCompaction(candidate,
lastTaskStatus);
+ if (eligibility.isEligible()) {
+ return CompactionStatus.pending("Not compacted yet");
+ } else {
+ return CompactionStatus.skipped("Rejected by search policy: %s",
eligibility.getReason());
}
-
- return CompactionStatus.pending("Not compacted yet");
}
/**
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
index 3e8726471b1..24a2f001afe 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
@@ -56,13 +56,14 @@ public class FixedIntervalOrderPolicy implements
CompactionCandidateSearchPolicy
}
@Override
- public boolean isEligibleForCompaction(
+ public Eligibility checkEligibilityForCompaction(
CompactionCandidate candidate,
- CompactionStatus currentCompactionStatus,
CompactionTaskStatus latestTaskStatus
)
{
- return findIndex(candidate) < Integer.MAX_VALUE;
+ return findIndex(candidate) < Integer.MAX_VALUE
+ ? Eligibility.OK
+ : Eligibility.fail("Datasource/Interval is not in the list of
'eligibleCandidates'");
}
private int findIndex(CompactionCandidate candidate)
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
new file mode 100644
index 00000000000..38e534c8273
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+
+import javax.annotation.Nullable;
+import java.util.Comparator;
+
+/**
+ * Experimental {@link CompactionCandidateSearchPolicy} which prioritizes
compaction
+ * of intervals with the largest number of small uncompacted segments.
+ * <p>
+ * This policy favors cluster stability (by prioritizing reduction of segment
+ * count) over performance of queries on newer intervals. For the latter, use
+ * {@link NewestSegmentFirstPolicy}.
+ */
+@UnstableApi
+public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
+{
+ private static final HumanReadableBytes SIZE_2_GB = new
HumanReadableBytes("2GiB");
+ private static final HumanReadableBytes SIZE_10_MB = new
HumanReadableBytes("10MiB");
+
+ private final int minUncompactedCount;
+ private final HumanReadableBytes minUncompactedBytes;
+ private final HumanReadableBytes maxAverageUncompactedBytesPerSegment;
+
+ @JsonCreator
+ public MostFragmentedIntervalFirstPolicy(
+ @JsonProperty("minUncompactedCount") @Nullable Integer
minUncompactedCount,
+ @JsonProperty("minUncompactedBytes") @Nullable HumanReadableBytes
minUncompactedBytes,
+ @JsonProperty("maxAverageUncompactedBytesPerSegment") @Nullable
+ HumanReadableBytes maxAverageUncompactedBytesPerSegment,
+ @JsonProperty("priorityDatasource") @Nullable String priorityDatasource
+ )
+ {
+ super(priorityDatasource);
+
+ InvalidInput.conditionalException(
+ minUncompactedCount == null || minUncompactedCount > 0,
+ "'minUncompactedCount'[%s] must be greater than 0",
+ minUncompactedCount
+ );
+ InvalidInput.conditionalException(
+ maxAverageUncompactedBytesPerSegment == null ||
maxAverageUncompactedBytesPerSegment.getBytes() > 0,
+ "'minUncompactedCount'[%s] must be greater than 0",
+ maxAverageUncompactedBytesPerSegment
+ );
+
+ this.minUncompactedCount = Configs.valueOrDefault(minUncompactedCount,
100);
+ this.minUncompactedBytes = Configs.valueOrDefault(minUncompactedBytes,
SIZE_10_MB);
+ this.maxAverageUncompactedBytesPerSegment
+ = Configs.valueOrDefault(maxAverageUncompactedBytesPerSegment,
SIZE_2_GB);
+ }
+
+ /**
+ * Minimum number of uncompacted segments that must be present in an interval
+ * to make it eligible for compaction.
+ */
+ @JsonProperty
+ public int getMinUncompactedCount()
+ {
+ return minUncompactedCount;
+ }
+
+ /**
+ * Minimum total bytes of uncompacted segments that must be present in an
+ * interval to make it eligible for compaction. Default value is {@link
#SIZE_10_MB}.
+ */
+ @JsonProperty
+ public HumanReadableBytes getMinUncompactedBytes()
+ {
+ return minUncompactedBytes;
+ }
+
+ /**
+ * Maximum average size of uncompacted segments in an interval eligible for
+ * compaction. Default value is {@link #SIZE_2_GB}.
+ */
+ @JsonProperty
+ public HumanReadableBytes getMaxAverageUncompactedBytesPerSegment()
+ {
+ return maxAverageUncompactedBytesPerSegment;
+ }
+
+ @Override
+ protected Comparator<CompactionCandidate> getSegmentComparator()
+ {
+ return this::compare;
+ }
+
+ private int compare(CompactionCandidate candidateA, CompactionCandidate
candidateB)
+ {
+ final double fragmentationDiff
+ = computeFragmentationIndex(candidateB) -
computeFragmentationIndex(candidateA);
+ return (int) fragmentationDiff;
+ }
+
+ @Override
+ public Eligibility checkEligibilityForCompaction(
+ CompactionCandidate candidate,
+ CompactionTaskStatus latestTaskStatus
+ )
+ {
+ final CompactionStatistics uncompacted = candidate.getUncompactedStats();
+ if (uncompacted == null) {
+ return Eligibility.OK;
+ } else if (uncompacted.getNumSegments() < 1) {
+ return Eligibility.fail("No uncompacted segments in interval");
+ } else if (uncompacted.getNumSegments() < minUncompactedCount) {
+ return Eligibility.fail(
+ "Uncompacted segments[%,d] in interval must be at least [%,d]",
+ uncompacted.getNumSegments(), minUncompactedCount
+ );
+ } else if (uncompacted.getTotalBytes() < minUncompactedBytes.getBytes()) {
+ return Eligibility.fail(
+ "Uncompacted bytes[%,d] in interval must be at least [%,d]",
+ uncompacted.getTotalBytes(), minUncompactedBytes.getBytes()
+ );
+ }
+
+ final long avgSegmentSize = (uncompacted.getTotalBytes() /
uncompacted.getNumSegments());
+ if (avgSegmentSize > maxAverageUncompactedBytesPerSegment.getBytes()) {
+ return Eligibility.fail(
+ "Average size[%,d] of uncompacted segments in interval must be at
most [%,d]",
+ avgSegmentSize, maxAverageUncompactedBytesPerSegment.getBytes()
+ );
+ } else {
+ return Eligibility.OK;
+ }
+ }
+
+ /**
+ * Computes the degree of fragmentation in the interval of the given
compaction
+ * candidate. Calculated as the number of uncompacted segments plus an
additional
+ * term that captures the "smallness" of segments in that interval.
+ * A higher fragmentation index causes the candidate to be higher in priority
+ * for compaction.
+ */
+ private double computeFragmentationIndex(CompactionCandidate candidate)
+ {
+ final CompactionStatistics uncompacted = candidate.getUncompactedStats();
+ if (uncompacted == null || uncompacted.getNumSegments() < 1 ||
uncompacted.getTotalBytes() < 1) {
+ return 0;
+ }
+
+ final long avgUncompactedSize = Math.max(1, uncompacted.getTotalBytes() /
uncompacted.getNumSegments());
+
+ // Fragmentation index increases as uncompacted segment count increases
+ double segmentCountTerm = uncompacted.getNumSegments();
+
+ // Fragmentation index increases as avg uncompacted segment size decreases
+ double segmentSizeTerm =
+ (1.0f * minUncompactedCount *
maxAverageUncompactedBytesPerSegment.getBytes()) / avgUncompactedSize;
+
+ return segmentCountTerm + segmentSizeTerm;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
index c0496b0705c..1314a1a0bc7 100644
---
a/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
+++
b/server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTrackerTest.java
@@ -120,7 +120,7 @@ public class CompactionStatusTrackerTest
statusTracker.onTaskFinished("task1", TaskStatus.success("task1"));
status = statusTracker.computeCompactionStatus(candidateSegments, policy);
- Assert.assertEquals(CompactionStatus.State.COMPLETE, status.getState());
+ Assert.assertEquals(CompactionStatus.State.SKIPPED, status.getState());
Assert.assertEquals(
"Segment timeline not updated since last compaction task succeeded",
status.getReason()
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
new file mode 100644
index 00000000000..594fe91020b
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.List;
+
+public class MostFragmentedIntervalFirstPolicyTest
+{
+ private static final DataSegment SEGMENT =
+
CreateDataSegments.ofDatasource(TestDataSource.WIKI).eachOfSizeInMb(100).get(0);
+
+ @Test
+ public void test_thresholdValues_ofDefaultPolicy()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(null, null, null, null);
+ Assertions.assertEquals(100, policy.getMinUncompactedCount());
+ Assertions.assertEquals(new HumanReadableBytes("10MiB"),
policy.getMinUncompactedBytes());
+ Assertions.assertEquals(new HumanReadableBytes("2GiB"),
policy.getMaxAverageUncompactedBytesPerSegment());
+ Assertions.assertNull(policy.getPriorityDatasource());
+ }
+
+ @Test
+ public void
test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanCutoff()
+ {
+ final int minUncompactedCount = 10_000;
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ minUncompactedCount,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(10_000),
+ null
+ );
+
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.fail(
+ "Uncompacted segments[1] in interval must be at least [10,000]"
+ ),
+ policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
+ );
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.OK,
+ policy.checkEligibilityForCompaction(createCandidate(10_001, 100L),
null)
+ );
+ }
+
+ @Test
+ public void
test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanCutoff()
+ {
+ final HumanReadableBytes minUncompactedBytes =
HumanReadableBytes.valueOf(10_000);
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ minUncompactedBytes,
+ HumanReadableBytes.valueOf(10_000),
+ null
+ );
+
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.fail(
+ "Uncompacted bytes[100] in interval must be at least [10,000]"
+ ),
+ policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
+ );
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.OK,
+ policy.checkEligibilityForCompaction(createCandidate(100, 10_000L),
null)
+ );
+ }
+
+ @Test
+ public void
test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThanCutoff()
+ {
+ final HumanReadableBytes maxAvgSegmentSize =
HumanReadableBytes.valueOf(100);
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ HumanReadableBytes.valueOf(100),
+ maxAvgSegmentSize,
+ null
+ );
+
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.fail(
+ "Average size[10,000] of uncompacted segments in interval must be
at most [100]"
+ ),
+ policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null)
+ );
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.OK,
+ policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
+ );
+ }
+
+ @Test
+ public void
test_policy_favorsIntervalWithMoreUncompactedSegments_ifTotalBytesIsEqual()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(10_000),
+ null
+ );
+
+ final CompactionCandidate candidateA = createCandidate(1, 1000L);
+ final CompactionCandidate candidateB = createCandidate(2, 500L);
+
+ verifyCandidateIsEligible(candidateA, policy);
+ verifyCandidateIsEligible(candidateB, policy);
+
+ Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) >
0);
+ Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) <
0);
+ }
+
+ @Test
+ public void
test_policy_favorsIntervalWithMoreUncompactedSegments_ifAverageSizeIsEqual()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(10_000),
+ null
+ );
+
+ final CompactionCandidate candidateA = createCandidate(1, 1000L);
+ final CompactionCandidate candidateB = createCandidate(2, 1000L);
+
+ verifyCandidateIsEligible(candidateA, policy);
+ verifyCandidateIsEligible(candidateB, policy);
+
+ Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) >
0);
+ Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) <
0);
+ }
+
+ @Test
+ public void test_policy_favorsIntervalWithSmallerSegments_ifCountIsEqual()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 1,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(10_000),
+ null
+ );
+
+ final CompactionCandidate candidateA = createCandidate(10, 500L);
+ final CompactionCandidate candidateB = createCandidate(10, 1000L);
+
+ verifyCandidateIsEligible(candidateA, policy);
+ verifyCandidateIsEligible(candidateB, policy);
+
+ Assertions.assertTrue(policy.compareCandidates(candidateA, candidateB) <
0);
+ Assertions.assertTrue(policy.compareCandidates(candidateB, candidateA) >
0);
+ }
+
+ @Test
+ public void
test_compareCandidates_returnsZeroIfSegmentCountAndAvgSizeScaleEquivalently()
+ {
+ final MostFragmentedIntervalFirstPolicy policy = new
MostFragmentedIntervalFirstPolicy(
+ 100,
+ HumanReadableBytes.valueOf(1),
+ HumanReadableBytes.valueOf(100),
+ null
+ );
+
+ final CompactionCandidate candidateA = createCandidate(100, 25);
+ final CompactionCandidate candidateB = createCandidate(400, 100);
+
+ verifyCandidateIsEligible(candidateA, policy);
+ verifyCandidateIsEligible(candidateB, policy);
+
+ Assertions.assertEquals(0, policy.compareCandidates(candidateA,
candidateB));
+ Assertions.assertEquals(0, policy.compareCandidates(candidateB,
candidateA));
+ }
+
+ private CompactionCandidate createCandidate(int numSegments, long
avgSizeBytes)
+ {
+ final CompactionStatistics dummyCompactedStats =
CompactionStatistics.create(1L, 1L, 1L);
+ final CompactionStatistics uncompactedStats = CompactionStatistics.create(
+ avgSizeBytes * numSegments,
+ numSegments,
+ 1L
+ );
+ return CompactionCandidate.from(List.of(SEGMENT), null)
+ .withCurrentStatus(CompactionStatus.pending(dummyCompactedStats,
uncompactedStats, ""));
+ }
+
+ private void verifyCandidateIsEligible(CompactionCandidate candidate,
MostFragmentedIntervalFirstPolicy policy)
+ {
+ Assertions.assertEquals(
+ CompactionCandidateSearchPolicy.Eligibility.OK,
+ policy.checkEligibilityForCompaction(candidate, null)
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]