This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit e72f3d7fcf5982c606147dd6f426d8c2008be22e Author: Jingsong Lee <[email protected]> AuthorDate: Tue May 13 10:03:40 2025 +0800 [core] Postpone bucket should introduce a new BucketMode (#5592) --- .../main/java/org/apache/paimon/table/BucketMode.java | 10 +++++++++- .../main/java/org/apache/paimon/KeyValueFileStore.java | 18 ++++++++++++------ .../apache/paimon/table/AbstractFileStoreTable.java | 4 ++++ .../apache/paimon/table/PrimaryKeyFileStoreTable.java | 11 ----------- .../flink/lookup/PrimaryKeyPartialLookupTable.java | 5 +++-- .../apache/paimon/flink/sink/CompactorSinkBuilder.java | 1 - .../org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 8 +++----- .../apache/paimon/flink/FlinkJobRecoveryITCase.java | 3 +++ .../scala/org/apache/paimon/spark/SparkTable.scala | 3 +++ .../paimon/spark/commands/PaimonSparkWriter.scala | 3 +-- 10 files changed, 38 insertions(+), 28 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java index 74ff34613e..277c4740c1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java +++ b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java @@ -57,7 +57,15 @@ public enum BucketMode { * Ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads * and writes is unrestricted. This mode only works for append-only table. */ - BUCKET_UNAWARE; + BUCKET_UNAWARE, + + /** + * Configured by 'bucket' = '-2' (postpone bucket) for primary key table. This mode aims to + * solve the difficulty to determine a fixed number of buckets and support different buckets for + * different partitions. The bucket will be adaptively adjusted to the appropriate value in the + * background. + */ + POSTPONE_MODE; public static final int UNAWARE_BUCKET = 0; diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 19ba34aa95..57f8db5f2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -99,11 +99,15 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { @Override public BucketMode bucketMode() { - if (options.bucket() == -1) { - return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : BucketMode.HASH_DYNAMIC; - } else { - checkArgument(!crossPartitionUpdate); - return BucketMode.HASH_FIXED; + int bucket = options.bucket(); + switch (bucket) { + case -2: + return BucketMode.POSTPONE_MODE; + case -1: + return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : BucketMode.HASH_DYNAMIC; + default: + checkArgument(!crossPartitionUpdate); + return BucketMode.HASH_FIXED; } } @@ -206,9 +210,11 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { @Override protected KeyValueFileStoreScan newScan(ScanType scanType) { + BucketMode bucketMode = bucketMode(); BucketSelectConverter bucketSelectConverter = keyFilter -> { - if (bucketMode() != BucketMode.HASH_FIXED) { + if (bucketMode != BucketMode.HASH_FIXED + && bucketMode != BucketMode.POSTPONE_MODE) { return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index f68bd9615e..edd7a94db9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -39,6 +39,7 @@ import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor; import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor; import org.apache.paimon.table.sink.FixedBucketWriteSelector; +import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor; import org.apache.paimon.table.sink.RowKeyExtractor; import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.TableCommitImpl; @@ -219,6 +220,7 @@ abstract class AbstractFileStoreTable implements FileStoreTable { case HASH_FIXED: return Optional.of(new FixedBucketWriteSelector(schema())); case BUCKET_UNAWARE: + case POSTPONE_MODE: return Optional.empty(); default: throw new UnsupportedOperationException( @@ -240,6 +242,8 @@ abstract class AbstractFileStoreTable implements FileStoreTable { return new DynamicBucketRowKeyExtractor(schema()); case BUCKET_UNAWARE: return new UnawareBucketRowKeyExtractor(schema()); + case POSTPONE_MODE: + return new PostponeBucketRowKeyExtractor(schema()); default: throw new UnsupportedOperationException("Unsupported mode: " + bucketMode()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 99a4122ca2..c375bb2900 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -33,8 +33,6 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; -import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor; -import org.apache.paimon.table.sink.RowKeyExtractor; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; @@ -189,13 +187,4 @@ public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable { return super.newExpireRunnable(); } } - - @Override - public RowKeyExtractor createRowKeyExtractor() { - if (coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) { - return new PostponeBucketRowKeyExtractor(schema()); - } else { - return super.createRowKeyExtractor(); - } - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 24fd87a213..43aeefc26a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -72,9 +72,10 @@ public class PrimaryKeyPartialLookupTable implements LookupTable { private PrimaryKeyPartialLookupTable( QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) { this.executorFactory = executorFactory; - if (table.bucketMode() != BucketMode.HASH_FIXED) { + BucketMode bucketMode = table.bucketMode(); + if (bucketMode != BucketMode.HASH_FIXED && bucketMode != BucketMode.POSTPONE_MODE) { throw new UnsupportedOperationException( - "Unsupported mode for partial lookup: " + table.bucketMode()); + "Unsupported mode for partial lookup: " + bucketMode); } TableSchema schema = table.schema(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java index 2d84ae6726..8e1321a9f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java @@ -55,7 +55,6 @@ public class CompactorSinkBuilder { case HASH_FIXED: case HASH_DYNAMIC: return buildForBucketAware(); - case BUCKET_UNAWARE: default: throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 19203294be..e19096c581 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -235,12 +235,10 @@ public class FlinkSinkBuilder { BucketMode bucketMode = table.bucketMode(); switch (bucketMode) { + case POSTPONE_MODE: + return buildPostponeBucketSink(input); case HASH_FIXED: - if (table.coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) { - return buildPostponeBucketSink(input); - } else { - return buildForFixedBucket(input); - } + return buildForFixedBucket(input); case HASH_DYNAMIC: return buildDynamicBucketSink(input, false); case CROSS_PARTITION: diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java index 8df379a71b..bad5998e39 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java @@ -89,6 +89,9 @@ public class FlinkJobRecoveryITCase extends CatalogITCaseBase { @EnumSource(BucketMode.class) @Timeout(300) public void testRestoreFromSavepointWithJobGraphChange(BucketMode bucketMode) throws Exception { + if (bucketMode == BucketMode.POSTPONE_MODE) { + return; + } createTargetTable("target_table", bucketMode); String beforeRecoverSql = "INSERT INTO `target_table` /*+ OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1 /*+ OPTIONS('source.operator-uid.suffix'='test-uid') */"; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala index b9a90d8b5b..90e68353b2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala @@ -21,7 +21,10 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.options.Options import org.apache.paimon.spark.schema.PaimonMetadataColumn +import org.apache.paimon.spark.util.OptionUtils +import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable, KnownSplitsTable, Table} import org.apache.paimon.table.{DataTable, FileStoreTable, KnownSplitsTable, Table} +import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, POSTPONE_MODE} import org.apache.paimon.utils.StringUtils import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, TableCatalog} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index a87668311e..780bbae25c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -235,8 +235,7 @@ case class PaimonSparkWriter(table: FileStoreTable) { ) } - case BUCKET_UNAWARE => - // Topology: input -> + case BUCKET_UNAWARE | POSTPONE_MODE => writeWithoutBucket(data) case HASH_FIXED =>
