This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e706fe8e9 [spark] Support spark compaction for postpone bucket table 
(#7169)
3e706fe8e9 is described below

commit 3e706fe8e967d7f8e714904199b2922beb1ee259
Author: Juntao Zhang <[email protected]>
AuthorDate: Mon Feb 2 22:07:57 2026 +0800

    [spark] Support spark compaction for postpone bucket table (#7169)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../generated/flink_connector_configuration.html   |   6 -
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 +
 .../org/apache/paimon/postpone/BucketFiles.java    | 113 ++++++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java |   7 -
 .../apache/paimon/flink/action/CompactAction.java  |   2 +-
 .../PostponeBucketCommittableRewriter.java         |  84 +-------
 .../paimon/spark/procedure/CompactProcedure.java   |   5 +
 .../procedure/SparkPostponeCompactProcedure.scala  | 235 +++++++++++++++++++++
 .../paimon/spark/sql/PostponeBucketTableTest.scala | 103 +++++++++
 10 files changed, 475 insertions(+), 97 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9e4d3f0b33..592b82c58c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1031,6 +1031,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>Boolean</td>
             <td>Whether to write the data into fixed bucket for batch writing 
a postpone bucket table.</td>
         </tr>
+        <tr>
+            <td><h5>postpone.default-bucket-num</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>Bucket number for the partitions compacted for the first time 
in postpone bucket tables.</td>
+        </tr>
         <tr>
             <td><h5>primary-key</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 9f4058ba01..1003aff77c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -128,12 +128,6 @@ under the License.
             <td>Duration</td>
             <td>You can specify time interval for partition, for example, 
daily partition is '1 d', hourly partition is '1 h'.</td>
         </tr>
-        <tr>
-            <td><h5>postpone.default-bucket-num</h5></td>
-            <td style="word-wrap: break-word;">1</td>
-            <td>Integer</td>
-            <td>Bucket number for the partitions compacted for the first time 
in postpone bucket tables.</td>
-        </tr>
         <tr>
             <td><h5>precommit-compact</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 405098067d..54633b930c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2164,6 +2164,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to write the data into fixed bucket for 
batch writing a postpone bucket table.");
 
+    public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
+            key("postpone.default-bucket-num")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Bucket number for the partitions compacted for 
the first time in postpone bucket tables.");
+
     public static final ConfigOption<Long> GLOBAL_INDEX_ROW_COUNT_PER_SHARD =
             key("global-index.row-count-per-shard")
                     .longType()
@@ -3394,6 +3401,10 @@ public class CoreOptions implements Serializable {
         return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
     }
 
+    public int postponeDefaultBucketNum() {
+        return options.get(POSTPONE_DEFAULT_BUCKET_NUM);
+    }
+
     public long globalIndexRowCountPerShard() {
         return options.get(GLOBAL_INDEX_ROW_COUNT_PER_SHARD);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/BucketFiles.java 
b/paimon-core/src/main/java/org/apache/paimon/postpone/BucketFiles.java
new file mode 100644
index 0000000000..51212f708e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/postpone/BucketFiles.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.postpone;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A utility class to track and manage file changes for a specific bucket in a 
postpone bucket
+ * table.
+ */
+public class BucketFiles {
+    private final DataFilePathFactory pathFactory;
+    private final FileIO fileIO;
+
+    private @Nullable Integer totalBuckets;
+    private final Map<String, DataFileMeta> newFiles;
+    private final List<DataFileMeta> compactBefore;
+    private final List<DataFileMeta> compactAfter;
+    private final List<DataFileMeta> changelogFiles;
+    private final List<IndexFileMeta> newIndexFiles;
+    private final List<IndexFileMeta> deletedIndexFiles;
+
+    public BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
+        this.pathFactory = pathFactory;
+        this.fileIO = fileIO;
+
+        this.newFiles = new LinkedHashMap<>();
+        this.compactBefore = new ArrayList<>();
+        this.compactAfter = new ArrayList<>();
+        this.changelogFiles = new ArrayList<>();
+        this.newIndexFiles = new ArrayList<>();
+        this.deletedIndexFiles = new ArrayList<>();
+    }
+
+    public void update(CommitMessageImpl message) {
+        totalBuckets = message.totalBuckets();
+
+        for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
+            newFiles.put(file.fileName(), file);
+        }
+
+        Map<String, Path> toDelete = new HashMap<>();
+        for (DataFileMeta file : message.compactIncrement().compactBefore()) {
+            if (newFiles.containsKey(file.fileName())) {
+                toDelete.put(file.fileName(), pathFactory.toPath(file));
+                newFiles.remove(file.fileName());
+            } else {
+                compactBefore.add(file);
+            }
+        }
+
+        for (DataFileMeta file : message.compactIncrement().compactAfter()) {
+            compactAfter.add(file);
+            toDelete.remove(file.fileName());
+        }
+
+        changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
+        changelogFiles.addAll(message.compactIncrement().changelogFiles());
+
+        newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
+        
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
+
+        toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
+    }
+
+    public CommitMessageImpl makeMessage(BinaryRow partition, int bucket) {
+        List<DataFileMeta> realCompactAfter = new 
ArrayList<>(newFiles.values());
+        realCompactAfter.addAll(compactAfter);
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                totalBuckets,
+                DataIncrement.emptyIncrement(),
+                new CompactIncrement(
+                        compactBefore,
+                        realCompactAfter,
+                        changelogFiles,
+                        newIndexFiles,
+                        deletedIndexFiles));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index b5d40bf3c8..8febafc5e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -447,13 +447,6 @@ public class FlinkConnectorOptions {
                             "Bounded mode for Paimon consumer. "
                                     + "By default, Paimon automatically 
selects bounded mode based on the mode of the Flink job.");
 
-    public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
-            key("postpone.default-bucket-num")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription(
-                            "Bucket number for the partitions compacted for 
the first time in postpone bucket tables.");
-
     public static final ConfigOption<Boolean> SCAN_DEDICATED_SPLIT_GENERATION =
             key("scan.dedicated-split-generation")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 3be60281c0..a58e8a393d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -290,7 +290,7 @@ public class CompactAction extends TableActionBase {
                 "Postpone bucket compaction currently does not support 
predicates");
 
         Options options = new Options(table.options());
-        int defaultBucketNum = 
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+        int defaultBucketNum = 
options.get(CoreOptions.POSTPONE_DEFAULT_BUCKET_NUM);
 
         // change bucket to a positive value, so we can scan files from the 
bucket = -2 directory
         Map<String, String> bucketOptions = new HashMap<>(table.options());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
index d26a637bc4..40c345ebb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
@@ -20,22 +20,13 @@ package org.apache.paimon.flink.postpone;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.postpone.BucketFiles;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.FileStorePathFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -84,77 +75,4 @@ public class PostponeBucketCommittableRewriter {
         buckets.clear();
         return result;
     }
-
-    private static class BucketFiles {
-
-        private final DataFilePathFactory pathFactory;
-        private final FileIO fileIO;
-
-        private @Nullable Integer totalBuckets;
-        private final Map<String, DataFileMeta> newFiles;
-        private final List<DataFileMeta> compactBefore;
-        private final List<DataFileMeta> compactAfter;
-        private final List<DataFileMeta> changelogFiles;
-        private final List<IndexFileMeta> newIndexFiles;
-        private final List<IndexFileMeta> deletedIndexFiles;
-
-        private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
-            this.pathFactory = pathFactory;
-            this.fileIO = fileIO;
-
-            this.newFiles = new LinkedHashMap<>();
-            this.compactBefore = new ArrayList<>();
-            this.compactAfter = new ArrayList<>();
-            this.changelogFiles = new ArrayList<>();
-            this.newIndexFiles = new ArrayList<>();
-            this.deletedIndexFiles = new ArrayList<>();
-        }
-
-        private void update(CommitMessageImpl message) {
-            totalBuckets = message.totalBuckets();
-
-            for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
-                newFiles.put(file.fileName(), file);
-            }
-
-            Map<String, Path> toDelete = new HashMap<>();
-            for (DataFileMeta file : 
message.compactIncrement().compactBefore()) {
-                if (newFiles.containsKey(file.fileName())) {
-                    toDelete.put(file.fileName(), pathFactory.toPath(file));
-                    newFiles.remove(file.fileName());
-                } else {
-                    compactBefore.add(file);
-                }
-            }
-
-            for (DataFileMeta file : 
message.compactIncrement().compactAfter()) {
-                compactAfter.add(file);
-                toDelete.remove(file.fileName());
-            }
-
-            
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
-            changelogFiles.addAll(message.compactIncrement().changelogFiles());
-
-            newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
-            
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
-
-            toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
-        }
-
-        private CommitMessageImpl makeMessage(BinaryRow partition, int bucket) 
{
-            List<DataFileMeta> realCompactAfter = new 
ArrayList<>(newFiles.values());
-            realCompactAfter.addAll(compactAfter);
-            return new CommitMessageImpl(
-                    partition,
-                    bucket,
-                    totalBuckets,
-                    DataIncrement.emptyIncrement(),
-                    new CompactIncrement(
-                            compactBefore,
-                            realCompactAfter,
-                            changelogFiles,
-                            newIndexFiles,
-                            deletedIndexFiles));
-        }
-    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 316e65f3d1..0fe8a76b86 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -278,6 +278,11 @@ public class CompactProcedure extends BaseProcedure {
                                 table, partitionPredicate, partitionIdleTime, 
javaSparkContext);
                     }
                     break;
+                case POSTPONE_MODE:
+                    SparkPostponeCompactProcedure.apply(
+                                    table, spark(), partitionPredicate, 
relation)
+                            .execute();
+                    break;
                 default:
                     throw new UnsupportedOperationException(
                             "Spark compact with " + bucketMode + " is not 
support yet.");
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
new file mode 100644
index 0000000000..572e1cf96c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.BUCKET
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.postpone.BucketFiles
+import org.apache.paimon.spark.commands.{EncoderSerDeGroup, 
PostponeFixBucketProcessor}
+import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
+import org.apache.paimon.spark.util.{ScanPlanHelper, SparkRowUtils}
+import org.apache.paimon.spark.write.{PaimonDataWrite, WriteTaskResult}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, PostponeUtils}
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import javax.annotation.Nullable
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Spark procedure helper for compacting postpone bucket tables. This provides 
the implementation
+ * for Postpone Bucket mode compaction.
+ */
+case class SparkPostponeCompactProcedure(
+    table: FileStoreTable,
+    @transient spark: SparkSession,
+    @Nullable partitionPredicate: PartitionPredicate,
+    @transient relation: DataSourceV2Relation) {
+  private val LOG = LoggerFactory.getLogger(getClass)
+
+  // Unlike `PostponeUtils.tableForFixBucketWrite`, here explicitly set bucket 
to 1 without
+  // WRITE_ONLY to enable proper compaction logic.
+  private lazy val realTable = table.copy(Map(BUCKET.key -> "1").asJava)
+
+  // Create bucket computer to determine bucket count for each partition
+  private lazy val postponePartitionBucketComputer = {
+    val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
+    val defaultBucketNum =
+      if 
(table.coreOptions.toConfiguration.contains(CoreOptions.POSTPONE_DEFAULT_BUCKET_NUM))
 {
+        table.coreOptions.postponeDefaultBucketNum
+      } else {
+        spark.sparkContext.defaultParallelism
+      }
+    (p: BinaryRow) => knownNumBuckets.getOrDefault(p, defaultBucketNum)
+  }
+
+  private def partitionCols(df: DataFrame): Seq[Column] = {
+    val inputSchema = df.schema
+    val tableSchema = table.schema
+    tableSchema
+      .partitionKeys()
+      .asScala
+      .map(tableSchema.fieldNames().indexOf(_))
+      .map(x => col(inputSchema.fieldNames(x)))
+      .toSeq
+  }
+
+  private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
+    df.repartition(partitionCols(df) ++ Seq(col(BUCKET_COL)): _*)
+  }
+
+  // write data with the bucket processor
+  private def writeWithBucketProcessor(
+      repartitioned: DataFrame,
+      bucketColIdx: Int,
+      schema: StructType) = {
+    import spark.implicits._
+    val rowKindColIdx = SparkRowUtils.getFieldIndex(schema, ROW_KIND_COL)
+    val writeBuilder = realTable.newBatchWriteBuilder
+    val rowType = table.rowType()
+    val coreOptions = table.coreOptions()
+
+    def newWrite() = PaimonDataWrite(
+      writeBuilder,
+      rowType,
+      rowKindColIdx,
+      writeRowTracking = coreOptions.dataEvolutionEnabled(),
+      Option.apply(coreOptions.fullCompactionDeltaCommits()),
+      None,
+      coreOptions.blobAsDescriptor(),
+      table.catalogEnvironment().catalogContext(),
+      Some(postponePartitionBucketComputer)
+    )
+
+    repartitioned.mapPartitions {
+      iter =>
+        {
+          val write = newWrite()
+          try {
+            iter.foreach(row => write.write(row, row.getInt(bucketColIdx)))
+            Iterator.apply(write.commit)
+          } finally {
+            write.close()
+          }
+        }
+    }
+  }
+
+  /** Creates a new BucketFiles instance for tracking file changes */
+  private def createBucketFiles(partition: BinaryRow, bucket: Int): 
BucketFiles = {
+    new BucketFiles(
+      table.store.pathFactory.createDataFilePathFactory(partition, bucket),
+      table.fileIO()
+    )
+  }
+
+  def execute(): Unit = {
+    LOG.info("Starting postpone bucket compaction for table: {}.", 
table.name())
+    // Validate input parameters - no partition predicates supported yet, same 
behavior with flink
+    assert(
+      partitionPredicate == null,
+      "Postpone bucket compaction currently does not support specifying 
partitions")
+
+    // Read data splits from the POSTPONE_BUCKET (-2)
+    val splits =
+      table.newSnapshotReader
+        .withBucket(BucketMode.POSTPONE_BUCKET)
+        .read
+        .dataSplits
+        .asScala
+
+    if (splits.isEmpty) {
+      LOG.info("Partition bucket is empty, no compact job to execute.")
+      return
+    }
+
+    // Prepare dataset for writing by combining all partitions
+    val datasetForWrite: Dataset[Row] = splits
+      .groupBy(_.partition)
+      .values
+      .map(
+        split => {
+          PaimonUtils
+            .createDataset(spark, 
ScanPlanHelper.createNewScanPlan(split.toArray, relation))
+        })
+      .reduce((a, b) => a.union(b))
+
+    val withInitBucketCol = datasetForWrite.withColumn(BUCKET_COL, lit(-1))
+    val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, 
BUCKET_COL)
+    val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
+
+    // Create processor to handle the actual bucket assignment
+    val processor = PostponeFixBucketProcessor(
+      table,
+      bucketColIdx,
+      encoderGroupWithBucketCol,
+      postponePartitionBucketComputer
+    )
+
+    val dataFrame = withInitBucketCol
+      
.mapPartitions(processor.processPartition)(encoderGroupWithBucketCol.encoder)
+      .toDF()
+    val repartition = repartitionByPartitionsAndBucket(dataFrame)
+    val written = writeWithBucketProcessor(repartition, bucketColIdx, 
withInitBucketCol.schema)
+
+    // Create commit messages for removing old postpone bucket files
+    val removeMessages = splits.map {
+      split =>
+        new CommitMessageImpl(
+          split.partition(),
+          split.bucket(),
+          split.totalBuckets(),
+          DataIncrement.emptyIncrement,
+          new CompactIncrement(
+            split.dataFiles(),
+            Collections.emptyList[DataFileMeta],
+            Collections.emptyList[DataFileMeta]
+          )
+        )
+    }
+
+    // Combine remove messages with new file write messages
+    val commitMessages = removeMessages ++ 
WriteTaskResult.merge(written.collect().toSeq)
+
+    // Process all commit messages using BucketFiles to track file changes
+    val buckets = new mutable.HashMap[BinaryRow, mutable.HashMap[Int, 
BucketFiles]]()
+    commitMessages.foreach {
+      message =>
+        val commitMessage = message.asInstanceOf[CommitMessageImpl]
+        buckets
+          .getOrElseUpdate(commitMessage.partition(), 
mutable.HashMap.empty[Int, BucketFiles])
+          .getOrElseUpdate(
+            commitMessage.bucket(),
+            createBucketFiles(commitMessage.partition(), 
commitMessage.bucket()))
+          .update(commitMessage)
+    }
+
+    val finalCommitMessages: Seq[CommitMessage] = buckets.iterator.flatMap {
+      case (partition, bucketMap) =>
+        bucketMap.iterator.map {
+          case (bucket, bucketFiles) =>
+            bucketFiles.makeMessage(partition, 
bucket).asInstanceOf[CommitMessage]
+        }
+    }.toSeq
+
+    // Commit the final result
+    try {
+      assert(finalCommitMessages.nonEmpty, "No commit message to commit")
+      val commitUser = table.coreOptions.createCommitUser
+      val commit = realTable.newCommit(commitUser)
+      commit.commit(finalCommitMessages.asJava)
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException("Failed to commit postpone bucket 
compaction result", e)
+    }
+    LOG.info("Successfully committed postpone bucket compaction for table: 
{}.", table.name())
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
index 43da68cd9e..07fe32308e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.sql.Row
@@ -188,4 +189,106 @@ class PostponeBucketTableTest extends PaimonSparkTestBase 
{
       }
     }
   }
+
+  test("Postpone bucket table: write postpone bucket then compact") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (
+            |  k INT,
+            |  v STRING
+            |) TBLPROPERTIES (
+            |  'primary-key' = 'k',
+            |  'bucket' = '-2',
+            |  'postpone.batch-write-fixed-bucket' = 'false'
+            |)
+            |""".stripMargin)
+
+      // write postpone bucket
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(4) */
+            |id AS k,
+            |CAST(id AS STRING) AS v
+            |FROM range (0, 1000)
+            |""".stripMargin)
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(0)))
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+        Seq(Row(-2))
+      )
+
+      sql("SET spark.default.parallelism = 2")
+      // compact
+      sql("CALL sys.compact(table => 't')")
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+      checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row((0 until 1000).sum)))
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+        Seq(Row(0), Row(1))
+      )
+    }
+  }
+
+  test("Postpone partition bucket table: write postpone bucket then compact") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (
+            |  k INT,
+            |  v STRING,
+            |  pt INT
+            |) PARTITIONED BY (pt)
+            |TBLPROPERTIES (
+            |  'primary-key' = 'k, pt',
+            |  'bucket' = '-2',
+            |  'postpone.default-bucket-num' = '3',
+            |  'changelog-producer' = 'lookup',
+            |  'snapshot.num-retained.min' = '5',
+            |  'snapshot.num-retained.max' = '5',
+            |  'postpone.batch-write-fixed-bucket' = 'false'
+            |)
+            |""".stripMargin)
+
+      // write postpone bucket
+      sql("""
+            |INSERT INTO t SELECT /*+ REPARTITION(4) */
+            |id AS k,
+            |CAST(id AS STRING) AS v,
+            |id % 2 AS pt
+            |FROM range (0, 1000)
+            |""".stripMargin)
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(0)))
+      checkAnswer(sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY 
bucket"), Seq(Row(-2)))
+
+      // compact
+      sql("SET spark.default.parallelism = 2")
+      sql("CALL sys.compact(table => 't')")
+
+      checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+      checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row((0 until 1000).sum)))
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+        Seq(Row(0), Row(1), Row(2))
+      )
+
+      val table = loadTable("t")
+      val files = table.fileIO.listStatus(new Path(table.location, 
"pt=0/bucket-0"))
+      assert(files.count(_.getPath.getName.startsWith("changelog-")) > 0)
+
+      for (i <- 2000 until 2020) {
+        spark.sql(s"INSERT INTO t (k, v, pt) VALUES ($i, '$i', ${i % 2})")
+      }
+
+      // Verify that snapshots are not automatically expired before compaction
+      checkAnswer(sql("SELECT count(1) FROM `t$snapshots`"), Seq(Row(22)))
+
+      sql("CALL sys.compact(table => 't')")
+      checkAnswer(
+        sql("SELECT distinct(bucket) FROM `t$buckets` where partition = '{0}' 
ORDER BY bucket"),
+        Seq(Row(0), Row(1), Row(2))
+      )
+      checkAnswer(sql("SELECT count(1) FROM `t$snapshots`"), Seq(Row(5)))
+    }
+  }
 }

Reply via email to