This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a6cdf8ccb0 [spark] Postpone bucket table supports to batch write fixed
bucket (#6534)
a6cdf8ccb0 is described below
commit a6cdf8ccb0e7484a646fd766b981f66975ba3e0a
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Nov 5 17:41:02 2025 +0800
[spark] Postpone bucket table supports to batch write fixed bucket (#6534)
---
.../org/apache/paimon/table/PostponeUtils.java | 18 ++
.../paimon/table/sink/BatchWriteBuilder.java | 5 +
.../paimon/table/sink/BatchWriteBuilderImpl.java | 13 ++
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 9 +-
.../paimon/flink/sink/PostponeFixedBucketSink.java | 10 +-
.../paimon/flink/PostponeBucketTableITCase.java | 2 +-
.../apache/paimon/spark/PaimonSparkTableBase.scala | 3 +-
.../org/apache/paimon/spark/SparkTableWrite.scala | 23 ++-
.../paimon/spark/commands/BucketProcessor.scala | 41 ++++-
.../MergeIntoPaimonDataEvolutionTable.scala | 2 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 43 ++++-
.../paimon/spark/sql/PostponeBucketTableTest.scala | 191 +++++++++++++++++++++
.../apache/paimon/spark/sql/SparkWriteITCase.scala | 3 +-
13 files changed, 334 insertions(+), 29 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
index 280dd16f00..097921df54 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
@@ -21,10 +21,14 @@ package org.apache.paimon.table;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.SimpleFileEntry;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
+
/** Utils for postpone table. */
public class PostponeUtils {
@@ -49,4 +53,18 @@ public class PostponeUtils {
}
return knownNumBuckets;
}
+
+ public static FileStoreTable tableForFixBucketWrite(FileStoreTable table) {
+ Map<String, String> batchWriteOptions = new HashMap<>();
+ batchWriteOptions.put(WRITE_ONLY.key(), "true");
+ // It's just used to create merge tree writer for writing files to
fixed bucket.
+ // The real bucket number is determined at runtime.
+ batchWriteOptions.put(BUCKET.key(), "1");
+ return table.copy(batchWriteOptions);
+ }
+
+ public static FileStoreTable tableForCommit(FileStoreTable table) {
+ return table.copy(
+ Collections.singletonMap(BUCKET.key(),
String.valueOf(BucketMode.POSTPONE_BUCKET)));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
index ee0091a7bf..a82c69b6ec 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
import javax.annotation.Nullable;
@@ -71,4 +72,8 @@ public interface BatchWriteBuilder extends WriteBuilder {
/** Create a {@link TableCommit} to commit {@link CommitMessage}s. */
@Override
BatchTableCommit newCommit();
+
+ default BatchWriteBuilder copyWithNewTable(Table newTable) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 445f48bafc..a0ff6e0e27 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.InnerTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -45,6 +46,13 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
this.commitUser = createCommitUser(new Options(table.options()));
}
+ private BatchWriteBuilderImpl(
+ InnerTable table, String commitUser, @Nullable Map<String, String>
staticPartition) {
+ this.table = table;
+ this.commitUser = commitUser;
+ this.staticPartition = staticPartition;
+ }
+
@Override
public String tableName() {
return table.name();
@@ -80,4 +88,9 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
.orElse(true));
return commit;
}
+
+ @Override
+ public BatchWriteBuilder copyWithNewTable(Table newTable) {
+ return new BatchWriteBuilderImpl((InnerTable) newTable, commitUser,
staticPartition);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index b7a3fc3ca1..01a034e78a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -56,8 +56,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.CoreOptions.BUCKET;
-import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.CoreOptions.clusteringStrategy;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -315,12 +313,7 @@ public class FlinkSinkBuilder {
new
PostponeFixedBucketChannelComputer(table.schema(), knownNumBuckets),
parallelism);
- Map<String, String> batchWriteOptions = new HashMap<>();
- batchWriteOptions.put(WRITE_ONLY.key(), "true");
- // It's just used to create merge tree writer for writing files to
fixed bucket.
- // The real bucket number is determined at runtime.
- batchWriteOptions.put(BUCKET.key(), "1");
- FileStoreTable tableForWrite = table.copy(batchWriteOptions);
+ FileStoreTable tableForWrite =
PostponeUtils.tableForFixBucketWrite(table);
PostponeFixedBucketSink sink =
new PostponeFixedBucketSink(tableForWrite,
overwritePartition, knownNumBuckets);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
index 92763c0cbe..3223ac50d3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PostponeUtils;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -30,11 +30,8 @@ import
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.Map;
-import static org.apache.paimon.CoreOptions.BUCKET;
-
/** {@link FlinkSink} for writing records into fixed bucket of postpone table.
*/
public class PostponeFixedBucketSink extends FlinkWriteSink<InternalRow> {
@@ -76,10 +73,7 @@ public class PostponeFixedBucketSink extends
FlinkWriteSink<InternalRow> {
} else {
// When overwriting, the postpone bucket files need to be deleted,
so using a postpone
// bucket table commit here
- FileStoreTable tableForCommit =
- table.copy(
- Collections.singletonMap(
- BUCKET.key(),
String.valueOf(BucketMode.POSTPONE_BUCKET)));
+ FileStoreTable tableForCommit =
PostponeUtils.tableForCommit(table);
return context ->
new StoreCommitter(
tableForCommit,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index c6b832eebe..37b2096d56 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -1054,7 +1054,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
}
@Test
- public void testWriteWritePostponeBucketThenWriteFixedBucket() throws
Exception {
+ public void testWritePostponeBucketThenWriteFixedBucket() throws Exception
{
String warehouse = getTempDirPath();
TableEnvironment tEnv =
tableEnvironmentBuilder()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index addaaa0831..859becf546 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -56,7 +56,8 @@ abstract class PaimonSparkTableBase(val table: Table)
case storeTable: FileStoreTable =>
storeTable.bucketMode() match {
case HASH_FIXED => BucketFunction.supportsTable(storeTable)
- case BUCKET_UNAWARE | POSTPONE_MODE => true
+ case BUCKET_UNAWARE => true
+ case POSTPONE_MODE if !coreOptions.postponeBatchWriteFixedBucket()
=> true
case _ => false
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 2fcd1f847c..4ce281b836 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.catalog.CatalogContext
+import org.apache.paimon.data.BinaryRow
import org.apache.paimon.disk.IOManager
import org.apache.paimon.spark.util.SparkRowUtils
import org.apache.paimon.spark.write.DataWriteHelper
@@ -38,19 +39,23 @@ case class SparkTableWrite(
fullCompactionDeltaCommits: Option[Int],
batchId: Long,
blobAsDescriptor: Boolean,
- catalogContext: CatalogContext)
+ catalogContext: CatalogContext,
+ postponePartitionBucketComputer: Option[BinaryRow => Integer])
extends SparkTableWriteTrait
with DataWriteHelper {
private val ioManager: IOManager = SparkUtils.createIOManager
val write: TableWriteImpl[Row] = {
- val _write = writeBuilder.newWrite()
+ val _write = writeBuilder.newWrite().asInstanceOf[TableWriteImpl[Row]]
_write.withIOManager(ioManager)
if (writeRowTracking) {
_write.withWriteType(writeType)
}
- _write.asInstanceOf[TableWriteImpl[Row]]
+ if (postponePartitionBucketComputer.isDefined) {
+ _write.getWrite.withIgnoreNumBucketCheck(true)
+ }
+ _write
}
private val toPaimonRow = {
@@ -78,7 +83,17 @@ case class SparkTableWrite(
bytesWritten += dataFileMeta.fileSize()
recordsWritten += dataFileMeta.rowCount()
}
- commitMessages += serializer.serialize(message)
+ val finalMessage = if (postponePartitionBucketComputer.isDefined) {
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+ postponePartitionBucketComputer.get.apply(message.partition()),
+ message.newFilesIncrement(),
+ message.compactIncrement())
+ } else {
+ message
+ }
+ commitMessages += serializer.serialize(finalMessage)
}
reportOutputMetrics(bytesWritten, recordsWritten)
commitMessages.iterator
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
index 19494fc88d..e6cb3b55ab 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
@@ -18,10 +18,14 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.bucket.BucketFunction
+import org.apache.paimon.codegen.CodeGenUtils
import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow}
import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow =>
PaimonInternalRow, JoinedRow}
import org.apache.paimon.disk.IOManager
-import org.apache.paimon.index.{HashBucketAssigner, PartitionIndex}
+import org.apache.paimon.index.HashBucketAssigner
+import org.apache.paimon.schema.TableSchema
import org.apache.paimon.spark.{DataConverter, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.util.EncoderUtils
@@ -90,6 +94,41 @@ case class CommonBucketProcessor(
}
}
+case class PostponeFixBucketProcessor(
+ table: FileStoreTable,
+ bucketColIndex: Int,
+ encoderGroup: EncoderSerDeGroup,
+ postponePartitionBucketComputer: BinaryRow => Integer)
+ extends BucketProcessor[Row] {
+
+ def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
+ val rowType = table.rowType()
+ val bucketFunction =
+ BucketFunction.create(new CoreOptions(table.options),
table.schema.logicalBucketKeyType)
+ val schema: TableSchema = table.schema
+ val partitionKeyExtractor = new RowPartitionKeyExtractor(schema)
+ val bucketKeyProjection =
+ CodeGenUtils.newProjection(schema.logicalRowType,
schema.projection(schema.bucketKeys))
+
+ def getBucketId(row: PaimonInternalRow): Int = {
+ val partition = partitionKeyExtractor.partition(row)
+ val numBuckets = postponePartitionBucketComputer.apply(partition)
+ bucketFunction.bucket(bucketKeyProjection.apply(row), numBuckets)
+ }
+
+ new Iterator[Row] {
+ override def hasNext: Boolean = rowIterator.hasNext
+
+ override def next(): Row = {
+ val row = rowIterator.next
+ val sparkInternalRow = encoderGroup.rowToInternal(row)
+ sparkInternalRow.setInt(bucketColIndex, getBucketId(new
SparkRow(rowType, row)))
+ encoderGroup.internalToRow(sparkInternalRow)
+ }
+ }
+ }
+}
+
case class DynamicBucketProcessor(
fileStoreTable: FileStoreTable,
bucketColIndex: Int,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index e29b765105..11cd875413 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -46,7 +46,7 @@ import scala.collection.Searching.{search, Found,
InsertionPoint}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-/** Command for Merge Into for Data Evolution paimom table. */
+/** Command for Merge Into for Data Evolution paimon table. */
case class MergeIntoPaimonDataEvolutionTable(
v2Table: SparkTable,
targetTable: LogicalPlan,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index c88220988a..59faa0e03a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.{CoreOptions, Snapshot}
import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY}
import org.apache.paimon.codegen.CodeGenUtils
import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
+import org.apache.paimon.data.BinaryRow
import org.apache.paimon.data.serializer.InternalSerializers
import org.apache.paimon.deletionvectors.DeletionVector
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
@@ -36,7 +37,7 @@ import org.apache.paimon.spark.sort.TableSorter
import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
import org.apache.paimon.spark.util.SparkRowUtils
import org.apache.paimon.spark.write.WriteHelper
-import org.apache.paimon.table.{FileStoreTable, SpecialFields}
+import org.apache.paimon.table.{FileStoreTable, PostponeUtils, SpecialFields}
import org.apache.paimon.table.BucketMode._
import org.apache.paimon.table.sink._
import org.apache.paimon.types.{RowKind, RowType}
@@ -75,7 +76,17 @@ case class PaimonSparkWriter(
}
}
- val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
+ val postponeBatchWriteFixedBucket: Boolean =
+ table.bucketMode() == POSTPONE_MODE &&
coreOptions.postponeBatchWriteFixedBucket()
+
+ val writeBuilder: BatchWriteBuilder = {
+ val tableForWrite = if (postponeBatchWriteFixedBucket) {
+ PostponeUtils.tableForFixBucketWrite(table)
+ } else {
+ table
+ }
+ tableForWrite.newBatchWriteBuilder()
+ }
def writeOnly(): PaimonSparkWriter = {
PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
@@ -104,6 +115,14 @@ case class PaimonSparkWriter(
val rowKindColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema,
ROW_KIND_COL)
val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema,
BUCKET_COL)
val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
+ val postponePartitionBucketComputer: Option[BinaryRow => Integer] =
+ if (postponeBatchWriteFixedBucket) {
+ val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
+ val defaultPostponeNumBuckets = withInitBucketCol.rdd.getNumPartitions
+ Some((p: BinaryRow) => knownNumBuckets.getOrDefault(p,
defaultPostponeNumBuckets))
+ } else {
+ None
+ }
def newWrite() = SparkTableWrite(
writeBuilder,
@@ -113,7 +132,8 @@ case class PaimonSparkWriter(
fullCompactionDeltaCommits,
batchId,
coreOptions.blobAsDescriptor(),
- table.catalogEnvironment().catalogContext()
+ table.catalogEnvironment().catalogContext(),
+ postponePartitionBucketComputer
)
def sparkParallelism = {
@@ -264,6 +284,16 @@ case class PaimonSparkWriter(
)
}
+ case POSTPONE_MODE if coreOptions.postponeBatchWriteFixedBucket() =>
+ // Topology: input -> bucket-assigner -> shuffle by partition & bucket
+ writeWithBucketProcessor(
+ withInitBucketCol,
+ PostponeFixBucketProcessor(
+ table,
+ bucketColIdx,
+ encoderGroupWithBucketCol,
+ postponePartitionBucketComputer.get))
+
case BUCKET_UNAWARE | POSTPONE_MODE =>
var input = data
if (tableSchema.partitionKeys().size() > 0) {
@@ -378,7 +408,12 @@ case class PaimonSparkWriter(
}
def commit(commitMessages: Seq[CommitMessage]): Unit = {
- val tableCommit = writeBuilder.newCommit()
+ val finalWriteBuilder = if (postponeBatchWriteFixedBucket) {
+ writeBuilder.copyWithNewTable(PostponeUtils.tableForCommit(table))
+ } else {
+ writeBuilder
+ }
+ val tableCommit = finalWriteBuilder.newCommit()
try {
tableCommit.commit(commitMessages.toList.asJava)
} catch {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
new file mode 100644
index 0000000000..43da68cd9e
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PostponeBucketTableTest.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class PostponeBucketTableTest extends PaimonSparkTestBase {
+
+ test("Postpone bucket table: write with different bucket number") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | k INT,
+ | v STRING,
+ | pt STRING
+ |) TBLPROPERTIES (
+ | 'primary-key' = 'k, pt',
+ | 'bucket' = '-2',
+ | 'postpone.batch-write-fixed-bucket' = 'true'
+ |)
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(4) */
+ |id AS k,
+ |CAST(id AS STRING) AS v,
+ |CAST((1 + FLOOR(RAND() * 4)) AS STRING) AS pt -- pt in [1, 4]
+ |FROM range (0, 1000)
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2), Row(3))
+ )
+
+ // Write to existing partition, the bucket number should not change
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(6) */
+ |id AS k,
+ |CAST(id AS STRING) AS v,
+ |'3' AS pt
+ |FROM range (100, 800)
+ |""".stripMargin)
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` WHERE partition = '{3}'
ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2), Row(3))
+ )
+
+ // Write to new partition, the bucket number should change
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(6) */
+ |id AS k,
+ |CAST(id AS STRING) AS v,
+ |'5' AS pt
+ |FROM range (100, 800)
+ |""".stripMargin)
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` WHERE partition = '{5}'
ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2), Row(3), Row(4), Row(5))
+ )
+ }
+ }
+
+ test("Postpone bucket table: write fix bucket then write postpone bucket") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | k INT,
+ | v STRING
+ |) TBLPROPERTIES (
+ | 'primary-key' = 'k',
+ | 'bucket' = '-2',
+ | 'postpone.batch-write-fixed-bucket' = 'true'
+ |)
+ |""".stripMargin)
+
+ // write fix bucket
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(4) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 1000)
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2), Row(3))
+ )
+
+ // write postpone bucket
+ withSparkSQLConf("spark.paimon.postpone.batch-write-fixed-bucket" ->
"false") {
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(6) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 1000)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(-2), Row(0), Row(1), Row(2), Row(3))
+ )
+ }
+ }
+ }
+
+ test("Postpone bucket table: write postpone bucket then write fix bucket") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | k INT,
+ | v STRING
+ |) TBLPROPERTIES (
+ | 'primary-key' = 'k',
+ | 'bucket' = '-2',
+ | 'postpone.batch-write-fixed-bucket' = 'false'
+ |)
+ |""".stripMargin)
+
+ // write postpone bucket
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(4) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 1000)
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(0)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(-2))
+ )
+
+ // write fix bucket
+ withSparkSQLConf("spark.paimon.postpone.batch-write-fixed-bucket" ->
"true") {
+ sql("""
+ |INSERT INTO t SELECT /*+ REPARTITION(6) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 1000)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(499500)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(-2), Row(0), Row(1), Row(2), Row(3), Row(4), Row(5))
+ )
+ }
+
+ // overwrite fix bucket
+ withSparkSQLConf("spark.paimon.postpone.batch-write-fixed-bucket" ->
"true") {
+ sql("""
+ |INSERT OVERWRITE t SELECT /*+ REPARTITION(8) */
+ |id AS k,
+ |CAST(id AS STRING) AS v
+ |FROM range (0, 500)
+ |""".stripMargin)
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(500)))
+ checkAnswer(sql("SELECT sum(k) FROM t"), Seq(Row(124750)))
+ checkAnswer(
+ sql("SELECT distinct(bucket) FROM `t$buckets` ORDER BY bucket"),
+ Seq(Row(0), Row(1), Row(2), Row(3), Row(4), Row(5))
+ )
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index 5c8f37f19e..96ada9d9eb 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -59,7 +59,8 @@ class SparkWriteITCase extends PaimonSparkTestBase {
|) TBLPROPERTIES (
| 'bucket' = '-2',
| 'primary-key' = 'id',
- | 'file.format' = 'parquet'
+ | 'file.format' = 'parquet',
+ | 'postpone.batch-write-fixed-bucket' = 'false'
|)
|""".stripMargin)