This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 98ffa777c6 [core] Introduce Tests for OffPeakHours and 
FullCompactTrigger (#6044)
98ffa777c6 is described below

commit 98ffa777c629d879a78a8813c71e4974307ffb59
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 11 11:40:55 2025 +0800

    [core] Introduce Tests for OffPeakHours and FullCompactTrigger (#6044)
---
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   2 +-
 .../paimon/mergetree/compact/OffPeakHours.java     |  55 +++---
 .../mergetree/compact/UniversalCompaction.java     |   8 +-
 .../mergetree/compact/FullCompactTriggerTest.java  | 195 +++++++++++++++++++++
 .../paimon/mergetree/compact/OffPeakHoursTest.java | 132 +++++++-------
 6 files changed, 292 insertions(+), 102 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 6f5153f594..29d110d495 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -252,7 +252,7 @@ under the License.
             <td><h5>compaction.offpeak.end.hour</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>Integer</td>
-            <td>The end of off-peak hours, expressed as an integer between 0 
and 23, inclusive. Set to -1 to disable off-peak.</td>
+            <td>The end of off-peak hours, expressed as an integer between 0 
and 23, exclusive. Set to -1 to disable off-peak.</td>
         </tr>
         <tr>
             <td><h5>compaction.offpeak.start.hour</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 32eff542a4..a25608ebb7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -704,7 +704,7 @@ public class CoreOptions implements Serializable {
                     .intType()
                     .defaultValue(-1)
                     .withDescription(
-                            "The end of off-peak hours, expressed as an 
integer between 0 and 23, inclusive. Set"
+                            "The end of off-peak hours, expressed as an 
integer between 0 and 23, exclusive. Set"
                                     + " to -1 to disable off-peak.");
 
     public static final ConfigOption<Integer> COMPACTION_OFFPEAK_RATIO =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
index 203af06cf6..3351e0ba0b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java
@@ -22,14 +22,28 @@ import org.apache.paimon.CoreOptions;
 
 import javax.annotation.Nullable;
 
-import java.time.LocalDateTime;
-
 /** OffPeakHours to control compaction ratio by hours. */
-public abstract class OffPeakHours {
+public class OffPeakHours {
+
+    private final int startHour;
+    private final int endHour;
+    private final int compactOffPeakRatio;
 
-    public abstract boolean isOffPeak(int targetHour);
+    private OffPeakHours(int startHour, int endHour, int compactOffPeakRatio) {
+        this.startHour = startHour;
+        this.endHour = endHour;
+        this.compactOffPeakRatio = compactOffPeakRatio;
+    }
 
-    public abstract int currentRatio();
+    public int currentRatio(int targetHour) {
+        boolean isOffPeak;
+        if (startHour <= endHour) {
+            isOffPeak = startHour <= targetHour && targetHour < endHour;
+        } else {
+            isOffPeak = targetHour < endHour || startHour <= targetHour;
+        }
+        return isOffPeak ? compactOffPeakRatio : 0;
+    }
 
     @Nullable
     public static OffPeakHours create(CoreOptions options) {
@@ -39,9 +53,8 @@ public abstract class OffPeakHours {
                 options.compactOffPeakRatio());
     }
 
-    @Nullable
     public static OffPeakHours create(int startHour, int endHour, int 
compactOffPeakRatio) {
-        if (startHour == -1 && endHour == -1) {
+        if (startHour == -1 || endHour == -1) {
             return null;
         }
 
@@ -49,32 +62,6 @@ public abstract class OffPeakHours {
             return null;
         }
 
-        return new OffPeakHoursImpl(startHour, endHour, compactOffPeakRatio);
-    }
-
-    private static class OffPeakHoursImpl extends OffPeakHours {
-
-        private final int startHour;
-        private final int endHour;
-        private final int compactOffPeakRatio;
-
-        OffPeakHoursImpl(int startHour, int endHour, int compactOffPeakRatio) {
-            this.startHour = startHour;
-            this.endHour = endHour;
-            this.compactOffPeakRatio = compactOffPeakRatio;
-        }
-
-        @Override
-        public boolean isOffPeak(int targetHour) {
-            if (startHour <= endHour) {
-                return startHour <= targetHour && targetHour < endHour;
-            }
-            return targetHour < endHour || startHour <= targetHour;
-        }
-
-        @Override
-        public int currentRatio() {
-            return isOffPeak(LocalDateTime.now().getHour()) ? 
compactOffPeakRatio : 0;
-        }
+        return new OffPeakHours(startHour, endHour, compactOffPeakRatio);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
index 5313682a90..6d938f0cae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/UniversalCompaction.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.LocalDateTime;
 import java.util.List;
 import java.util.Optional;
 
@@ -164,8 +165,7 @@ public class UniversalCompaction implements CompactStrategy 
{
         long candidateSize = candidateSize(runs, candidateCount);
         for (int i = candidateCount; i < runs.size(); i++) {
             LevelSortedRun next = runs.get(i);
-            int offPeakRatio = offPeakHours == null ? 0 : 
offPeakHours.currentRatio();
-            if (candidateSize * (100.0 + sizeRatio + offPeakRatio) / 100.0
+            if (candidateSize * (100.0 + sizeRatio + ratioForOffPeak()) / 100.0
                     < next.run().totalSize()) {
                 break;
             }
@@ -181,6 +181,10 @@ public class UniversalCompaction implements 
CompactStrategy {
         return null;
     }
 
+    private int ratioForOffPeak() {
+        return offPeakHours == null ? 0 : 
offPeakHours.currentRatio(LocalDateTime.now().getHour());
+    }
+
     private long candidateSize(List<LevelSortedRun> runs, int candidateCount) {
         long size = 0;
         for (int i = 0; i < candidateCount; i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
new file mode 100644
index 0000000000..54759e4389
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullCompactTriggerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.COMPACTION_OPTIMIZATION_INTERVAL;
+import static org.apache.paimon.CoreOptions.COMPACTION_TOTAL_SIZE_THRESHOLD;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link FullCompactTrigger}. */
+public class FullCompactTriggerTest {
+
+    @Test
+    public void testCreateNoOptions() {
+        CoreOptions options = new CoreOptions(new Options());
+        assertThat(FullCompactTrigger.create(options)).isNull();
+    }
+
+    @Test
+    public void testCreateWithInterval() {
+        Options options = new Options();
+        options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
+        FullCompactTrigger trigger = FullCompactTrigger.create(new 
CoreOptions(options));
+        assertThat(trigger).isNotNull();
+    }
+
+    @Test
+    public void testCreateWithThreshold() {
+        Options options = new Options();
+        options.set(COMPACTION_TOTAL_SIZE_THRESHOLD, 
MemorySize.ofMebiBytes(100));
+        FullCompactTrigger trigger = FullCompactTrigger.create(new 
CoreOptions(options));
+        assertThat(trigger).isNotNull();
+    }
+
+    @Test
+    public void testCreateWithBoth() {
+        Options options = new Options();
+        options.set(COMPACTION_OPTIMIZATION_INTERVAL, Duration.ofHours(1));
+        options.set(COMPACTION_TOTAL_SIZE_THRESHOLD, 
MemorySize.ofMebiBytes(100));
+        FullCompactTrigger trigger = FullCompactTrigger.create(new 
CoreOptions(options));
+        assertThat(trigger).isNotNull();
+    }
+
+    @Test
+    public void testSingleRun() {
+        FullCompactTrigger trigger = new FullCompactTrigger(1000L, 1000L);
+        assertThat(trigger.tryFullCompact(5, createRuns(100))).isEmpty();
+    }
+
+    @Test
+    public void testNoOptions() {
+        FullCompactTrigger trigger = new FullCompactTrigger(null, null);
+        assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
+    }
+
+    @Test
+    public void testInterval() {
+        AtomicLong time = new AtomicLong(10_000);
+        TestableFullCompactTrigger trigger = new 
TestableFullCompactTrigger(1000L, null, time);
+
+        // First time, should trigger
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(100, 200));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+        assertThat(compactUnit.get().files()).hasSize(2);
+
+        // Last compaction time is now 10_000.
+        // Advance time, but not enough for interval to trigger.
+        time.addAndGet(500); // now 10_500
+        assertThat(trigger.tryFullCompact(5, createRuns(100, 200))).isEmpty();
+
+        // Advance time to be greater than interval.
+        time.addAndGet(501); // now 11_001, diff is 1001 > 1000
+        Optional<CompactUnit> compactUnit2 = trigger.tryFullCompact(5, 
createRuns(100, 200));
+        assertThat(compactUnit2).isPresent();
+        assertThat(compactUnit2.get().outputLevel()).isEqualTo(4);
+        assertThat(compactUnit2.get().files()).hasSize(2);
+    }
+
+    @Test
+    public void testThreshold() {
+        FullCompactTrigger trigger = new FullCompactTrigger(null, 1000L);
+
+        // total size 300 < 1000, should trigger
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(100, 200));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+        assertThat(compactUnit.get().files()).hasSize(2);
+
+        // total size 1000 == 1000, should not trigger
+        assertThat(trigger.tryFullCompact(5, createRuns(500, 500))).isEmpty();
+
+        // total size 1500 > 1000, should not trigger
+        assertThat(trigger.tryFullCompact(5, createRuns(500, 1000))).isEmpty();
+    }
+
+    @Test
+    public void testIntervalTriggersFirst() {
+        AtomicLong time = new AtomicLong(10_000);
+        // Interval will trigger, but size is > threshold
+        TestableFullCompactTrigger trigger = new 
TestableFullCompactTrigger(1000L, 500L, time);
+
+        // First time, interval should trigger even if size (600) > threshold 
(500)
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(300, 300));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+    }
+
+    @Test
+    public void testThresholdTriggersWhenIntervalFails() {
+        AtomicLong time = new AtomicLong(10_000);
+        TestableFullCompactTrigger trigger = new 
TestableFullCompactTrigger(1000L, 500L, time);
+
+        // Trigger once to set last compaction time
+        assertThat(trigger.tryFullCompact(5, createRuns(10, 20))).isPresent();
+
+        // Advance time, but not enough for interval to trigger
+        time.addAndGet(500); // now 10_500
+
+        // Size (60) < threshold (500), should trigger
+        Optional<CompactUnit> compactUnit = trigger.tryFullCompact(5, 
createRuns(30, 30));
+        assertThat(compactUnit).isPresent();
+        assertThat(compactUnit.get().outputLevel()).isEqualTo(4);
+
+        // Size (600) > threshold (500), should not trigger
+        assertThat(trigger.tryFullCompact(5, createRuns(300, 300))).isEmpty();
+    }
+
+    private LevelSortedRun createLevelSortedRun(long size) {
+        SortedRun run = mock(SortedRun.class);
+        when(run.totalSize()).thenReturn(size);
+        DataFileMeta file = mock(DataFileMeta.class);
+        when(run.files()).thenReturn(singletonList(file));
+        // Level does not matter for the trigger logic
+        return new LevelSortedRun(0, run);
+    }
+
+    private List<LevelSortedRun> createRuns(long... sizes) {
+        return Arrays.stream(sizes)
+                .mapToObj(this::createLevelSortedRun)
+                .collect(Collectors.toList());
+    }
+
+    /** A {@link FullCompactTrigger} that allows controlling time for tests. */
+    private static class TestableFullCompactTrigger extends FullCompactTrigger 
{
+
+        private final AtomicLong currentTime;
+
+        public TestableFullCompactTrigger(
+                Long fullCompactionInterval, Long totalSizeThreshold, 
AtomicLong currentTime) {
+            super(fullCompactionInterval, totalSizeThreshold);
+            this.currentTime = currentTime;
+        }
+
+        @Override
+        long currentTimeMillis() {
+            return currentTime.get();
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
index 6978280e3c..f367db5b85 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/OffPeakHoursTest.java
@@ -18,82 +18,86 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link OffPeakHours}. */
-public class OffPeakHoursTest {
+class OffPeakHoursTest {
+
+    private static final int RATIO = 10;
 
     @Test
-    public void testNormalRangeOffPeakHours() {
-        // Test normal range: 9 AM to 5 PM (9-17)
-        OffPeakHours offPeakHours = OffPeakHours.create(9, 17, 0);
-
-        // Hours before start should not be off-peak
-        for (int hour = 0; hour < 9; hour++) {
-            assertThat(offPeakHours.isOffPeak(hour))
-                    .as("Hour %d should not be off-peak", hour)
-                    .isFalse();
-        }
-
-        // Hours in range should be off-peak (start inclusive, end exclusive)
-        for (int hour = 9; hour < 17; hour++) {
-            assertThat(offPeakHours.isOffPeak(hour))
-                    .as("Hour %d should be off-peak", hour)
-                    .isTrue();
-        }
-
-        // Hours after end should not be off-peak
-        for (int hour = 17; hour < 24; hour++) {
-            assertThat(offPeakHours.isOffPeak(hour))
-                    .as("Hour %d should not be off-peak", hour)
-                    .isFalse();
-        }
+    void testCreateFromOptions() {
+        Options options = new Options();
+        options.set(CoreOptions.COMPACT_OFFPEAK_START_HOUR, 22);
+        options.set(CoreOptions.COMPACT_OFFPEAK_END_HOUR, 6);
+        options.set(CoreOptions.COMPACTION_OFFPEAK_RATIO, RATIO);
+        CoreOptions coreOptions = new CoreOptions(options);
+
+        OffPeakHours offPeakHours = OffPeakHours.create(coreOptions);
+
+        assertThat(offPeakHours).isNotNull();
+        assertThat(offPeakHours.currentRatio(23)).isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(7)).isEqualTo(0);
     }
 
     @Test
-    public void testWrapAroundRangeOffPeakHours() {
-        OffPeakHours offPeakHours = OffPeakHours.create(22, 6, 0);
-
-        // Hours before end (0-5) should be off-peak
-        for (int hour = 0; hour < 6; hour++) {
-            assertThat(offPeakHours.isOffPeak(hour))
-                    .as("Hour %d should be off-peak", hour)
-                    .isTrue();
-        }
-
-        // Hours between end and start (6-21) should not be off-peak
-        for (int hour = 6; hour < 22; hour++) {
-            assertThat(offPeakHours.isOffPeak(hour))
-                    .as("Hour %d should not be off-peak", hour)
-                    .isFalse();
-        }
-
-        // Hours from start to end of day (22-23) should be off-peak
-        for (int hour = 22; hour < 24; hour++) {
-            assertThat(offPeakHours.isOffPeak(hour))
-                    .as("Hour %d should be off-peak", hour)
-                    .isTrue();
-        }
+    void testCreateFromOptionsWithDefault() {
+        Options options = new Options();
+        CoreOptions coreOptions = new CoreOptions(options);
+        OffPeakHours offPeakHours = OffPeakHours.create(coreOptions);
+        assertThat(offPeakHours).isNull();
+    }
+
+    @Test
+    void testCreateFromOptionsWithSameHour() {
+        Options options = new Options();
+        options.set(CoreOptions.COMPACT_OFFPEAK_START_HOUR, 5);
+        options.set(CoreOptions.COMPACT_OFFPEAK_END_HOUR, 5);
+        options.set(CoreOptions.COMPACTION_OFFPEAK_RATIO, RATIO);
+        CoreOptions coreOptions = new CoreOptions(options);
+
+        OffPeakHours offPeakHours = OffPeakHours.create(coreOptions);
+
+        assertThat(offPeakHours).isNull();
     }
 
     @Test
-    public void testSingleHourRange() {
-        // Test single hour range: 12 to 13
-        OffPeakHours offPeakHours = OffPeakHours.create(12, 13, 0);
-
-        // Only hour 12 should be off-peak
-        for (int hour = 0; hour < 24; hour++) {
-            if (hour == 12) {
-                assertThat(offPeakHours.isOffPeak(hour))
-                        .as("Hour %d should be off-peak", hour)
-                        .isTrue();
-            } else {
-                assertThat(offPeakHours.isOffPeak(hour))
-                        .as("Hour %d should not be off-peak", hour)
-                        .isFalse();
-            }
-        }
+    void testCreateWithInvalidHours() {
+        assertThat(OffPeakHours.create(-1, -1, RATIO)).isNull();
+        assertThat(OffPeakHours.create(5, 5, RATIO)).isNull();
+        assertThat(OffPeakHours.create(2, -1, RATIO)).isNull();
+        assertThat(OffPeakHours.create(-1, 2, RATIO)).isNull();
+    }
+
+    @Test
+    void testCurrentRatioNormalHours() {
+        OffPeakHours offPeakHours = OffPeakHours.create(2, 8, RATIO);
+        assertThat(offPeakHours).isNotNull();
+
+        assertThat(offPeakHours.currentRatio(1)).as("Before 
start").isEqualTo(0);
+        assertThat(offPeakHours.currentRatio(2)).as("At 
start").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(5)).as("In 
between").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(7)).as("Before 
end").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(8)).as("At end 
(exclusive)").isEqualTo(0);
+        assertThat(offPeakHours.currentRatio(9)).as("After end").isEqualTo(0);
+    }
+
+    @Test
+    void testCurrentRatioOvernightHours() {
+        OffPeakHours offPeakHours = OffPeakHours.create(22, 6, RATIO);
+        assertThat(offPeakHours).isNotNull();
+
+        assertThat(offPeakHours.currentRatio(21)).as("Before 
start").isEqualTo(0);
+        assertThat(offPeakHours.currentRatio(22)).as("At 
start").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(23)).as("After 
start").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(0)).as("After midnight, before 
end").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(5)).as("Before 
end").isEqualTo(RATIO);
+        assertThat(offPeakHours.currentRatio(6)).as("At end 
(exclusive)").isEqualTo(0);
+        assertThat(offPeakHours.currentRatio(10)).as("After end, before next 
start").isEqualTo(0);
     }
 }

Reply via email to