This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch bucket-stats in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 4772101a86fc34be8cd0a1b0e3c6a90112d721fb Author: tsreaper <[email protected]> AuthorDate: Wed Mar 26 11:14:10 2025 +0800 [fix] Fix comments --- .../org/apache/paimon/manifest/ManifestFile.java | 8 ++++- .../apache/paimon/manifest/ManifestFileMeta.java | 34 ++++++++++++++++++---- .../manifest/ManifestFileMetaSerializer.java | 8 +++-- .../paimon/operation/AbstractFileStoreScan.java | 19 ++++++------ .../apache/paimon/operation/ManifestsReader.java | 13 +++++---- .../manifest/ManifestFileMetaSerializerTest.java | 11 ++++++- .../paimon/manifest/ManifestTestDataGenerator.java | 14 +++++++-- .../apache/paimon/flink/action/CompactAction.java | 14 +-------- 8 files changed, 81 insertions(+), 40 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 2977b01b19..5453fbc107 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -131,6 +131,8 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> { private long schemaId = Long.MIN_VALUE; private int minBucket = Integer.MAX_VALUE; private int maxBucket = Integer.MIN_VALUE; + private int minLevel = Integer.MAX_VALUE; + private int maxLevel = Integer.MIN_VALUE; ManifestEntryWriter(FormatWriterFactory factory, Path path, String fileCompression) { super( @@ -161,6 +163,8 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> { schemaId = Math.max(schemaId, entry.file().schemaId()); minBucket = Math.min(minBucket, entry.bucket()); maxBucket = Math.max(maxBucket, entry.bucket()); + minLevel = Math.min(minLevel, entry.level()); + maxLevel = Math.max(maxLevel, entry.level()); partitionStatsCollector.collect(entry.partition()); } @@ -177,7 +181,9 @@ public class ManifestFile extends ObjectsFile<ManifestEntry> { ? schemaId : schemaManager.latest().get().id(), minBucket, - maxBucket); + maxBucket, + minLevel, + maxLevel); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index 59fa0d2127..aa0156360a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -52,7 +52,9 @@ public class ManifestFileMeta { new DataField(4, "_PARTITION_STATS", SimpleStats.SCHEMA), new DataField(5, "_SCHEMA_ID", new BigIntType(false)), new DataField(6, "_MIN_BUCKET", new IntType(true)), - new DataField(7, "_MAX_BUCKET", new IntType(true)))); + new DataField(7, "_MAX_BUCKET", new IntType(true)), + new DataField(8, "_MIN_LEVEL", new IntType(true)), + new DataField(9, "_MAX_LEVEL", new IntType(true)))); private final String fileName; private final long fileSize; @@ -62,6 +64,8 @@ public class ManifestFileMeta { private final long schemaId; private final @Nullable Integer minBucket; private final @Nullable Integer maxBucket; + private final @Nullable Integer minLevel; + private final @Nullable Integer maxLevel; public ManifestFileMeta( String fileName, @@ -71,7 +75,9 @@ public class ManifestFileMeta { SimpleStats partitionStats, long schemaId, @Nullable Integer minBucket, - @Nullable Integer maxBucket) { + @Nullable Integer maxBucket, + @Nullable Integer minLevel, + @Nullable Integer maxLevel) { this.fileName = fileName; this.fileSize = fileSize; this.numAddedFiles = numAddedFiles; @@ -80,6 +86,8 @@ public class ManifestFileMeta { this.schemaId = schemaId; this.minBucket = minBucket; this.maxBucket = maxBucket; + this.minLevel = minLevel; + this.maxLevel = maxLevel; } public String fileName() { @@ -114,6 +122,14 @@ public class ManifestFileMeta { return maxBucket; } + public @Nullable Integer minLevel() { + return minLevel; + } + + public @Nullable Integer maxLevel() { + return maxLevel; + } + @Override public boolean equals(Object o) { if (!(o instanceof ManifestFileMeta)) { @@ -127,7 +143,9 @@ public class ManifestFileMeta { && Objects.equals(partitionStats, that.partitionStats) && schemaId == that.schemaId && Objects.equals(minBucket, that.minBucket) - && Objects.equals(maxBucket, that.maxBucket); + && Objects.equals(maxBucket, that.maxBucket) + && Objects.equals(minLevel, that.minLevel) + && Objects.equals(maxLevel, that.maxLevel); } @Override @@ -140,13 +158,15 @@ public class ManifestFileMeta { partitionStats, schemaId, minBucket, - maxBucket); + maxBucket, + minLevel, + maxLevel); } @Override public String toString() { return String.format( - "{%s, %d, %d, %d, %s, %d, %s, %s}", + "{%s, %d, %d, %d, %s, %d, %s, %s, %s, %s}", fileName, fileSize, numAddedFiles, @@ -154,7 +174,9 @@ public class ManifestFileMeta { partitionStats, schemaId, minBucket, - maxBucket); + maxBucket, + minLevel, + maxLevel); } // ----------------------- Serialization ----------------------------- diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java index 3bd242ebf1..b4c8ac2d9b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMetaSerializer.java @@ -48,7 +48,9 @@ public class ManifestFileMetaSerializer extends VersionedObjectSerializer<Manife meta.partitionStats().toRow(), meta.schemaId(), meta.minBucket(), - meta.maxBucket()); + meta.maxBucket(), + meta.minLevel(), + meta.maxLevel()); } @Override @@ -68,6 +70,8 @@ public class ManifestFileMetaSerializer extends VersionedObjectSerializer<Manife SimpleStats.fromRow(row.getRow(4, 3)), row.getLong(5), version >= 3 ? row.getInt(6) : null, - version >= 3 ? row.getInt(7) : null); + version >= 3 ? row.getInt(7) : null, + version >= 3 ? row.getInt(8) : null, + version >= 3 ? row.getInt(9) : null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 534d1c237b..47a46518f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -78,8 +78,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final TableSchema schema; private Snapshot specifiedSnapshot = null; - private boolean onlyReadRealBuckets = false; - private Integer specifiedBucket = null; + private final Pair<Integer, Integer> specifiedBucketRange = + Pair.of(Integer.MIN_VALUE, Integer.MAX_VALUE); private Filter<Integer> bucketFilter = null; private BiFilter<Integer, Integer> totalAwareBucketFilter = null; protected ScanMode scanMode = ScanMode.ALL; @@ -134,14 +134,16 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { @Override public FileStoreScan onlyReadRealBuckets() { - this.onlyReadRealBuckets = true; + specifiedBucketRange.setLeft(Math.max(specifiedBucketRange.getLeft(), 0)); + manifestsReader.withBucketRange(specifiedBucketRange); return this; } @Override public FileStoreScan withBucket(int bucket) { - manifestsReader.withBucket(bucket); - specifiedBucket = bucket; + specifiedBucketRange.setLeft(Math.max(specifiedBucketRange.getLeft(), bucket)); + specifiedBucketRange.setRight(Math.min(specifiedBucketRange.getRight(), bucket)); + manifestsReader.withBucketRange(specifiedBucketRange); return this; } @@ -506,11 +508,8 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { } int bucket = bucketGetter.apply(row); - if (onlyReadRealBuckets && bucket < 0) { - return false; - } - - if (specifiedBucket != null && bucket != specifiedBucket) { + if (bucket < specifiedBucketRange.getLeft() + || bucket > specifiedBucketRange.getRight()) { return false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java index 8a5cdf50e3..faf3f1362f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java @@ -27,6 +27,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import javax.annotation.Nullable; @@ -49,7 +50,8 @@ public class ManifestsReader { private final ManifestList.Factory manifestListFactory; @Nullable private PartitionPredicate partitionFilter = null; - @Nullable private Integer specifiedBucket = null; + private Pair<Integer, Integer> specifiedBucketRange = + Pair.of(Integer.MIN_VALUE, Integer.MAX_VALUE); public ManifestsReader( RowType partitionType, @@ -62,8 +64,8 @@ public class ManifestsReader { this.manifestListFactory = manifestListFactory; } - public ManifestsReader withBucket(int bucket) { - this.specifiedBucket = bucket; + public ManifestsReader withBucketRange(Pair<Integer, Integer> bucketRange) { + this.specifiedBucketRange = bucketRange; return this; } @@ -131,8 +133,9 @@ public class ManifestsReader { private boolean filterManifestFileMeta(ManifestFileMeta manifest) { Integer minBucket = manifest.minBucket(); Integer maxBucket = manifest.maxBucket(); - if (specifiedBucket != null && minBucket != null && maxBucket != null) { - if (specifiedBucket < minBucket || specifiedBucket > maxBucket) { + if (minBucket != null && maxBucket != null) { + if (maxBucket < specifiedBucketRange.getLeft() + || minBucket > specifiedBucketRange.getRight()) { return false; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java index ae1a610be0..cfaf788d2c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaSerializerTest.java @@ -71,7 +71,16 @@ public class ManifestFileMetaSerializerTest extends ObjectSerializerTestBase<Man SimpleStats partitionStats = new SimpleStats(minFields, maxFields, nullCounts); ManifestFileMeta meta = new ManifestFileMeta( - "test-manifest-file", 1024, 5, 2, partitionStats, 1, null, null); + "test-manifest-file", + 1024, + 5, + 2, + partitionStats, + 1, + null, + null, + null, + null); byte[] v2Bytes = IOUtils.readFully( diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java index 83f82869ff..959d3f9e3e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestTestDataGenerator.java @@ -99,6 +99,10 @@ public class ManifestTestDataGenerator { long numAddedFiles = 0; long numDeletedFiles = 0; + int minBucket = Integer.MAX_VALUE; + int maxBucket = Integer.MIN_VALUE; + int minLevel = Integer.MAX_VALUE; + int maxLevel = Integer.MIN_VALUE; for (ManifestEntry entry : entries) { collector.collect(entry.partition()); if (entry.kind() == FileKind.ADD) { @@ -106,6 +110,10 @@ public class ManifestTestDataGenerator { } else { numDeletedFiles++; } + minBucket = Math.min(minBucket, entry.bucket()); + maxBucket = Math.max(maxBucket, entry.bucket()); + minLevel = Math.min(minLevel, entry.level()); + maxLevel = Math.max(maxLevel, entry.level()); } return new ManifestFileMeta( @@ -115,8 +123,10 @@ public class ManifestTestDataGenerator { numDeletedFiles, serializer.toBinaryAllMode(collector.extract()), 0, - 0, - 0); + minBucket, + maxBucket, + minLevel, + maxLevel); } private void mergeLevelsIfNeeded(BinaryRow partition, int bucket) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 79961e4340..900543cb5e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -266,10 +266,8 @@ public class CompactAction extends TableActionBase { int bucketNum = options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM); Iterator<ManifestEntry> it = - fileStoreTable - .newSnapshotReader() + table.newSnapshotReader() .withPartitionFilter(Collections.singletonList(partition)) - .withBucketFilter(new NormalBucketFilter()) .readFileIterator(); if (it.hasNext()) { bucketNum = it.next().totalBuckets(); @@ -314,16 +312,6 @@ public class CompactAction extends TableActionBase { return true; } - private static class NormalBucketFilter implements Filter<Integer>, Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public boolean test(Integer bucket) { - return bucket >= 0; - } - } - @Override public void run() throws Exception { if (buildImpl()) {
