This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 654c130b78 [core] Add bucket & level statistics in ManifestFileMeta
(#5345)
654c130b78 is described below
commit 654c130b78d590d8cb2c137d9e540038716315d6
Author: tsreaper <[email protected]>
AuthorDate: Thu Mar 27 15:53:07 2025 +0800
[core] Add bucket & level statistics in ManifestFileMeta (#5345)
---
.../org/apache/paimon/manifest/ManifestFile.java | 14 +++-
.../apache/paimon/manifest/ManifestFileMeta.java | 69 ++++++++++++++--
.../manifest/ManifestFileMetaSerializer.java | 13 ++-
.../org/apache/paimon/manifest/ManifestList.java | 7 +-
.../paimon/operation/AbstractFileStoreScan.java | 15 +++-
.../apache/paimon/operation/ManifestsReader.java | 24 ++++++
.../paimon/table/source/AbstractDataTableScan.java | 3 +-
.../apache/paimon/table/source/InnerTableScan.java | 4 +
.../apache/paimon/table/source/ReadBuilder.java | 2 +
.../paimon/table/source/ReadBuilderImpl.java | 10 +++
.../LegacyManifestFileMetaSerializerPaimon10.java} | 34 ++++++--
.../apache/paimon/manifest/ManifestListTest.java | 96 ++++++++++++++++++----
.../paimon/manifest/ManifestTestDataGenerator.java | 16 +++-
.../apache/paimon/flink/action/CompactAction.java | 34 ++------
14 files changed, 273 insertions(+), 68 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 3bb1ddf480..5453fbc107 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -129,6 +129,10 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private long numAddedFiles = 0;
private long numDeletedFiles = 0;
private long schemaId = Long.MIN_VALUE;
+ private int minBucket = Integer.MAX_VALUE;
+ private int maxBucket = Integer.MIN_VALUE;
+ private int minLevel = Integer.MAX_VALUE;
+ private int maxLevel = Integer.MIN_VALUE;
ManifestEntryWriter(FormatWriterFactory factory, Path path, String
fileCompression) {
super(
@@ -157,6 +161,10 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
throw new UnsupportedOperationException("Unknown entry
kind: " + entry.kind());
}
schemaId = Math.max(schemaId, entry.file().schemaId());
+ minBucket = Math.min(minBucket, entry.bucket());
+ maxBucket = Math.max(maxBucket, entry.bucket());
+ minLevel = Math.min(minLevel, entry.level());
+ maxLevel = Math.max(maxLevel, entry.level());
partitionStatsCollector.collect(entry.partition());
}
@@ -171,7 +179,11 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
partitionStatsSerializer.toBinaryAllMode(partitionStatsCollector.extract()),
numAddedFiles + numDeletedFiles > 0
? schemaId
- : schemaManager.latest().get().id());
+ : schemaManager.latest().get().id(),
+ minBucket,
+ maxBucket,
+ minLevel,
+ maxLevel);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 242abb1897..aa0156360a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -22,9 +22,12 @@ import org.apache.paimon.annotation.Public;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;
@@ -47,7 +50,11 @@ public class ManifestFileMeta {
new DataField(2, "_NUM_ADDED_FILES", new
BigIntType(false)),
new DataField(3, "_NUM_DELETED_FILES", new
BigIntType(false)),
new DataField(4, "_PARTITION_STATS",
SimpleStats.SCHEMA),
- new DataField(5, "_SCHEMA_ID", new
BigIntType(false))));
+ new DataField(5, "_SCHEMA_ID", new
BigIntType(false)),
+ new DataField(6, "_MIN_BUCKET", new IntType(true)),
+ new DataField(7, "_MAX_BUCKET", new IntType(true)),
+ new DataField(8, "_MIN_LEVEL", new IntType(true)),
+ new DataField(9, "_MAX_LEVEL", new
IntType(true))));
private final String fileName;
private final long fileSize;
@@ -55,6 +62,10 @@ public class ManifestFileMeta {
private final long numDeletedFiles;
private final SimpleStats partitionStats;
private final long schemaId;
+ private final @Nullable Integer minBucket;
+ private final @Nullable Integer maxBucket;
+ private final @Nullable Integer minLevel;
+ private final @Nullable Integer maxLevel;
public ManifestFileMeta(
String fileName,
@@ -62,13 +73,21 @@ public class ManifestFileMeta {
long numAddedFiles,
long numDeletedFiles,
SimpleStats partitionStats,
- long schemaId) {
+ long schemaId,
+ @Nullable Integer minBucket,
+ @Nullable Integer maxBucket,
+ @Nullable Integer minLevel,
+ @Nullable Integer maxLevel) {
this.fileName = fileName;
this.fileSize = fileSize;
this.numAddedFiles = numAddedFiles;
this.numDeletedFiles = numDeletedFiles;
this.partitionStats = partitionStats;
this.schemaId = schemaId;
+ this.minBucket = minBucket;
+ this.maxBucket = maxBucket;
+ this.minLevel = minLevel;
+ this.maxLevel = maxLevel;
}
public String fileName() {
@@ -95,6 +114,22 @@ public class ManifestFileMeta {
return schemaId;
}
+ public @Nullable Integer minBucket() {
+ return minBucket;
+ }
+
+ public @Nullable Integer maxBucket() {
+ return maxBucket;
+ }
+
+ public @Nullable Integer minLevel() {
+ return minLevel;
+ }
+
+ public @Nullable Integer maxLevel() {
+ return maxLevel;
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof ManifestFileMeta)) {
@@ -106,20 +141,42 @@ public class ManifestFileMeta {
&& numAddedFiles == that.numAddedFiles
&& numDeletedFiles == that.numDeletedFiles
&& Objects.equals(partitionStats, that.partitionStats)
- && schemaId == that.schemaId;
+ && schemaId == that.schemaId
+ && Objects.equals(minBucket, that.minBucket)
+ && Objects.equals(maxBucket, that.maxBucket)
+ && Objects.equals(minLevel, that.minLevel)
+ && Objects.equals(maxLevel, that.maxLevel);
}
@Override
public int hashCode() {
return Objects.hash(
- fileName, fileSize, numAddedFiles, numDeletedFiles,
partitionStats, schemaId);
+ fileName,
+ fileSize,
+ numAddedFiles,
+ numDeletedFiles,
+ partitionStats,
+ schemaId,
+ minBucket,
+ maxBucket,
+ minLevel,
+ maxLevel);
}
@Override
public String toString() {
return String.format(
- "{%s, %d, %d, %d, %s, %d}",
- fileName, fileSize, numAddedFiles, numDeletedFiles,
partitionStats, schemaId);
+ "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}",
+ fileName,
+ fileSize,
+ numAddedFiles,
+ numDeletedFiles,
+ partitionStats,
+ schemaId,
+ minBucket,
+ maxBucket,
+ minLevel,
+ maxLevel);
}
// ----------------------- Serialization -----------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
index fe791e5638..b3819e1277 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
@@ -46,7 +46,11 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
meta.numAddedFiles(),
meta.numDeletedFiles(),
meta.partitionStats().toRow(),
- meta.schemaId());
+ meta.schemaId(),
+ meta.minBucket(),
+ meta.maxBucket(),
+ meta.minLevel(),
+ meta.maxLevel());
}
@Override
@@ -60,12 +64,17 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
}
throw new IllegalArgumentException("Unsupported version: " +
version);
}
+
return new ManifestFileMeta(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
row.getLong(3),
SimpleStats.fromRow(row.getRow(4, 3)),
- row.getLong(5));
+ row.getLong(5),
+ row.isNullAt(6) ? null : row.getInt(6),
+ row.isNullAt(7) ? null : row.getInt(7),
+ row.isNullAt(8) ? null : row.getInt(8),
+ row.isNullAt(9) ? null : row.getInt(9));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index ab2751ab70..1f2b235dfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -19,6 +19,7 @@
package org.apache.paimon.manifest;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
@@ -26,6 +27,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ObjectSerializer;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
@@ -44,9 +46,10 @@ import java.util.List;
*/
public class ManifestList extends ObjectsFile<ManifestFileMeta> {
- private ManifestList(
+ @VisibleForTesting
+ public ManifestList(
FileIO fileIO,
- ManifestFileMetaSerializer serializer,
+ ObjectSerializer<ManifestFileMeta> serializer,
RowType schema,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 861128155d..ffa88bb259 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -79,6 +79,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Snapshot specifiedSnapshot = null;
private boolean onlyReadRealBuckets = false;
+ private Integer specifiedBucket = null;
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
protected ScanMode scanMode = ScanMode.ALL;
@@ -132,14 +133,16 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
@Override
- public FileStoreScan withBucket(int bucket) {
- this.bucketFilter = i -> i == bucket;
+ public FileStoreScan onlyReadRealBuckets() {
+ manifestsReader.onlyReadRealBuckets();
+ this.onlyReadRealBuckets = true;
return this;
}
@Override
- public FileStoreScan onlyReadRealBuckets() {
- this.onlyReadRealBuckets = true;
+ public FileStoreScan withBucket(int bucket) {
+ manifestsReader.withBucket(bucket);
+ specifiedBucket = bucket;
return this;
}
@@ -508,6 +511,10 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
return false;
}
+ if (specifiedBucket != null && bucket != specifiedBucket) {
+ return false;
+ }
+
if (bucketFilter != null && !bucketFilter.test(bucket)) {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index 2eaa3646f7..afcbb82b73 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -48,6 +48,8 @@ public class ManifestsReader {
private final SnapshotManager snapshotManager;
private final ManifestList.Factory manifestListFactory;
+ private boolean onlyReadRealBuckets = false;
+ @Nullable private Integer specifiedBucket = null;
@Nullable private PartitionPredicate partitionFilter = null;
public ManifestsReader(
@@ -61,6 +63,16 @@ public class ManifestsReader {
this.manifestListFactory = manifestListFactory;
}
+ public ManifestsReader onlyReadRealBuckets() {
+ this.onlyReadRealBuckets = true;
+ return this;
+ }
+
+ public ManifestsReader withBucket(int bucket) {
+ this.specifiedBucket = bucket;
+ return this;
+ }
+
public ManifestsReader withPartitionFilter(Predicate predicate) {
this.partitionFilter = PartitionPredicate.fromPredicate(partitionType,
predicate);
return this;
@@ -123,6 +135,18 @@ public class ManifestsReader {
/** Note: Keep this thread-safe. */
private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
+ Integer minBucket = manifest.minBucket();
+ Integer maxBucket = manifest.maxBucket();
+ if (minBucket != null && maxBucket != null) {
+ if (onlyReadRealBuckets && maxBucket < 0) {
+ return false;
+ }
+ if (specifiedBucket != null
+ && (specifiedBucket < minBucket || specifiedBucket >
maxBucket)) {
+ return false;
+ }
+ }
+
if (partitionFilter == null) {
return true;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 17a5b161b0..c6de102581 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
@@ -78,7 +77,7 @@ abstract class AbstractDataTableScan implements DataTableScan
{
this.snapshotReader = snapshotReader;
}
- @VisibleForTesting
+ @Override
public AbstractDataTableScan withBucket(int bucket) {
snapshotReader.withBucket(bucket);
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index f7d609187d..1c7b153184 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -47,6 +47,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withBucket(int bucket) {
+ return this;
+ }
+
default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index d12de1211b..212d08d907 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -100,6 +100,8 @@ public interface ReadBuilder extends Serializable {
/** Push partition filter. */
ReadBuilder withPartitionFilter(Map<String, String> partitionSpec);
+ ReadBuilder withBucket(int bucket);
+
/**
* Push bucket filter. Note that this method cannot be used simultaneously
with {@link
* #withShard(int, int)}.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 95bfe6f24b..f86e742269 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -47,6 +47,7 @@ public class ReadBuilderImpl implements ReadBuilder {
private Map<String, String> partitionSpec;
+ private @Nullable Integer specifiedBucket = null;
private Filter<Integer> bucketFilter;
private @Nullable RowType readType;
@@ -120,6 +121,12 @@ public class ReadBuilderImpl implements ReadBuilder {
return this;
}
+ @Override
+ public ReadBuilder withBucket(int bucket) {
+ this.specifiedBucket = bucket;
+ return this;
+ }
+
@Override
public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
this.bucketFilter = bucketFilter;
@@ -161,6 +168,9 @@ public class ReadBuilderImpl implements ReadBuilder {
"Unsupported table scan type for shard configuring,
the scan is: " + scan);
}
}
+ if (specifiedBucket != null) {
+ scan.withBucket(specifiedBucket);
+ }
if (bucketFilter != null) {
scan.withBucketFilter(bucketFilter);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
similarity index 64%
copy from
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
copy to
paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
index fe791e5638..10a31bd67b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/LegacyManifestFileMetaSerializerPaimon10.java
@@ -22,15 +22,34 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.VersionedObjectSerializer;
-/** Serializer for {@link ManifestFileMeta}. */
-public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<ManifestFileMeta> {
+import java.util.Arrays;
+
+/** Legacy serializer for {@link ManifestFileMeta} in Paimon 1.0. */
+public class LegacyManifestFileMetaSerializerPaimon10
+ extends VersionedObjectSerializer<ManifestFileMeta> {
private static final long serialVersionUID = 1L;
- public ManifestFileMetaSerializer() {
- super(ManifestFileMeta.SCHEMA);
+ public static final RowType SCHEMA =
+ new RowType(
+ false,
+ Arrays.asList(
+ new DataField(
+ 0, "_FILE_NAME", new VarCharType(false,
Integer.MAX_VALUE)),
+ new DataField(1, "_FILE_SIZE", new
BigIntType(false)),
+ new DataField(2, "_NUM_ADDED_FILES", new
BigIntType(false)),
+ new DataField(3, "_NUM_DELETED_FILES", new
BigIntType(false)),
+ new DataField(4, "_PARTITION_STATS",
SimpleStats.SCHEMA),
+ new DataField(5, "_SCHEMA_ID", new
BigIntType(false))));
+
+ public LegacyManifestFileMetaSerializerPaimon10() {
+ super(SCHEMA);
}
@Override
@@ -60,12 +79,17 @@ public class ManifestFileMetaSerializer extends
VersionedObjectSerializer<Manife
}
throw new IllegalArgumentException("Unsupported version: " +
version);
}
+
return new ManifestFileMeta(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
row.getLong(3),
SimpleStats.fromRow(row.getRow(4, 3)),
- row.getLong(5));
+ row.getLong(5),
+ null,
+ null,
+ null,
+ null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index a6aaf3530f..875aa7a2b4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -25,8 +25,10 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.VersionedObjectSerializer;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
@@ -84,6 +86,67 @@ public class ManifestListTest {
assertThat(manifestListName.startsWith("manifest-list-")).isTrue();
}
+ // ============================ Compatibility tests
===================================
+
+ @Test
+ public void testCanReadOldMetaPaimon10() throws Exception {
+ ManifestList legacyManifestList = createLegacyManifestListPaimon10();
+ List<ManifestFileMeta> metas = generateData();
+ String manifestListName = legacyManifestList.write(metas).getKey();
+
+ ManifestList manifestList = createManifestList(tempDir.toString());
+ List<ManifestFileMeta> actualMetas =
manifestList.read(manifestListName);
+ assertThat(actualMetas).isEqualTo(getLegacyMetaPaimon10(metas));
+ }
+
+ @Test
+ public void testOldReaderCanReadNewMetaPaimon10() throws Exception {
+ ManifestList manifestList = createManifestList(tempDir.toString());
+ List<ManifestFileMeta> metas = generateData();
+ String manifestListName = manifestList.write(metas).getKey();
+
+ ManifestList legacyManifestList = createLegacyManifestListPaimon10();
+ List<ManifestFileMeta> actualMetas =
legacyManifestList.read(manifestListName);
+ assertThat(actualMetas).isEqualTo(getLegacyMetaPaimon10(metas));
+ }
+
+ private ManifestList createLegacyManifestListPaimon10() {
+ FileStorePathFactory pathFactory =
createPathFactory(tempDir.toString());
+ RowType legacyMetaType =
+ VersionedObjectSerializer.versionType(
+ LegacyManifestFileMetaSerializerPaimon10.SCHEMA);
+ return new ManifestList(
+ LocalFileIO.create(),
+ new LegacyManifestFileMetaSerializerPaimon10(),
+ legacyMetaType,
+ avro.createReaderFactory(legacyMetaType),
+ avro.createWriterFactory(legacyMetaType),
+ "zstd",
+ pathFactory.manifestListFactory(),
+ null);
+ }
+
+ private List<ManifestFileMeta>
getLegacyMetaPaimon10(List<ManifestFileMeta> metas) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ for (ManifestFileMeta meta : metas) {
+ result.add(
+ new ManifestFileMeta(
+ meta.fileName(),
+ meta.fileSize(),
+ meta.numAddedFiles(),
+ meta.numDeletedFiles(),
+ meta.partitionStats(),
+ meta.schemaId(),
+ null,
+ null,
+ null,
+ null));
+ }
+ return result;
+ }
+
+ // ============================ Test utils
===================================
+
private List<ManifestFileMeta> generateData() {
Random random = new Random();
List<ManifestFileMeta> metas = new ArrayList<>();
@@ -97,22 +160,25 @@ public class ManifestListTest {
return metas;
}
+ private FileStorePathFactory createPathFactory(String pathStr) {
+ return new FileStorePathFactory(
+ new Path(pathStr),
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ "default",
+ CoreOptions.FILE_FORMAT.defaultValue().toString(),
+ CoreOptions.DATA_FILE_PREFIX.defaultValue(),
+ CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+ CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+ CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+ CoreOptions.FILE_COMPRESSION.defaultValue(),
+ null,
+ null);
+ }
+
private ManifestList createManifestList(String pathStr) {
- Path path = new Path(pathStr);
- FileStorePathFactory pathFactory =
- new FileStorePathFactory(
- path,
- TestKeyValueGenerator.DEFAULT_PART_TYPE,
- "default",
- CoreOptions.FILE_FORMAT.defaultValue().toString(),
- CoreOptions.DATA_FILE_PREFIX.defaultValue(),
- CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
-
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
- CoreOptions.FILE_COMPRESSION.defaultValue(),
- null,
- null);
- return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd",
pathFactory, null)
+ FileStorePathFactory pathFactory = createPathFactory(pathStr);
+ return new ManifestList.Factory(
+ FileIOFinder.find(new Path(pathStr)), avro, "zstd",
pathFactory, null)
.create();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
index 0283eda853..959d3f9e3e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java
@@ -63,7 +63,7 @@ public class ManifestTestDataGenerator {
}
public ManifestEntry next() {
- if (bufferedResults.size() > 0) {
+ if (!bufferedResults.isEmpty()) {
return bufferedResults.poll();
}
@@ -99,6 +99,10 @@ public class ManifestTestDataGenerator {
long numAddedFiles = 0;
long numDeletedFiles = 0;
+ int minBucket = Integer.MAX_VALUE;
+ int maxBucket = Integer.MIN_VALUE;
+ int minLevel = Integer.MAX_VALUE;
+ int maxLevel = Integer.MIN_VALUE;
for (ManifestEntry entry : entries) {
collector.collect(entry.partition());
if (entry.kind() == FileKind.ADD) {
@@ -106,6 +110,10 @@ public class ManifestTestDataGenerator {
} else {
numDeletedFiles++;
}
+ minBucket = Math.min(minBucket, entry.bucket());
+ maxBucket = Math.max(maxBucket, entry.bucket());
+ minLevel = Math.min(minLevel, entry.level());
+ maxLevel = Math.max(maxLevel, entry.level());
}
return new ManifestFileMeta(
@@ -114,7 +122,11 @@ public class ManifestTestDataGenerator {
numAddedFiles,
numDeletedFiles,
serializer.toBinaryAllMode(collector.extract()),
- 0);
+ 0,
+ minBucket,
+ maxBucket,
+ minLevel,
+ maxLevel);
}
private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 9722847852..91bb30b45b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -41,7 +41,6 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
@@ -58,7 +57,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
@@ -248,9 +246,9 @@ public class CompactAction extends TableActionBase {
List<BinaryRow> partitions =
fileStoreTable
- .newScan()
- .withBucketFilter(new PostponeBucketFilter())
- .listPartitions();
+ .newSnapshotReader()
+ .withBucket(BucketMode.POSTPONE_BUCKET)
+ .partitions();
if (partitions.isEmpty()) {
return false;
}
@@ -266,10 +264,8 @@ public class CompactAction extends TableActionBase {
int bucketNum =
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
Iterator<ManifestEntry> it =
- fileStoreTable
- .newSnapshotReader()
+ table.newSnapshotReader()
.withPartitionFilter(Collections.singletonList(partition))
- .withBucketFilter(new NormalBucketFilter())
.readFileIterator();
if (it.hasNext()) {
bucketNum = it.next().totalBuckets();
@@ -289,7 +285,7 @@ public class CompactAction extends TableActionBase {
realTable
.newReadBuilder()
.withPartitionFilter(partitionSpec)
- .withBucketFilter(new
PostponeBucketFilter()),
+ .withBucket(BucketMode.POSTPONE_BUCKET),
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
DataStream<InternalRow> partitioned =
@@ -314,26 +310,6 @@ public class CompactAction extends TableActionBase {
return true;
}
- private static class PostponeBucketFilter implements Filter<Integer>,
Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean test(Integer bucket) {
- return bucket == BucketMode.POSTPONE_BUCKET;
- }
- }
-
- private static class NormalBucketFilter implements Filter<Integer>,
Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean test(Integer bucket) {
- return bucket >= 0;
- }
- }
-
@Override
public void run() throws Exception {
if (buildImpl()) {