This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7448de9c77 [core] Support off-peak hours in universal compaction
(#5928)
7448de9c77 is described below
commit 7448de9c77e079a442909dd6236f1ed2bd2479f6
Author: xiaochen <[email protected]>
AuthorDate: Mon Jul 21 21:12:09 2025 +0800
[core] Support off-peak hours in universal compaction (#5928)
---
.../shortcodes/generated/core_configuration.html | 18 +++
.../main/java/org/apache/paimon/CoreOptions.java | 43 +++++++
.../main/java/org/apache/paimon/OffPeakHours.java | 108 ++++++++++++++++
.../apache/paimon/offpeak/OffPeakHoursTest.java | 140 +++++++++++++++++++++
.../mergetree/compact/UniversalCompaction.java | 46 ++++++-
.../paimon/operation/KeyValueFileStoreWrite.java | 12 +-
.../mergetree/compact/UniversalCompactionTest.java | 24 +++-
7 files changed, 381 insertions(+), 10 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index ed6ba5da13..f4276640e8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -242,6 +242,24 @@ under the License.
<td>Integer</td>
<td>For file set [f_0,...,f_N], the minimum file number to trigger
a compaction for append-only table.</td>
</tr>
+ <tr>
+ <td><h5>compaction.offpeak-ratio</h5></td>
+ <td style="word-wrap: break-word;">0</td>
+ <td>Integer</td>
+ <td>Allows you to set a different (by default, more aggressive)
percentage ratio for determining whether larger sorted run's size are included
in compactions during off-peak hours. Works in the same way as
compaction.size-ratio. Only applies if offpeak.start.hour and offpeak.end.hour
are also enabled. <br /> For instance, if your cluster experiences low pressure
between 2 AM and 6 PM , you can configure `compaction.offpeak.start.hour=2`
and `compaction.offpeak.end.hour=1 [...]
+ </tr>
+ <tr>
+ <td><h5>compaction.offpeak.end.hour</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>The end of off-peak hours, expressed as an integer between 0
and 23, inclusive. Set to -1 to disable off-peak.</td>
+ </tr>
+ <tr>
+ <td><h5>compaction.offpeak.start.hour</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>The start of off-peak hours, expressed as an integer between 0
and 23, inclusive Set to -1 to disable off-peak</td>
+ </tr>
<tr>
<td><h5>compaction.optimization-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 19ca49737a..ba5cd54cad 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -683,6 +683,40 @@ public class CoreOptions implements Serializable {
+ "size is 1% smaller than the next sorted
run's size, then include next sorted run "
+ "into this candidate set.");
+ public static final ConfigOption<Integer> COMPACT_OFFPEAK_START_HOUR =
+ key("compaction.offpeak.start.hour")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The start of off-peak hours, expressed as an
integer between 0 and 23, inclusive"
+ + " Set to -1 to disable off-peak");
+
+ public static final ConfigOption<Integer> COMPACT_OFFPEAK_END_HOUR =
+ key("compaction.offpeak.end.hour")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The end of off-peak hours, expressed as an
integer between 0 and 23, inclusive. Set"
+ + " to -1 to disable off-peak.");
+
+ public static final ConfigOption<Integer> COMPACTION_OFFPEAK_RATIO =
+ key("compaction.offpeak-ratio")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ Description.builder()
+ .text(
+ "Allows you to set a different (by
default, more aggressive) percentage ratio for determining "
+ + " whether larger sorted
run's size are included in compactions during off-peak hours. Works in the "
+ + " same way as
compaction.size-ratio. Only applies if offpeak.start.hour and "
+ + " offpeak.end.hour are
also enabled. ")
+ .linebreak()
+ .text(
+ " For instance, if your cluster
experiences low pressure between 2 AM and 6 PM , "
+ + " you can configure
`compaction.offpeak.start.hour=2` and `compaction.offpeak.end.hour=18` to
define this period as off-peak hours. "
+ + " During these hours,
you can increase the off-peak compaction ratio (e.g.
`compaction.offpeak-ratio=20`) to enable more aggressive data compaction")
+ .build());
+
public static final ConfigOption<Duration>
COMPACTION_OPTIMIZATION_INTERVAL =
key("compaction.optimization-interval")
.durationType()
@@ -2344,6 +2378,15 @@ public class CoreOptions implements Serializable {
return options.get(COMPACTION_SIZE_RATIO);
}
+ public OffPeakHours offPeakHours() {
+ return OffPeakHours.getInstance(
+ options.get(COMPACT_OFFPEAK_START_HOUR),
options.get(COMPACT_OFFPEAK_END_HOUR));
+ }
+
+ public int compactOffPeakRatio() {
+ return options.get(COMPACTION_OFFPEAK_RATIO);
+ }
+
public int compactionMinFileNum() {
return options.get(COMPACTION_MIN_FILE_NUM);
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
b/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
new file mode 100644
index 0000000000..35e934cf49
--- /dev/null
+++ b/paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+/** OffPeakHours. */
+public abstract class OffPeakHours {
+ private static final Logger LOG =
LoggerFactory.getLogger(OffPeakHours.class);
+
+ public static final OffPeakHours DISABLED =
+ new OffPeakHours() {
+ @Override
+ public boolean isOffPeak() {
+ return false;
+ }
+
+ @Override
+ public boolean isOffPeak(int targetHour) {
+ return false;
+ }
+ };
+
+ /**
+ * @param startHour inclusive
+ * @param endHour exclusive
+ */
+ public static OffPeakHours getInstance(int startHour, int endHour) {
+ if (startHour == -1 && endHour == -1) {
+ return DISABLED;
+ }
+
+ if (!isValidHour(startHour) || !isValidHour(endHour)) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(
+ "Ignoring invalid start/end hour for peak hour : start
= "
+ + startHour
+ + " end = "
+ + endHour
+ + ". Valid numbers are [0-23]");
+ }
+ return DISABLED;
+ }
+
+ if (startHour == endHour) {
+ return DISABLED;
+ }
+
+ return new OffPeakHoursImpl(startHour, endHour);
+ }
+
+ private static boolean isValidHour(int hour) {
+ return 0 <= hour && hour <= 23;
+ }
+
+ /** Returns whether {@code targetHour} is off-peak hour. */
+ public abstract boolean isOffPeak(int targetHour);
+
+ /** Returns whether it is off-peak hour. */
+ public abstract boolean isOffPeak();
+
+ private static class OffPeakHoursImpl extends OffPeakHours {
+ final int startHour;
+ final int endHour;
+
+ /**
+ * @param startHour inclusive
+ * @param endHour exclusive
+ */
+ OffPeakHoursImpl(int startHour, int endHour) {
+ this.startHour = startHour;
+ this.endHour = endHour;
+ }
+
+ @Override
+ public boolean isOffPeak() {
+ return
isOffPeak(ZonedDateTime.now(ZoneId.systemDefault()).getHour());
+ }
+
+ @Override
+ public boolean isOffPeak(int targetHour) {
+ if (startHour <= endHour) {
+ return startHour <= targetHour && targetHour < endHour;
+ }
+ return targetHour < endHour || startHour <= targetHour;
+ }
+ }
+}
diff --git
a/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
b/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
new file mode 100644
index 0000000000..92c786d35c
--- /dev/null
+++ b/paimon-api/src/test/java/org/apache/paimon/offpeak/OffPeakHoursTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.offpeak;
+
+import org.apache.paimon.OffPeakHours;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OffPeakHours}. */
+public class OffPeakHoursTest {
+
+ @Test
+ public void testDisabledInstance() {
+ OffPeakHours disabled = OffPeakHours.DISABLED;
+ assertThat(disabled.isOffPeak()).isFalse();
+ for (int hour = 0; hour < 24; hour++) {
+ assertThat(disabled.isOffPeak(hour)).isFalse();
+ }
+ }
+
+ @Test
+ public void testGetInstanceWithDisabledValues() {
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(-1, -1);
+ assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
+ }
+
+ @Test
+ public void testGetInstanceWithSameStartAndEnd() {
+ for (int hour = 0; hour < 24; hour++) {
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(hour, hour);
+ assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {-2, -1, 24, 25, 100})
+ public void testGetInstanceWithInvalidStartHour(int invalidHour) {
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(invalidHour, 10);
+ assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {-2, -1, 24, 25, 100})
+ public void testGetInstanceWithInvalidEndHour(int invalidHour) {
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(10, invalidHour);
+ assertThat(offPeakHours).isSameAs(OffPeakHours.DISABLED);
+ }
+
+ @Test
+ public void testNormalRangeOffPeakHours() {
+ // Test normal range: 9 AM to 5 PM (9-17)
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(9, 17);
+
+ // Hours before start should not be off-peak
+ for (int hour = 0; hour < 9; hour++) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should not be off-peak", hour)
+ .isFalse();
+ }
+
+ // Hours in range should be off-peak (start inclusive, end exclusive)
+ for (int hour = 9; hour < 17; hour++) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should be off-peak", hour)
+ .isTrue();
+ }
+
+ // Hours after end should not be off-peak
+ for (int hour = 17; hour < 24; hour++) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should not be off-peak", hour)
+ .isFalse();
+ }
+ }
+
+ @Test
+ public void testWrapAroundRangeOffPeakHours() {
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(22, 6);
+
+ // Hours before end (0-5) should be off-peak
+ for (int hour = 0; hour < 6; hour++) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should be off-peak", hour)
+ .isTrue();
+ }
+
+ // Hours between end and start (6-21) should not be off-peak
+ for (int hour = 6; hour < 22; hour++) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should not be off-peak", hour)
+ .isFalse();
+ }
+
+ // Hours from start to end of day (22-23) should be off-peak
+ for (int hour = 22; hour < 24; hour++) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should be off-peak", hour)
+ .isTrue();
+ }
+ }
+
+ @Test
+ public void testSingleHourRange() {
+ // Test single hour range: 12 to 13
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(12, 13);
+
+ // Only hour 12 should be off-peak
+ for (int hour = 0; hour < 24; hour++) {
+ if (hour == 12) {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should be off-peak", hour)
+ .isTrue();
+ } else {
+ assertThat(offPeakHours.isOffPeak(hour))
+ .as("Hour %d should not be off-peak", hour)
+ .isFalse();
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index c53d26ab4b..63e71354cf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.OffPeakHours;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.mergetree.LevelSortedRun;
@@ -47,6 +48,8 @@ public class UniversalCompaction implements CompactStrategy {
private final int maxSizeAmp;
private final int sizeRatio;
private final int numRunCompactionTrigger;
+ private final OffPeakHours offPeakHours;
+ private final int compactOffPeakRatio;
@Nullable private final Long opCompactionInterval;
@Nullable private Long lastOptimizedCompaction;
@@ -55,15 +58,22 @@ public class UniversalCompaction implements CompactStrategy
{
@Nullable private final AtomicInteger lookupCompactTriggerCount;
public UniversalCompaction(int maxSizeAmp, int sizeRatio, int
numRunCompactionTrigger) {
- this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null);
+ this(maxSizeAmp, sizeRatio, numRunCompactionTrigger, null,
OffPeakHours.DISABLED, 0);
}
public UniversalCompaction(
int maxSizeAmp,
int sizeRatio,
int numRunCompactionTrigger,
- @Nullable Duration opCompactionInterval) {
- this(maxSizeAmp, sizeRatio, numRunCompactionTrigger,
opCompactionInterval, null);
+ OffPeakHours offPeakHours,
+ int compactOffPeakRatio) {
+ this(
+ maxSizeAmp,
+ sizeRatio,
+ numRunCompactionTrigger,
+ null,
+ offPeakHours,
+ compactOffPeakRatio);
}
public UniversalCompaction(
@@ -71,15 +81,36 @@ public class UniversalCompaction implements CompactStrategy
{
int sizeRatio,
int numRunCompactionTrigger,
@Nullable Duration opCompactionInterval,
- @Nullable Integer maxLookupCompactInterval) {
+ OffPeakHours offPeakHours,
+ int compactOffPeakRatio) {
+ this(
+ maxSizeAmp,
+ sizeRatio,
+ numRunCompactionTrigger,
+ opCompactionInterval,
+ null,
+ offPeakHours,
+ compactOffPeakRatio);
+ }
+
+ public UniversalCompaction(
+ int maxSizeAmp,
+ int sizeRatio,
+ int numRunCompactionTrigger,
+ @Nullable Duration opCompactionInterval,
+ @Nullable Integer maxLookupCompactInterval,
+ OffPeakHours offPeakHours,
+ int compactOffPeakRatio) {
this.maxSizeAmp = maxSizeAmp;
this.sizeRatio = sizeRatio;
+ this.offPeakHours = offPeakHours;
this.numRunCompactionTrigger = numRunCompactionTrigger;
this.opCompactionInterval =
opCompactionInterval == null ? null :
opCompactionInterval.toMillis();
this.maxLookupCompactInterval = maxLookupCompactInterval;
this.lookupCompactTriggerCount =
maxLookupCompactInterval == null ? null : new AtomicInteger(0);
+ this.compactOffPeakRatio = compactOffPeakRatio;
}
@Override
@@ -203,7 +234,12 @@ public class UniversalCompaction implements
CompactStrategy {
long candidateSize = candidateSize(runs, candidateCount);
for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
- if (candidateSize * (100.0 + sizeRatio) / 100.0 <
next.run().totalSize()) {
+ if (candidateSize
+ * (100.0
+ + sizeRatio
+ + (offPeakHours.isOffPeak() ?
compactOffPeakRatio : 0))
+ / 100.0
+ < next.run().totalSize()) {
break;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index e337db7e63..15787ea5de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -226,14 +226,18 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- options.optimizedCompactionInterval()));
+ options.optimizedCompactionInterval(),
+ options.offPeakHours(),
+ options.compactOffPeakRatio()));
} else if
(CoreOptions.LookupCompactMode.GENTLE.equals(options.lookupCompact())) {
return new UniversalCompaction(
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
options.optimizedCompactionInterval(),
- options.lookupCompactMaxInterval());
+ options.lookupCompactMaxInterval(),
+ options.offPeakHours(),
+ options.compactOffPeakRatio());
}
}
@@ -242,7 +246,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
options.maxSizeAmplificationPercent(),
options.sortedRunSizeRatio(),
options.numSortedRunCompactionTrigger(),
- options.optimizedCompactionInterval());
+ options.optimizedCompactionInterval(),
+ options.offPeakHours(),
+ options.compactOffPeakRatio());
if (options.compactionForceUpLevel0()) {
return new ForceUpLevel0Compaction(universal);
} else {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
index b5587e4752..e70034693b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.OffPeakHours;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileSource;
@@ -86,7 +87,8 @@ public class UniversalCompactionTest {
public void testOptimizedCompactionInterval() {
AtomicLong time = new AtomicLong(0);
UniversalCompaction compaction =
- new UniversalCompaction(100, 1, 3, Duration.ofMillis(1000)) {
+ new UniversalCompaction(
+ 100, 1, 3, Duration.ofMillis(1000),
OffPeakHours.DISABLED, 0) {
@Override
long currentTimeMillis() {
return time.get();
@@ -265,6 +267,23 @@ public class UniversalCompactionTest {
.isEqualTo(new long[] {27});
}
+ @Test
+ public void testOffPeakRatioThreshold() {
+
+ OffPeakHours offPeakHours = OffPeakHours.getInstance(0, 23);
+ long[] sizes = new long[] {8, 9, 10};
+ assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2,
offPeakHours, 0), sizes))
+ .isEqualTo(new long[] {8, 9, 10});
+ assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2,
offPeakHours, 10), sizes))
+ .isEqualTo(new long[] {27});
+
+ assertThat(
+ pickForSizeRatio(
+ new UniversalCompaction(25, 10, 2,
OffPeakHours.DISABLED, 10),
+ sizes))
+ .isEqualTo(new long[] {8, 9, 10});
+ }
+
@Test
public void testLookup() {
ForceUpLevel0Compaction compaction =
@@ -295,7 +314,8 @@ public class UniversalCompactionTest {
@Test
public void testForcePickL0() {
int maxInterval = 5;
- UniversalCompaction compaction = new UniversalCompaction(25, 1, 5,
null, maxInterval);
+ UniversalCompaction compaction =
+ new UniversalCompaction(25, 1, 5, null, maxInterval,
OffPeakHours.DISABLED, 0);
// level 0 to max level
List<LevelSortedRun> level0ToMax = level0(1, 2, 2, 2);