This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 98ffa777c6 [core] Introduce Tests for OffPeakHours and
FullCompactTrigger (#6044)
98ffa777c6 is described below
commit 98ffa777c629d879a78a8813c71e4974307ffb59
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 11 11:40:55 2025 +0800
[core] Introduce Tests for OffPeakHours and FullCompactTrigger (#6044)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../paimon/mergetree/compact/OffPeakHours.java | 55 +++---
.../mergetree/compact/UniversalCompaction.java | 8 +-
.../mergetree/compact/FullCompactTriggerTest.java | 195 +++++++++++++++++++++
.../paimon/mergetree/compact/OffPeakHoursTest.java | 132 +++++++-------
6 files changed, 292 insertions(+), 102 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6f5153f594..29d110d495 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -252,7 +252,7 @@ under the License.
<td><h5>compaction.offpeak.end.hour</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
- <td>The end of off-peak hours, expressed as an integer between 0
and 23, inclusive. Set to -1 to disable off-peak.</td>
+ <td>The end of off-peak hours, expressed as an integer between 0
and 23, exclusive. Set to -1 to disable off-peak.</td>
</tr>
<tr>
<td><h5>compaction.offpeak.start.hour</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 32eff542a4..a25608ebb7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -704,7 +704,7 @@ public class CoreOptions implements Serializable {
.intType()
.defaultValue(-1)
.withDescription(
- "The end of off-peak hours, expressed as an
integer between 0 and 23, inclusive. Set"
+ "The end of off-peak hours, expressed as an
integer between 0 and 23, exclusive. Set"
+ " to -1 to disable off-peak.");
public static final ConfigOption<Integer> COMPACTION_OFFPEAK_RATIO =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
index 203af06cf6..3351e0ba0b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
@@ -22,14 +22,28 @@ import org.apache.paimon.CoreOptions;
import javax.annotation.Nullable;
-import java.time.LocalDateTime;
-
/** OffPeakHours to control compaction ratio by hours. */
-public abstract class OffPeakHours {
+public class OffPeakHours {
+
+ private final int startHour;
+ private final int endHour;
+ private final int compactOffPeakRatio;
- public abstract boolean isOffPeak(int targetHour);
+ private OffPeakHours(int startHour, int endHour, int compactOffPeakRatio) {
+ this.startHour = startHour;
+ this.endHour = endHour;
+ this.compactOffPeakRatio = compactOffPeakRatio;
+ }
- public abstract int currentRatio();
+ public int currentRatio(int targetHour) {
+ boolean isOffPeak;
+ if (startHour <= endHour) {
+ isOffPeak = startHour <= targetHour && targetHour < endHour;
+ } else {
+ isOffPeak = targetHour < endHour || startHour <= targetHour;
+ }
+ return isOffPeak ? compactOffPeakRatio : 0;
+ }
@Nullable
public static OffPeakHours create(CoreOptions options) {
@@ -39,9 +53,8 @@ public abstract class OffPeakHours {
options.compactOffPeakRatio());
}
- @Nullable
public static OffPeakHours create(int startHour, int endHour, int
compactOffPeakRatio) {
- if (startHour == -1 && endHour == -1) {
+ if (startHour == -1 || endHour == -1) {
return null;
}
@@ -49,32 +62,6 @@ public abstract class OffPeakHours {
return null;
}
- return new OffPeakHoursImpl(startHour, endHour, compactOffPeakRatio);
- }
-
- private static class OffPeakHoursImpl extends OffPeakHours {
-
- private final int startHour;
- private final int endHour;
- private final int compactOffPeakRatio;
-
- OffPeakHoursImpl(int startHour, int endHour, int compactOffPeakRatio) {
- this.startHour = startHour;
- this.endHour = endHour;
- this.compactOffPeakRatio = compactOffPeakRatio;
- }
-
- @Override
- public boolean isOffPeak(int targetHour) {
- if (startHour <= endHour) {
- return startHour <= targetHour && targetHour < endHour;
- }
- return targetHour < endHour || startHour <= targetHour;
- }
-
- @Override
- public int currentRatio() {
- return isOffPeak(LocalDateTime.now().getHour()) ?
compactOffPeakRatio : 0;
- }
+ return new OffPeakHours(startHour, endHour, compactOffPeakRatio);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 5313682a90..6d938f0cae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
@@ -164,8 +165,7 @@ public class UniversalCompaction implements CompactStrategy
{
long candidateSize = candidateSize(runs, candidateCount);
for (int i = candidateCount; i < runs.size(); i++) {
LevelSortedRun next = runs.get(i);
- int offPeakRatio = offPeakHours == null ? 0 :
offPeakHours.currentRatio();
- if (candidateSize * (100.0 + sizeRatio + offPeakRatio) / 100.0
+ if (candidateSize * (100.0 + sizeRatio + ratioForOffPeak()) / 100.0
< next.run().totalSize()) {
break;
}
@@ -181,6 +181,10 @@ public class UniversalCompaction implements
CompactStrategy {
return null;
}
+ private int ratioForOffPeak() {
+ return offPeakHours == null ? 0 :
offPeakHours.currentRatio(LocalDateTime.now().getHour());
+ }
+
private long candidateSize(List<LevelSortedRun> runs, int candidateCount) {
long size = 0;
for (int i = 0; i < candidateCount; i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
new file mode 100644
index 0000000000..54759e4389
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.COMPACTION_OPTIMIZATION_INTERVAL;
+import static org.apache.paimon.CoreOptions.COMPACTION_TOTAL_SIZE_THRESHOLD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link FullCompactTrigger}. */
+public class FullCompactTriggerTest {
+
+ @Test
+ public void testCreateNoOptions() {
+ CoreOptions options = new CoreOptions(new Options());
+ assertThat(FullCompactTrigger.create(options)).isNull();
+ }
+
+ @Test
+ public void testCreateWithInterval() {
+ Options options = new Options();
+ options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
+ FullCompactTrigger trigger = FullCompactTrigger.create(new
CoreOptions(options));
+ assertThat(trigger).isNotNull();
+ }
+
+ @Test
+ public void testCreateWithThreshold() {
+ Options options = new Options();
+ options.set(COMPACTION_TOTAL_SIZE_THRESHOLD,
MemorySize.ofMebiBytes(100));
+ FullCompactTrigger trigger = FullCompactTrigger.create(new
CoreOptions(options));
+ assertThat(trigger).isNotNull();
+ }
+
+ @Test
+ public void testCreateWithBoth() {
+ Options options = new Options();
+ options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
+ options.set(COMPACTION_TOTAL_SIZE_THRESHOLD,
MemorySize.ofMebiBytes(100));
+ FullCompactTrigger trigger = FullCompactTrigger.create(new
CoreOptions(options));
+ assertThat(trigger).isNotNull();
+ }
+
+ @Test
+ public void testSingleRun() {
+ FullCompactTrigger trigger = new FullCompactTrigger(1000L, 1000L);
+ assertThat(trigger.tryFullCompact(5, createRuns(100))).isEmpty();
+ }
+
+ @Test
+ public void testNoOptions() {
+ FullCompactTrigger trigger = new FullCompactTrigger(null, null);
+ assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
+ }
+
+ @Test
+ public void testInterval() {
+ AtomicLong time = new AtomicLong(10_000);
+ TestableFullCompactTrigger trigger = new
TestableFullCompactTrigger(1000L, null, time);
+
+ // First time, should trigger
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(100, 200));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ assertThat(compactUnit.get().files()).hasSize(2);
+
+ // Last compaction time is now 10_000.
+ // Advance time, but not enough for interval to trigger.
+ time.addAndGet(500); // now 10_500
+ assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
+
+ // Advance time to be greater than interval.
+ time.addAndGet(501); // now 11_001, diff is 1001 > 1000
+ Optional<CompactUnit> compactUnit2 = trigger.tryFullCompact(5,
createRuns(100, 200));
+ assertThat(compactUnit2).isPresent();
+ assertThat(compactUnit2.get().outputLevel()).isEqualTo(4);
+ assertThat(compactUnit2.get().files()).hasSize(2);
+ }
+
+ @Test
+ public void testThreshold() {
+ FullCompactTrigger trigger = new FullCompactTrigger(null, 1000L);
+
+ // total size 300 < 1000, should trigger
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(100, 200));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ assertThat(compactUnit.get().files()).hasSize(2);
+
+ // total size 1000 == 1000, should not trigger
+ assertThat(trigger.tryFullCompact(5, createRuns(500, 500))).isEmpty();
+
+ // total size 1500 > 1000, should not trigger
+ assertThat(trigger.tryFullCompact(5, createRuns(500, 1000))).isEmpty();
+ }
+
+ @Test
+ public void testIntervalTriggersFirst() {
+ AtomicLong time = new AtomicLong(10_000);
+ // Interval will trigger, but size is > threshold
+ TestableFullCompactTrigger trigger = new
TestableFullCompactTrigger(1000L, 500L, time);
+
+ // First time, interval should trigger even if size (600) > threshold
(500)
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(300, 300));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+ }
+
+ @Test
+ public void testThresholdTriggersWhenIntervalFails() {
+ AtomicLong time = new AtomicLong(10_000);
+ TestableFullCompactTrigger trigger = new
TestableFullCompactTrigger(1000L, 500L, time);
+
+ // Trigger once to set last compaction time
+ assertThat(trigger.tryFullCompact(5, createRuns(10, 20))).isPresent();
+
+ // Advance time, but not enough for interval to trigger
+ time.addAndGet(500); // now 10_500
+
+ // Size (60) < threshold (500), should trigger
+ Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5,
createRuns(30, 30));
+ assertThat(compactUnit).isPresent();
+ assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+
+ // Size (600) > threshold (500), should not trigger
+ assertThat(trigger.tryFullCompact(5, createRuns(300, 300))).isEmpty();
+ }
+
+ private LevelSortedRun createLevelSortedRun(long size) {
+ SortedRun run = mock(SortedRun.class);
+ when(run.totalSize()).thenReturn(size);
+ DataFileMeta file = mock(DataFileMeta.class);
+ when(run.files()).thenReturn(singletonList(file));
+ // Level does not matter for the trigger logic
+ return new LevelSortedRun(0, run);
+ }
+
+ private List<LevelSortedRun> createRuns(long... sizes) {
+ return Arrays.stream(sizes)
+ .mapToObj(this::createLevelSortedRun)
+ .collect(Collectors.toList());
+ }
+
+ /** A {@link FullCompactTrigger} that allows controlling time for tests. */
+ private static class TestableFullCompactTrigger extends FullCompactTrigger
{
+
+ private final AtomicLong currentTime;
+
+ public TestableFullCompactTrigger(
+ Long fullCompactionInterval, Long totalSizeThreshold,
AtomicLong currentTime) {
+ super(fullCompactionInterval, totalSizeThreshold);
+ this.currentTime = currentTime;
+ }
+
+ @Override
+ long currentTimeMillis() {
+ return currentTime.get();
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
index 6978280e3c..f367db5b85 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
@@ -18,82 +18,86 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link OffPeakHours}. */
-public class OffPeakHoursTest {
+class OffPeakHoursTest {
+
+ private static final int RATIO = 10;
@Test
- public void testNormalRangeOffPeakHours() {
- // Test normal range: 9 AM to 5 PM (9-17)
- OffPeakHours offPeakHours = OffPeakHours.create(9, 17, 0);
-
- // Hours before start should not be off-peak
- for (int hour = 0; hour < 9; hour++) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should not be off-peak", hour)
- .isFalse();
- }
-
- // Hours in range should be off-peak (start inclusive, end exclusive)
- for (int hour = 9; hour < 17; hour++) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should be off-peak", hour)
- .isTrue();
- }
-
- // Hours after end should not be off-peak
- for (int hour = 17; hour < 24; hour++) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should not be off-peak", hour)
- .isFalse();
- }
+ void testCreateFromOptions() {
+ Options options = new Options();
+ options.set(CoreOptions.COMPACT_OFFPEAK_START_HOUR, 22);
+ options.set(CoreOptions.COMPACT_OFFPEAK_END_HOUR, 6);
+ options.set(CoreOptions.COMPACTION_OFFPEAK_RATIO, RATIO);
+ CoreOptions coreOptions = new CoreOptions(options);
+
+ OffPeakHours offPeakHours = OffPeakHours.create(coreOptions);
+
+ assertThat(offPeakHours).isNotNull();
+ assertThat(offPeakHours.currentRatio(23)).isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(7)).isEqualTo(0);
}
@Test
- public void testWrapAroundRangeOffPeakHours() {
- OffPeakHours offPeakHours = OffPeakHours.create(22, 6, 0);
-
- // Hours before end (0-5) should be off-peak
- for (int hour = 0; hour < 6; hour++) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should be off-peak", hour)
- .isTrue();
- }
-
- // Hours between end and start (6-21) should not be off-peak
- for (int hour = 6; hour < 22; hour++) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should not be off-peak", hour)
- .isFalse();
- }
-
- // Hours from start to end of day (22-23) should be off-peak
- for (int hour = 22; hour < 24; hour++) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should be off-peak", hour)
- .isTrue();
- }
+ void testCreateFromOptionsWithDefault() {
+ Options options = new Options();
+ CoreOptions coreOptions = new CoreOptions(options);
+ OffPeakHours offPeakHours = OffPeakHours.create(coreOptions);
+ assertThat(offPeakHours).isNull();
+ }
+
+ @Test
+ void testCreateFromOptionsWithSameHour() {
+ Options options = new Options();
+ options.set(CoreOptions.COMPACT_OFFPEAK_START_HOUR, 5);
+ options.set(CoreOptions.COMPACT_OFFPEAK_END_HOUR, 5);
+ options.set(CoreOptions.COMPACTION_OFFPEAK_RATIO, RATIO);
+ CoreOptions coreOptions = new CoreOptions(options);
+
+ OffPeakHours offPeakHours = OffPeakHours.create(coreOptions);
+
+ assertThat(offPeakHours).isNull();
}
@Test
- public void testSingleHourRange() {
- // Test single hour range: 12 to 13
- OffPeakHours offPeakHours = OffPeakHours.create(12, 13, 0);
-
- // Only hour 12 should be off-peak
- for (int hour = 0; hour < 24; hour++) {
- if (hour == 12) {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should be off-peak", hour)
- .isTrue();
- } else {
- assertThat(offPeakHours.isOffPeak(hour))
- .as("Hour %d should not be off-peak", hour)
- .isFalse();
- }
- }
+ void testCreateWithInvalidHours() {
+ assertThat(OffPeakHours.create(-1, -1, RATIO)).isNull();
+ assertThat(OffPeakHours.create(5, 5, RATIO)).isNull();
+ assertThat(OffPeakHours.create(2, -1, RATIO)).isNull();
+ assertThat(OffPeakHours.create(-1, 2, RATIO)).isNull();
+ }
+
+ @Test
+ void testCurrentRatioNormalHours() {
+ OffPeakHours offPeakHours = OffPeakHours.create(2, 8, RATIO);
+ assertThat(offPeakHours).isNotNull();
+
+ assertThat(offPeakHours.currentRatio(1)).as("Before
start").isEqualTo(0);
+ assertThat(offPeakHours.currentRatio(2)).as("At
start").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(5)).as("In
between").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(7)).as("Before
end").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(8)).as("At end
(exclusive)").isEqualTo(0);
+ assertThat(offPeakHours.currentRatio(9)).as("After end").isEqualTo(0);
+ }
+
+ @Test
+ void testCurrentRatioOvernightHours() {
+ OffPeakHours offPeakHours = OffPeakHours.create(22, 6, RATIO);
+ assertThat(offPeakHours).isNotNull();
+
+ assertThat(offPeakHours.currentRatio(21)).as("Before
start").isEqualTo(0);
+ assertThat(offPeakHours.currentRatio(22)).as("At
start").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(23)).as("After
start").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(0)).as("After midnight, before
end").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(5)).as("Before
end").isEqualTo(RATIO);
+ assertThat(offPeakHours.currentRatio(6)).as("At end
(exclusive)").isEqualTo(0);
+ assertThat(offPeakHours.currentRatio(10)).as("After end, before next
start").isEqualTo(0);
}
}