This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d54260aea [spark][hive] Spark & Hive support writing to postpone 
bucket (#5441)
6d54260aea is described below

commit 6d54260aea6703bab0d39f76a6b01fe85aef04da
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 11 12:32:35 2025 +0800

    [spark][hive] Spark & Hive support writing to postpone bucket (#5441)
    
    This closes #5441.
---
 .../content/primary-key-table/data-distribution.md |  2 --
 .../org/apache/paimon/hive/HiveWriteITCase.java    | 41 ++++++++++++++++++++++
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  4 ++-
 .../apache/paimon/spark/sql/SparkWriteITCase.scala | 31 ++++++++++++++++
 4 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/docs/content/primary-key-table/data-distribution.md 
b/docs/content/primary-key-table/data-distribution.md
index edfb0133f8..1ae880786b 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -93,8 +93,6 @@ Postpone bucket mode is configured by `'bucket' = '-2'`.
 This mode aims to solve the difficulty to determine a fixed number of buckets
 and support different buckets for different partitions.
 
-Currently, only Flink supports this mode.
-
 When writing records into the table,
 all records will first be stored in the `bucket-postpone` directory of each 
partition
 and are not available to readers.
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
index c99eb9cd1f..586afa8274 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveWriteITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
@@ -28,8 +29,11 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.hive.mapred.PaimonOutputFormat;
 import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
@@ -1171,4 +1175,41 @@ public class HiveWriteITCase extends HiveTestBase {
         List<Object[]> expect = hiveShell.executeStatement("select * from " + 
tableName);
         
assertThat(select.toArray()).containsExactlyInAnyOrder(expect.toArray());
     }
+
+    @Test
+    public void testInsertIntoPostponeBucket() throws Exception {
+        Options options = new Options();
+        options.set(CatalogOptions.WAREHOUSE, 
folder.newFolder().toURI().toString());
+        options.set(CoreOptions.BUCKET, -2);
+        options.set(CoreOptions.FILE_FORMAT, "parquet");
+        Identifier identifier = Identifier.create(DATABASE_NAME, 
"postpone_bucket");
+        FileStoreTable table =
+                (FileStoreTable)
+                        FileStoreTestUtils.createFileStoreTable(
+                                options,
+                                RowType.of(DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()),
+                                Collections.emptyList(),
+                                Collections.singletonList("f0"),
+                                identifier);
+        String tableName =
+                writeData(
+                        table,
+                        table.location().toString(),
+                        Collections.singletonList(GenericRow.of(1, 2, 3)));
+        hiveShell.execute("insert into " + tableName + " values 
(1,2,3),(4,5,6)");
+
+        Snapshot snapshot = table.latestSnapshot().get();
+        ManifestEntry manifestEntry =
+                table.manifestFileReader()
+                        .read(
+                                table.manifestListReader()
+                                        .read(snapshot.deltaManifestList())
+                                        .get(0)
+                                        .fileName())
+                        .get(0);
+        DataFileMeta file = manifestEntry.file();
+        assertThat(manifestEntry.bucket()).isEqualTo(-2);
+        // default format for postpone bucket is avro
+        assertThat(file.fileName()).endsWith(".avro");
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index b8a3cccf08..2366ff9e02 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -233,7 +233,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {
         writeWithoutBucket(data)
 
       case HASH_FIXED =>
-        if (!paimonExtensionEnabled) {
+        if (table.bucketSpec().getNumBuckets == -2) {
+          writeWithoutBucket(data)
+        } else if (!paimonExtensionEnabled) {
           // Topology: input -> bucket-assigner -> shuffle by partition & 
bucket
           writeWithBucketProcessor(
             withInitBucketCol,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index e6c113d75d..d1cdea66cc 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.Snapshot
+import org.apache.paimon.io.DataFileMeta
 import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.Assertions
 
 import java.sql.Timestamp
@@ -38,6 +41,34 @@ class SparkWriteWithNoExtensionITCase extends 
SparkWriteITCase {
 
 class SparkWriteITCase extends PaimonSparkTestBase {
 
+  test("Paimon Write : Postpone Bucket") {
+    withTable("PostponeTable") {
+      spark.sql("""
+                  |CREATE TABLE PostponeTable (
+                  |  id INT,
+                  |  v1 INT,
+                  |  v2 INT
+                  |) TBLPROPERTIES (
+                  | 'bucket' = '-2',
+                  | 'primary-key' = 'id',
+                  | 'file.format' = 'parquet'
+                  |)
+                  |""".stripMargin)
+
+      spark.sql("INSERT INTO PostponeTable VALUES (1, 1, 1)")
+
+      val table = loadTable("PostponeTable")
+      val snapshot = table.latestSnapshot.get
+      val manifestEntry = table.manifestFileReader
+        
.read(table.manifestListReader.read(snapshot.deltaManifestList).get(0).fileName)
+        .get(0)
+      val file = manifestEntry.file
+      assertThat(manifestEntry.bucket()).isEqualTo(-2)
+      // default format for postpone bucket is avro
+      assertThat(file.fileName).endsWith(".avro")
+    }
+  }
+
   test("Paimon Write: AllTypes") {
     withTable("AllTypesTable") {
       val createTableSQL =

Reply via email to