This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3e706fe8e9 [spark] Support spark compaction for postpone bucket table
(#7169)
3e706fe8e9 is described below
commit 3e706fe8e967d7f8e714904199b2922beb1ee259
Author: Juntao Zhang <[email protected]>
AuthorDate: Mon Feb 2 22:07:57 2026 +0800
[spark] Support spark compaction for postpone bucket table (#7169)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../generated/flink_connector_configuration.html | 6 -
.../main/java/org/apache/paimon/CoreOptions.java | 11 +
.../org/apache/paimon/postpone/BucketFiles.java | 113 ++++++++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 -
.../apache/paimon/flink/action/CompactAction.java | 2 +-
.../PostponeBucketCommittableRewriter.java | 84 +-------
.../paimon/spark/procedure/CompactProcedure.java | 5 +
.../procedure/SparkPostponeCompactProcedure.scala | 235 +++++++++++++++++++++
.../paimon/spark/sql/PostponeBucketTableTest.scala | 103 +++++++++
10 files changed, 475 insertions(+), 97 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9e4d3f0b33..592b82c58c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1031,6 +1031,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td>Boolean</td>
<td>Whether to write the data into fixed bucket for batch writing
a postpone bucket table.</td>
</tr>
+ <tr>
+ <td><h5>postpone.default-bucket-num</h5></td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>Bucket number for the partitions compacted for the first time
in postpone bucket tables.</td>
+ </tr>
<tr>
<td><h5>primary-key</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 9f4058ba01..1003aff77c 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -128,12 +128,6 @@ under the License.
<td>Duration</td>
<td>You can specify time interval for partition, for example,
daily partition is '1 d', hourly partition is '1 h'.</td>
</tr>
- <tr>
- <td><h5>postpone.default-bucket-num</h5></td>
- <td style="word-wrap: break-word;">1</td>
- <td>Integer</td>
- <td>Bucket number for the partitions compacted for the first time
in postpone bucket tables.</td>
- </tr>
<tr>
<td><h5>precommit-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 405098067d..54633b930c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2164,6 +2164,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to write the data into fixed bucket for
batch writing a postpone bucket table.");
+ public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
+ key("postpone.default-bucket-num")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Bucket number for the partitions compacted for
the first time in postpone bucket tables.");
+
public static final ConfigOption<Long> GLOBAL_INDEX_ROW_COUNT_PER_SHARD =
key("global-index.row-count-per-shard")
.longType()
@@ -3394,6 +3401,10 @@ public class CoreOptions implements Serializable {
return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
}
+ public int postponeDefaultBucketNum() {
+ return options.get(POSTPONE_DEFAULT_BUCKET_NUM);
+ }
+
public long globalIndexRowCountPerShard() {
return options.get(GLOBAL_INDEX_ROW_COUNT_PER_SHARD);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/BucketFiles.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/BucketFiles.java
new file mode 100644
index 0000000000..51212f708e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/postpone/BucketFiles.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.postpone;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A utility class to track and manage file changes for a specific bucket in a
postpone bucket
+ * table.
+ */
+public class BucketFiles {
+ private final DataFilePathFactory pathFactory;
+ private final FileIO fileIO;
+
+ private @Nullable Integer totalBuckets;
+ private final Map<String, DataFileMeta> newFiles;
+ private final List<DataFileMeta> compactBefore;
+ private final List<DataFileMeta> compactAfter;
+ private final List<DataFileMeta> changelogFiles;
+ private final List<IndexFileMeta> newIndexFiles;
+ private final List<IndexFileMeta> deletedIndexFiles;
+
+ public BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
+ this.pathFactory = pathFactory;
+ this.fileIO = fileIO;
+
+ this.newFiles = new LinkedHashMap<>();
+ this.compactBefore = new ArrayList<>();
+ this.compactAfter = new ArrayList<>();
+ this.changelogFiles = new ArrayList<>();
+ this.newIndexFiles = new ArrayList<>();
+ this.deletedIndexFiles = new ArrayList<>();
+ }
+
+ public void update(CommitMessageImpl message) {
+ totalBuckets = message.totalBuckets();
+
+ for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
+ newFiles.put(file.fileName(), file);
+ }
+
+ Map<String, Path> toDelete = new HashMap<>();
+ for (DataFileMeta file : message.compactIncrement().compactBefore()) {
+ if (newFiles.containsKey(file.fileName())) {
+ toDelete.put(file.fileName(), pathFactory.toPath(file));
+ newFiles.remove(file.fileName());
+ } else {
+ compactBefore.add(file);
+ }
+ }
+
+ for (DataFileMeta file : message.compactIncrement().compactAfter()) {
+ compactAfter.add(file);
+ toDelete.remove(file.fileName());
+ }
+
+ changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
+ changelogFiles.addAll(message.compactIncrement().changelogFiles());
+
+ newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
+
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
+
+ toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
+ }
+
+ public CommitMessageImpl makeMessage(BinaryRow partition, int bucket) {
+ List<DataFileMeta> realCompactAfter = new
ArrayList<>(newFiles.values());
+ realCompactAfter.addAll(compactAfter);
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ totalBuckets,
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ compactBefore,
+ realCompactAfter,
+ changelogFiles,
+ newIndexFiles,
+ deletedIndexFiles));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index b5d40bf3c8..8febafc5e3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -447,13 +447,6 @@ public class FlinkConnectorOptions {
"Bounded mode for Paimon consumer. "
+ "By default, Paimon automatically
selects bounded mode based on the mode of the Flink job.");
- public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
- key("postpone.default-bucket-num")
- .intType()
- .defaultValue(1)
- .withDescription(
- "Bucket number for the partitions compacted for
the first time in postpone bucket tables.");
-
public static final ConfigOption<Boolean> SCAN_DEDICATED_SPLIT_GENERATION =
key("scan.dedicated-split-generation")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 3be60281c0..a58e8a393d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -290,7 +290,7 @@ public class CompactAction extends TableActionBase {
"Postpone bucket compaction currently does not support
predicates");
Options options = new Options(table.options());
- int defaultBucketNum =
options.get(FlinkConnectorOptions.POSTPONE_DEFAULT_BUCKET_NUM);
+ int defaultBucketNum =
options.get(CoreOptions.POSTPONE_DEFAULT_BUCKET_NUM);
// change bucket to a positive value, so we can scan files from the
bucket = -2 directory
Map<String, String> bucketOptions = new HashMap<>(table.options());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
index d26a637bc4..40c345ebb6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
@@ -20,22 +20,13 @@ package org.apache.paimon.flink.postpone;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.postpone.BucketFiles;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
-import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -84,77 +75,4 @@ public class PostponeBucketCommittableRewriter {
buckets.clear();
return result;
}
-
- private static class BucketFiles {
-
- private final DataFilePathFactory pathFactory;
- private final FileIO fileIO;
-
- private @Nullable Integer totalBuckets;
- private final Map<String, DataFileMeta> newFiles;
- private final List<DataFileMeta> compactBefore;
- private final List<DataFileMeta> compactAfter;
- private final List<DataFileMeta> changelogFiles;
- private final List<IndexFileMeta> newIndexFiles;
- private final List<IndexFileMeta> deletedIndexFiles;
-
- private BucketFiles(DataFilePathFactory pathFactory, FileIO fileIO) {
- this.pathFactory = pathFactory;
- this.fileIO = fileIO;
-
- this.newFiles = new LinkedHashMap<>();
- this.compactBefore = new ArrayList<>();
- this.compactAfter = new ArrayList<>();
- this.changelogFiles = new ArrayList<>();
- this.newIndexFiles = new ArrayList<>();
- this.deletedIndexFiles = new ArrayList<>();
- }
-
- private void update(CommitMessageImpl message) {
- totalBuckets = message.totalBuckets();
-
- for (DataFileMeta file : message.newFilesIncrement().newFiles()) {
- newFiles.put(file.fileName(), file);
- }
-
- Map<String, Path> toDelete = new HashMap<>();
- for (DataFileMeta file :
message.compactIncrement().compactBefore()) {
- if (newFiles.containsKey(file.fileName())) {
- toDelete.put(file.fileName(), pathFactory.toPath(file));
- newFiles.remove(file.fileName());
- } else {
- compactBefore.add(file);
- }
- }
-
- for (DataFileMeta file :
message.compactIncrement().compactAfter()) {
- compactAfter.add(file);
- toDelete.remove(file.fileName());
- }
-
-
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
- changelogFiles.addAll(message.compactIncrement().changelogFiles());
-
- newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
-
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
-
- toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
- }
-
- private CommitMessageImpl makeMessage(BinaryRow partition, int bucket)
{
- List<DataFileMeta> realCompactAfter = new
ArrayList<>(newFiles.values());
- realCompactAfter.addAll(compactAfter);
- return new CommitMessageImpl(
- partition,
- bucket,
- totalBuckets,
- DataIncrement.emptyIncrement(),
- new CompactIncrement(
- compactBefore,
- realCompactAfter,
- changelogFiles,
- newIndexFiles,
- deletedIndexFiles));
- }
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 316e65f3d1..0fe8a76b86 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -278,6 +278,11 @@ public class CompactProcedure extends BaseProcedure {
table, partitionPredicate, partitionIdleTime,
javaSparkContext);
}
break;
+ case POSTPONE_MODE:
+ SparkPostponeCompactProcedure.apply(
+ table, spark(), partitionPredicate,
relation)
+ .execute();
+ break;
default:
throw new UnsupportedOperationException(
"Spark compact with " + bucketMode + " is not
support yet.");
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
new file mode 100644
index 0000000000..572e1cf96c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.BUCKET
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.postpone.BucketFiles
+import org.apache.paimon.spark.commands.{EncoderSerDeGroup,
PostponeFixBucketProcessor}
+import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
+import org.apache.paimon.spark.util.{ScanPlanHelper, SparkRowUtils}
+import org.apache.paimon.spark.write.{PaimonDataWrite, WriteTaskResult}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, PostponeUtils}
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+import javax.annotation.Nullable
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Spark procedure helper for compacting postpone bucket tables. This provides
the implementation
+ * for Postpone Bucket mode compaction.
+ */
+case class SparkPostponeCompactProcedure(
+ table: FileStoreTable,
+ @transient spark: SparkSession,
+ @Nullable partitionPredicate: PartitionPredicate,
+ @transient relation: DataSourceV2Relation) {
+ private val LOG = LoggerFactory.getLogger(getClass)
+
+ // Unlike `PostponeUtils.tableForFixBucketWrite`, here explicitly set bucket
to 1 without
+ // WRITE_ONLY to enable proper compaction logic.
+ private lazy val realTable = table.copy(Map(BUCKET.key -> "1").asJava)
+
+ // Create bucket computer to determine bucket count for each partition
+ private lazy val postponePartitionBucketComputer = {
+ val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
+ val defaultBucketNum =
+ if
(table.coreOptions.toConfiguration.contains(CoreOptions.POSTPONE_DEFAULT_BUCKET_NUM))
{
+ table.coreOptions.postponeDefaultBucketNum
+ } else {
+ spark.sparkContext.defaultParallelism
+ }
+ (p: BinaryRow) => knownNumBuckets.getOrDefault(p, defaultBucketNum)
+ }
+
+ private def partitionCols(df: DataFrame): Seq[Column] = {
+ val inputSchema = df.schema
+ val tableSchema = table.schema
+ tableSchema
+ .partitionKeys()
+ .asScala
+ .map(tableSchema.fieldNames().indexOf(_))
+ .map(x => col(inputSchema.fieldNames(x)))
+ .toSeq
+ }
+
+ private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
+ df.repartition(partitionCols(df) ++ Seq(col(BUCKET_COL)): _*)
+ }
+
+ // write data with the bucket processor
+ private def writeWithBucketProcessor(
+ repartitioned: DataFrame,
+ bucketColIdx: Int,
+ schema: StructType) = {
+ import spark.implicits._
+ val rowKindColIdx = SparkRowUtils.getFieldIndex(schema, ROW_KIND_COL)
+ val writeBuilder = realTable.newBatchWriteBuilder
+ val rowType = table.rowType()
+ val coreOptions = table.coreOptions()
+
+ def newWrite() = PaimonDataWrite(
+ writeBuilder,
+ rowType,
+ rowKindColIdx,
+ writeRowTracking = coreOptions.dataEvolutionEnabled(),
+ Option.apply(coreOptions.fullCompactionDeltaCommits()),
+ None,
+ coreOptions.blobAsDescriptor(),
+ table.catalogEnvironment().catalogContext(),
+ Some(postponePartitionBucketComputer)
+ )
+
+ repartitioned.mapPartitions {
+ iter =>
+ {
+ val write = newWrite()
+ try {
+ iter.foreach(row => write.write(row, row.getInt(bucketColIdx)))
+ Iterator.apply(write.commit)
+ } finally {
+ write.close()
+ }
+ }
+ }
+ }
+
+ /** Creates a new BucketFiles instance for tracking file changes */
+ private def createBucketFiles(partition: BinaryRow, bucket: Int):
BucketFiles = {
+ new BucketFiles(
+ table.store.pathFactory.createDataFilePathFactory(partition, bucket),
+ table.fileIO()
+ )
+ }
+
+ def execute(): Unit = {
+ LOG.info("Starting postpone bucket compaction for table: {}.",
table.name())
+ // Validate input parameters - no partition predicates supported yet, same
behavior with flink
+ assert(
+ partitionPredicate == null,
+ "Postpone bucket compaction currently does not support specifying
partitions")
+
+ // Read data splits from the POSTPONE_BUCKET (-2)
+ val splits =
+ table.newSnapshotReader
+ .withBucket(BucketMode.POSTPONE_BUCKET)
+ .read
+ .dataSplits
+ .asScala
+
+ if (splits.isEmpty) {
+ LOG.info("Partition bucket is empty, no compact job to execute.")
+ return
+ }
+
+ // Prepare dataset for writing by combining all partitions
+ val datasetForWrite: Dataset[Row] = splits
+ .groupBy(_.partition)
+ .values
+ .map(
+ split => {
+ PaimonUtils
+ .createDataset(spark,
ScanPlanHelper.createNewScanPlan(split.toArray, relation))
+ })
+ .reduce((a, b) => a.union(b))
+
+ val withInitBucketCol = datasetForWrite.withColumn(BUCKET_COL, lit(-1))
+ val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema,
BUCKET_COL)
+ val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
+
+ // Create processor to handle the actual bucket assignment
+ val processor = PostponeFixBucketProcessor(
+ table,
+ bucketColIdx,
+ encoderGroupWithBucketCol,
+ postponePartitionBucketComputer
+ )
+
+ val dataFrame = withInitBucketCol
+
.mapPartitions(processor.processPartition)(encoderGroupWithBucketCol.encoder)
+ .toDF()
+ val repartition = repartitionByPartitionsAndBucket(dataFrame)
+ val written = writeWithBucketProcessor(repartition, bucketColIdx,
withInitBucketCol.schema)
+
+ // Create commit messages for removing old postpone bucket files
+ val removeMessages = splits.map {
+ split =>
+ new CommitMessageImpl(
+ split.partition(),
+ split.bucket(),
+ split.totalBuckets(),
+ DataIncrement.emptyIncrement,
+ new CompactIncrement(
+ split.dataFiles(),
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta]
+ )
+ )
+ }
+
+ // Combine remove messages with new file write messages
+ val commitMessages = removeMessages ++
WriteTaskResult.merge(written.collect().toSeq)
+
+ // Process all commit messages using BucketFiles to track file changes
+ val buckets = new mutable.HashMap[BinaryRow, mutable.HashMap[Int,
BucketFiles]]()
+ commitMessages.foreach {
+ message =>
+ val commitMessage = message.asInstanceOf[CommitMessageImpl]
+ buckets
+ .getOrElseUpdate(commitMessage.partition(),
mutable.HashMap.empty[Int, BucketFiles])
+ .getOrElseUpdate(
+ commitMessage.bucket(),
+ createBucketFiles(commitMessage.partition(),
commitMessage.bucket()))
+ .update(commitMessage)
+ }
+
+ val finalCommitMessages: Seq[CommitMessage] = buckets.iterator.flatMap {
+ case (partition, bucketMap) =>
+ bucketMap.iterator.map {
+ case (bucket, bucketFiles) =>
+ bucketFiles.makeMessage(partition,
bucket).asInstanceOf[CommitMessage]
+ }
+ }.toSeq
+
+ // Commit the final result
+ try {
+ assert(finalCommitMessages.nonEmpty, "No commit message to commit")
+ val commitUser = table.coreOptions.createCommitUser
+ val commit = realTable.newCommit(commitUser)
+ commit.commit(finalCommitMessages.asJava)
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException("Failed to commit postpone bucket
compaction result", e)
+ }
+ LOG.info("Successfully committed postpone bucket compaction for table:
{}.", table.name())
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
index 43da68cd9e..07fe32308e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
@@ -188,4 +189,106 @@ class PostponeBucketTableTest extends PaimonSparkTestBase
{
}
}
}
+
+ test("Postpone bucket table: write postpone bucket then compact") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | k INT,
+ | v STRING
+ |) TBLPROPERTIES (
+ | 'primary-key' = 'k',
+ | 'bucket' = '-2',
+ | 'postpone.batch-write-fixed-bucket' = 'false'
+ |)
+ |""".stripMargin)
+
+ // write postpone bucket
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(4) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 1000)
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(0)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(-2))
+ )
+
+ sql("SET spark.default.parallelism = 2")
+ // compact
+ sql("CALL sys.compact(table => 't')")
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row((0 until 1000).sum)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(0), Row(1))
+ )
+ }
+ }
+
+ test("Postpone partition bucket table: write postpone bucket then compact") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | k INT,
+ | v STRING,
+ | pt INT
+ |) PARTITIONED BY (pt)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'k, pt',
+ | 'bucket' = '-2',
+ | 'postpone.default-bucket-num' = '3',
+ | 'changelog-producer' = 'lookup',
+ | 'snapshot.num-retained.min' = '5',
+ | 'snapshot.num-retained.max' = '5',
+ | 'postpone.batch-write-fixed-bucket' = 'false'
+ |)
+ |""".stripMargin)
+
+ // write postpone bucket
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(4) */
+ |id AS k,
+ |CAST(id AS STRING) AS v,
+ |id % 2 AS pt
+ |FROM range (0, 1000)
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(0)))
+ checkAnswer(sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY
bucket"), Seq(Row(-2)))
+
+ // compact
+ sql("SET spark.default.parallelism = 2")
+ sql("CALL sys.compact(table => 't')")
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row((0 until 1000).sum)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2))
+ )
+
+ val table = loadTable("t")
+ val files = table.fileIO.listStatus(new Path(table.location,
"pt=0/bucket-0"))
+ assert(files.count(_.getPath.getName.startsWith("changelog-")) > 0)
+
+ for (i <- 2000 until 2020) {
+ spark.sql(s"INSERT INTO t (k, v, pt) VALUES ($i, '$i', ${i % 2})")
+ }
+
+ // Verify that snapshots are not automatically expired before compaction
+ checkAnswer(sql("SELECT count(1) FROM `t$snapshots`"), Seq(Row(22)))
+
+ sql("CALL sys.compact(table => 't')")
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` where partition = '{0}'
ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2))
+ )
+ checkAnswer(sql("SELECT count(1) FROM `t$snapshots`"), Seq(Row(5)))
+ }
+ }
}