This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 70831894da [core] Introduce 'compaction.total-size-threshold' to do 
full compaction (#5973)
70831894da is described below

commit 70831894da351b16f67a4821282f28054a80d186
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 31 10:12:09 2025 +0800

    [core] Introduce 'compaction.total-size-threshold' to do full compaction 
(#5973)
---
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |  21 +++-
 .../mergetree/compact/ForceUpLevel0Compaction.java |  37 ++++++-
 .../mergetree/compact/FullCompactTrigger.java      |  98 +++++++++++++++++
 .../paimon/mergetree/compact}/OffPeakHours.java    |  52 ++++-----
 .../mergetree/compact/UniversalCompaction.java     | 117 ++++-----------------
 .../paimon/operation/KeyValueFileStoreWrite.java   |  42 ++++----
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   6 +-
 .../compact/ForceUpLevel0CompactionTest.java       |   3 +-
 .../mergetree/compact}/OffPeakHoursTest.java       |  32 +-----
 .../mergetree/compact/UniversalCompactionTest.java |  66 ++++++++----
 .../operation/KeyValueFileStoreWriteTest.java      |  23 ++--
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  60 ++++++++++-
 13 files changed, 348 insertions(+), 215 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 311914ebb0..85fa54aa63 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -272,6 +272,12 @@ under the License.
             <td>Integer</td>
             <td>Percentage flexibility while comparing sorted run size for 
changelog mode table. If the candidate sorted run(s) size is 1% smaller than 
the next sorted run's size, then include next sorted run into this candidate 
set.</td>
         </tr>
+        <tr>
+            <td><h5>compaction.total-size-threshold</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>When total size is smaller than this threshold, force a full 
compaction.</td>
+        </tr>
         <tr>
             <td><h5>consumer-id</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 56f6daf40b..ca2c67d0f4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -733,6 +733,13 @@ public class CoreOptions implements Serializable {
                             "Implying how often to perform an optimization 
compaction, this configuration is used to "
                                     + "ensure the query timeliness of the 
read-optimized system table.");
 
+    public static final ConfigOption<MemorySize> 
COMPACTION_TOTAL_SIZE_THRESHOLD =
+            key("compaction.total-size-threshold")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When total size is smaller than this threshold, 
force a full compaction.");
+
     public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
             key("compaction.min.file-num")
                     .intType()
@@ -2364,6 +2371,11 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
     }
 
+    @Nullable
+    public MemorySize compactionTotalSizeThreshold() {
+        return options.get(COMPACTION_TOTAL_SIZE_THRESHOLD);
+    }
+
     public int numSortedRunStopTrigger() {
         Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
         if (stopTrigger == null) {
@@ -2416,9 +2428,12 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_SIZE_RATIO);
     }
 
-    public OffPeakHours offPeakHours() {
-        return OffPeakHours.create(
-                options.get(COMPACT_OFFPEAK_START_HOUR), 
options.get(COMPACT_OFFPEAK_END_HOUR));
+    public int compactOffPeakStartHour() {
+        return options.get(COMPACT_OFFPEAK_START_HOUR);
+    }
+
+    public int compactOffPeakEndHour() {
+        return options.get(COMPACT_OFFPEAK_END_HOUR);
     }
 
     public int compactOffPeakRatio() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
index 350c5fb056..1f23f598c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
@@ -21,16 +21,29 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.mergetree.LevelSortedRun;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** A {@link CompactStrategy} to force compacting level 0 files. */
 public class ForceUpLevel0Compaction implements CompactStrategy {
 
     private final UniversalCompaction universal;
+    @Nullable private final Integer maxCompactInterval;
+    @Nullable private final AtomicInteger compactTriggerCount;
 
-    public ForceUpLevel0Compaction(UniversalCompaction universal) {
+    public ForceUpLevel0Compaction(
+            UniversalCompaction universal, @Nullable Integer 
maxCompactInterval) {
         this.universal = universal;
+        this.maxCompactInterval = maxCompactInterval;
+        this.compactTriggerCount = maxCompactInterval == null ? null : new 
AtomicInteger(0);
+    }
+
+    @Nullable
+    public Integer maxCompactInterval() {
+        return maxCompactInterval;
     }
 
     @Override
@@ -40,6 +53,26 @@ public class ForceUpLevel0Compaction implements 
CompactStrategy {
             return pick;
         }
 
-        return universal.forcePickL0(numLevels, runs);
+        if (maxCompactInterval == null || compactTriggerCount == null) {
+            return universal.forcePickL0(numLevels, runs);
+        }
+
+        compactTriggerCount.getAndIncrement();
+        if (compactTriggerCount.compareAndSet(maxCompactInterval, 0)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Universal compaction due to max lookup compaction 
interval {}.",
+                        maxCompactInterval);
+            }
+            return universal.forcePickL0(numLevels, runs);
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Skip universal compaction due to lookup compaction 
trigger count {} is less than the max interval {}.",
+                        compactTriggerCount.get(),
+                        maxCompactInterval);
+            }
+            return Optional.empty();
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
new file mode 100644
index 0000000000..931b07630d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.options.MemorySize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+/** Trigger full compaction. */
+public class FullCompactTrigger {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FullCompactTrigger.class);
+
+    @Nullable private final Long fullCompactionInterval;
+    @Nullable private final Long totalSizeThreshold;
+
+    @Nullable private Long lastFullCompaction;
+
+    public FullCompactTrigger(
+            @Nullable Long fullCompactionInterval, @Nullable Long 
totalSizeThreshold) {
+        this.fullCompactionInterval = fullCompactionInterval;
+        this.totalSizeThreshold = totalSizeThreshold;
+    }
+
+    @Nullable
+    public static FullCompactTrigger create(CoreOptions options) {
+        Duration interval = options.optimizedCompactionInterval();
+        MemorySize threshold = options.compactionTotalSizeThreshold();
+        if (interval == null && threshold == null) {
+            return null;
+        }
+        return new FullCompactTrigger(
+                interval == null ? null : interval.toMillis(),
+                threshold == null ? null : threshold.getBytes());
+    }
+
+    public Optional<CompactUnit> tryFullCompact(int numLevels, 
List<LevelSortedRun> runs) {
+        if (runs.size() == 1) {
+            return Optional.empty();
+        }
+
+        int maxLevel = numLevels - 1;
+        if (fullCompactionInterval != null) {
+            if (lastFullCompaction == null
+                    || currentTimeMillis() - lastFullCompaction > 
fullCompactionInterval) {
+                LOG.debug("Universal compaction due to full compaction 
interval");
+                updateLastFullCompaction();
+                return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+            }
+        }
+        if (totalSizeThreshold != null) {
+            long totalSize = 0;
+            for (LevelSortedRun run : runs) {
+                totalSize += run.run().totalSize();
+            }
+            if (totalSize < totalSizeThreshold) {
+                return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+            }
+        }
+        return Optional.empty();
+    }
+
+    public void updateLastFullCompaction() {
+        lastFullCompaction = currentTimeMillis();
+    }
+
+    @VisibleForTesting
+    long currentTimeMillis() {
+        return System.currentTimeMillis();
+    }
+}
diff --git a/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
similarity index 63%
rename from paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
rename to 
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
index b33e27618b..203af06cf6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
@@ -16,55 +16,52 @@
  * limitations under the License.
  */
 
-package org.apache.paimon;
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+
+import javax.annotation.Nullable;
 
 import java.time.LocalDateTime;
 
-/** OffPeakHours. */
+/** OffPeakHours to control compaction ratio by hours. */
 public abstract class OffPeakHours {
 
-    public abstract boolean isOffPeak();
-
     public abstract boolean isOffPeak(int targetHour);
 
-    public static final OffPeakHours DISABLED =
-            new OffPeakHours() {
-                @Override
-                public boolean isOffPeak() {
-                    return false;
-                }
+    public abstract int currentRatio();
 
-                @Override
-                public boolean isOffPeak(int targetHour) {
-                    return false;
-                }
-            };
+    @Nullable
+    public static OffPeakHours create(CoreOptions options) {
+        return create(
+                options.compactOffPeakStartHour(),
+                options.compactOffPeakEndHour(),
+                options.compactOffPeakRatio());
+    }
 
-    public static OffPeakHours create(int startHour, int endHour) {
+    @Nullable
+    public static OffPeakHours create(int startHour, int endHour, int 
compactOffPeakRatio) {
         if (startHour == -1 && endHour == -1) {
-            return DISABLED;
+            return null;
         }
 
         if (startHour == endHour) {
-            return DISABLED;
+            return null;
         }
 
-        return new OffPeakHoursImpl(startHour, endHour);
+        return new OffPeakHoursImpl(startHour, endHour, compactOffPeakRatio);
     }
 
     private static class OffPeakHoursImpl extends OffPeakHours {
 
         private final int startHour;
         private final int endHour;
+        private final int compactOffPeakRatio;
 
-        OffPeakHoursImpl(int startHour, int endHour) {
+        OffPeakHoursImpl(int startHour, int endHour, int compactOffPeakRatio) {
             this.startHour = startHour;
             this.endHour = endHour;
-        }
-
-        @Override
-        public boolean isOffPeak() {
-            return isOffPeak(LocalDateTime.now().getHour());
+            this.compactOffPeakRatio = compactOffPeakRatio;
         }
 
         @Override
@@ -74,5 +71,10 @@ public abstract class OffPeakHours {
             }
             return targetHour < endHour || startHour <= targetHour;
         }
+
+        @Override
+        public int currentRatio() {
+            return isOffPeak(LocalDateTime.now().getHour()) ? 
compactOffPeakRatio : 0;
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 63e71354cf..5313682a90 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.OffPeakHours;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.mergetree.LevelSortedRun;
@@ -29,10 +28,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Universal Compaction Style is a compaction style, targeting the use cases 
requiring lower write
@@ -48,81 +45,32 @@ public class UniversalCompaction implements CompactStrategy 
{
     private final int maxSizeAmp;
     private final int sizeRatio;
     private final int numRunCompactionTrigger;
-    private final OffPeakHours offPeakHours;
-    private final int compactOffPeakRatio;
 
-    @Nullable private final Long opCompactionInterval;
-    @Nullable private Long lastOptimizedCompaction;
-
-    @Nullable private final Integer maxLookupCompactInterval;
-    @Nullable private final AtomicInteger lookupCompactTriggerCount;
-
-    public UniversalCompaction(int maxSizeAmp, int sizeRatio, int 
numRunCompactionTrigger) {
-        this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null, 
OffPeakHours.DISABLED, 0);
-    }
-
-    public UniversalCompaction(
-            int maxSizeAmp,
-            int sizeRatio,
-            int numRunCompactionTrigger,
-            OffPeakHours offPeakHours,
-            int compactOffPeakRatio) {
-        this(
-                maxSizeAmp,
-                sizeRatio,
-                numRunCompactionTrigger,
-                null,
-                offPeakHours,
-                compactOffPeakRatio);
-    }
-
-    public UniversalCompaction(
-            int maxSizeAmp,
-            int sizeRatio,
-            int numRunCompactionTrigger,
-            @Nullable Duration opCompactionInterval,
-            OffPeakHours offPeakHours,
-            int compactOffPeakRatio) {
-        this(
-                maxSizeAmp,
-                sizeRatio,
-                numRunCompactionTrigger,
-                opCompactionInterval,
-                null,
-                offPeakHours,
-                compactOffPeakRatio);
-    }
+    @Nullable private final FullCompactTrigger fullCompactTrigger;
+    @Nullable private final OffPeakHours offPeakHours;
 
     public UniversalCompaction(
             int maxSizeAmp,
             int sizeRatio,
             int numRunCompactionTrigger,
-            @Nullable Duration opCompactionInterval,
-            @Nullable Integer maxLookupCompactInterval,
-            OffPeakHours offPeakHours,
-            int compactOffPeakRatio) {
+            @Nullable FullCompactTrigger fullCompactTrigger,
+            @Nullable OffPeakHours offPeakHours) {
         this.maxSizeAmp = maxSizeAmp;
         this.sizeRatio = sizeRatio;
-        this.offPeakHours = offPeakHours;
         this.numRunCompactionTrigger = numRunCompactionTrigger;
-        this.opCompactionInterval =
-                opCompactionInterval == null ? null : 
opCompactionInterval.toMillis();
-        this.maxLookupCompactInterval = maxLookupCompactInterval;
-        this.lookupCompactTriggerCount =
-                maxLookupCompactInterval == null ? null : new AtomicInteger(0);
-        this.compactOffPeakRatio = compactOffPeakRatio;
+        this.fullCompactTrigger = fullCompactTrigger;
+        this.offPeakHours = offPeakHours;
     }
 
     @Override
     public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> 
runs) {
         int maxLevel = numLevels - 1;
 
-        if (opCompactionInterval != null) {
-            if (lastOptimizedCompaction == null
-                    || currentTimeMillis() - lastOptimizedCompaction > 
opCompactionInterval) {
-                LOG.debug("Universal compaction due to optimized compaction 
interval");
-                updateLastOptimizedCompaction();
-                return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+        // 0 try full compaction by trigger
+        if (fullCompactTrigger != null) {
+            Optional<CompactUnit> unit = 
fullCompactTrigger.tryFullCompact(numLevels, runs);
+            if (unit.isPresent()) {
+                return unit;
             }
         }
 
@@ -154,26 +102,6 @@ public class UniversalCompaction implements 
CompactStrategy {
             return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, 
candidateCount));
         }
 
-        // 4 checking if a forced L0 compact should be triggered
-        if (maxLookupCompactInterval != null && lookupCompactTriggerCount != 
null) {
-            lookupCompactTriggerCount.getAndIncrement();
-            if 
(lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                            "Universal compaction due to max lookup compaction 
interval {}.",
-                            maxLookupCompactInterval);
-                }
-                return forcePickL0(numLevels, runs);
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                            "Skip universal compaction due to lookup 
compaction trigger count {} is less than the max interval {}.",
-                            lookupCompactTriggerCount.get(),
-                            maxLookupCompactInterval);
-                }
-            }
-        }
-
         return Optional.empty();
     }
 
@@ -208,7 +136,9 @@ public class UniversalCompaction implements CompactStrategy 
{
 
         // size amplification = percentage of additional size
         if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
-            updateLastOptimizedCompaction();
+            if (fullCompactTrigger != null) {
+                fullCompactTrigger.updateLastFullCompaction();
+            }
             return CompactUnit.fromLevelRuns(maxLevel, runs);
         }
 
@@ -234,11 +164,8 @@ public class UniversalCompaction implements 
CompactStrategy {
         long candidateSize = candidateSize(runs, candidateCount);
         for (int i = candidateCount; i < runs.size(); i++) {
             LevelSortedRun next = runs.get(i);
-            if (candidateSize
-                            * (100.0
-                                    + sizeRatio
-                                    + (offPeakHours.isOffPeak() ? 
compactOffPeakRatio : 0))
-                            / 100.0
+            int offPeakRatio = offPeakHours == null ? 0 : 
offPeakHours.currentRatio();
+            if (candidateSize * (100.0 + sizeRatio + offPeakRatio) / 100.0
                     < next.run().totalSize()) {
                 break;
             }
@@ -285,18 +212,12 @@ public class UniversalCompaction implements 
CompactStrategy {
         }
 
         if (runCount == runs.size()) {
-            updateLastOptimizedCompaction();
+            if (fullCompactTrigger != null) {
+                fullCompactTrigger.updateLastFullCompaction();
+            }
             outputLevel = maxLevel;
         }
 
         return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, 
runCount));
     }
-
-    private void updateLastOptimizedCompaction() {
-        lastOptimizedCompaction = currentTimeMillis();
-    }
-
-    long currentTimeMillis() {
-        return System.currentTimeMillis();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 15787ea5de..4e9e558d04 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -53,12 +53,14 @@ import org.apache.paimon.mergetree.compact.CompactRewriter;
 import org.apache.paimon.mergetree.compact.CompactStrategy;
 import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
 import 
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
+import org.apache.paimon.mergetree.compact.FullCompactTrigger;
 import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
 import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
 import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
+import org.apache.paimon.mergetree.compact.OffPeakHours;
 import org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -220,25 +222,22 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
 
     private CompactStrategy createCompactStrategy(CoreOptions options) {
         if (options.needLookup()) {
-            if 
(CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
-                return new ForceUpLevel0Compaction(
-                        new UniversalCompaction(
-                                options.maxSizeAmplificationPercent(),
-                                options.sortedRunSizeRatio(),
-                                options.numSortedRunCompactionTrigger(),
-                                options.optimizedCompactionInterval(),
-                                options.offPeakHours(),
-                                options.compactOffPeakRatio()));
-            } else if 
(CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) {
-                return new UniversalCompaction(
-                        options.maxSizeAmplificationPercent(),
-                        options.sortedRunSizeRatio(),
-                        options.numSortedRunCompactionTrigger(),
-                        options.optimizedCompactionInterval(),
-                        options.lookupCompactMaxInterval(),
-                        options.offPeakHours(),
-                        options.compactOffPeakRatio());
+            Integer compactMaxInterval = null;
+            switch (options.lookupCompact()) {
+                case GENTLE:
+                    compactMaxInterval = options.lookupCompactMaxInterval();
+                    break;
+                case RADICAL:
+                    break;
             }
+            return new ForceUpLevel0Compaction(
+                    new UniversalCompaction(
+                            options.maxSizeAmplificationPercent(),
+                            options.sortedRunSizeRatio(),
+                            options.numSortedRunCompactionTrigger(),
+                            FullCompactTrigger.create(options),
+                            OffPeakHours.create(options)),
+                    compactMaxInterval);
         }
 
         UniversalCompaction universal =
@@ -246,11 +245,10 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
                         options.numSortedRunCompactionTrigger(),
-                        options.optimizedCompactionInterval(),
-                        options.offPeakHours(),
-                        options.compactOffPeakRatio());
+                        FullCompactTrigger.create(options),
+                        OffPeakHours.create(options));
         if (options.compactionForceUpLevel0()) {
-            return new ForceUpLevel0Compaction(universal);
+            return new ForceUpLevel0Compaction(universal, null);
         } else {
             return universal;
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 95daf384e7..11d3ede213 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -46,7 +46,6 @@ import 
org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.mergetree.compact.IntervalPartition;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
-import org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
@@ -90,6 +89,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.mergetree.compact.UniversalCompactionTest.ofTesting;
 import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -277,7 +277,7 @@ public abstract class MergeTreeTestBase {
                 new MockFailResultCompactionManager(
                         service,
                         new Levels(comparator, dataFileMetas, 
options.numLevels()),
-                        new UniversalCompaction(
+                        ofTesting(
                                 options.maxSizeAmplificationPercent(),
                                 options.sortedRunSizeRatio(),
                                 options.numSortedRunCompactionTrigger()),
@@ -438,7 +438,7 @@ public abstract class MergeTreeTestBase {
     private MergeTreeCompactManager createCompactManager(
             ExecutorService compactExecutor, List<DataFileMeta> files) {
         CompactStrategy strategy =
-                new UniversalCompaction(
+                ofTesting(
                         options.maxSizeAmplificationPercent(),
                         options.sortedRunSizeRatio(),
                         options.numSortedRunCompactionTrigger());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
index 07eedc9214..126ce000f0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.Optional;
 
 import static org.apache.paimon.mergetree.compact.UniversalCompactionTest.file;
+import static 
org.apache.paimon.mergetree.compact.UniversalCompactionTest.ofTesting;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link ForceUpLevel0Compaction}. */
@@ -37,7 +38,7 @@ public class ForceUpLevel0CompactionTest {
     @Test
     public void testForceCompaction0() {
         ForceUpLevel0Compaction compaction =
-                new ForceUpLevel0Compaction(new UniversalCompaction(200, 1, 
5));
+                new ForceUpLevel0Compaction(ofTesting(200, 1, 5), null);
 
         Optional<CompactUnit> result = compaction.pick(3, Arrays.asList(run(0, 
1), run(0, 1)));
         assertThat(result).isPresent();
diff --git 
a/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
similarity index 77%
rename from 
paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
index 699d02f06b..6978280e3c 100644
--- a/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.offpeak;
-
-import org.apache.paimon.OffPeakHours;
+package org.apache.paimon.mergetree.compact;
 
 import org.junit.jupiter.api.Test;
 
@@ -27,32 +25,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link OffPeakHours}. */
 public class OffPeakHoursTest {
 
-    @Test
-    public void testDisabledInstance() {
-        OffPeakHours disabled = OffPeakHours.DISABLED;
-        for (int hour = 0; hour < 24; hour++) {
-            assertThat(disabled.isOffPeak(hour)).isFalse();
-        }
-    }
-
-    @Test
-    public void testGetInstanceWithDisabledValues() {
-        OffPeakHours offPeakHours = OffPeakHours.create(-1, -1);
-        assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
-    }
-
-    @Test
-    public void testGetInstanceWithSameStartAndEnd() {
-        for (int hour = 0; hour < 24; hour++) {
-            OffPeakHours offPeakHours = OffPeakHours.create(hour, hour);
-            assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
-        }
-    }
-
     @Test
     public void testNormalRangeOffPeakHours() {
         // Test normal range: 9 AM to 5 PM (9-17)
-        OffPeakHours offPeakHours = OffPeakHours.create(9, 17);
+        OffPeakHours offPeakHours = OffPeakHours.create(9, 17, 0);
 
         // Hours before start should not be off-peak
         for (int hour = 0; hour < 9; hour++) {
@@ -78,7 +54,7 @@ public class OffPeakHoursTest {
 
     @Test
     public void testWrapAroundRangeOffPeakHours() {
-        OffPeakHours offPeakHours = OffPeakHours.create(22, 6);
+        OffPeakHours offPeakHours = OffPeakHours.create(22, 6, 0);
 
         // Hours before end (0-5) should be off-peak
         for (int hour = 0; hour < 6; hour++) {
@@ -105,7 +81,7 @@ public class OffPeakHoursTest {
     @Test
     public void testSingleHourRange() {
         // Test single hour range: 12 to 13
-        OffPeakHours offPeakHours = OffPeakHours.create(12, 13);
+        OffPeakHours offPeakHours = OffPeakHours.create(12, 13, 0);
 
         // Only hour 12 should be off-peak
         for (int hour = 0; hour < 24; hour++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index dccd22b008..4c892e40d1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.OffPeakHours;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileSource;
@@ -27,9 +26,9 @@ import org.apache.paimon.mergetree.SortedRun;
 
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
@@ -42,7 +41,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testOutputLevel() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+        UniversalCompaction compaction = ofTesting(25, 1, 3);
         assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
1).outputLevel())
                 .isEqualTo(1);
         assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5, 
2).outputLevel())
@@ -57,7 +56,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testPick() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+        UniversalCompaction compaction = ofTesting(25, 1, 3);
 
         // by size amplification
         Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
@@ -86,14 +85,15 @@ public class UniversalCompactionTest {
     @Test
     public void testOptimizedCompactionInterval() {
         AtomicLong time = new AtomicLong(0);
-        UniversalCompaction compaction =
-                new UniversalCompaction(
-                        100, 1, 3, Duration.ofMillis(1000), 
OffPeakHours.DISABLED, 0) {
+        FullCompactTrigger fullCompactTrigger =
+                new FullCompactTrigger(1000L, null) {
                     @Override
                     long currentTimeMillis() {
                         return time.get();
                     }
                 };
+        UniversalCompaction compaction =
+                new UniversalCompaction(100, 1, 3, fullCompactTrigger, null);
 
         // first time, force optimized compaction
         Optional<CompactUnit> pick =
@@ -126,9 +126,31 @@ public class UniversalCompactionTest {
         assertThat(pick.isPresent()).isFalse();
     }
 
+    @Test
+    public void testTotalSizeThreshold() {
+        FullCompactTrigger fullCompactTrigger = new FullCompactTrigger(null, 
10L);
+        UniversalCompaction compaction =
+                new UniversalCompaction(100, 1, 3, fullCompactTrigger, null);
+
+        // total size less than threshold
+        Optional<CompactUnit> pick =
+                compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3), 
level(2, 5)));
+        assertThat(pick.isPresent()).isTrue();
+        long[] results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 3, 5});
+
+        // total size bigger than threshold
+        pick = compaction.pick(3, Arrays.asList(level(0, 2), level(1, 6), 
level(2, 10)));
+        assertThat(pick.isPresent()).isFalse();
+
+        // one sort run, not trigger
+        pick = compaction.pick(3, Collections.singletonList(level(3, 5)));
+        assertThat(pick.isPresent()).isFalse();
+    }
+
     @Test
     public void testNoOutputLevel0() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+        UniversalCompaction compaction = ofTesting(25, 1, 3);
 
         Optional<CompactUnit> pick =
                 compaction.pick(
@@ -148,7 +170,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testExtremeCaseNoOutputLevel0() {
-        UniversalCompaction compaction = new UniversalCompaction(200, 1, 5);
+        UniversalCompaction compaction = ofTesting(200, 1, 5);
 
         Optional<CompactUnit> pick =
                 compaction.pick(
@@ -168,7 +190,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testSizeAmplification() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
+        UniversalCompaction compaction = ofTesting(25, 0, 1);
         long[] sizes = new long[] {1};
         sizes = appendAndPickForSizeAmp(compaction, sizes);
         assertThat(sizes).isEqualTo(new long[] {2});
@@ -208,7 +230,7 @@ public class UniversalCompactionTest {
 
     @Test
     public void testSizeRatio() {
-        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
+        UniversalCompaction compaction = ofTesting(25, 1, 5);
         long[] sizes = new long[] {1, 1, 1, 1};
         sizes = appendAndPickForSizeRatio(compaction, sizes);
         assertThat(sizes).isEqualTo(new long[] {5});
@@ -261,16 +283,13 @@ public class UniversalCompactionTest {
     @Test
     public void testSizeRatioThreshold() {
         long[] sizes = new long[] {8, 9, 10};
-        assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
-                .isEqualTo(new long[] {8, 9, 10});
-        assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
-                .isEqualTo(new long[] {27});
+        assertThat(pickForSizeRatio(ofTesting(25, 10, 2), 
sizes)).isEqualTo(new long[] {8, 9, 10});
+        assertThat(pickForSizeRatio(ofTesting(25, 20, 2), 
sizes)).isEqualTo(new long[] {27});
     }
 
     @Test
     public void testLookup() {
-        ForceUpLevel0Compaction compaction =
-                new ForceUpLevel0Compaction(new UniversalCompaction(25, 1, 3));
+        ForceUpLevel0Compaction compaction = new 
ForceUpLevel0Compaction(ofTesting(25, 1, 3), null);
 
         // level 0 to max level
         Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 2, 2));
@@ -297,8 +316,9 @@ public class UniversalCompactionTest {
     @Test
     public void testForcePickL0() {
         int maxInterval = 5;
-        UniversalCompaction compaction =
-                new UniversalCompaction(25, 1, 5, null, maxInterval, 
OffPeakHours.DISABLED, 0);
+        ForceUpLevel0Compaction compaction =
+                new ForceUpLevel0Compaction(
+                        new UniversalCompaction(25, 1, 5, null, null), 
maxInterval);
 
         // level 0 to max level
         List<LevelSortedRun> level0ToMax = level0(1, 2, 2, 2);
@@ -324,16 +344,15 @@ public class UniversalCompactionTest {
         List<LevelSortedRun> level0ForcePick = Arrays.asList(level(0, 2), 
level(1, 2), level(2, 2));
 
         for (int i = 1; i <= maxInterval; i++) {
+            pick = compaction.pick(3, level0ForcePick);
             if (i == maxInterval) {
                 // level 0 force pick triggered
-                pick = compaction.pick(3, level0ForcePick);
                 assertThat(pick.isPresent()).isTrue();
                 results = 
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
                 assertThat(results).isEqualTo(new long[] {2, 2, 2});
                 assertThat(pick.get().outputLevel()).isEqualTo(2);
             } else {
                 // compact skipped
-                pick = compaction.pick(3, level0ForcePick);
                 assertThat(pick.isPresent()).isFalse();
             }
         }
@@ -441,4 +460,9 @@ public class UniversalCompactionTest {
                 null,
                 null);
     }
+
+    public static UniversalCompaction ofTesting(
+            int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) {
+        return new UniversalCompaction(maxSizeAmp, sizeRatio, 
numRunCompactionTrigger, null, null);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
index 606a290bcb..baeb6e9b4e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.mergetree.compact.CompactStrategy;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
 import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
-import org.apache.paimon.mergetree.compact.UniversalCompaction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -65,7 +64,11 @@ public class KeyValueFileStoreWriteTest {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
         options.put(CoreOptions.LOOKUP_COMPACT.key(), "radical");
-        assertCompactStrategy(options, ForceUpLevel0Compaction.class);
+        CompactStrategy compactStrategy = createCompactStrategy(options);
+        
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
+        Integer maxCompactInterval =
+                ((ForceUpLevel0Compaction) 
compactStrategy).maxCompactInterval();
+        assertThat(maxCompactInterval).isNull();
     }
 
     @Test
@@ -73,19 +76,22 @@ public class KeyValueFileStoreWriteTest {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
         options.put(CoreOptions.LOOKUP_COMPACT.key(), "gentle");
-        assertCompactStrategy(options, UniversalCompaction.class);
+        CompactStrategy compactStrategy = createCompactStrategy(options);
+        
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
+        Integer maxCompactInterval =
+                ((ForceUpLevel0Compaction) 
compactStrategy).maxCompactInterval();
+        assertThat(maxCompactInterval).isEqualTo(10);
     }
 
     @Test
     public void testForceUpLevel0CompactStrategy() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.COMPACTION_FORCE_UP_LEVEL_0.key(), "true");
-        assertCompactStrategy(options, ForceUpLevel0Compaction.class);
+        CompactStrategy compactStrategy = createCompactStrategy(options);
+        
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
     }
 
-    private void assertCompactStrategy(
-            Map<String, String> options, Class<? extends CompactStrategy> 
expected)
-            throws Exception {
+    private CompactStrategy createCompactStrategy(Map<String, String> options) 
throws Exception {
         KeyValueFileStoreWrite write = createWriteWithOptions(options);
         write.withIOManager(ioManager);
         TestKeyValueGenerator gen = new TestKeyValueGenerator();
@@ -96,8 +102,7 @@ public class KeyValueFileStoreWriteTest {
         MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
         try (MergeTreeCompactManager compactManager =
                 (MergeTreeCompactManager) writer.compactManager()) {
-            CompactStrategy compactStrategy = compactManager.getStrategy();
-            assertThat(compactStrategy).isInstanceOf(expected);
+            return compactManager.getStrategy();
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 560a8fd8aa..ac57e7d874 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,7 +19,12 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
@@ -40,13 +45,13 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,7 +61,56 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
 
     @Override
     protected List<String> ddl() {
-        return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT, 
b INT, c INT)");
+        return singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c 
INT)");
+    }
+
+    @Test
+    public void testFullCompactionNoDv() throws Catalog.TableNotExistException 
{
+        sql(
+                "CREATE TEMPORARY TABLE GEN (a INT) WITH ("
+                        + "'connector'='datagen', "
+                        + "'number-of-rows'='1000', "
+                        + "'fields.a.kind'='sequence', "
+                        + "'fields.a.start'='0', "
+                        + "'fields.a.end'='1000')");
+        sql(
+                "CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING, c 
STRING) WITH ("
+                        + "'bucket' = '1', "
+                        + "'file.format' = 'avro', "
+                        + "'file.compression' = 'null', "
+                        + "'deletion-vectors.enabled' = 'true')");
+        batchSql("INSERT INTO T1 SELECT a, 'unknown', 'unknown' FROM GEN");
+
+        // first insert, producing dv files
+        batchSql("INSERT INTO T1 VALUES (1, '22', '33')");
+        FileStoreTable table = paimonTable("T1");
+        Snapshot snapshot = table.latestSnapshot().get();
+        assertThat(deletionVectors(table, snapshot)).hasSize(1);
+        assertThat(batchSql("SELECT * FROM T1 WHERE a = 
1")).containsExactly(Row.of(1, "22", "33"));
+
+        // second insert, producing no dv files
+        batchSql("ALTER TABLE T1 SET ('compaction.total-size-threshold' = 
'1m')");
+        batchSql("INSERT INTO T1 VALUES (1, '44', '55')");
+        snapshot = table.latestSnapshot().get();
+        assertThat(deletionVectors(table, snapshot)).hasSize(0);
+        assertThat(batchSql("SELECT * FROM T1 WHERE a = 
1")).containsExactly(Row.of(1, "44", "55"));
+
+        // third insert, producing no dv files, same index manifest
+        batchSql("INSERT INTO T1 VALUES (1, '66', '77')");
+        assertThat(table.latestSnapshot().get().indexManifest())
+                .isEqualTo(snapshot.indexManifest());
+        assertThat(batchSql("SELECT * FROM T1 WHERE a = 
1")).containsExactly(Row.of(1, "66", "77"));
+    }
+
+    private Map<String, DeletionVector> deletionVectors(FileStoreTable table, 
Snapshot snapshot) {
+        assertThat(snapshot.indexManifest()).isNotNull();
+        List<IndexManifestEntry> indexManifestEntries =
+                table.indexManifestFileReader().read(snapshot.indexManifest());
+        assertThat(indexManifestEntries.size()).isEqualTo(1);
+        IndexFileMeta indexFileMeta = indexManifestEntries.get(0).indexFile();
+        return table.store()
+                .newIndexFileHandler()
+                .readAllDeletionVectors(singletonList(indexFileMeta));
     }
 
     @Test
@@ -794,7 +848,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
         // snapshot 5,6
         String dataId =
                 TestValuesTableFactory.registerData(
-                        Collections.singletonList(Row.ofKind(RowKind.DELETE, 
2, "B")));
+                        singletonList(Row.ofKind(RowKind.DELETE, 2, "B")));
         sEnv.executeSql(
                 "CREATE TEMPORARY TABLE source (id INT, v STRING) "
                         + "WITH ('connector' = 'values', 'bounded' = 'true', 
'data-id' = '"

Reply via email to