This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 072127dbea [core] Introducer compaction for data-evolution table 
(#6828)
072127dbea is described below

commit 072127dbea5673e5d9a1db3476f8886504d4e553
Author: YeJunHao <[email protected]>
AuthorDate: Fri Dec 19 11:40:10 2025 +0800

    [core] Introducer compaction for data-evolution table (#6828)
---
 .../paimon/append/RollingBlobFileWriter.java       |   4 +-
 .../DataEvolutionCompactCoordinator.java           | 239 +++++++++++++++++++++
 .../dataevolution/DataEvolutionCompactTask.java    | 140 ++++++++++++
 .../globalindex/GlobalIndexScanBuilderImpl.java    |   1 -
 .../DataEvolutionCompactCoordinatorTest.java       | 239 +++++++++++++++++++++
 .../paimon/table/DataEvolutionTableTest.java       |  66 +++++-
 6 files changed, 685 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 17928dd6fa..8d511c6b91 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -67,8 +67,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * Every time a file is rolled, the writer will close the current normal data 
file and blob data files,
  * so one normal data file may correspond to multiple blob data files.
  *
- * Normal file1: f1.parquet may including (b1.blob, b2.blob, b3.blob)
- * Normal file2: f1-2.parquet may including (b4.blob, b5.blob)
+ * Normal file1: f1.parquet may include (b1.blob, b2.blob, b3.blob)
+ * Normal file2: f1-2.parquet may include (b4.blob, b5.blob)
  *
  * </pre>
  */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
new file mode 100644
index 0000000000..892b88952a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.RangeHelper;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Compact coordinator to cmpact data evolution table. */
+public class DataEvolutionCompactCoordinator {
+
+    private final CompactScanner scanner;
+    private final CompactPlanner planner;
+
+    public DataEvolutionCompactCoordinator(FileStoreTable table, boolean 
compactBlob) {
+        CoreOptions options = table.coreOptions();
+        long targetFileSize = options.targetFileSize(false);
+        long openFileCost = options.splitOpenFileCost();
+        long compactMinFileNum = options.compactionMinFileNum();
+
+        this.scanner = new CompactScanner(table.newSnapshotReader());
+        this.planner =
+                new CompactPlanner(
+                        scanner::fetchResult,
+                        compactBlob,
+                        targetFileSize,
+                        openFileCost,
+                        compactMinFileNum);
+    }
+
+    public List<DataEvolutionCompactTask> plan() {
+        // scan files in snapshot
+        if (scanner.scan()) {
+            // do plan compact tasks
+            return planner.compactPlan();
+        }
+
+        return Collections.emptyList();
+    }
+
+    /** Scanner to generate sorted ManifestEntries. */
+    static class CompactScanner {
+
+        private final SnapshotReader snapshotReader;
+        private final Queue<List<ManifestFileMeta>> metas;
+
+        private List<ManifestEntry> result;
+
+        private CompactScanner(SnapshotReader snapshotReader) {
+            this.snapshotReader = snapshotReader;
+            Snapshot snapshot = 
snapshotReader.snapshotManager().latestSnapshot();
+
+            List<ManifestFileMeta> manifestFileMetas =
+                    snapshotReader.manifestsReader().read(snapshot, 
ScanMode.ALL).filteredManifests;
+            RangeHelper<ManifestFileMeta> rangeHelper =
+                    new RangeHelper<>(ManifestFileMeta::minRowId, 
ManifestFileMeta::maxRowId);
+            this.metas = new 
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
+            this.result = new ArrayList<>();
+        }
+
+        boolean scan() {
+            boolean scanResult = false;
+            while (metas.peek() != null && result.size() < 1000) {
+                scanResult = true;
+                List<ManifestFileMeta> currentMetas = metas.poll();
+                List<ManifestEntry> targetEntries =
+                        currentMetas.stream()
+                                .flatMap(meta -> 
snapshotReader.readManifest(meta).stream())
+                                .collect(Collectors.toList());
+                Comparator<ManifestEntry> comparator =
+                        Comparator.comparingLong((ManifestEntry a) -> 
a.file().nonNullFirstRowId())
+                                .thenComparingInt(
+                                        a -> 
BlobFileFormat.isBlobFile(a.fileName()) ? 1 : 0);
+                targetEntries.sort(comparator);
+
+                result.addAll(targetEntries);
+            }
+            return scanResult;
+        }
+
+        List<ManifestEntry> fetchResult() {
+            List<ManifestEntry> result = new ArrayList<>(this.result);
+            this.result = new ArrayList<>();
+            return result;
+        }
+    }
+
+    /** Generate compaction tasks. */
+    static class CompactPlanner {
+
+        private final Supplier<List<ManifestEntry>> supplier;
+        private final boolean compactBlob;
+        private final long targetFileSize;
+        private final long openFileCost;
+        private final long compactMinFileNum;
+        private long lastRowIdStart = -1;
+        private long nextRowIdExpected = -1;
+        private long weightSum = 0L;
+        private BinaryRow lastPartition = null;
+        private boolean skipFile = false;
+        private List<DataEvolutionCompactTask> tasks = new ArrayList<>();
+        private List<DataFileMeta> groupFiles = new ArrayList<>();
+        private List<DataFileMeta> blobFiles = new ArrayList<>();
+
+        CompactPlanner(
+                Supplier<List<ManifestEntry>> supplier,
+                boolean compactBlob,
+                long targetFileSize,
+                long openFileCost,
+                long compactMinFileNum) {
+            this.supplier = supplier;
+            this.compactBlob = compactBlob;
+            this.targetFileSize = targetFileSize;
+            this.openFileCost = openFileCost;
+            this.compactMinFileNum = compactMinFileNum;
+        }
+
+        List<DataEvolutionCompactTask> compactPlan() {
+            for (ManifestEntry entry : supplier.get()) {
+                long rowId = entry.file().nonNullFirstRowId();
+                if (rowId < lastRowIdStart) {
+                    throw new IllegalStateException(
+                            "Files are not in order by rowId. Current file 
rowId: "
+                                    + rowId
+                                    + ", last file rowId: "
+                                    + lastRowIdStart);
+                } else if (rowId == lastRowIdStart) {
+                    checkArgument(
+                            lastPartition.equals(entry.partition()),
+                            "Inconsistent partition for the same rowId: " + 
rowId);
+                    if (!skipFile) {
+                        if (BlobFileFormat.isBlobFile(entry.fileName())) {
+                            blobFiles.add(entry.file());
+                        } else {
+                            groupFiles.add(entry.file());
+                            weightSum += Math.max(entry.file().fileSize(), 
openFileCost);
+                        }
+                    }
+                } else if (rowId < nextRowIdExpected) {
+                    checkArgument(
+                            lastPartition.equals(entry.partition()),
+                            "Inconsistent partition for the same rowId: " + 
rowId);
+                    checkArgument(
+                            BlobFileFormat.isBlobFile(entry.fileName()),
+                            "Data file found in the middle of blob files for 
rowId: " + rowId);
+                    if (!skipFile) {
+                        blobFiles.add(entry.file());
+                    }
+                } else {
+                    BinaryRow currentPartition = entry.partition();
+                    long currentWeight = Math.max(entry.file().fileSize(), 
openFileCost);
+                    // skip big file
+                    skipFile = currentWeight > targetFileSize;
+
+                    // If compaction condition meets, do compaction
+                    if (weightSum > targetFileSize
+                            || rowId > nextRowIdExpected
+                            || !currentPartition.equals(lastPartition)
+                            || skipFile) {
+                        flushAll();
+                    }
+
+                    if (!skipFile) {
+                        weightSum += currentWeight;
+                        groupFiles.add(entry.file());
+                    }
+                    lastRowIdStart = rowId;
+                    nextRowIdExpected = rowId + entry.file().rowCount();
+                    lastPartition = currentPartition;
+                }
+            }
+            // do compaction for the last group
+            flushAll();
+
+            List<DataEvolutionCompactTask> result = new ArrayList<>(tasks);
+            tasks = new ArrayList<>();
+            return result;
+        }
+
+        private void flushAll() {
+            if (!groupFiles.isEmpty()) {
+                if (groupFiles.size() >= compactMinFileNum) {
+                    tasks.add(
+                            new DataEvolutionCompactTask(
+                                    lastPartition, new 
ArrayList<>(groupFiles), false));
+
+                    if (compactBlob && blobFiles.size() > 1) {
+                        tasks.add(
+                                new DataEvolutionCompactTask(
+                                        lastPartition, new 
ArrayList<>(blobFiles), true));
+                    }
+                }
+
+                weightSum = 0L;
+                groupFiles = new ArrayList<>();
+                blobFiles = new ArrayList<>();
+            }
+
+            checkArgument(weightSum == 0L, "Weight sum should be zero after 
compaction.");
+            checkArgument(groupFiles.isEmpty(), "Group files should be 
empty.");
+            checkArgument(blobFiles.isEmpty(), "Blob files should be empty.");
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
new file mode 100644
index 0000000000..f1024395b6
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.operation.AppendFileStoreWrite;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RecordWriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Data evolution table compaction task. */
+public class DataEvolutionCompactTask {
+
+    private static final Map<String, String> DYNAMIC_WRITE_OPTIONS =
+            Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), 
"99999 G");
+
+    private final BinaryRow partition;
+    private final List<DataFileMeta> compactBefore;
+    private final List<DataFileMeta> compactAfter;
+    private final boolean blobTask;
+
+    public DataEvolutionCompactTask(
+            BinaryRow partition, List<DataFileMeta> files, boolean blobTask) {
+        this.partition = partition;
+        this.compactBefore = new ArrayList<>(files);
+        this.compactAfter = new ArrayList<>();
+        this.blobTask = blobTask;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public List<DataFileMeta> compactBefore() {
+        return compactBefore;
+    }
+
+    public List<DataFileMeta> compactAfter() {
+        return compactAfter;
+    }
+
+    public boolean isBlobTask() {
+        return blobTask;
+    }
+
+    public CommitMessage doCompact(FileStoreTable table) throws Exception {
+        if (blobTask) {
+            // TODO: support blob file compaction
+            throw new UnsupportedOperationException("Blob task is not 
supported");
+        }
+
+        table = table.copy(DYNAMIC_WRITE_OPTIONS);
+        long firstRowId = compactBefore.get(0).nonNullFirstRowId();
+
+        RowType readWriteType =
+                new RowType(
+                        table.rowType().getFields().stream()
+                                .filter(f -> f.type().getTypeRoot() != 
DataTypeRoot.BLOB)
+                                .collect(Collectors.toList()));
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        AppendOnlyFileStore store = (AppendOnlyFileStore) table.store();
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withPartition(partition)
+                        .withBucket(0)
+                        .withDataFiles(compactBefore)
+                        .withBucketPath(pathFactory.bucketPath(partition, 
0).toString())
+                        .rawConvertible(false)
+                        .build();
+        RecordReader<InternalRow> reader =
+                
store.newDataEvolutionRead().withReadType(readWriteType).createReader(dataSplit);
+        AppendFileStoreWrite storeWrite =
+                (AppendFileStoreWrite) 
store.newWrite("Compact-Data-Evolution");
+        storeWrite.withWriteType(readWriteType);
+        RecordWriter<InternalRow> writer = storeWrite.createWriter(partition, 
0);
+
+        reader.forEachRemaining(
+                row -> {
+                    try {
+                        writer.write(row);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        List<DataFileMeta> writeResult = 
writer.prepareCommit(false).newFilesIncrement().newFiles();
+        checkArgument(
+                writeResult.size() == 1, "Data evolution compaction should 
produce one file.");
+
+        DataFileMeta dataFileMeta = 
writeResult.get(0).assignFirstRowId(firstRowId);
+        compactAfter.add(dataFileMeta);
+
+        CompactIncrement compactIncrement =
+                new CompactIncrement(
+                        compactBefore,
+                        compactAfter,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+        return new CommitMessageImpl(
+                partition, 0, null, DataIncrement.emptyIncrement(), 
compactIncrement);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
index 3bf35acc47..4619bb77d8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
@@ -102,7 +102,6 @@ public class GlobalIndexScanBuilderImpl implements 
GlobalIndexScanBuilder {
 
     @Override
     public List<Range> shardList() {
-
         Map<String, List<Range>> indexRanges = new HashMap<>();
         for (IndexManifestEntry entry : scan()) {
             GlobalIndexMeta globalIndexMeta = 
entry.indexFile().globalIndexMeta();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
new file mode 100644
index 0000000000..f7b3ab7839
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.stats.StatsTestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionCompactCoordinator.CompactPlanner}. */
+public class DataEvolutionCompactCoordinatorTest {
+
+    @Test
+    public void testCompactPlannerSingleFile() {
+        // Single file should not produce compaction tasks
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                new DataEvolutionCompactCoordinator.CompactPlanner(
+                        () -> entries, false, 128 * 1024 * 1024, 4 * 1024 * 
1024, 2);
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+        assertThat(tasks).isEmpty();
+    }
+
+    @Test
+    public void testCompactPlannerContiguousFiles() {
+        // Multiple contiguous files should be grouped together
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+        entries.add(makeEntry("file3.parquet", 200L, 100L, 100));
+
+        // Use small target file size to trigger compaction
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                new DataEvolutionCompactCoordinator.CompactPlanner(() -> 
entries, false, 199, 1, 2);
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+        assertThat(tasks).isNotEmpty();
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(0).file(), entries.get(1).file());
+
+        planner =
+                new DataEvolutionCompactCoordinator.CompactPlanner(() -> 
entries, false, 200, 1, 2);
+        tasks = planner.compactPlan();
+        assertThat(tasks).isNotEmpty();
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(
+                        entries.get(0).file(), entries.get(1).file(), 
entries.get(2).file());
+    }
+
+    @Test
+    public void testCompactPlannerWithRowIdGap() {
+        // Files with a gap in row IDs should trigger compaction of previous 
group
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+        // Gap: row IDs 200-999 are missing
+        entries.add(makeEntry("file3.parquet", 1000L, 100L, 100));
+        entries.add(makeEntry("file3.parquet", 1100L, 100L, 100));
+
+        // Use large target file size so compaction is triggered by gap, not 
size
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                new DataEvolutionCompactCoordinator.CompactPlanner(
+                        () -> entries, false, 128 * 1024 * 1024, 4 * 1024 * 
1024, 2);
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+        // Gap should trigger compaction of the first group (file1 + file2)
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).compactBefore()).hasSize(2);
+        assertThat(tasks.get(1).compactBefore()).hasSize(2);
+    }
+
+    @Test
+    public void testCompactPlannerSkipsLargeFiles() {
+        // Files larger than target size should be skipped
+        List<ManifestEntry> entries = new ArrayList<>();
+        // This file is larger than target
+        entries.add(makeEntryWithSize("large.parquet", 0L, 100L, 100, 200 * 
1024 * 1024));
+        entries.add(makeEntry("file1.parquet", 100L, 100L, 100));
+        entries.add(makeEntry("file2.parquet", 200L, 100L, 100));
+        entries.add(makeEntryWithSize("large2.parquet", 300L, 100L, 100, 200 * 
1024 * 1024));
+        entries.add(makeEntryWithSize("large2-1.blob", 300L, 50L, 100, 200 * 
1024 * 1024));
+        entries.add(makeEntryWithSize("large2-2.blob", 350L, 50L, 100, 200 * 
1024 * 1024));
+        entries.add(makeEntry("file3.parquet", 400L, 100L, 100));
+        entries.add(makeEntry("file4.parquet", 500L, 100L, 100));
+        entries.add(makeEntry(BinaryRow.singleColumn(0), "file5.parquet", 
600L, 100L, 100));
+        entries.add(makeEntry(BinaryRow.singleColumn(0), "file6.parquet", 
700L, 100L, 100));
+        entries.add(makeEntry(BinaryRow.singleColumn(1), "file7.parquet", 
800L, 100L, 100));
+        entries.add(makeEntry(BinaryRow.singleColumn(1), "file8.parquet", 
900L, 100L, 100));
+
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                new DataEvolutionCompactCoordinator.CompactPlanner(
+                        () -> entries, false, 100 * 1024 * 1024, 4 * 1024 * 
1024, 2);
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+        assertThat(tasks.size()).isEqualTo(4);
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(1).file(), entries.get(2).file());
+        assertThat(tasks.get(1).compactBefore())
+                .containsExactly(entries.get(6).file(), entries.get(7).file());
+        assertThat(tasks.get(2).compactBefore())
+                .containsExactly(entries.get(8).file(), entries.get(9).file());
+        assertThat(tasks.get(3).compactBefore())
+                .containsExactly(entries.get(10).file(), 
entries.get(11).file());
+    }
+
+    @Test
+    public void testCompactPlannerWithBlobFiles() {
+        // Test blob file compaction when enabled
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+        entries.add(makeBlobEntry("file1.blob", 0L, 100L, 100));
+        entries.add(makeBlobEntry("file1b.blob", 0L, 100L, 100));
+        entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+        entries.add(makeBlobEntry("file2.blob", 100L, 100L, 100));
+        entries.add(makeBlobEntry("file2b.blob", 100L, 100L, 100));
+
+        // Use small target to trigger compaction, with blob compaction enabled
+        DataEvolutionCompactCoordinator.CompactPlanner planner =
+                new DataEvolutionCompactCoordinator.CompactPlanner(
+                        () -> entries, true, 1024, 1024, 2);
+
+        List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+        // Should have compaction tasks for both data files and blob files
+        assertThat(tasks.size()).isEqualTo(2);
+
+        assertThat(tasks.get(0).compactBefore())
+                .containsExactly(entries.get(0).file(), entries.get(3).file());
+        assertThat(tasks.get(1).compactBefore())
+                .containsExactly(
+                        entries.get(1).file(),
+                        entries.get(2).file(),
+                        entries.get(4).file(),
+                        entries.get(5).file());
+    }
+
+    private ManifestEntry makeEntry(
+            String fileName, long firstRowId, long rowCount, long fileSize) {
+        return makeEntryWithSize(
+                BinaryRow.EMPTY_ROW, fileName, firstRowId, rowCount, fileSize, 
fileSize);
+    }
+
+    private ManifestEntry makeEntry(
+            BinaryRow partition, String fileName, long firstRowId, long 
rowCount, long fileSize) {
+        return makeEntryWithSize(partition, fileName, firstRowId, rowCount, 
fileSize, fileSize);
+    }
+
+    private ManifestEntry makeEntryWithSize(
+            String fileName, long firstRowId, long rowCount, long minSeq, long 
fileSize) {
+        return makeEntryWithSize(
+                BinaryRow.EMPTY_ROW, fileName, firstRowId, rowCount, minSeq, 
fileSize);
+    }
+
+    private ManifestEntry makeEntryWithSize(
+            BinaryRow partition,
+            String fileName,
+            long firstRowId,
+            long rowCount,
+            long minSeq,
+            long fileSize) {
+        return ManifestEntry.create(
+                FileKind.ADD,
+                partition,
+                0,
+                0,
+                createDataFileMeta(fileName, firstRowId, rowCount, minSeq, 
fileSize));
+    }
+
+    private ManifestEntry makeBlobEntry(
+            String fileName, long firstRowId, long rowCount, long fileSize) {
+        // Blob files have .blob extension
+        String blobFileName = fileName.endsWith(".blob") ? fileName : fileName 
+ ".blob";
+        return ManifestEntry.create(
+                FileKind.ADD,
+                BinaryRow.EMPTY_ROW,
+                0,
+                0,
+                createDataFileMeta(blobFileName, firstRowId, rowCount, 0, 
fileSize));
+    }
+
+    private DataFileMeta createDataFileMeta(
+            String fileName, long firstRowId, long rowCount, long maxSeq, long 
fileSize) {
+        return DataFileMeta.create(
+                fileName,
+                fileSize,
+                rowCount,
+                BinaryRow.EMPTY_ROW,
+                BinaryRow.EMPTY_ROW,
+                StatsTestUtils.newEmptySimpleStats(),
+                StatsTestUtils.newEmptySimpleStats(),
+                0,
+                maxSeq,
+                0,
+                0,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(System.currentTimeMillis()),
+                0L,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                firstRowId,
+                null);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 3bbe513a62..6a7efd013b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -40,6 +42,7 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -67,6 +70,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -896,6 +900,65 @@ public class DataEvolutionTableTest extends TableTestBase {
         Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400");
     }
 
+    @Test
+    public void testCompactCoordinator() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            write(100000L);
+        }
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+        // Create coordinator and call plan multiple times
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false);
+
+        // Each plan() call processes one manifest group
+        List<DataEvolutionCompactTask> allTasks = new ArrayList<>();
+        List<DataEvolutionCompactTask> tasks;
+        while (!(tasks = coordinator.plan()).isEmpty() || allTasks.isEmpty()) {
+            allTasks.addAll(tasks);
+            if (tasks.isEmpty()) {
+                break;
+            }
+        }
+
+        // Verify no exceptions were thrown and tasks list is valid (may be 
empty)
+        assertThat(allTasks).isNotNull();
+        assertThat(allTasks.size()).isEqualTo(1);
+        DataEvolutionCompactTask task = allTasks.get(0);
+        assertThat(task.compactBefore().size()).isEqualTo(20);
+    }
+
+    @Test
+    public void testCompact() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            write(100000L);
+        }
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+        // Create coordinator and call plan multiple times
+        DataEvolutionCompactCoordinator coordinator =
+                new DataEvolutionCompactCoordinator(table, false);
+
+        // Each plan() call processes one manifest group
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        List<DataEvolutionCompactTask> tasks;
+        while (!(tasks = coordinator.plan()).isEmpty()) {
+            for (DataEvolutionCompactTask task : tasks) {
+                commitMessages.add(task.doCompact(table));
+            }
+        }
+
+        table.newBatchWriteBuilder().newCommit().commit(commitMessages);
+
+        List<ManifestEntry> entries = new ArrayList<>();
+        Iterator<ManifestEntry> files = 
table.newSnapshotReader().readFileIterator();
+        while (files.hasNext()) {
+            entries.add(files.next());
+        }
+
+        assertThat(entries.size()).isEqualTo(1);
+        assertThat(entries.get(0).file().nonNullFirstRowId()).isEqualTo(0);
+        assertThat(entries.get(0).file().rowCount()).isEqualTo(500000L);
+    }
+
     private void write(long count) throws Exception {
         createTableDefault();
 
@@ -911,6 +974,7 @@ public class DataEvolutionTableTest extends TableTestBase {
             commit.commit(write0.prepareCommit());
         }
 
+        long rowId = 
getTableDefault().snapshotManager().latestSnapshot().nextRowId() - count;
         builder = getTableDefault().newBatchWriteBuilder();
         try (BatchTableWrite write1 = 
builder.newWrite().withWriteType(writeType1)) {
             for (int i = 0; i < count; i++) {
@@ -918,7 +982,7 @@ public class DataEvolutionTableTest extends TableTestBase {
             }
             BatchTableCommit commit = builder.newCommit();
             List<CommitMessage> commitables = write1.prepareCommit();
-            setFirstRowId(commitables, 0L);
+            setFirstRowId(commitables, rowId);
             commit.commit(commitables);
         }
 


Reply via email to