This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a6cdf8ccb0 [spark] Postpone bucket table supports to batch write fixed 
bucket (#6534)
a6cdf8ccb0 is described below

commit a6cdf8ccb0e7484a646fd766b981f66975ba3e0a
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Nov 5 17:41:02 2025 +0800

    [spark] Postpone bucket table supports to batch write fixed bucket (#6534)
---
 .../org/apache/paimon/table/PostponeUtils.java     |  18 ++
 .../paimon/table/sink/BatchWriteBuilder.java       |   5 +
 .../paimon/table/sink/BatchWriteBuilderImpl.java   |  13 ++
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java |   9 +-
 .../paimon/flink/sink/PostponeFixedBucketSink.java |  10 +-
 .../paimon/flink/PostponeBucketTableITCase.java    |   2 +-
 .../apache/paimon/spark/PaimonSparkTableBase.scala |   3 +-
 .../org/apache/paimon/spark/SparkTableWrite.scala  |  23 ++-
 .../paimon/spark/commands/BucketProcessor.scala    |  41 ++++-
 .../MergeIntoPaimonDataEvolutionTable.scala        |   2 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  43 ++++-
 .../paimon/spark/sql/PostponeBucketTableTest.scala | 191 +++++++++++++++++++++
 .../apache/paimon/spark/sql/SparkWriteITCase.scala |   3 +-
 13 files changed, 334 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
index 280dd16f00..097921df54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
@@ -21,10 +21,14 @@ package org.apache.paimon.table;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.SimpleFileEntry;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
+
 /** Utils for postpone table. */
 public class PostponeUtils {
 
@@ -49,4 +53,18 @@ public class PostponeUtils {
         }
         return knownNumBuckets;
     }
+
+    public static FileStoreTable tableForFixBucketWrite(FileStoreTable table) {
+        Map<String, String> batchWriteOptions = new HashMap<>();
+        batchWriteOptions.put(WRITE_ONLY.key(), "true");
+        // It's just used to create merge tree writer for writing files to 
fixed bucket.
+        // The real bucket number is determined at runtime.
+        batchWriteOptions.put(BUCKET.key(), "1");
+        return table.copy(batchWriteOptions);
+    }
+
+    public static FileStoreTable tableForCommit(FileStoreTable table) {
+        return table.copy(
+                Collections.singletonMap(BUCKET.key(), 
String.valueOf(BucketMode.POSTPONE_BUCKET)));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
index ee0091a7bf..a82c69b6ec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
 
 import javax.annotation.Nullable;
 
@@ -71,4 +72,8 @@ public interface BatchWriteBuilder extends WriteBuilder {
     /** Create a {@link TableCommit} to commit {@link CommitMessage}s. */
     @Override
     BatchTableCommit newCommit();
+
+    default BatchWriteBuilder copyWithNewTable(Table newTable) {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 445f48bafc..a0ff6e0e27 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.InnerTable;
+import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
@@ -45,6 +46,13 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
         this.commitUser = createCommitUser(new Options(table.options()));
     }
 
+    private BatchWriteBuilderImpl(
+            InnerTable table, String commitUser, @Nullable Map<String, String> 
staticPartition) {
+        this.table = table;
+        this.commitUser = commitUser;
+        this.staticPartition = staticPartition;
+    }
+
     @Override
     public String tableName() {
         return table.name();
@@ -80,4 +88,9 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
                         .orElse(true));
         return commit;
     }
+
+    @Override
+    public BatchWriteBuilder copyWithNewTable(Table newTable) {
+        return new BatchWriteBuilderImpl((InnerTable) newTable, commitUser, 
staticPartition);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index b7a3fc3ca1..01a034e78a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -56,8 +56,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.CoreOptions.BUCKET;
-import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.apache.paimon.CoreOptions.clusteringStrategy;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -315,12 +313,7 @@ public class FlinkSinkBuilder {
                             new 
PostponeFixedBucketChannelComputer(table.schema(), knownNumBuckets),
                             parallelism);
 
-            Map<String, String> batchWriteOptions = new HashMap<>();
-            batchWriteOptions.put(WRITE_ONLY.key(), "true");
-            // It's just used to create merge tree writer for writing files to 
fixed bucket.
-            // The real bucket number is determined at runtime.
-            batchWriteOptions.put(BUCKET.key(), "1");
-            FileStoreTable tableForWrite = table.copy(batchWriteOptions);
+            FileStoreTable tableForWrite = 
PostponeUtils.tableForFixBucketWrite(table);
 
             PostponeFixedBucketSink sink =
                     new PostponeFixedBucketSink(tableForWrite, 
overwritePartition, knownNumBuckets);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
index 92763c0cbe..3223ac50d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PostponeUtils;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -30,11 +30,8 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.paimon.CoreOptions.BUCKET;
-
 /** {@link FlinkSink} for writing records into fixed bucket of postpone table. 
*/
 public class PostponeFixedBucketSink extends FlinkWriteSink<InternalRow> {
 
@@ -76,10 +73,7 @@ public class PostponeFixedBucketSink extends 
FlinkWriteSink<InternalRow> {
         } else {
             // When overwriting, the postpone bucket files need to be deleted, 
so using a postpone
             // bucket table commit here
-            FileStoreTable tableForCommit =
-                    table.copy(
-                            Collections.singletonMap(
-                                    BUCKET.key(), 
String.valueOf(BucketMode.POSTPONE_BUCKET)));
+            FileStoreTable tableForCommit = 
PostponeUtils.tableForCommit(table);
             return context ->
                     new StoreCommitter(
                             tableForCommit,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index c6b832eebe..37b2096d56 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -1054,7 +1054,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    public void testWriteWritePostponeBucketThenWriteFixedBucket() throws 
Exception {
+    public void testWritePostponeBucketThenWriteFixedBucket() throws Exception 
{
         String warehouse = getTempDirPath();
         TableEnvironment tEnv =
                 tableEnvironmentBuilder()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index addaaa0831..859becf546 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -56,7 +56,8 @@ abstract class PaimonSparkTableBase(val table: Table)
         case storeTable: FileStoreTable =>
           storeTable.bucketMode() match {
             case HASH_FIXED => BucketFunction.supportsTable(storeTable)
-            case BUCKET_UNAWARE | POSTPONE_MODE => true
+            case BUCKET_UNAWARE => true
+            case POSTPONE_MODE if !coreOptions.postponeBatchWriteFixedBucket() 
=> true
             case _ => false
           }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 2fcd1f847c..4ce281b836 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.catalog.CatalogContext
+import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.disk.IOManager
 import org.apache.paimon.spark.util.SparkRowUtils
 import org.apache.paimon.spark.write.DataWriteHelper
@@ -38,19 +39,23 @@ case class SparkTableWrite(
     fullCompactionDeltaCommits: Option[Int],
     batchId: Long,
     blobAsDescriptor: Boolean,
-    catalogContext: CatalogContext)
+    catalogContext: CatalogContext,
+    postponePartitionBucketComputer: Option[BinaryRow => Integer])
   extends SparkTableWriteTrait
   with DataWriteHelper {
 
   private val ioManager: IOManager = SparkUtils.createIOManager
 
   val write: TableWriteImpl[Row] = {
-    val _write = writeBuilder.newWrite()
+    val _write = writeBuilder.newWrite().asInstanceOf[TableWriteImpl[Row]]
     _write.withIOManager(ioManager)
     if (writeRowTracking) {
       _write.withWriteType(writeType)
     }
-    _write.asInstanceOf[TableWriteImpl[Row]]
+    if (postponePartitionBucketComputer.isDefined) {
+      _write.getWrite.withIgnoreNumBucketCheck(true)
+    }
+    _write
   }
 
   private val toPaimonRow = {
@@ -78,7 +83,17 @@ case class SparkTableWrite(
             bytesWritten += dataFileMeta.fileSize()
             recordsWritten += dataFileMeta.rowCount()
         }
-        commitMessages += serializer.serialize(message)
+        val finalMessage = if (postponePartitionBucketComputer.isDefined) {
+          new CommitMessageImpl(
+            message.partition(),
+            message.bucket(),
+            postponePartitionBucketComputer.get.apply(message.partition()),
+            message.newFilesIncrement(),
+            message.compactIncrement())
+        } else {
+          message
+        }
+        commitMessages += serializer.serialize(finalMessage)
     }
     reportOutputMetrics(bytesWritten, recordsWritten)
     commitMessages.iterator
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
index 19494fc88d..e6cb3b55ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
@@ -18,10 +18,14 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.bucket.BucketFunction
+import org.apache.paimon.codegen.CodeGenUtils
 import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow}
 import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow => 
PaimonInternalRow, JoinedRow}
 import org.apache.paimon.disk.IOManager
-import org.apache.paimon.index.{HashBucketAssigner, PartitionIndex}
+import org.apache.paimon.index.HashBucketAssigner
+import org.apache.paimon.schema.TableSchema
 import org.apache.paimon.spark.{DataConverter, SparkRow}
 import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.spark.util.EncoderUtils
@@ -90,6 +94,41 @@ case class CommonBucketProcessor(
   }
 }
 
+case class PostponeFixBucketProcessor(
+    table: FileStoreTable,
+    bucketColIndex: Int,
+    encoderGroup: EncoderSerDeGroup,
+    postponePartitionBucketComputer: BinaryRow => Integer)
+  extends BucketProcessor[Row] {
+
+  def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
+    val rowType = table.rowType()
+    val bucketFunction =
+      BucketFunction.create(new CoreOptions(table.options), 
table.schema.logicalBucketKeyType)
+    val schema: TableSchema = table.schema
+    val partitionKeyExtractor = new RowPartitionKeyExtractor(schema)
+    val bucketKeyProjection =
+      CodeGenUtils.newProjection(schema.logicalRowType, 
schema.projection(schema.bucketKeys))
+
+    def getBucketId(row: PaimonInternalRow): Int = {
+      val partition = partitionKeyExtractor.partition(row)
+      val numBuckets = postponePartitionBucketComputer.apply(partition)
+      bucketFunction.bucket(bucketKeyProjection.apply(row), numBuckets)
+    }
+
+    new Iterator[Row] {
+      override def hasNext: Boolean = rowIterator.hasNext
+
+      override def next(): Row = {
+        val row = rowIterator.next
+        val sparkInternalRow = encoderGroup.rowToInternal(row)
+        sparkInternalRow.setInt(bucketColIndex, getBucketId(new 
SparkRow(rowType, row)))
+        encoderGroup.internalToRow(sparkInternalRow)
+      }
+    }
+  }
+}
+
 case class DynamicBucketProcessor(
     fileStoreTable: FileStoreTable,
     bucketColIndex: Int,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index e29b765105..11cd875413 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -46,7 +46,7 @@ import scala.collection.Searching.{search, Found, 
InsertionPoint}
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
-/** Command for Merge Into for Data Evolution paimom table. */
+/** Command for Merge Into for Data Evolution paimon table. */
 case class MergeIntoPaimonDataEvolutionTable(
     v2Table: SparkTable,
     targetTable: LogicalPlan,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index c88220988a..59faa0e03a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.{CoreOptions, Snapshot}
 import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY}
 import org.apache.paimon.codegen.CodeGenUtils
 import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
+import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.data.serializer.InternalSerializers
 import org.apache.paimon.deletionvectors.DeletionVector
 import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
@@ -36,7 +37,7 @@ import org.apache.paimon.spark.sort.TableSorter
 import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
 import org.apache.paimon.spark.util.SparkRowUtils
 import org.apache.paimon.spark.write.WriteHelper
-import org.apache.paimon.table.{FileStoreTable, SpecialFields}
+import org.apache.paimon.table.{FileStoreTable, PostponeUtils, SpecialFields}
 import org.apache.paimon.table.BucketMode._
 import org.apache.paimon.table.sink._
 import org.apache.paimon.types.{RowKind, RowType}
@@ -75,7 +76,17 @@ case class PaimonSparkWriter(
     }
   }
 
-  val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
+  val postponeBatchWriteFixedBucket: Boolean =
+    table.bucketMode() == POSTPONE_MODE && 
coreOptions.postponeBatchWriteFixedBucket()
+
+  val writeBuilder: BatchWriteBuilder = {
+    val tableForWrite = if (postponeBatchWriteFixedBucket) {
+      PostponeUtils.tableForFixBucketWrite(table)
+    } else {
+      table
+    }
+    tableForWrite.newBatchWriteBuilder()
+  }
 
   def writeOnly(): PaimonSparkWriter = {
     PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
@@ -104,6 +115,14 @@ case class PaimonSparkWriter(
     val rowKindColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, 
ROW_KIND_COL)
     val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, 
BUCKET_COL)
     val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
+    val postponePartitionBucketComputer: Option[BinaryRow => Integer] =
+      if (postponeBatchWriteFixedBucket) {
+        val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
+        val defaultPostponeNumBuckets = withInitBucketCol.rdd.getNumPartitions
+        Some((p: BinaryRow) => knownNumBuckets.getOrDefault(p, 
defaultPostponeNumBuckets))
+      } else {
+        None
+      }
 
     def newWrite() = SparkTableWrite(
       writeBuilder,
@@ -113,7 +132,8 @@ case class PaimonSparkWriter(
       fullCompactionDeltaCommits,
       batchId,
       coreOptions.blobAsDescriptor(),
-      table.catalogEnvironment().catalogContext()
+      table.catalogEnvironment().catalogContext(),
+      postponePartitionBucketComputer
     )
 
     def sparkParallelism = {
@@ -264,6 +284,16 @@ case class PaimonSparkWriter(
           )
         }
 
+      case POSTPONE_MODE if coreOptions.postponeBatchWriteFixedBucket() =>
+        // Topology: input -> bucket-assigner -> shuffle by partition & bucket
+        writeWithBucketProcessor(
+          withInitBucketCol,
+          PostponeFixBucketProcessor(
+            table,
+            bucketColIdx,
+            encoderGroupWithBucketCol,
+            postponePartitionBucketComputer.get))
+
       case BUCKET_UNAWARE | POSTPONE_MODE =>
         var input = data
         if (tableSchema.partitionKeys().size() > 0) {
@@ -378,7 +408,12 @@ case class PaimonSparkWriter(
   }
 
   def commit(commitMessages: Seq[CommitMessage]): Unit = {
-    val tableCommit = writeBuilder.newCommit()
+    val finalWriteBuilder = if (postponeBatchWriteFixedBucket) {
+      writeBuilder.copyWithNewTable(PostponeUtils.tableForCommit(table))
+    } else {
+      writeBuilder
+    }
+    val tableCommit = finalWriteBuilder.newCommit()
     try {
       tableCommit.commit(commitMessages.toList.asJava)
     } catch {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
new file mode 100644
index 0000000000..43da68cd9e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class PostponeBucketTableTest extends PaimonSparkTestBase {
+
+  test("Postpone bucket table: write with different bucket number") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (
+            |  k INT,
+            |  v STRING,
+            |  pt STRING
+            |) TBLPROPERTIES (
+            |  'primary-key' = 'k, pt',
+            |  'bucket' = '-2',
+            |  'postpone.batch-write-fixed-bucket' = 'true'
+            |)
+            |PARTITIONED BY (pt)
+            |""".stripMargin)
+
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(4) */
+            |id AS k,
+            |CAST(id AS STRING) AS v,
+            |CAST((1 + FLOOR(RAND() * 4)) AS STRING) AS pt -- pt in [1, 4]
+            |FROM range (0, 1000)
+            |""".stripMargin)
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+      checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+        Seq(Row(0), Row(1), Row(2), Row(3))
+      )
+
+      // Write to existing partition, the bucket number should not change
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(6) */
+            |id AS k,
+            |CAST(id AS STRING) AS v,
+            |'3' AS pt
+            |FROM range (100, 800)
+            |""".stripMargin)
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` WHERE partition = '{3}' 
ORDER BY bucket"),
+        Seq(Row(0), Row(1), Row(2), Row(3))
+      )
+
+      // Write to new partition, the bucket number should change
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(6) */
+            |id AS k,
+            |CAST(id AS STRING) AS v,
+            |'5' AS pt
+            |FROM range (100, 800)
+            |""".stripMargin)
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` WHERE partition = '{5}' 
ORDER BY bucket"),
+        Seq(Row(0), Row(1), Row(2), Row(3), Row(4), Row(5))
+      )
+    }
+  }
+
+  test("Postpone bucket table: write fix bucket then write postpone bucket") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (
+            |  k INT,
+            |  v STRING
+            |) TBLPROPERTIES (
+            |  'primary-key' = 'k',
+            |  'bucket' = '-2',
+            |  'postpone.batch-write-fixed-bucket' = 'true'
+            |)
+            |""".stripMargin)
+
+      // write fix bucket
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(4) */
+            |id AS k,
+            |CAST(id AS STRING) AS v
+            |FROM range (0, 1000)
+            |""".stripMargin)
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+      checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+        Seq(Row(0), Row(1), Row(2), Row(3))
+      )
+
+      // write postpone bucket
+      withSparkSQLConf("spark.paimon.postpone.batch-write-fixed-bucket" -> 
"false") {
+        sql("""
+              |INSERT INTO t SELECT /*+ REPARTITION(6) */
+              |id AS k,
+              |CAST(id AS STRING) AS v
+              |FROM range (0, 1000)
+              |""".stripMargin)
+        checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+        checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+        checkAnswer(
+          sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+          Seq(Row(-2), Row(0), Row(1), Row(2), Row(3))
+        )
+      }
+    }
+  }
+
+  test("Postpone bucket table: write postpone bucket then write fix bucket") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (
+            |  k INT,
+            |  v STRING
+            |) TBLPROPERTIES (
+            |  'primary-key' = 'k',
+            |  'bucket' = '-2',
+            |  'postpone.batch-write-fixed-bucket' = 'false'
+            |)
+            |""".stripMargin)
+
+      // write postpone bucket
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(4) */
+            |id AS k,
+            |CAST(id AS STRING) AS v
+            |FROM range (0, 1000)
+            |""".stripMargin)
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(0)))
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+        Seq(Row(-2))
+      )
+
+      // write fix bucket
+      withSparkSQLConf("spark.paimon.postpone.batch-write-fixed-bucket" -> 
"true") {
+        sql("""
+              |INSERT INTO t SELECT /*+ REPARTITION(6) */
+              |id AS k,
+              |CAST(id AS STRING) AS v
+              |FROM range (0, 1000)
+              |""".stripMargin)
+        checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+        checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+        checkAnswer(
+          sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+          Seq(Row(-2), Row(0), Row(1), Row(2), Row(3), Row(4), Row(5))
+        )
+      }
+
+      // overwrite fix bucket
+      withSparkSQLConf("spark.paimon.postpone.batch-write-fixed-bucket" -> 
"true") {
+        sql("""
+              |INSERT OVERWRITE t SELECT /*+ REPARTITION(8) */
+              |id AS k,
+              |CAST(id AS STRING) AS v
+              |FROM range (0, 500)
+              |""".stripMargin)
+        checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(500)))
+        checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(124750)))
+        checkAnswer(
+          sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+          Seq(Row(0), Row(1), Row(2), Row(3), Row(4), Row(5))
+        )
+      }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index 5c8f37f19e..96ada9d9eb 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -59,7 +59,8 @@ class SparkWriteITCase extends PaimonSparkTestBase {
                   |) TBLPROPERTIES (
                   | 'bucket' = '-2',
                   | 'primary-key' = 'id',
-                  | 'file.format' = 'parquet'
+                  | 'file.format' = 'parquet',
+                  | 'postpone.batch-write-fixed-bucket' = 'false'
                   |)
                   |""".stripMargin)
 

Reply via email to