This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6d54260aea [spark][hive] Spark & Hive support writing to postpone
bucket (#5441)
6d54260aea is described below
commit 6d54260aea6703bab0d39f76a6b01fe85aef04da
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 11 12:32:35 2025 +0800
[spark][hive] Spark & Hive support writing to postpone bucket (#5441)
This closes #5441.
---
.../content/primary-key-table/data-distribution.md | 2 --
.../org/apache/paimon/hive/HiveWriteITCase.java | 41 ++++++++++++++++++++++
.../paimon/spark/commands/PaimonSparkWriter.scala | 4 ++-
.../apache/paimon/spark/sql/SparkWriteITCase.scala | 31 ++++++++++++++++
4 files changed, 75 insertions(+), 3 deletions(-)
diff --git a/docs/content/primary-key-table/data-distribution.md
b/docs/content/primary-key-table/data-distribution.md
index edfb0133f8..1ae880786b 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -93,8 +93,6 @@ Postpone bucket mode is configured by `'bucket' = '-2'`.
This mode aims to solve the difficulty to determine a fixed number of buckets
and support different buckets for different partitions.
-Currently, only Flink supports this mode.
-
When writing records into the table,
all records will first be stored in the `bucket-postpone` directory of each
partition
and are not available to readers.
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
index c99eb9cd1f..586afa8274 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.hive;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
@@ -28,8 +29,11 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -1171,4 +1175,41 @@ public class HiveWriteITCase extends HiveTestBase {
List<Object[]> expect = hiveShell.executeStatement("select * from " +
tableName);
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
}
+
+ @Test
+ public void testInsertIntoPostponeBucket() throws Exception {
+ Options options = new Options();
+ options.set(CatalogOptions.WAREHOUSE,
folder.newFolder().toURI().toString());
+ options.set(CoreOptions.BUCKET, -2);
+ options.set(CoreOptions.FILE_FORMAT, "parquet");
+ Identifier identifier = Identifier.create(DATABASE_NAME,
"postpone_bucket");
+ FileStoreTable table =
+ (FileStoreTable)
+ FileStoreTestUtils.createFileStoreTable(
+ options,
+ RowType.of(DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ identifier);
+ String tableName =
+ writeData(
+ table,
+ table.location().toString(),
+ Collections.singletonList(GenericRow.of(1, 2, 3)));
+ hiveShell.execute("insert into " + tableName + " values
(1,2,3),(4,5,6)");
+
+ Snapshot snapshot = table.latestSnapshot().get();
+ ManifestEntry manifestEntry =
+ table.manifestFileReader()
+ .read(
+ table.manifestListReader()
+ .read(snapshot.deltaManifestList())
+ .get(0)
+ .fileName())
+ .get(0);
+ DataFileMeta file = manifestEntry.file();
+ assertThat(manifestEntry.bucket()).isEqualTo(-2);
+ // default format for postpone bucket is avro
+ assertThat(file.fileName()).endsWith(".avro");
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index b8a3cccf08..2366ff9e02 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -233,7 +233,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {
writeWithoutBucket(data)
case HASH_FIXED =>
- if (!paimonExtensionEnabled) {
+ if (table.bucketSpec().getNumBuckets == -2) {
+ writeWithoutBucket(data)
+ } else if (!paimonExtensionEnabled) {
// Topology: input -> bucket-assigner -> shuffle by partition &
bucket
writeWithBucketProcessor(
withInitBucketCol,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index e6c113d75d..d1cdea66cc 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -18,10 +18,13 @@
package org.apache.paimon.spark.sql
+import org.apache.paimon.Snapshot
+import org.apache.paimon.io.DataFileMeta
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import java.sql.Timestamp
@@ -38,6 +41,34 @@ class SparkWriteWithNoExtensionITCase extends
SparkWriteITCase {
class SparkWriteITCase extends PaimonSparkTestBase {
+ test("Paimon Write : Postpone Bucket") {
+ withTable("PostponeTable") {
+ spark.sql("""
+ |CREATE TABLE PostponeTable (
+ | id INT,
+ | v1 INT,
+ | v2 INT
+ |) TBLPROPERTIES (
+ | 'bucket' = '-2',
+ | 'primary-key' = 'id',
+ | 'file.format' = 'parquet'
+ |)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO PostponeTable VALUES (1, 1, 1)")
+
+ val table = loadTable("PostponeTable")
+ val snapshot = table.latestSnapshot.get
+ val manifestEntry = table.manifestFileReader
+
.read(table.manifestListReader.read(snapshot.deltaManifestList).get(0).fileName)
+ .get(0)
+ val file = manifestEntry.file
+ assertThat(manifestEntry.bucket()).isEqualTo(-2)
+ // default format for postpone bucket is avro
+ assertThat(file.fileName).endsWith(".avro")
+ }
+ }
+
test("Paimon Write: AllTypes") {
withTable("AllTypesTable") {
val createTableSQL =