This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1055e4b99b [core] Refactor scan and plan in 
DataEvolutionCompactCoordinator
1055e4b99b is described below

commit 1055e4b99b3a293211fb1a8e264b26c3eb59d005
Author: JingsongLi <[email protected]>
AuthorDate: Fri Dec 19 11:48:48 2025 +0800

    [core] Refactor scan and plan in DataEvolutionCompactCoordinator
---
 .../DataEvolutionCompactCoordinator.java           | 40 +++++++---------------
 .../DataEvolutionCompactCoordinatorTest.java       | 26 +++++++-------
 2 files changed, 24 insertions(+), 42 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 892b88952a..e6a59da2b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -36,14 +36,15 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Queue;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Compact coordinator to cmpact data evolution table. */
+/** Compact coordinator to compact data evolution table. */
 public class DataEvolutionCompactCoordinator {
 
+    private static final int FILES_BATCH = 100_000;
+
     private final CompactScanner scanner;
     private final CompactPlanner planner;
 
@@ -55,19 +56,15 @@ public class DataEvolutionCompactCoordinator {
 
         this.scanner = new CompactScanner(table.newSnapshotReader());
         this.planner =
-                new CompactPlanner(
-                        scanner::fetchResult,
-                        compactBlob,
-                        targetFileSize,
-                        openFileCost,
-                        compactMinFileNum);
+                new CompactPlanner(compactBlob, targetFileSize, openFileCost, 
compactMinFileNum);
     }
 
     public List<DataEvolutionCompactTask> plan() {
         // scan files in snapshot
-        if (scanner.scan()) {
+        List<ManifestEntry> entries = scanner.scan();
+        if (!entries.isEmpty()) {
             // do plan compact tasks
-            return planner.compactPlan();
+            return planner.compactPlan(entries);
         }
 
         return Collections.emptyList();
@@ -79,8 +76,6 @@ public class DataEvolutionCompactCoordinator {
         private final SnapshotReader snapshotReader;
         private final Queue<List<ManifestFileMeta>> metas;
 
-        private List<ManifestEntry> result;
-
         private CompactScanner(SnapshotReader snapshotReader) {
             this.snapshotReader = snapshotReader;
             Snapshot snapshot = 
snapshotReader.snapshotManager().latestSnapshot();
@@ -90,13 +85,11 @@ public class DataEvolutionCompactCoordinator {
             RangeHelper<ManifestFileMeta> rangeHelper =
                     new RangeHelper<>(ManifestFileMeta::minRowId, 
ManifestFileMeta::maxRowId);
             this.metas = new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
-            this.result = new ArrayList<>();
         }
 
-        boolean scan() {
-            boolean scanResult = false;
-            while (metas.peek() != null && result.size() < 1000) {
-                scanResult = true;
+        List<ManifestEntry> scan() {
+            List<ManifestEntry> result = new ArrayList<>();
+            while (metas.peek() != null && result.size() < FILES_BATCH) {
                 List<ManifestFileMeta> currentMetas = metas.poll();
                 List<ManifestEntry> targetEntries =
                         currentMetas.stream()
@@ -110,12 +103,6 @@ public class DataEvolutionCompactCoordinator {
 
                 result.addAll(targetEntries);
             }
-            return scanResult;
-        }
-
-        List<ManifestEntry> fetchResult() {
-            List<ManifestEntry> result = new ArrayList<>(this.result);
-            this.result = new ArrayList<>();
             return result;
         }
     }
@@ -123,7 +110,6 @@ public class DataEvolutionCompactCoordinator {
     /** Generate compaction tasks. */
     static class CompactPlanner {
 
-        private final Supplier<List<ManifestEntry>> supplier;
         private final boolean compactBlob;
         private final long targetFileSize;
         private final long openFileCost;
@@ -138,20 +124,18 @@ public class DataEvolutionCompactCoordinator {
         private List<DataFileMeta> blobFiles = new ArrayList<>();
 
         CompactPlanner(
-                Supplier<List<ManifestEntry>> supplier,
                 boolean compactBlob,
                 long targetFileSize,
                 long openFileCost,
                 long compactMinFileNum) {
-            this.supplier = supplier;
             this.compactBlob = compactBlob;
             this.targetFileSize = targetFileSize;
             this.openFileCost = openFileCost;
             this.compactMinFileNum = compactMinFileNum;
         }
 
-        List<DataEvolutionCompactTask> compactPlan() {
-            for (ManifestEntry entry : supplier.get()) {
+        List<DataEvolutionCompactTask> compactPlan(Iterable<ManifestEntry> 
entries) {
+            for (ManifestEntry entry : entries) {
                 long rowId = entry.file().nonNullFirstRowId();
                 if (rowId < lastRowIdStart) {
                     throw new IllegalStateException(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
index f7b3ab7839..1c66baa554 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -45,9 +45,9 @@ public class DataEvolutionCompactCoordinatorTest {
 
         DataEvolutionCompactCoordinator.CompactPlanner planner =
                 new DataEvolutionCompactCoordinator.CompactPlanner(
-                        () -> entries, false, 128 * 1024 * 1024, 4 * 1024 * 
1024, 2);
+                        false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
 
-        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
         assertThat(tasks).isEmpty();
     }
@@ -62,17 +62,16 @@ public class DataEvolutionCompactCoordinatorTest {
 
         // Use small target file size to trigger compaction
         DataEvolutionCompactCoordinator.CompactPlanner planner =
-                new DataEvolutionCompactCoordinator.CompactPlanner(() -> 
entries, false, 199, 1, 2);
+                new DataEvolutionCompactCoordinator.CompactPlanner(false, 199, 
1, 2);
 
-        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
         assertThat(tasks).isNotEmpty();
         assertThat(tasks.get(0).compactBefore())
                 .containsExactly(entries.get(0).file(), entries.get(1).file());
 
-        planner =
-                new DataEvolutionCompactCoordinator.CompactPlanner(() -> 
entries, false, 200, 1, 2);
-        tasks = planner.compactPlan();
+        planner = new DataEvolutionCompactCoordinator.CompactPlanner(false, 
200, 1, 2);
+        tasks = planner.compactPlan(entries);
         assertThat(tasks).isNotEmpty();
         assertThat(tasks.get(0).compactBefore())
                 .containsExactly(
@@ -92,9 +91,9 @@ public class DataEvolutionCompactCoordinatorTest {
         // Use large target file size so compaction is triggered by gap, not 
size
         DataEvolutionCompactCoordinator.CompactPlanner planner =
                 new DataEvolutionCompactCoordinator.CompactPlanner(
-                        () -> entries, false, 128 * 1024 * 1024, 4 * 1024 * 
1024, 2);
+                        false, 128 * 1024 * 1024, 4 * 1024 * 1024, 2);
 
-        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
         // Gap should trigger compaction of the first group (file1 + file2)
         assertThat(tasks).hasSize(2);
@@ -122,9 +121,9 @@ public class DataEvolutionCompactCoordinatorTest {
 
         DataEvolutionCompactCoordinator.CompactPlanner planner =
                 new DataEvolutionCompactCoordinator.CompactPlanner(
-                        () -> entries, false, 100 * 1024 * 1024, 4 * 1024 * 
1024, 2);
+                        false, 100 * 1024 * 1024, 4 * 1024 * 1024, 2);
 
-        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
         assertThat(tasks.size()).isEqualTo(4);
         assertThat(tasks.get(0).compactBefore())
@@ -150,10 +149,9 @@ public class DataEvolutionCompactCoordinatorTest {
 
         // Use small target to trigger compaction, with blob compaction enabled
         DataEvolutionCompactCoordinator.CompactPlanner planner =
-                new DataEvolutionCompactCoordinator.CompactPlanner(
-                        () -> entries, true, 1024, 1024, 2);
+                new DataEvolutionCompactCoordinator.CompactPlanner(true, 1024, 
1024, 2);
 
-        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan(entries);
 
         // Should have compaction tasks for both data files and blob files
         assertThat(tasks.size()).isEqualTo(2);

Reply via email to