This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 856a878a23 [core] Introduce 'compaction.incremental-size-threshold' to 
force full compaction (#6754)
856a878a23 is described below

commit 856a878a23d240db7a9f0ae1247ba5160833ce41
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 10 17:27:38 2025 +0800

    [core] Introduce 'compaction.incremental-size-threshold' to force full 
compaction (#6754)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 +++
 ...ompactTrigger.java => EarlyFullCompaction.java} | 37 ++++++---
 .../mergetree/compact/UniversalCompaction.java     | 18 ++---
 .../paimon/operation/KeyValueFileStoreWrite.java   |  6 +-
 ...iggerTest.java => EarlyFullCompactionTest.java} | 87 +++++++++++++++++-----
 .../mergetree/compact/UniversalCompactionTest.java |  6 +-
 7 files changed, 127 insertions(+), 45 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index b50b9f2c9a..9b2bf644fc 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -272,6 +272,12 @@ under the License.
             <td>Boolean</td>
             <td>If set to true, compaction strategy will always include all 
level 0 files in candidates.</td>
         </tr>
+        <tr>
+            <td><h5>compaction.incremental-size-threshold</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>When incremental size is bigger than this threshold, force a 
full compaction.</td>
+        </tr>
         <tr>
             <td><h5>compaction.max-size-amplification-percent</h5></td>
             <td style="word-wrap: break-word;">200</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index b0862356f5..19b44d2a2a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -761,6 +761,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "When total size is smaller than this threshold, 
force a full compaction.");
 
+    public static final ConfigOption<MemorySize> 
COMPACTION_INCREMENTAL_SIZE_THRESHOLD =
+            key("compaction.incremental-size-threshold")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When incremental size is bigger than this 
threshold, force a full compaction.");
+
     public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
             key("compaction.min.file-num")
                     .intType()
@@ -2584,6 +2591,11 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_TOTAL_SIZE_THRESHOLD);
     }
 
+    @Nullable
+    public MemorySize compactionIncrementalSizeThreshold() {
+        return options.get(COMPACTION_INCREMENTAL_SIZE_THRESHOLD);
+    }
+
     public int numSortedRunStopTrigger() {
         Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
         if (stopTrigger == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
similarity index 67%
rename from 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
rename to 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
index 931b07630d..74fb003ed3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/EarlyFullCompaction.java
@@ -33,32 +33,38 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 
-/** Trigger full compaction. */
-public class FullCompactTrigger {
+/** Early trigger full compaction. */
+public class EarlyFullCompaction {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FullCompactTrigger.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(EarlyFullCompaction.class);
 
     @Nullable private final Long fullCompactionInterval;
     @Nullable private final Long totalSizeThreshold;
+    @Nullable private final Long incrementalSizeThreshold;
 
     @Nullable private Long lastFullCompaction;
 
-    public FullCompactTrigger(
-            @Nullable Long fullCompactionInterval, @Nullable Long 
totalSizeThreshold) {
+    public EarlyFullCompaction(
+            @Nullable Long fullCompactionInterval,
+            @Nullable Long totalSizeThreshold,
+            @Nullable Long incrementalSizeThreshold) {
         this.fullCompactionInterval = fullCompactionInterval;
         this.totalSizeThreshold = totalSizeThreshold;
+        this.incrementalSizeThreshold = incrementalSizeThreshold;
     }
 
     @Nullable
-    public static FullCompactTrigger create(CoreOptions options) {
+    public static EarlyFullCompaction create(CoreOptions options) {
         Duration interval = options.optimizedCompactionInterval();
-        MemorySize threshold = options.compactionTotalSizeThreshold();
-        if (interval == null && threshold == null) {
+        MemorySize totalThreshold = options.compactionTotalSizeThreshold();
+        MemorySize incrementalThreshold = 
options.compactionIncrementalSizeThreshold();
+        if (interval == null && totalThreshold == null && incrementalThreshold 
== null) {
             return null;
         }
-        return new FullCompactTrigger(
+        return new EarlyFullCompaction(
                 interval == null ? null : interval.toMillis(),
-                threshold == null ? null : threshold.getBytes());
+                totalThreshold == null ? null : totalThreshold.getBytes(),
+                incrementalThreshold == null ? null : 
incrementalThreshold.getBytes());
     }
 
     public Optional<CompactUnit> tryFullCompact(int numLevels, 
List<LevelSortedRun> runs) {
@@ -84,6 +90,17 @@ public class FullCompactTrigger {
                 return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
             }
         }
+        if (incrementalSizeThreshold != null) {
+            long incrementalSize = 0;
+            for (LevelSortedRun run : runs) {
+                if (run.level() != maxLevel) {
+                    incrementalSize += run.run().totalSize();
+                }
+            }
+            if (incrementalSize > incrementalSizeThreshold) {
+                return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+            }
+        }
         return Optional.empty();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 6d938f0cae..a40e985e9d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -47,19 +47,19 @@ public class UniversalCompaction implements CompactStrategy 
{
     private final int sizeRatio;
     private final int numRunCompactionTrigger;
 
-    @Nullable private final FullCompactTrigger fullCompactTrigger;
+    @Nullable private final EarlyFullCompaction earlyFullCompact;
     @Nullable private final OffPeakHours offPeakHours;
 
     public UniversalCompaction(
             int maxSizeAmp,
             int sizeRatio,
             int numRunCompactionTrigger,
-            @Nullable FullCompactTrigger fullCompactTrigger,
+            @Nullable EarlyFullCompaction earlyFullCompact,
             @Nullable OffPeakHours offPeakHours) {
         this.maxSizeAmp = maxSizeAmp;
         this.sizeRatio = sizeRatio;
         this.numRunCompactionTrigger = numRunCompactionTrigger;
-        this.fullCompactTrigger = fullCompactTrigger;
+        this.earlyFullCompact = earlyFullCompact;
         this.offPeakHours = offPeakHours;
     }
 
@@ -68,8 +68,8 @@ public class UniversalCompaction implements CompactStrategy {
         int maxLevel = numLevels - 1;
 
         // 0 try full compaction by trigger
-        if (fullCompactTrigger != null) {
-            Optional<CompactUnit> unit = 
fullCompactTrigger.tryFullCompact(numLevels, runs);
+        if (earlyFullCompact != null) {
+            Optional<CompactUnit> unit = 
earlyFullCompact.tryFullCompact(numLevels, runs);
             if (unit.isPresent()) {
                 return unit;
             }
@@ -137,8 +137,8 @@ public class UniversalCompaction implements CompactStrategy 
{
 
         // size amplification = percentage of additional size
         if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
-            if (fullCompactTrigger != null) {
-                fullCompactTrigger.updateLastFullCompaction();
+            if (earlyFullCompact != null) {
+                earlyFullCompact.updateLastFullCompaction();
             }
             return CompactUnit.fromLevelRuns(maxLevel, runs);
         }
@@ -216,8 +216,8 @@ public class UniversalCompaction implements CompactStrategy 
{
         }
 
         if (runCount == runs.size()) {
-            if (fullCompactTrigger != null) {
-                fullCompactTrigger.updateLastFullCompaction();
+            if (earlyFullCompact != null) {
+                earlyFullCompact.updateLastFullCompaction();
             }
             outputLevel = maxLevel;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 7d9295c700..177f093583 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -49,9 +49,9 @@ import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.MergeTreeWriter;
 import org.apache.paimon.mergetree.compact.CompactRewriter;
 import org.apache.paimon.mergetree.compact.CompactStrategy;
+import org.apache.paimon.mergetree.compact.EarlyFullCompaction;
 import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
 import 
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
-import org.apache.paimon.mergetree.compact.FullCompactTrigger;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
 import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
@@ -254,7 +254,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                             options.maxSizeAmplificationPercent(),
                             options.sortedRunSizeRatio(),
                             options.numSortedRunCompactionTrigger(),
-                            FullCompactTrigger.create(options),
+                            EarlyFullCompaction.create(options),
                             OffPeakHours.create(options)),
                     compactMaxInterval);
         }
@@ -264,7 +264,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
                         options.numSortedRunCompactionTrigger(),
-                        FullCompactTrigger.create(options),
+                        EarlyFullCompaction.create(options),
                         OffPeakHours.create(options));
         if (options.compactionForceUpLevel0()) {
             return new ForceUpLevel0Compaction(universal, null);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
similarity index 66%
rename from 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
index 54759e4389..35665ed690 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/EarlyFullCompactionTest.java
@@ -28,6 +28,8 @@ import org.apache.paimon.options.Options;
 
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
@@ -42,20 +44,20 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-/** Tests for {@link FullCompactTrigger}. */
-public class FullCompactTriggerTest {
+/** Tests for {@link EarlyFullCompaction}. */
+public class EarlyFullCompactionTest {
 
     @Test
     public void testCreateNoOptions() {
         CoreOptions options = new CoreOptions(new Options());
-        assertThat(FullCompactTrigger.create(options)).isNull();
+        assertThat(EarlyFullCompaction.create(options)).isNull();
     }
 
     @Test
     public void testCreateWithInterval() {
         Options options = new Options();
         options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
-        FullCompactTrigger trigger = FullCompactTrigger.create(new 
CoreOptions(options));
+        EarlyFullCompaction trigger = EarlyFullCompaction.create(new 
CoreOptions(options));
         assertThat(trigger).isNotNull();
     }
 
@@ -63,7 +65,7 @@ public class FullCompactTriggerTest {
     public void testCreateWithThreshold() {
         Options options = new Options();
         options.set(COMPACTION_TOTAL_SIZE_THRESHOLD, 
MemorySize.ofMebiBytes(100));
-        FullCompactTrigger trigger = FullCompactTrigger.create(new 
CoreOptions(options));
+        EarlyFullCompaction trigger = EarlyFullCompaction.create(new 
CoreOptions(options));
         assertThat(trigger).isNotNull();
     }
 
@@ -72,26 +74,27 @@ public class FullCompactTriggerTest {
         Options options = new Options();
         options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
         options.set(COMPACTION_TOTAL_SIZE_THRESHOLD, 
MemorySize.ofMebiBytes(100));
-        FullCompactTrigger trigger = FullCompactTrigger.create(new 
CoreOptions(options));
+        EarlyFullCompaction trigger = EarlyFullCompaction.create(new 
CoreOptions(options));
         assertThat(trigger).isNotNull();
     }
 
     @Test
     public void testSingleRun() {
-        FullCompactTrigger trigger = new FullCompactTrigger(1000L, 1000L);
+        EarlyFullCompaction trigger = new EarlyFullCompaction(1000L, 1000L, 
null);
         assertThat(trigger.tryFullCompact(5, createRuns(100))).isEmpty();
     }
 
     @Test
     public void testNoOptions() {
-        FullCompactTrigger trigger = new FullCompactTrigger(null, null);
+        EarlyFullCompaction trigger = new EarlyFullCompaction(null, null, 
null);
         assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
     }
 
     @Test
     public void testInterval() {
         AtomicLong time = new AtomicLong(10_000);
-        TestableFullCompactTrigger trigger = new 
TestableFullCompactTrigger(1000L, null, time);
+        TestableEarlyFullCompaction trigger =
+                new TestableEarlyFullCompaction(1000L, null, null, time);
 
         // First time, should trigger
         Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(100, 200));
@@ -113,8 +116,8 @@ public class FullCompactTriggerTest {
     }
 
     @Test
-    public void testThreshold() {
-        FullCompactTrigger trigger = new FullCompactTrigger(null, 1000L);
+    public void testTotalSizeThreshold() {
+        EarlyFullCompaction trigger = new EarlyFullCompaction(null, 1000L, 
null);
 
         // total size 300 < 1000, should trigger
         Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(100, 200));
@@ -129,11 +132,48 @@ public class FullCompactTriggerTest {
         assertThat(trigger.tryFullCompact(5, createRuns(500, 1000))).isEmpty();
     }
 
+    @Test
+    public void testIncrementalSizeThreshold() {
+        EarlyFullCompaction trigger = new EarlyFullCompaction(null, null, 
500L);
+
+        // trigger, no max level
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(400, 200));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+        assertThat(compactUnit.get().files()).hasSize(2);
+
+        // no trigger, no max level
+        compactUnit = trigger.tryFullCompact(5, createRuns(100, 200));
+        assertThat(compactUnit).isEmpty();
+
+        // no trigger, with max level
+        List<LevelSortedRun> runs =
+                Arrays.asList(
+                        createLevelSortedRun(100),
+                        createLevelSortedRun(300),
+                        createLevelSortedRun(4, 500));
+        compactUnit = trigger.tryFullCompact(5, runs);
+        assertThat(compactUnit).isEmpty();
+
+        // trigger, with max level
+        runs =
+                Arrays.asList(
+                        createLevelSortedRun(100),
+                        createLevelSortedRun(300),
+                        createLevelSortedRun(300),
+                        createLevelSortedRun(4, 500));
+        compactUnit = trigger.tryFullCompact(5, runs);
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+        assertThat(compactUnit.get().files()).hasSize(4);
+    }
+
     @Test
     public void testIntervalTriggersFirst() {
         AtomicLong time = new AtomicLong(10_000);
         // Interval will trigger, but size is > threshold
-        TestableFullCompactTrigger trigger = new 
TestableFullCompactTrigger(1000L, 500L, time);
+        TestableEarlyFullCompaction trigger =
+                new TestableEarlyFullCompaction(1000L, 500L, null, time);
 
         // First time, interval should trigger even if size (600) > threshold 
(500)
         Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(300, 300));
@@ -144,7 +184,8 @@ public class FullCompactTriggerTest {
     @Test
     public void testThresholdTriggersWhenIntervalFails() {
         AtomicLong time = new AtomicLong(10_000);
-        TestableFullCompactTrigger trigger = new 
TestableFullCompactTrigger(1000L, 500L, time);
+        TestableEarlyFullCompaction trigger =
+                new TestableEarlyFullCompaction(1000L, 500L, null, time);
 
         // Trigger once to set last compaction time
         assertThat(trigger.tryFullCompact(5, createRuns(10, 20))).isPresent();
@@ -162,12 +203,15 @@ public class FullCompactTriggerTest {
     }
 
     private LevelSortedRun createLevelSortedRun(long size) {
+        return createLevelSortedRun(0, size);
+    }
+
+    private LevelSortedRun createLevelSortedRun(int level, long size) {
         SortedRun run = mock(SortedRun.class);
         when(run.totalSize()).thenReturn(size);
         DataFileMeta file = mock(DataFileMeta.class);
         when(run.files()).thenReturn(singletonList(file));
-        // Level does not matter for the trigger logic
-        return new LevelSortedRun(0, run);
+        return new LevelSortedRun(level, run);
     }
 
     private List<LevelSortedRun> createRuns(long... sizes) {
@@ -176,14 +220,17 @@ public class FullCompactTriggerTest {
                 .collect(Collectors.toList());
     }
 
-    /** A {@link FullCompactTrigger} that allows controlling time for tests. */
-    private static class TestableFullCompactTrigger extends FullCompactTrigger 
{
+    /** A {@link EarlyFullCompaction} that allows controlling time for tests. 
*/
+    private static class TestableEarlyFullCompaction extends 
EarlyFullCompaction {
 
         private final AtomicLong currentTime;
 
-        public TestableFullCompactTrigger(
-                Long fullCompactionInterval, Long totalSizeThreshold, 
AtomicLong currentTime) {
-            super(fullCompactionInterval, totalSizeThreshold);
+        public TestableEarlyFullCompaction(
+                @Nullable Long fullCompactionInterval,
+                @Nullable Long totalSizeThreshold,
+                @Nullable Long incrementalSizeThreshold,
+                AtomicLong currentTime) {
+            super(fullCompactionInterval, totalSizeThreshold, 
incrementalSizeThreshold);
             this.currentTime = currentTime;
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index 37e057c704..5ac14f9293 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -85,8 +85,8 @@ public class UniversalCompactionTest {
     @Test
     public void testOptimizedCompactionInterval() {
         AtomicLong time = new AtomicLong(0);
-        FullCompactTrigger fullCompactTrigger =
-                new FullCompactTrigger(1000L, null) {
+        EarlyFullCompaction fullCompactTrigger =
+                new EarlyFullCompaction(1000L, null, null) {
                     @Override
                     long currentTimeMillis() {
                         return time.get();
@@ -128,7 +128,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testTotalSizeThreshold() {
-        FullCompactTrigger fullCompactTrigger = new FullCompactTrigger(null, 
10L);
+        EarlyFullCompaction fullCompactTrigger = new EarlyFullCompaction(null, 
10L, null);
         UniversalCompaction compaction =
                 new UniversalCompaction(100, 1, 3, fullCompactTrigger, null);
 

Reply via email to