This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 072127dbea [core] Introducer compaction for data-evolution table
(#6828)
072127dbea is described below
commit 072127dbea5673e5d9a1db3476f8886504d4e553
Author: YeJunHao <[email protected]>
AuthorDate: Fri Dec 19 11:40:10 2025 +0800
[core] Introducer compaction for data-evolution table (#6828)
---
.../paimon/append/RollingBlobFileWriter.java | 4 +-
.../DataEvolutionCompactCoordinator.java | 239 +++++++++++++++++++++
.../dataevolution/DataEvolutionCompactTask.java | 140 ++++++++++++
.../globalindex/GlobalIndexScanBuilderImpl.java | 1 -
.../DataEvolutionCompactCoordinatorTest.java | 239 +++++++++++++++++++++
.../paimon/table/DataEvolutionTableTest.java | 66 +++++-
6 files changed, 685 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 17928dd6fa..8d511c6b91 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -67,8 +67,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* Every time a file is rolled, the writer will close the current normal data
file and blob data files,
* so one normal data file may correspond to multiple blob data files.
*
- * Normal file1: f1.parquet may including (b1.blob, b2.blob, b3.blob)
- * Normal file2: f1-2.parquet may including (b4.blob, b5.blob)
+ * Normal file1: f1.parquet may include (b1.blob, b2.blob, b3.blob)
+ * Normal file2: f1-2.parquet may include (b4.blob, b5.blob)
*
* </pre>
*/
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
new file mode 100644
index 0000000000..892b88952a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.format.blob.BlobFileFormat;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.utils.RangeHelper;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Compact coordinator to cmpact data evolution table. */
+public class DataEvolutionCompactCoordinator {
+
+ private final CompactScanner scanner;
+ private final CompactPlanner planner;
+
+ public DataEvolutionCompactCoordinator(FileStoreTable table, boolean
compactBlob) {
+ CoreOptions options = table.coreOptions();
+ long targetFileSize = options.targetFileSize(false);
+ long openFileCost = options.splitOpenFileCost();
+ long compactMinFileNum = options.compactionMinFileNum();
+
+ this.scanner = new CompactScanner(table.newSnapshotReader());
+ this.planner =
+ new CompactPlanner(
+ scanner::fetchResult,
+ compactBlob,
+ targetFileSize,
+ openFileCost,
+ compactMinFileNum);
+ }
+
+ public List<DataEvolutionCompactTask> plan() {
+ // scan files in snapshot
+ if (scanner.scan()) {
+ // do plan compact tasks
+ return planner.compactPlan();
+ }
+
+ return Collections.emptyList();
+ }
+
+ /** Scanner to generate sorted ManifestEntries. */
+ static class CompactScanner {
+
+ private final SnapshotReader snapshotReader;
+ private final Queue<List<ManifestFileMeta>> metas;
+
+ private List<ManifestEntry> result;
+
+ private CompactScanner(SnapshotReader snapshotReader) {
+ this.snapshotReader = snapshotReader;
+ Snapshot snapshot =
snapshotReader.snapshotManager().latestSnapshot();
+
+ List<ManifestFileMeta> manifestFileMetas =
+ snapshotReader.manifestsReader().read(snapshot,
ScanMode.ALL).filteredManifests;
+ RangeHelper<ManifestFileMeta> rangeHelper =
+ new RangeHelper<>(ManifestFileMeta::minRowId,
ManifestFileMeta::maxRowId);
+ this.metas = new
ArrayDeque<>(rangeHelper.mergeOverlappingRanges(manifestFileMetas));
+ this.result = new ArrayList<>();
+ }
+
+ boolean scan() {
+ boolean scanResult = false;
+ while (metas.peek() != null && result.size() < 1000) {
+ scanResult = true;
+ List<ManifestFileMeta> currentMetas = metas.poll();
+ List<ManifestEntry> targetEntries =
+ currentMetas.stream()
+ .flatMap(meta ->
snapshotReader.readManifest(meta).stream())
+ .collect(Collectors.toList());
+ Comparator<ManifestEntry> comparator =
+ Comparator.comparingLong((ManifestEntry a) ->
a.file().nonNullFirstRowId())
+ .thenComparingInt(
+ a ->
BlobFileFormat.isBlobFile(a.fileName()) ? 1 : 0);
+ targetEntries.sort(comparator);
+
+ result.addAll(targetEntries);
+ }
+ return scanResult;
+ }
+
+ List<ManifestEntry> fetchResult() {
+ List<ManifestEntry> result = new ArrayList<>(this.result);
+ this.result = new ArrayList<>();
+ return result;
+ }
+ }
+
+ /** Generate compaction tasks. */
+ static class CompactPlanner {
+
+ private final Supplier<List<ManifestEntry>> supplier;
+ private final boolean compactBlob;
+ private final long targetFileSize;
+ private final long openFileCost;
+ private final long compactMinFileNum;
+ private long lastRowIdStart = -1;
+ private long nextRowIdExpected = -1;
+ private long weightSum = 0L;
+ private BinaryRow lastPartition = null;
+ private boolean skipFile = false;
+ private List<DataEvolutionCompactTask> tasks = new ArrayList<>();
+ private List<DataFileMeta> groupFiles = new ArrayList<>();
+ private List<DataFileMeta> blobFiles = new ArrayList<>();
+
+ CompactPlanner(
+ Supplier<List<ManifestEntry>> supplier,
+ boolean compactBlob,
+ long targetFileSize,
+ long openFileCost,
+ long compactMinFileNum) {
+ this.supplier = supplier;
+ this.compactBlob = compactBlob;
+ this.targetFileSize = targetFileSize;
+ this.openFileCost = openFileCost;
+ this.compactMinFileNum = compactMinFileNum;
+ }
+
+ List<DataEvolutionCompactTask> compactPlan() {
+ for (ManifestEntry entry : supplier.get()) {
+ long rowId = entry.file().nonNullFirstRowId();
+ if (rowId < lastRowIdStart) {
+ throw new IllegalStateException(
+ "Files are not in order by rowId. Current file
rowId: "
+ + rowId
+ + ", last file rowId: "
+ + lastRowIdStart);
+ } else if (rowId == lastRowIdStart) {
+ checkArgument(
+ lastPartition.equals(entry.partition()),
+ "Inconsistent partition for the same rowId: " +
rowId);
+ if (!skipFile) {
+ if (BlobFileFormat.isBlobFile(entry.fileName())) {
+ blobFiles.add(entry.file());
+ } else {
+ groupFiles.add(entry.file());
+ weightSum += Math.max(entry.file().fileSize(),
openFileCost);
+ }
+ }
+ } else if (rowId < nextRowIdExpected) {
+ checkArgument(
+ lastPartition.equals(entry.partition()),
+ "Inconsistent partition for the same rowId: " +
rowId);
+ checkArgument(
+ BlobFileFormat.isBlobFile(entry.fileName()),
+ "Data file found in the middle of blob files for
rowId: " + rowId);
+ if (!skipFile) {
+ blobFiles.add(entry.file());
+ }
+ } else {
+ BinaryRow currentPartition = entry.partition();
+ long currentWeight = Math.max(entry.file().fileSize(),
openFileCost);
+ // skip big file
+ skipFile = currentWeight > targetFileSize;
+
+ // If compaction condition meets, do compaction
+ if (weightSum > targetFileSize
+ || rowId > nextRowIdExpected
+ || !currentPartition.equals(lastPartition)
+ || skipFile) {
+ flushAll();
+ }
+
+ if (!skipFile) {
+ weightSum += currentWeight;
+ groupFiles.add(entry.file());
+ }
+ lastRowIdStart = rowId;
+ nextRowIdExpected = rowId + entry.file().rowCount();
+ lastPartition = currentPartition;
+ }
+ }
+ // do compaction for the last group
+ flushAll();
+
+ List<DataEvolutionCompactTask> result = new ArrayList<>(tasks);
+ tasks = new ArrayList<>();
+ return result;
+ }
+
+ private void flushAll() {
+ if (!groupFiles.isEmpty()) {
+ if (groupFiles.size() >= compactMinFileNum) {
+ tasks.add(
+ new DataEvolutionCompactTask(
+ lastPartition, new
ArrayList<>(groupFiles), false));
+
+ if (compactBlob && blobFiles.size() > 1) {
+ tasks.add(
+ new DataEvolutionCompactTask(
+ lastPartition, new
ArrayList<>(blobFiles), true));
+ }
+ }
+
+ weightSum = 0L;
+ groupFiles = new ArrayList<>();
+ blobFiles = new ArrayList<>();
+ }
+
+ checkArgument(weightSum == 0L, "Weight sum should be zero after
compaction.");
+ checkArgument(groupFiles.isEmpty(), "Group files should be
empty.");
+ checkArgument(blobFiles.isEmpty(), "Blob files should be empty.");
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
new file mode 100644
index 0000000000..f1024395b6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.operation.AppendFileStoreWrite;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.RecordWriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Data evolution table compaction task. */
+public class DataEvolutionCompactTask {
+
+ private static final Map<String, String> DYNAMIC_WRITE_OPTIONS =
+ Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(),
"99999 G");
+
+ private final BinaryRow partition;
+ private final List<DataFileMeta> compactBefore;
+ private final List<DataFileMeta> compactAfter;
+ private final boolean blobTask;
+
+ public DataEvolutionCompactTask(
+ BinaryRow partition, List<DataFileMeta> files, boolean blobTask) {
+ this.partition = partition;
+ this.compactBefore = new ArrayList<>(files);
+ this.compactAfter = new ArrayList<>();
+ this.blobTask = blobTask;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public List<DataFileMeta> compactBefore() {
+ return compactBefore;
+ }
+
+ public List<DataFileMeta> compactAfter() {
+ return compactAfter;
+ }
+
+ public boolean isBlobTask() {
+ return blobTask;
+ }
+
+ public CommitMessage doCompact(FileStoreTable table) throws Exception {
+ if (blobTask) {
+ // TODO: support blob file compaction
+ throw new UnsupportedOperationException("Blob task is not
supported");
+ }
+
+ table = table.copy(DYNAMIC_WRITE_OPTIONS);
+ long firstRowId = compactBefore.get(0).nonNullFirstRowId();
+
+ RowType readWriteType =
+ new RowType(
+ table.rowType().getFields().stream()
+ .filter(f -> f.type().getTypeRoot() !=
DataTypeRoot.BLOB)
+ .collect(Collectors.toList()));
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ AppendOnlyFileStore store = (AppendOnlyFileStore) table.store();
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(0)
+ .withDataFiles(compactBefore)
+ .withBucketPath(pathFactory.bucketPath(partition,
0).toString())
+ .rawConvertible(false)
+ .build();
+ RecordReader<InternalRow> reader =
+
store.newDataEvolutionRead().withReadType(readWriteType).createReader(dataSplit);
+ AppendFileStoreWrite storeWrite =
+ (AppendFileStoreWrite)
store.newWrite("Compact-Data-Evolution");
+ storeWrite.withWriteType(readWriteType);
+ RecordWriter<InternalRow> writer = storeWrite.createWriter(partition,
0);
+
+ reader.forEachRemaining(
+ row -> {
+ try {
+ writer.write(row);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<DataFileMeta> writeResult =
writer.prepareCommit(false).newFilesIncrement().newFiles();
+ checkArgument(
+ writeResult.size() == 1, "Data evolution compaction should
produce one file.");
+
+ DataFileMeta dataFileMeta =
writeResult.get(0).assignFirstRowId(firstRowId);
+ compactAfter.add(dataFileMeta);
+
+ CompactIncrement compactIncrement =
+ new CompactIncrement(
+ compactBefore,
+ compactAfter,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+ return new CommitMessageImpl(
+ partition, 0, null, DataIncrement.emptyIncrement(),
compactIncrement);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
index 3bf35acc47..4619bb77d8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanBuilderImpl.java
@@ -102,7 +102,6 @@ public class GlobalIndexScanBuilderImpl implements
GlobalIndexScanBuilder {
@Override
public List<Range> shardList() {
-
Map<String, List<Range>> indexRanges = new HashMap<>();
for (IndexManifestEntry entry : scan()) {
GlobalIndexMeta globalIndexMeta =
entry.indexFile().globalIndexMeta();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
new file mode 100644
index 0000000000..f7b3ab7839
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinatorTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.dataevolution;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.stats.StatsTestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionCompactCoordinator.CompactPlanner}. */
+public class DataEvolutionCompactCoordinatorTest {
+
+ @Test
+ public void testCompactPlannerSingleFile() {
+ // Single file should not produce compaction tasks
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ new DataEvolutionCompactCoordinator.CompactPlanner(
+ () -> entries, false, 128 * 1024 * 1024, 4 * 1024 *
1024, 2);
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+ assertThat(tasks).isEmpty();
+ }
+
+ @Test
+ public void testCompactPlannerContiguousFiles() {
+ // Multiple contiguous files should be grouped together
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+ entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+ entries.add(makeEntry("file3.parquet", 200L, 100L, 100));
+
+ // Use small target file size to trigger compaction
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ new DataEvolutionCompactCoordinator.CompactPlanner(() ->
entries, false, 199, 1, 2);
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+ assertThat(tasks).isNotEmpty();
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(entries.get(0).file(), entries.get(1).file());
+
+ planner =
+ new DataEvolutionCompactCoordinator.CompactPlanner(() ->
entries, false, 200, 1, 2);
+ tasks = planner.compactPlan();
+ assertThat(tasks).isNotEmpty();
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(
+ entries.get(0).file(), entries.get(1).file(),
entries.get(2).file());
+ }
+
+ @Test
+ public void testCompactPlannerWithRowIdGap() {
+ // Files with a gap in row IDs should trigger compaction of previous
group
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+ entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+ // Gap: row IDs 200-999 are missing
+ entries.add(makeEntry("file3.parquet", 1000L, 100L, 100));
+ entries.add(makeEntry("file3.parquet", 1100L, 100L, 100));
+
+ // Use large target file size so compaction is triggered by gap, not
size
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ new DataEvolutionCompactCoordinator.CompactPlanner(
+ () -> entries, false, 128 * 1024 * 1024, 4 * 1024 *
1024, 2);
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+ // Gap should trigger compaction of the first group (file1 + file2)
+ assertThat(tasks).hasSize(2);
+ assertThat(tasks.get(0).compactBefore()).hasSize(2);
+ assertThat(tasks.get(1).compactBefore()).hasSize(2);
+ }
+
+ @Test
+ public void testCompactPlannerSkipsLargeFiles() {
+ // Files larger than target size should be skipped
+ List<ManifestEntry> entries = new ArrayList<>();
+ // This file is larger than target
+ entries.add(makeEntryWithSize("large.parquet", 0L, 100L, 100, 200 *
1024 * 1024));
+ entries.add(makeEntry("file1.parquet", 100L, 100L, 100));
+ entries.add(makeEntry("file2.parquet", 200L, 100L, 100));
+ entries.add(makeEntryWithSize("large2.parquet", 300L, 100L, 100, 200 *
1024 * 1024));
+ entries.add(makeEntryWithSize("large2-1.blob", 300L, 50L, 100, 200 *
1024 * 1024));
+ entries.add(makeEntryWithSize("large2-2.blob", 350L, 50L, 100, 200 *
1024 * 1024));
+ entries.add(makeEntry("file3.parquet", 400L, 100L, 100));
+ entries.add(makeEntry("file4.parquet", 500L, 100L, 100));
+ entries.add(makeEntry(BinaryRow.singleColumn(0), "file5.parquet",
600L, 100L, 100));
+ entries.add(makeEntry(BinaryRow.singleColumn(0), "file6.parquet",
700L, 100L, 100));
+ entries.add(makeEntry(BinaryRow.singleColumn(1), "file7.parquet",
800L, 100L, 100));
+ entries.add(makeEntry(BinaryRow.singleColumn(1), "file8.parquet",
900L, 100L, 100));
+
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ new DataEvolutionCompactCoordinator.CompactPlanner(
+ () -> entries, false, 100 * 1024 * 1024, 4 * 1024 *
1024, 2);
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+ assertThat(tasks.size()).isEqualTo(4);
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(entries.get(1).file(), entries.get(2).file());
+ assertThat(tasks.get(1).compactBefore())
+ .containsExactly(entries.get(6).file(), entries.get(7).file());
+ assertThat(tasks.get(2).compactBefore())
+ .containsExactly(entries.get(8).file(), entries.get(9).file());
+ assertThat(tasks.get(3).compactBefore())
+ .containsExactly(entries.get(10).file(),
entries.get(11).file());
+ }
+
+ @Test
+ public void testCompactPlannerWithBlobFiles() {
+ // Test blob file compaction when enabled
+ List<ManifestEntry> entries = new ArrayList<>();
+ entries.add(makeEntry("file1.parquet", 0L, 100L, 100));
+ entries.add(makeBlobEntry("file1.blob", 0L, 100L, 100));
+ entries.add(makeBlobEntry("file1b.blob", 0L, 100L, 100));
+ entries.add(makeEntry("file2.parquet", 100L, 100L, 100));
+ entries.add(makeBlobEntry("file2.blob", 100L, 100L, 100));
+ entries.add(makeBlobEntry("file2b.blob", 100L, 100L, 100));
+
+ // Use small target to trigger compaction, with blob compaction enabled
+ DataEvolutionCompactCoordinator.CompactPlanner planner =
+ new DataEvolutionCompactCoordinator.CompactPlanner(
+ () -> entries, true, 1024, 1024, 2);
+
+ List<DataEvolutionCompactTask> tasks = planner.compactPlan();
+
+ // Should have compaction tasks for both data files and blob files
+ assertThat(tasks.size()).isEqualTo(2);
+
+ assertThat(tasks.get(0).compactBefore())
+ .containsExactly(entries.get(0).file(), entries.get(3).file());
+ assertThat(tasks.get(1).compactBefore())
+ .containsExactly(
+ entries.get(1).file(),
+ entries.get(2).file(),
+ entries.get(4).file(),
+ entries.get(5).file());
+ }
+
+ private ManifestEntry makeEntry(
+ String fileName, long firstRowId, long rowCount, long fileSize) {
+ return makeEntryWithSize(
+ BinaryRow.EMPTY_ROW, fileName, firstRowId, rowCount, fileSize,
fileSize);
+ }
+
+ private ManifestEntry makeEntry(
+ BinaryRow partition, String fileName, long firstRowId, long
rowCount, long fileSize) {
+ return makeEntryWithSize(partition, fileName, firstRowId, rowCount,
fileSize, fileSize);
+ }
+
+ private ManifestEntry makeEntryWithSize(
+ String fileName, long firstRowId, long rowCount, long minSeq, long
fileSize) {
+ return makeEntryWithSize(
+ BinaryRow.EMPTY_ROW, fileName, firstRowId, rowCount, minSeq,
fileSize);
+ }
+
+ private ManifestEntry makeEntryWithSize(
+ BinaryRow partition,
+ String fileName,
+ long firstRowId,
+ long rowCount,
+ long minSeq,
+ long fileSize) {
+ return ManifestEntry.create(
+ FileKind.ADD,
+ partition,
+ 0,
+ 0,
+ createDataFileMeta(fileName, firstRowId, rowCount, minSeq,
fileSize));
+ }
+
+ private ManifestEntry makeBlobEntry(
+ String fileName, long firstRowId, long rowCount, long fileSize) {
+ // Blob files have .blob extension
+ String blobFileName = fileName.endsWith(".blob") ? fileName : fileName
+ ".blob";
+ return ManifestEntry.create(
+ FileKind.ADD,
+ BinaryRow.EMPTY_ROW,
+ 0,
+ 0,
+ createDataFileMeta(blobFileName, firstRowId, rowCount, 0,
fileSize));
+ }
+
+ private DataFileMeta createDataFileMeta(
+ String fileName, long firstRowId, long rowCount, long maxSeq, long
fileSize) {
+ return DataFileMeta.create(
+ fileName,
+ fileSize,
+ rowCount,
+ BinaryRow.EMPTY_ROW,
+ BinaryRow.EMPTY_ROW,
+ StatsTestUtils.newEmptySimpleStats(),
+ StatsTestUtils.newEmptySimpleStats(),
+ 0,
+ maxSeq,
+ 0,
+ 0,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(System.currentTimeMillis()),
+ 0L,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ firstRowId,
+ null);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 3bbe513a62..6a7efd013b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -19,6 +19,8 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -40,6 +42,7 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -67,6 +70,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -896,6 +900,65 @@ public class DataEvolutionTableTest extends TableTestBase {
Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400");
}
+ @Test
+ public void testCompactCoordinator() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ write(100000L);
+ }
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+ // Create coordinator and call plan multiple times
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false);
+
+ // Each plan() call processes one manifest group
+ List<DataEvolutionCompactTask> allTasks = new ArrayList<>();
+ List<DataEvolutionCompactTask> tasks;
+ while (!(tasks = coordinator.plan()).isEmpty() || allTasks.isEmpty()) {
+ allTasks.addAll(tasks);
+ if (tasks.isEmpty()) {
+ break;
+ }
+ }
+
+ // Verify no exceptions were thrown and tasks list is valid (may be
empty)
+ assertThat(allTasks).isNotNull();
+ assertThat(allTasks.size()).isEqualTo(1);
+ DataEvolutionCompactTask task = allTasks.get(0);
+ assertThat(task.compactBefore().size()).isEqualTo(20);
+ }
+
+ @Test
+ public void testCompact() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ write(100000L);
+ }
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+ // Create coordinator and call plan multiple times
+ DataEvolutionCompactCoordinator coordinator =
+ new DataEvolutionCompactCoordinator(table, false);
+
+ // Each plan() call processes one manifest group
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ List<DataEvolutionCompactTask> tasks;
+ while (!(tasks = coordinator.plan()).isEmpty()) {
+ for (DataEvolutionCompactTask task : tasks) {
+ commitMessages.add(task.doCompact(table));
+ }
+ }
+
+ table.newBatchWriteBuilder().newCommit().commit(commitMessages);
+
+ List<ManifestEntry> entries = new ArrayList<>();
+ Iterator<ManifestEntry> files =
table.newSnapshotReader().readFileIterator();
+ while (files.hasNext()) {
+ entries.add(files.next());
+ }
+
+ assertThat(entries.size()).isEqualTo(1);
+ assertThat(entries.get(0).file().nonNullFirstRowId()).isEqualTo(0);
+ assertThat(entries.get(0).file().rowCount()).isEqualTo(500000L);
+ }
+
private void write(long count) throws Exception {
createTableDefault();
@@ -911,6 +974,7 @@ public class DataEvolutionTableTest extends TableTestBase {
commit.commit(write0.prepareCommit());
}
+ long rowId =
getTableDefault().snapshotManager().latestSnapshot().nextRowId() - count;
builder = getTableDefault().newBatchWriteBuilder();
try (BatchTableWrite write1 =
builder.newWrite().withWriteType(writeType1)) {
for (int i = 0; i < count; i++) {
@@ -918,7 +982,7 @@ public class DataEvolutionTableTest extends TableTestBase {
}
BatchTableCommit commit = builder.newCommit();
List<CommitMessage> commitables = write1.prepareCommit();
- setFirstRowId(commitables, 0L);
+ setFirstRowId(commitables, rowId);
commit.commit(commitables);
}