This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 856a878a23 [core] Introduce 'compaction.incremental-size-threshold' to
force full compaction (#6754)
856a878a23 is described below
commit 856a878a23d240db7a9f0ae1247ba5160833ce41
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 10 17:27:38 2025 +0800
[core] Introduce 'compaction.incremental-size-threshold' to force full
compaction (#6754)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 12 +++
...ompactTrigger.java => EarlyFullCompaction.java} | 37 ++++++---
.../mergetree/compact/UniversalCompaction.java | 18 ++---
.../paimon/operation/KeyValueFileStoreWrite.java | 6 +-
...iggerTest.java => EarlyFullCompactionTest.java} | 87 +++++++++++++++++-----
.../mergetree/compact/UniversalCompactionTest.java | 6 +-
7 files changed, 127 insertions(+), 45 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b50b9f2c9a..9b2bf644fc 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -272,6 +272,12 @@ under the License.
<td>Boolean</td>
<td>If set to true, compaction strategy will always include all
level 0 files in candidates.</td>
</tr>
+ <tr>
+ <td><h5>compaction.incremental-size-threshold</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>When incremental size is bigger than this threshold, force a
full compaction.</td>
+ </tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index b0862356f5..19b44d2a2a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -761,6 +761,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"When total size is smaller than this threshold,
force a full compaction.");
+ public static final ConfigOption<MemorySize>
COMPACTION_INCREMENTAL_SIZE_THRESHOLD =
+ key("compaction.incremental-size-threshold")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ "When incremental size is bigger than this
threshold, force a full compaction.");
+
public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
key("compaction.min.file-num")
.intType()
@@ -2584,6 +2591,11 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_TOTAL_SIZE_THRESHOLD);
}
+ @Nullable
+ public MemorySize compactionIncrementalSizeThreshold() {
+ return options.get(COMPACTION_INCREMENTAL_SIZE_THRESHOLD);
+ }
+
public int numSortedRunStopTrigger() {
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
if (stopTrigger == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
similarity index 67%
rename from
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
rename to
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
index 931b07630d..74fb003ed3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
@@ -33,32 +33,38 @@ import java.time.Duration;
import java.util.List;
import java.util.Optional;
-/** Trigger full compaction. */
-public class FullCompactTrigger {
+/** Early trigger full compaction. */
+public class EarlyFullCompaction {
- private static final Logger LOG =
LoggerFactory.getLogger(FullCompactTrigger.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(EarlyFullCompaction.class);
@Nullable private final Long fullCompactionInterval;
@Nullable private final Long totalSizeThreshold;
+ @Nullable private final Long incrementalSizeThreshold;
@Nullable private Long lastFullCompaction;
- public FullCompactTrigger(
- @Nullable Long fullCompactionInterval, @Nullable Long
totalSizeThreshold) {
+ public EarlyFullCompaction(
+ @Nullable Long fullCompactionInterval,
+ @Nullable Long totalSizeThreshold,
+ @Nullable Long incrementalSizeThreshold) {
this.fullCompactionInterval = fullCompactionInterval;
this.totalSizeThreshold = totalSizeThreshold;
+ this.incrementalSizeThreshold = incrementalSizeThreshold;
}
@Nullable
- public static FullCompactTrigger create(CoreOptions options) {
+ public static EarlyFullCompaction create(CoreOptions options) {
Duration interval = options.optimizedCompactionInterval();
- MemorySize threshold = options.compactionTotalSizeThreshold();
- if (interval == null && threshold == null) {
+ MemorySize totalThreshold = options.compactionTotalSizeThreshold();
+ MemorySize incrementalThreshold =
options.compactionIncrementalSizeThreshold();
+ if (interval == null && totalThreshold == null && incrementalThreshold
== null) {
return null;
}
- return new FullCompactTrigger(
+ return new EarlyFullCompaction(
interval == null ? null : interval.toMillis(),
- threshold == null ? null : threshold.getBytes());
+ totalThreshold == null ? null : totalThreshold.getBytes(),
+ incrementalThreshold == null ? null :
incrementalThreshold.getBytes());
}
public Optional<CompactUnit> tryFullCompact(int numLevels,
List<LevelSortedRun> runs) {
@@ -84,6 +90,17 @@ public class FullCompactTrigger {
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
}
}
+ if (incrementalSizeThreshold != null) {
+ long incrementalSize = 0;
+ for (LevelSortedRun run : runs) {
+ if (run.level() != maxLevel) {
+ incrementalSize += run.run().totalSize();
+ }
+ }
+ if (incrementalSize > incrementalSizeThreshold) {
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ }
+ }
return Optional.empty();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 6d938f0cae..a40e985e9d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -47,19 +47,19 @@ public class UniversalCompaction implements CompactStrategy
{
private final int sizeRatio;
private final int numRunCompactionTrigger;
- @Nullable private final FullCompactTrigger fullCompactTrigger;
+ @Nullable private final EarlyFullCompaction earlyFullCompact;
@Nullable private final OffPeakHours offPeakHours;
public UniversalCompaction(
int maxSizeAmp,
int sizeRatio,
int numRunCompactionTrigger,
- @Nullable FullCompactTrigger fullCompactTrigger,
+ @Nullable EarlyFullCompaction earlyFullCompact,
@Nullable OffPeakHours offPeakHours) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
this.numRunCompactionTrigger = numRunCompactionTrigger;
- this.fullCompactTrigger = fullCompactTrigger;
+ this.earlyFullCompact = earlyFullCompact;
this.offPeakHours = offPeakHours;
}
@@ -68,8 +68,8 @@ public class UniversalCompaction implements CompactStrategy {
int maxLevel = numLevels - 1;
// 0 try full compaction by trigger
- if (fullCompactTrigger != null) {
- Optional<CompactUnit> unit =
fullCompactTrigger.tryFullCompact(numLevels, runs);
+ if (earlyFullCompact != null) {
+ Optional<CompactUnit> unit =
earlyFullCompact.tryFullCompact(numLevels, runs);
if (unit.isPresent()) {
return unit;
}
@@ -137,8 +137,8 @@ public class UniversalCompaction implements CompactStrategy
{
// size amplification = percentage of additional size
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
- if (fullCompactTrigger != null) {
- fullCompactTrigger.updateLastFullCompaction();
+ if (earlyFullCompact != null) {
+ earlyFullCompact.updateLastFullCompaction();
}
return CompactUnit.fromLevelRuns(maxLevel, runs);
}
@@ -216,8 +216,8 @@ public class UniversalCompaction implements CompactStrategy
{
}
if (runCount == runs.size()) {
- if (fullCompactTrigger != null) {
- fullCompactTrigger.updateLastFullCompaction();
+ if (earlyFullCompact != null) {
+ earlyFullCompact.updateLastFullCompaction();
}
outputLevel = maxLevel;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 7d9295c700..177f093583 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -49,9 +49,9 @@ import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeWriter;
import org.apache.paimon.mergetree.compact.CompactRewriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
+import org.apache.paimon.mergetree.compact.EarlyFullCompaction;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
-import org.apache.paimon.mergetree.compact.FullCompactTrigger;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
@@ -254,7 +254,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- FullCompactTrigger.create(options),
+ EarlyFullCompaction.create(options),
OffPeakHours.create(options)),
compactMaxInterval);
}
@@ -264,7 +264,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- FullCompactTrigger.create(options),
+ EarlyFullCompaction.create(options),
OffPeakHours.create(options));
if (options.compactionForceUpLevel0()) {
return new ForceUpLevel0Compaction(universal, null);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
similarity index 66%
rename from
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
index 54759e4389..35665ed690 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
@@ -28,6 +28,8 @@ import org.apache.paimon.options.Options;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
@@ -42,20 +44,20 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-/** Tests for {@link FullCompactTrigger}. */
-public class FullCompactTriggerTest {
+/** Tests for {@link EarlyFullCompaction}. */
+public class EarlyFullCompactionTest {
@Test
public void testCreateNoOptions() {
CoreOptions options = new CoreOptions(new Options());
- assertThat(FullCompactTrigger.create(options)).isNull();
+ assertThat(EarlyFullCompaction.create(options)).isNull();
}
@Test
public void testCreateWithInterval() {
Options options = new Options();
options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
- FullCompactTrigger trigger = FullCompactTrigger.create(new
CoreOptions(options));
+ EarlyFullCompaction trigger = EarlyFullCompaction.create(new
CoreOptions(options));
assertThat(trigger).isNotNull();
}
@@ -63,7 +65,7 @@ public class FullCompactTriggerTest {
public void testCreateWithThreshold() {
Options options = new Options();
options.set(COMPACTION_TOTAL_SIZE_THRESHOLD,
MemorySize.ofMebiBytes(100));
- FullCompactTrigger trigger = FullCompactTrigger.create(new
CoreOptions(options));
+ EarlyFullCompaction trigger = EarlyFullCompaction.create(new
CoreOptions(options));
assertThat(trigger).isNotNull();
}
@@ -72,26 +74,27 @@ public class FullCompactTriggerTest {
Options options = new Options();
options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
options.set(COMPACTION_TOTAL_SIZE_THRESHOLD,
MemorySize.ofMebiBytes(100));
- FullCompactTrigger trigger = FullCompactTrigger.create(new
CoreOptions(options));
+ EarlyFullCompaction trigger = EarlyFullCompaction.create(new
CoreOptions(options));
assertThat(trigger).isNotNull();
}
@Test
public void testSingleRun() {
- FullCompactTrigger trigger = new FullCompactTrigger(1000L, 1000L);
+ EarlyFullCompaction trigger = new EarlyFullCompaction(1000L, 1000L,
null);
assertThat(trigger.tryFullCompact(5, createRuns(100))).isEmpty();
}
@Test
public void testNoOptions() {
- FullCompactTrigger trigger = new FullCompactTrigger(null, null);
+ EarlyFullCompaction trigger = new EarlyFullCompaction(null, null,
null);
assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
}
@Test
public void testInterval() {
AtomicLong time = new AtomicLong(10_000);
- TestableFullCompactTrigger trigger = new
TestableFullCompactTrigger(1000L, null, time);
+ TestableEarlyFullCompaction trigger =
+ new TestableEarlyFullCompaction(1000L, null, null, time);
// First time, should trigger
Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(100, 200));
@@ -113,8 +116,8 @@ public class FullCompactTriggerTest {
}
@Test
- public void testThreshold() {
- FullCompactTrigger trigger = new FullCompactTrigger(null, 1000L);
+ public void testTotalSizeThreshold() {
+ EarlyFullCompaction trigger = new EarlyFullCompaction(null, 1000L,
null);
// total size 300 < 1000, should trigger
Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(100, 200));
@@ -129,11 +132,48 @@ public class FullCompactTriggerTest {
assertThat(trigger.tryFullCompact(5, createRuns(500, 1000))).isEmpty();
}
+ @Test
+ public void testIncrementalSizeThreshold() {
+ EarlyFullCompaction trigger = new EarlyFullCompaction(null, null,
500L);
+
+ // trigger, no max level
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(400, 200));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ assertThat(compactUnit.get().files()).hasSize(2);
+
+ // no trigger, no max level
+ compactUnit = trigger.tryFullCompact(5, createRuns(100, 200));
+ assertThat(compactUnit).isEmpty();
+
+ // no trigger, with max level
+ List<LevelSortedRun> runs =
+ Arrays.asList(
+ createLevelSortedRun(100),
+ createLevelSortedRun(300),
+ createLevelSortedRun(4, 500));
+ compactUnit = trigger.tryFullCompact(5, runs);
+ assertThat(compactUnit).isEmpty();
+
+ // trigger, with max level
+ runs =
+ Arrays.asList(
+ createLevelSortedRun(100),
+ createLevelSortedRun(300),
+ createLevelSortedRun(300),
+ createLevelSortedRun(4, 500));
+ compactUnit = trigger.tryFullCompact(5, runs);
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ assertThat(compactUnit.get().files()).hasSize(4);
+ }
+
@Test
public void testIntervalTriggersFirst() {
AtomicLong time = new AtomicLong(10_000);
// Interval will trigger, but size is > threshold
- TestableFullCompactTrigger trigger = new
TestableFullCompactTrigger(1000L, 500L, time);
+ TestableEarlyFullCompaction trigger =
+ new TestableEarlyFullCompaction(1000L, 500L, null, time);
// First time, interval should trigger even if size (600) > threshold
(500)
Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(300, 300));
@@ -144,7 +184,8 @@ public class FullCompactTriggerTest {
@Test
public void testThresholdTriggersWhenIntervalFails() {
AtomicLong time = new AtomicLong(10_000);
- TestableFullCompactTrigger trigger = new
TestableFullCompactTrigger(1000L, 500L, time);
+ TestableEarlyFullCompaction trigger =
+ new TestableEarlyFullCompaction(1000L, 500L, null, time);
// Trigger once to set last compaction time
assertThat(trigger.tryFullCompact(5, createRuns(10, 20))).isPresent();
@@ -162,12 +203,15 @@ public class FullCompactTriggerTest {
}
private LevelSortedRun createLevelSortedRun(long size) {
+ return createLevelSortedRun(0, size);
+ }
+
+ private LevelSortedRun createLevelSortedRun(int level, long size) {
SortedRun run = mock(SortedRun.class);
when(run.totalSize()).thenReturn(size);
DataFileMeta file = mock(DataFileMeta.class);
when(run.files()).thenReturn(singletonList(file));
- // Level does not matter for the trigger logic
- return new LevelSortedRun(0, run);
+ return new LevelSortedRun(level, run);
}
private List<LevelSortedRun> createRuns(long... sizes) {
@@ -176,14 +220,17 @@ public class FullCompactTriggerTest {
.collect(Collectors.toList());
}
- /** A {@link FullCompactTrigger} that allows controlling time for tests. */
- private static class TestableFullCompactTrigger extends FullCompactTrigger
{
+ /** A {@link EarlyFullCompaction} that allows controlling time for tests.
*/
+ private static class TestableEarlyFullCompaction extends
EarlyFullCompaction {
private final AtomicLong currentTime;
- public TestableFullCompactTrigger(
- Long fullCompactionInterval, Long totalSizeThreshold,
AtomicLong currentTime) {
- super(fullCompactionInterval, totalSizeThreshold);
+ public TestableEarlyFullCompaction(
+ @Nullable Long fullCompactionInterval,
+ @Nullable Long totalSizeThreshold,
+ @Nullable Long incrementalSizeThreshold,
+ AtomicLong currentTime) {
+ super(fullCompactionInterval, totalSizeThreshold,
incrementalSizeThreshold);
this.currentTime = currentTime;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 37e057c704..5ac14f9293 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -85,8 +85,8 @@ public class UniversalCompactionTest {
@Test
public void testOptimizedCompactionInterval() {
AtomicLong time = new AtomicLong(0);
- FullCompactTrigger fullCompactTrigger =
- new FullCompactTrigger(1000L, null) {
+ EarlyFullCompaction fullCompactTrigger =
+ new EarlyFullCompaction(1000L, null, null) {
@Override
long currentTimeMillis() {
return time.get();
@@ -128,7 +128,7 @@ public class UniversalCompactionTest {
@Test
public void testTotalSizeThreshold() {
- FullCompactTrigger fullCompactTrigger = new FullCompactTrigger(null,
10L);
+ EarlyFullCompaction fullCompactTrigger = new EarlyFullCompaction(null,
10L, null);
UniversalCompaction compaction =
new UniversalCompaction(100, 1, 3, fullCompactTrigger, null);