This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit e72f3d7fcf5982c606147dd6f426d8c2008be22e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 13 10:03:40 2025 +0800

    [core] Postpone bucket should introduce a new BucketMode (#5592)
---
 .../main/java/org/apache/paimon/table/BucketMode.java  | 10 +++++++++-
 .../main/java/org/apache/paimon/KeyValueFileStore.java | 18 ++++++++++++------
 .../apache/paimon/table/AbstractFileStoreTable.java    |  4 ++++
 .../apache/paimon/table/PrimaryKeyFileStoreTable.java  | 11 -----------
 .../flink/lookup/PrimaryKeyPartialLookupTable.java     |  5 +++--
 .../apache/paimon/flink/sink/CompactorSinkBuilder.java |  1 -
 .../org/apache/paimon/flink/sink/FlinkSinkBuilder.java |  8 +++-----
 .../apache/paimon/flink/FlinkJobRecoveryITCase.java    |  3 +++
 .../scala/org/apache/paimon/spark/SparkTable.scala     |  3 +++
 .../paimon/spark/commands/PaimonSparkWriter.scala      |  3 +--
 10 files changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java 
b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
index 74ff34613e..277c4740c1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-common/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -57,7 +57,15 @@ public enum BucketMode {
      * Ignoring bucket concept, although all data is written to bucket-0, the 
parallelism of reads
      * and writes is unrestricted. This mode only works for append-only table.
      */
-    BUCKET_UNAWARE;
+    BUCKET_UNAWARE,
+
+    /**
+     * Configured by 'bucket' = '-2' (postpone bucket) for primary key table. 
This mode aims to
+     * solve the difficulty to determine a fixed number of buckets and support 
different buckets for
+     * different partitions. The bucket will be adaptively adjusted to the 
appropriate value in the
+     * background.
+     */
+    POSTPONE_MODE;
 
     public static final int UNAWARE_BUCKET = 0;
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 19ba34aa95..57f8db5f2d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -99,11 +99,15 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public BucketMode bucketMode() {
-        if (options.bucket() == -1) {
-            return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : 
BucketMode.HASH_DYNAMIC;
-        } else {
-            checkArgument(!crossPartitionUpdate);
-            return BucketMode.HASH_FIXED;
+        int bucket = options.bucket();
+        switch (bucket) {
+            case -2:
+                return BucketMode.POSTPONE_MODE;
+            case -1:
+                return crossPartitionUpdate ? BucketMode.CROSS_PARTITION : 
BucketMode.HASH_DYNAMIC;
+            default:
+                checkArgument(!crossPartitionUpdate);
+                return BucketMode.HASH_FIXED;
         }
     }
 
@@ -206,9 +210,11 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     protected KeyValueFileStoreScan newScan(ScanType scanType) {
+        BucketMode bucketMode = bucketMode();
         BucketSelectConverter bucketSelectConverter =
                 keyFilter -> {
-                    if (bucketMode() != BucketMode.HASH_FIXED) {
+                    if (bucketMode != BucketMode.HASH_FIXED
+                            && bucketMode != BucketMode.POSTPONE_MODE) {
                         return Optional.empty();
                     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index f68bd9615e..edd7a94db9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -39,6 +39,7 @@ import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketWriteSelector;
+import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.TableCommitImpl;
@@ -219,6 +220,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
             case HASH_FIXED:
                 return Optional.of(new FixedBucketWriteSelector(schema()));
             case BUCKET_UNAWARE:
+            case POSTPONE_MODE:
                 return Optional.empty();
             default:
                 throw new UnsupportedOperationException(
@@ -240,6 +242,8 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 return new DynamicBucketRowKeyExtractor(schema());
             case BUCKET_UNAWARE:
                 return new UnawareBucketRowKeyExtractor(schema());
+            case POSTPONE_MODE:
+                return new PostponeBucketRowKeyExtractor(schema());
             default:
                 throw new UnsupportedOperationException("Unsupported mode: " + 
bucketMode());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 99a4122ca2..c375bb2900 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -33,8 +33,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.PostponeBucketRowKeyExtractor;
-import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.KeyValueTableRead;
@@ -189,13 +187,4 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
             return super.newExpireRunnable();
         }
     }
-
-    @Override
-    public RowKeyExtractor createRowKeyExtractor() {
-        if (coreOptions().bucket() == BucketMode.POSTPONE_BUCKET) {
-            return new PostponeBucketRowKeyExtractor(schema());
-        } else {
-            return super.createRowKeyExtractor();
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 24fd87a213..43aeefc26a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -72,9 +72,10 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
     private PrimaryKeyPartialLookupTable(
             QueryExecutorFactory executorFactory, FileStoreTable table, 
List<String> joinKey) {
         this.executorFactory = executorFactory;
-        if (table.bucketMode() != BucketMode.HASH_FIXED) {
+        BucketMode bucketMode = table.bucketMode();
+        if (bucketMode != BucketMode.HASH_FIXED && bucketMode != 
BucketMode.POSTPONE_MODE) {
             throw new UnsupportedOperationException(
-                    "Unsupported mode for partial lookup: " + 
table.bucketMode());
+                    "Unsupported mode for partial lookup: " + bucketMode);
         }
 
         TableSchema schema = table.schema();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
index 2d84ae6726..8e1321a9f7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
@@ -55,7 +55,6 @@ public class CompactorSinkBuilder {
             case HASH_FIXED:
             case HASH_DYNAMIC:
                 return buildForBucketAware();
-            case BUCKET_UNAWARE:
             default:
                 throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 19203294be..e19096c581 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -235,12 +235,10 @@ public class FlinkSinkBuilder {
 
         BucketMode bucketMode = table.bucketMode();
         switch (bucketMode) {
+            case POSTPONE_MODE:
+                return buildPostponeBucketSink(input);
             case HASH_FIXED:
-                if (table.coreOptions().bucket() == 
BucketMode.POSTPONE_BUCKET) {
-                    return buildPostponeBucketSink(input);
-                } else {
-                    return buildForFixedBucket(input);
-                }
+                return buildForFixedBucket(input);
             case HASH_DYNAMIC:
                 return buildDynamicBucketSink(input, false);
             case CROSS_PARTITION:
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
index 8df379a71b..bad5998e39 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkJobRecoveryITCase.java
@@ -89,6 +89,9 @@ public class FlinkJobRecoveryITCase extends CatalogITCaseBase 
{
     @EnumSource(BucketMode.class)
     @Timeout(300)
     public void testRestoreFromSavepointWithJobGraphChange(BucketMode 
bucketMode) throws Exception {
+        if (bucketMode == BucketMode.POSTPONE_MODE) {
+            return;
+        }
         createTargetTable("target_table", bucketMode);
         String beforeRecoverSql =
                 "INSERT INTO `target_table` /*+ 
OPTIONS('sink.operator-uid.suffix'='test-uid') */ SELECT * FROM source_table1 
/*+ OPTIONS('source.operator-uid.suffix'='test-uid') */";
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index b9a90d8b5b..90e68353b2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -21,7 +21,10 @@ package org.apache.paimon.spark
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable, 
KnownSplitsTable, Table}
 import org.apache.paimon.table.{DataTable, FileStoreTable, KnownSplitsTable, 
Table}
+import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
 import org.apache.paimon.utils.StringUtils
 
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, 
TableCatalog}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a87668311e..780bbae25c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -235,8 +235,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
           )
         }
 
-      case BUCKET_UNAWARE =>
-        // Topology: input ->
+      case BUCKET_UNAWARE | POSTPONE_MODE =>
         writeWithoutBucket(data)
 
       case HASH_FIXED =>

Reply via email to