This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 70831894da [core] Introduce 'compaction.total-size-threshold' to do
full compaction (#5973)
70831894da is described below
commit 70831894da351b16f67a4821282f28054a80d186
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 31 10:12:09 2025 +0800
[core] Introduce 'compaction.total-size-threshold' to do full compaction
(#5973)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 21 +++-
.../mergetree/compact/ForceUpLevel0Compaction.java | 37 ++++++-
.../mergetree/compact/FullCompactTrigger.java | 98 +++++++++++++++++
.../paimon/mergetree/compact}/OffPeakHours.java | 52 ++++-----
.../mergetree/compact/UniversalCompaction.java | 117 ++++-----------------
.../paimon/operation/KeyValueFileStoreWrite.java | 42 ++++----
.../apache/paimon/mergetree/MergeTreeTestBase.java | 6 +-
.../compact/ForceUpLevel0CompactionTest.java | 3 +-
.../mergetree/compact}/OffPeakHoursTest.java | 32 +-----
.../mergetree/compact/UniversalCompactionTest.java | 66 ++++++++----
.../operation/KeyValueFileStoreWriteTest.java | 23 ++--
.../apache/paimon/flink/BatchFileStoreITCase.java | 60 ++++++++++-
13 files changed, 348 insertions(+), 215 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 311914ebb0..85fa54aa63 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -272,6 +272,12 @@ under the License.
<td>Integer</td>
<td>Percentage flexibility while comparing sorted run size for
changelog mode table. If the candidate sorted run(s) size is 1% smaller than
the next sorted run's size, then include next sorted run into this candidate
set.</td>
</tr>
+ <tr>
+ <td><h5>compaction.total-size-threshold</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>When total size is smaller than this threshold, force a full
compaction.</td>
+ </tr>
<tr>
<td><h5>consumer-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 56f6daf40b..ca2c67d0f4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -733,6 +733,13 @@ public class CoreOptions implements Serializable {
"Implying how often to perform an optimization
compaction, this configuration is used to "
+ "ensure the query timeliness of the
read-optimized system table.");
+ public static final ConfigOption<MemorySize>
COMPACTION_TOTAL_SIZE_THRESHOLD =
+ key("compaction.total-size-threshold")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ "When total size is smaller than this threshold,
force a full compaction.");
+
public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
key("compaction.min.file-num")
.intType()
@@ -2364,6 +2371,11 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
}
+ @Nullable
+ public MemorySize compactionTotalSizeThreshold() {
+ return options.get(COMPACTION_TOTAL_SIZE_THRESHOLD);
+ }
+
public int numSortedRunStopTrigger() {
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
if (stopTrigger == null) {
@@ -2416,9 +2428,12 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_SIZE_RATIO);
}
- public OffPeakHours offPeakHours() {
- return OffPeakHours.create(
- options.get(COMPACT_OFFPEAK_START_HOUR),
options.get(COMPACT_OFFPEAK_END_HOUR));
+ public int compactOffPeakStartHour() {
+ return options.get(COMPACT_OFFPEAK_START_HOUR);
+ }
+
+ public int compactOffPeakEndHour() {
+ return options.get(COMPACT_OFFPEAK_END_HOUR);
}
public int compactOffPeakRatio() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
index 350c5fb056..1f23f598c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java
@@ -21,16 +21,29 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.mergetree.LevelSortedRun;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
/** A {@link CompactStrategy} to force compacting level 0 files. */
public class ForceUpLevel0Compaction implements CompactStrategy {
private final UniversalCompaction universal;
+ @Nullable private final Integer maxCompactInterval;
+ @Nullable private final AtomicInteger compactTriggerCount;
- public ForceUpLevel0Compaction(UniversalCompaction universal) {
+ public ForceUpLevel0Compaction(
+ UniversalCompaction universal, @Nullable Integer
maxCompactInterval) {
this.universal = universal;
+ this.maxCompactInterval = maxCompactInterval;
+ this.compactTriggerCount = maxCompactInterval == null ? null : new
AtomicInteger(0);
+ }
+
+ @Nullable
+ public Integer maxCompactInterval() {
+ return maxCompactInterval;
}
@Override
@@ -40,6 +53,26 @@ public class ForceUpLevel0Compaction implements
CompactStrategy {
return pick;
}
- return universal.forcePickL0(numLevels, runs);
+ if (maxCompactInterval == null || compactTriggerCount == null) {
+ return universal.forcePickL0(numLevels, runs);
+ }
+
+ compactTriggerCount.getAndIncrement();
+ if (compactTriggerCount.compareAndSet(maxCompactInterval, 0)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Universal compaction due to max lookup compaction
interval {}.",
+ maxCompactInterval);
+ }
+ return universal.forcePickL0(numLevels, runs);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Skip universal compaction due to lookup compaction
trigger count {} is less than the max interval {}.",
+ compactTriggerCount.get(),
+ maxCompactInterval);
+ }
+ return Optional.empty();
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
new file mode 100644
index 0000000000..931b07630d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullCompactTrigger.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.options.MemorySize;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+
+/** Trigger full compaction. */
+public class FullCompactTrigger {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FullCompactTrigger.class);
+
+ @Nullable private final Long fullCompactionInterval;
+ @Nullable private final Long totalSizeThreshold;
+
+ @Nullable private Long lastFullCompaction;
+
+ public FullCompactTrigger(
+ @Nullable Long fullCompactionInterval, @Nullable Long
totalSizeThreshold) {
+ this.fullCompactionInterval = fullCompactionInterval;
+ this.totalSizeThreshold = totalSizeThreshold;
+ }
+
+ @Nullable
+ public static FullCompactTrigger create(CoreOptions options) {
+ Duration interval = options.optimizedCompactionInterval();
+ MemorySize threshold = options.compactionTotalSizeThreshold();
+ if (interval == null && threshold == null) {
+ return null;
+ }
+ return new FullCompactTrigger(
+ interval == null ? null : interval.toMillis(),
+ threshold == null ? null : threshold.getBytes());
+ }
+
+ public Optional<CompactUnit> tryFullCompact(int numLevels,
List<LevelSortedRun> runs) {
+ if (runs.size() == 1) {
+ return Optional.empty();
+ }
+
+ int maxLevel = numLevels - 1;
+ if (fullCompactionInterval != null) {
+ if (lastFullCompaction == null
+ || currentTimeMillis() - lastFullCompaction >
fullCompactionInterval) {
+ LOG.debug("Universal compaction due to full compaction
interval");
+ updateLastFullCompaction();
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ }
+ }
+ if (totalSizeThreshold != null) {
+ long totalSize = 0;
+ for (LevelSortedRun run : runs) {
+ totalSize += run.run().totalSize();
+ }
+ if (totalSize < totalSizeThreshold) {
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ }
+ }
+ return Optional.empty();
+ }
+
+ public void updateLastFullCompaction() {
+ lastFullCompaction = currentTimeMillis();
+ }
+
+ @VisibleForTesting
+ long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+}
diff --git a/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
similarity index 63%
rename from paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
rename to
paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
index b33e27618b..203af06cf6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
@@ -16,55 +16,52 @@
* limitations under the License.
*/
-package org.apache.paimon;
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+
+import javax.annotation.Nullable;
import java.time.LocalDateTime;
-/** OffPeakHours. */
+/** OffPeakHours to control compaction ratio by hours. */
public abstract class OffPeakHours {
- public abstract boolean isOffPeak();
-
public abstract boolean isOffPeak(int targetHour);
- public static final OffPeakHours DISABLED =
- new OffPeakHours() {
- @Override
- public boolean isOffPeak() {
- return false;
- }
+ public abstract int currentRatio();
- @Override
- public boolean isOffPeak(int targetHour) {
- return false;
- }
- };
+ @Nullable
+ public static OffPeakHours create(CoreOptions options) {
+ return create(
+ options.compactOffPeakStartHour(),
+ options.compactOffPeakEndHour(),
+ options.compactOffPeakRatio());
+ }
- public static OffPeakHours create(int startHour, int endHour) {
+ @Nullable
+ public static OffPeakHours create(int startHour, int endHour, int
compactOffPeakRatio) {
if (startHour == -1 && endHour == -1) {
- return DISABLED;
+ return null;
}
if (startHour == endHour) {
- return DISABLED;
+ return null;
}
- return new OffPeakHoursImpl(startHour, endHour);
+ return new OffPeakHoursImpl(startHour, endHour, compactOffPeakRatio);
}
private static class OffPeakHoursImpl extends OffPeakHours {
private final int startHour;
private final int endHour;
+ private final int compactOffPeakRatio;
- OffPeakHoursImpl(int startHour, int endHour) {
+ OffPeakHoursImpl(int startHour, int endHour, int compactOffPeakRatio) {
this.startHour = startHour;
this.endHour = endHour;
- }
-
- @Override
- public boolean isOffPeak() {
- return isOffPeak(LocalDateTime.now().getHour());
+ this.compactOffPeakRatio = compactOffPeakRatio;
}
@Override
@@ -74,5 +71,10 @@ public abstract class OffPeakHours {
}
return targetHour < endHour || startHour <= targetHour;
}
+
+ @Override
+ public int currentRatio() {
+ return isOffPeak(LocalDateTime.now().getHour()) ?
compactOffPeakRatio : 0;
+ }
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 63e71354cf..5313682a90 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.OffPeakHours;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.mergetree.LevelSortedRun;
@@ -29,10 +28,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.time.Duration;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Universal Compaction Style is a compaction style, targeting the use cases
requiring lower write
@@ -48,81 +45,32 @@ public class UniversalCompaction implements CompactStrategy
{
private final int maxSizeAmp;
private final int sizeRatio;
private final int numRunCompactionTrigger;
- private final OffPeakHours offPeakHours;
- private final int compactOffPeakRatio;
- @Nullable private final Long opCompactionInterval;
- @Nullable private Long lastOptimizedCompaction;
-
- @Nullable private final Integer maxLookupCompactInterval;
- @Nullable private final AtomicInteger lookupCompactTriggerCount;
-
- public UniversalCompaction(int maxSizeAmp, int sizeRatio, int
numRunCompactionTrigger) {
- this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null,
OffPeakHours.DISABLED, 0);
- }
-
- public UniversalCompaction(
- int maxSizeAmp,
- int sizeRatio,
- int numRunCompactionTrigger,
- OffPeakHours offPeakHours,
- int compactOffPeakRatio) {
- this(
- maxSizeAmp,
- sizeRatio,
- numRunCompactionTrigger,
- null,
- offPeakHours,
- compactOffPeakRatio);
- }
-
- public UniversalCompaction(
- int maxSizeAmp,
- int sizeRatio,
- int numRunCompactionTrigger,
- @Nullable Duration opCompactionInterval,
- OffPeakHours offPeakHours,
- int compactOffPeakRatio) {
- this(
- maxSizeAmp,
- sizeRatio,
- numRunCompactionTrigger,
- opCompactionInterval,
- null,
- offPeakHours,
- compactOffPeakRatio);
- }
+ @Nullable private final FullCompactTrigger fullCompactTrigger;
+ @Nullable private final OffPeakHours offPeakHours;
public UniversalCompaction(
int maxSizeAmp,
int sizeRatio,
int numRunCompactionTrigger,
- @Nullable Duration opCompactionInterval,
- @Nullable Integer maxLookupCompactInterval,
- OffPeakHours offPeakHours,
- int compactOffPeakRatio) {
+ @Nullable FullCompactTrigger fullCompactTrigger,
+ @Nullable OffPeakHours offPeakHours) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
- this.offPeakHours = offPeakHours;
this.numRunCompactionTrigger = numRunCompactionTrigger;
- this.opCompactionInterval =
- opCompactionInterval == null ? null :
opCompactionInterval.toMillis();
- this.maxLookupCompactInterval = maxLookupCompactInterval;
- this.lookupCompactTriggerCount =
- maxLookupCompactInterval == null ? null : new AtomicInteger(0);
- this.compactOffPeakRatio = compactOffPeakRatio;
+ this.fullCompactTrigger = fullCompactTrigger;
+ this.offPeakHours = offPeakHours;
}
@Override
public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun>
runs) {
int maxLevel = numLevels - 1;
- if (opCompactionInterval != null) {
- if (lastOptimizedCompaction == null
- || currentTimeMillis() - lastOptimizedCompaction >
opCompactionInterval) {
- LOG.debug("Universal compaction due to optimized compaction
interval");
- updateLastOptimizedCompaction();
- return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ // 0 try full compaction by trigger
+ if (fullCompactTrigger != null) {
+ Optional<CompactUnit> unit =
fullCompactTrigger.tryFullCompact(numLevels, runs);
+ if (unit.isPresent()) {
+ return unit;
}
}
@@ -154,26 +102,6 @@ public class UniversalCompaction implements
CompactStrategy {
return Optional.ofNullable(pickForSizeRatio(maxLevel, runs,
candidateCount));
}
- // 4 checking if a forced L0 compact should be triggered
- if (maxLookupCompactInterval != null && lookupCompactTriggerCount !=
null) {
- lookupCompactTriggerCount.getAndIncrement();
- if
(lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Universal compaction due to max lookup compaction
interval {}.",
- maxLookupCompactInterval);
- }
- return forcePickL0(numLevels, runs);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Skip universal compaction due to lookup
compaction trigger count {} is less than the max interval {}.",
- lookupCompactTriggerCount.get(),
- maxLookupCompactInterval);
- }
- }
- }
-
return Optional.empty();
}
@@ -208,7 +136,9 @@ public class UniversalCompaction implements CompactStrategy
{
// size amplification = percentage of additional size
if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
- updateLastOptimizedCompaction();
+ if (fullCompactTrigger != null) {
+ fullCompactTrigger.updateLastFullCompaction();
+ }
return CompactUnit.fromLevelRuns(maxLevel, runs);
}
@@ -234,11 +164,8 @@ public class UniversalCompaction implements
CompactStrategy {
long candidateSize = candidateSize(runs, candidateCount);
for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
- if (candidateSize
- * (100.0
- + sizeRatio
- + (offPeakHours.isOffPeak() ?
compactOffPeakRatio : 0))
- / 100.0
+ int offPeakRatio = offPeakHours == null ? 0 :
offPeakHours.currentRatio();
+ if (candidateSize * (100.0 + sizeRatio + offPeakRatio) / 100.0
< next.run().totalSize()) {
break;
}
@@ -285,18 +212,12 @@ public class UniversalCompaction implements
CompactStrategy {
}
if (runCount == runs.size()) {
- updateLastOptimizedCompaction();
+ if (fullCompactTrigger != null) {
+ fullCompactTrigger.updateLastFullCompaction();
+ }
outputLevel = maxLevel;
}
return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0,
runCount));
}
-
- private void updateLastOptimizedCompaction() {
- lastOptimizedCompaction = currentTimeMillis();
- }
-
- long currentTimeMillis() {
- return System.currentTimeMillis();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 15787ea5de..4e9e558d04 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -53,12 +53,14 @@ import org.apache.paimon.mergetree.compact.CompactRewriter;
import org.apache.paimon.mergetree.compact.CompactStrategy;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
+import org.apache.paimon.mergetree.compact.FullCompactTrigger;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.MergeTreeCompactRewriter;
+import org.apache.paimon.mergetree.compact.OffPeakHours;
import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
@@ -220,25 +222,22 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private CompactStrategy createCompactStrategy(CoreOptions options) {
if (options.needLookup()) {
- if
(CoreOptions.LookupCompactMode.RADICAL.equals(options.lookupCompact())) {
- return new ForceUpLevel0Compaction(
- new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.optimizedCompactionInterval(),
- options.offPeakHours(),
- options.compactOffPeakRatio()));
- } else if
(CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) {
- return new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.optimizedCompactionInterval(),
- options.lookupCompactMaxInterval(),
- options.offPeakHours(),
- options.compactOffPeakRatio());
+ Integer compactMaxInterval = null;
+ switch (options.lookupCompact()) {
+ case GENTLE:
+ compactMaxInterval = options.lookupCompactMaxInterval();
+ break;
+ case RADICAL:
+ break;
}
+ return new ForceUpLevel0Compaction(
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent(),
+ options.sortedRunSizeRatio(),
+ options.numSortedRunCompactionTrigger(),
+ FullCompactTrigger.create(options),
+ OffPeakHours.create(options)),
+ compactMaxInterval);
}
UniversalCompaction universal =
@@ -246,11 +245,10 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- options.optimizedCompactionInterval(),
- options.offPeakHours(),
- options.compactOffPeakRatio());
+ FullCompactTrigger.create(options),
+ OffPeakHours.create(options));
if (options.compactionForceUpLevel0()) {
- return new ForceUpLevel0Compaction(universal);
+ return new ForceUpLevel0Compaction(universal, null);
} else {
return universal;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 95daf384e7..11d3ede213 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -46,7 +46,6 @@ import
org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
-import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
@@ -90,6 +89,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
+import static
org.apache.paimon.mergetree.compact.UniversalCompactionTest.ofTesting;
import static
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
import static org.assertj.core.api.Assertions.assertThat;
@@ -277,7 +277,7 @@ public abstract class MergeTreeTestBase {
new MockFailResultCompactionManager(
service,
new Levels(comparator, dataFileMetas,
options.numLevels()),
- new UniversalCompaction(
+ ofTesting(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger()),
@@ -438,7 +438,7 @@ public abstract class MergeTreeTestBase {
private MergeTreeCompactManager createCompactManager(
ExecutorService compactExecutor, List<DataFileMeta> files) {
CompactStrategy strategy =
- new UniversalCompaction(
+ ofTesting(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
index 07eedc9214..126ce000f0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ForceUpLevel0CompactionTest.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.Optional;
import static org.apache.paimon.mergetree.compact.UniversalCompactionTest.file;
+import static
org.apache.paimon.mergetree.compact.UniversalCompactionTest.ofTesting;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link ForceUpLevel0Compaction}. */
@@ -37,7 +38,7 @@ public class ForceUpLevel0CompactionTest {
@Test
public void testForceCompaction0() {
ForceUpLevel0Compaction compaction =
- new ForceUpLevel0Compaction(new UniversalCompaction(200, 1,
5));
+ new ForceUpLevel0Compaction(ofTesting(200, 1, 5), null);
Optional<CompactUnit> result = compaction.pick(3, Arrays.asList(run(0,
1), run(0, 1)));
assertThat(result).isPresent();
diff --git
a/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
similarity index 77%
rename from
paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
index 699d02f06b..6978280e3c 100644
--- a/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
@@ -16,9 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.offpeak;
-
-import org.apache.paimon.OffPeakHours;
+package org.apache.paimon.mergetree.compact;
import org.junit.jupiter.api.Test;
@@ -27,32 +25,10 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link OffPeakHours}. */
public class OffPeakHoursTest {
- @Test
- public void testDisabledInstance() {
- OffPeakHours disabled = OffPeakHours.DISABLED;
- for (int hour = 0; hour < 24; hour++) {
- assertThat(disabled.isOffPeak(hour)).isFalse();
- }
- }
-
- @Test
- public void testGetInstanceWithDisabledValues() {
- OffPeakHours offPeakHours = OffPeakHours.create(-1, -1);
- assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
- }
-
- @Test
- public void testGetInstanceWithSameStartAndEnd() {
- for (int hour = 0; hour < 24; hour++) {
- OffPeakHours offPeakHours = OffPeakHours.create(hour, hour);
- assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
- }
- }
-
@Test
public void testNormalRangeOffPeakHours() {
// Test normal range: 9 AM to 5 PM (9-17)
- OffPeakHours offPeakHours = OffPeakHours.create(9, 17);
+ OffPeakHours offPeakHours = OffPeakHours.create(9, 17, 0);
// Hours before start should not be off-peak
for (int hour = 0; hour < 9; hour++) {
@@ -78,7 +54,7 @@ public class OffPeakHoursTest {
@Test
public void testWrapAroundRangeOffPeakHours() {
- OffPeakHours offPeakHours = OffPeakHours.create(22, 6);
+ OffPeakHours offPeakHours = OffPeakHours.create(22, 6, 0);
// Hours before end (0-5) should be off-peak
for (int hour = 0; hour < 6; hour++) {
@@ -105,7 +81,7 @@ public class OffPeakHoursTest {
@Test
public void testSingleHourRange() {
// Test single hour range: 12 to 13
- OffPeakHours offPeakHours = OffPeakHours.create(12, 13);
+ OffPeakHours offPeakHours = OffPeakHours.create(12, 13, 0);
// Only hour 12 should be off-peak
for (int hour = 0; hour < 24; hour++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index dccd22b008..4c892e40d1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.OffPeakHours;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
@@ -27,9 +26,9 @@ import org.apache.paimon.mergetree.SortedRun;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,7 +41,7 @@ public class UniversalCompactionTest {
@Test
public void testOutputLevel() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+ UniversalCompaction compaction = ofTesting(25, 1, 3);
assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
1).outputLevel())
.isEqualTo(1);
assertThat(compaction.createUnit(createLevels(0, 0, 1, 3, 4), 5,
2).outputLevel())
@@ -57,7 +56,7 @@ public class UniversalCompactionTest {
@Test
public void testPick() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+ UniversalCompaction compaction = ofTesting(25, 1, 3);
// by size amplification
Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
@@ -86,14 +85,15 @@ public class UniversalCompactionTest {
@Test
public void testOptimizedCompactionInterval() {
AtomicLong time = new AtomicLong(0);
- UniversalCompaction compaction =
- new UniversalCompaction(
- 100, 1, 3, Duration.ofMillis(1000),
OffPeakHours.DISABLED, 0) {
+ FullCompactTrigger fullCompactTrigger =
+ new FullCompactTrigger(1000L, null) {
@Override
long currentTimeMillis() {
return time.get();
}
};
+ UniversalCompaction compaction =
+ new UniversalCompaction(100, 1, 3, fullCompactTrigger, null);
// first time, force optimized compaction
Optional<CompactUnit> pick =
@@ -126,9 +126,31 @@ public class UniversalCompactionTest {
assertThat(pick.isPresent()).isFalse();
}
+ @Test
+ public void testTotalSizeThreshold() {
+ FullCompactTrigger fullCompactTrigger = new FullCompactTrigger(null,
10L);
+ UniversalCompaction compaction =
+ new UniversalCompaction(100, 1, 3, fullCompactTrigger, null);
+
+ // total size less than threshold
+ Optional<CompactUnit> pick =
+ compaction.pick(3, Arrays.asList(level(0, 1), level(1, 3),
level(2, 5)));
+ assertThat(pick.isPresent()).isTrue();
+ long[] results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+ assertThat(results).isEqualTo(new long[] {1, 3, 5});
+
+ // total size bigger than threshold
+ pick = compaction.pick(3, Arrays.asList(level(0, 2), level(1, 6),
level(2, 10)));
+ assertThat(pick.isPresent()).isFalse();
+
+ // one sort run, not trigger
+ pick = compaction.pick(3, Collections.singletonList(level(3, 5)));
+ assertThat(pick.isPresent()).isFalse();
+ }
+
@Test
public void testNoOutputLevel0() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+ UniversalCompaction compaction = ofTesting(25, 1, 3);
Optional<CompactUnit> pick =
compaction.pick(
@@ -148,7 +170,7 @@ public class UniversalCompactionTest {
@Test
public void testExtremeCaseNoOutputLevel0() {
- UniversalCompaction compaction = new UniversalCompaction(200, 1, 5);
+ UniversalCompaction compaction = ofTesting(200, 1, 5);
Optional<CompactUnit> pick =
compaction.pick(
@@ -168,7 +190,7 @@ public class UniversalCompactionTest {
@Test
public void testSizeAmplification() {
- UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
+ UniversalCompaction compaction = ofTesting(25, 0, 1);
long[] sizes = new long[] {1};
sizes = appendAndPickForSizeAmp(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {2});
@@ -208,7 +230,7 @@ public class UniversalCompactionTest {
@Test
public void testSizeRatio() {
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
+ UniversalCompaction compaction = ofTesting(25, 1, 5);
long[] sizes = new long[] {1, 1, 1, 1};
sizes = appendAndPickForSizeRatio(compaction, sizes);
assertThat(sizes).isEqualTo(new long[] {5});
@@ -261,16 +283,13 @@ public class UniversalCompactionTest {
@Test
public void testSizeRatioThreshold() {
long[] sizes = new long[] {8, 9, 10};
- assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
- .isEqualTo(new long[] {8, 9, 10});
- assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
- .isEqualTo(new long[] {27});
+ assertThat(pickForSizeRatio(ofTesting(25, 10, 2),
sizes)).isEqualTo(new long[] {8, 9, 10});
+ assertThat(pickForSizeRatio(ofTesting(25, 20, 2),
sizes)).isEqualTo(new long[] {27});
}
@Test
public void testLookup() {
- ForceUpLevel0Compaction compaction =
- new ForceUpLevel0Compaction(new UniversalCompaction(25, 1, 3));
+ ForceUpLevel0Compaction compaction = new
ForceUpLevel0Compaction(ofTesting(25, 1, 3), null);
// level 0 to max level
Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 2, 2));
@@ -297,8 +316,9 @@ public class UniversalCompactionTest {
@Test
public void testForcePickL0() {
int maxInterval = 5;
- UniversalCompaction compaction =
- new UniversalCompaction(25, 1, 5, null, maxInterval,
OffPeakHours.DISABLED, 0);
+ ForceUpLevel0Compaction compaction =
+ new ForceUpLevel0Compaction(
+ new UniversalCompaction(25, 1, 5, null, null),
maxInterval);
// level 0 to max level
List<LevelSortedRun> level0ToMax = level0(1, 2, 2, 2);
@@ -324,16 +344,15 @@ public class UniversalCompactionTest {
List<LevelSortedRun> level0ForcePick = Arrays.asList(level(0, 2),
level(1, 2), level(2, 2));
for (int i = 1; i <= maxInterval; i++) {
+ pick = compaction.pick(3, level0ForcePick);
if (i == maxInterval) {
// level 0 force pick triggered
- pick = compaction.pick(3, level0ForcePick);
assertThat(pick.isPresent()).isTrue();
results =
pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
assertThat(results).isEqualTo(new long[] {2, 2, 2});
assertThat(pick.get().outputLevel()).isEqualTo(2);
} else {
// compact skipped
- pick = compaction.pick(3, level0ForcePick);
assertThat(pick.isPresent()).isFalse();
}
}
@@ -441,4 +460,9 @@ public class UniversalCompactionTest {
null,
null);
}
+
+ public static UniversalCompaction ofTesting(
+ int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) {
+ return new UniversalCompaction(maxSizeAmp, sizeRatio,
numRunCompactionTrigger, null, null);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
index 606a290bcb..baeb6e9b4e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreWriteTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.mergetree.compact.CompactStrategy;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import org.apache.paimon.mergetree.compact.MergeTreeCompactManager;
-import org.apache.paimon.mergetree.compact.UniversalCompaction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -65,7 +64,11 @@ public class KeyValueFileStoreWriteTest {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
options.put(CoreOptions.LOOKUP_COMPACT.key(), "radical");
- assertCompactStrategy(options, ForceUpLevel0Compaction.class);
+ CompactStrategy compactStrategy = createCompactStrategy(options);
+
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
+ Integer maxCompactInterval =
+ ((ForceUpLevel0Compaction)
compactStrategy).maxCompactInterval();
+ assertThat(maxCompactInterval).isNull();
}
@Test
@@ -73,19 +76,22 @@ public class KeyValueFileStoreWriteTest {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
options.put(CoreOptions.LOOKUP_COMPACT.key(), "gentle");
- assertCompactStrategy(options, UniversalCompaction.class);
+ CompactStrategy compactStrategy = createCompactStrategy(options);
+
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
+ Integer maxCompactInterval =
+ ((ForceUpLevel0Compaction)
compactStrategy).maxCompactInterval();
+ assertThat(maxCompactInterval).isEqualTo(10);
}
@Test
public void testForceUpLevel0CompactStrategy() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.COMPACTION_FORCE_UP_LEVEL_0.key(), "true");
- assertCompactStrategy(options, ForceUpLevel0Compaction.class);
+ CompactStrategy compactStrategy = createCompactStrategy(options);
+
assertThat(compactStrategy).isInstanceOf(ForceUpLevel0Compaction.class);
}
- private void assertCompactStrategy(
- Map<String, String> options, Class<? extends CompactStrategy>
expected)
- throws Exception {
+ private CompactStrategy createCompactStrategy(Map<String, String> options)
throws Exception {
KeyValueFileStoreWrite write = createWriteWithOptions(options);
write.withIOManager(ioManager);
TestKeyValueGenerator gen = new TestKeyValueGenerator();
@@ -96,8 +102,7 @@ public class KeyValueFileStoreWriteTest {
MergeTreeWriter writer = (MergeTreeWriter) writerContainer.writer;
try (MergeTreeCompactManager compactManager =
(MergeTreeCompactManager) writer.compactManager()) {
- CompactStrategy compactStrategy = compactManager.getStrategy();
- assertThat(compactStrategy).isInstanceOf(expected);
+ return compactManager.getStrategy();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 560a8fd8aa..ac57e7d874 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,7 +19,12 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
@@ -40,13 +45,13 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.math.BigDecimal;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import static java.util.Collections.singletonList;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,7 +61,56 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
@Override
protected List<String> ddl() {
- return Collections.singletonList("CREATE TABLE IF NOT EXISTS T (a INT,
b INT, c INT)");
+ return singletonList("CREATE TABLE IF NOT EXISTS T (a INT, b INT, c
INT)");
+ }
+
+ @Test
+ public void testFullCompactionNoDv() throws Catalog.TableNotExistException
{
+ sql(
+ "CREATE TEMPORARY TABLE GEN (a INT) WITH ("
+ + "'connector'='datagen', "
+ + "'number-of-rows'='1000', "
+ + "'fields.a.kind'='sequence', "
+ + "'fields.a.start'='0', "
+ + "'fields.a.end'='1000')");
+ sql(
+ "CREATE TABLE T1 (a INT PRIMARY KEY NOT ENFORCED, b STRING, c
STRING) WITH ("
+ + "'bucket' = '1', "
+ + "'file.format' = 'avro', "
+ + "'file.compression' = 'null', "
+ + "'deletion-vectors.enabled' = 'true')");
+ batchSql("INSERT INTO T1 SELECT a, 'unknown', 'unknown' FROM GEN");
+
+ // first insert, producing dv files
+ batchSql("INSERT INTO T1 VALUES (1, '22', '33')");
+ FileStoreTable table = paimonTable("T1");
+ Snapshot snapshot = table.latestSnapshot().get();
+ assertThat(deletionVectors(table, snapshot)).hasSize(1);
+ assertThat(batchSql("SELECT * FROM T1 WHERE a =
1")).containsExactly(Row.of(1, "22", "33"));
+
+ // second insert, producing no dv files
+ batchSql("ALTER TABLE T1 SET ('compaction.total-size-threshold' =
'1m')");
+ batchSql("INSERT INTO T1 VALUES (1, '44', '55')");
+ snapshot = table.latestSnapshot().get();
+ assertThat(deletionVectors(table, snapshot)).hasSize(0);
+ assertThat(batchSql("SELECT * FROM T1 WHERE a =
1")).containsExactly(Row.of(1, "44", "55"));
+
+ // third insert, producing no dv files, same index manifest
+ batchSql("INSERT INTO T1 VALUES (1, '66', '77')");
+ assertThat(table.latestSnapshot().get().indexManifest())
+ .isEqualTo(snapshot.indexManifest());
+ assertThat(batchSql("SELECT * FROM T1 WHERE a =
1")).containsExactly(Row.of(1, "66", "77"));
+ }
+
+ private Map<String, DeletionVector> deletionVectors(FileStoreTable table,
Snapshot snapshot) {
+ assertThat(snapshot.indexManifest()).isNotNull();
+ List<IndexManifestEntry> indexManifestEntries =
+ table.indexManifestFileReader().read(snapshot.indexManifest());
+ assertThat(indexManifestEntries.size()).isEqualTo(1);
+ IndexFileMeta indexFileMeta = indexManifestEntries.get(0).indexFile();
+ return table.store()
+ .newIndexFileHandler()
+ .readAllDeletionVectors(singletonList(indexFileMeta));
}
@Test
@@ -794,7 +848,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
// snapshot 5,6
String dataId =
TestValuesTableFactory.registerData(
- Collections.singletonList(Row.ofKind(RowKind.DELETE,
2, "B")));
+ singletonList(Row.ofKind(RowKind.DELETE, 2, "B")));
sEnv.executeSql(
"CREATE TEMPORARY TABLE source (id INT, v STRING) "
+ "WITH ('connector' = 'values', 'bounded' = 'true',
'data-id' = '"