This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85eeda8d72 [spark] Restrict bucket creation to not exceed max bucket 
limit (#5661)
85eeda8d72 is described below

commit 85eeda8d726e8a7ead2313ad3d4ad2dd08c89fd3
Author: Zouxxyy <[email protected]>
AuthorDate: Tue May 27 07:52:30 2025 +0800

    [spark] Restrict bucket creation to not exceed max bucket limit (#5661)
---
 .../org/apache/paimon/index/PartitionIndex.java    | 21 +++++--------
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 35 +++++++++++++---------
 .../paimon/spark/sql/DynamicBucketTableTest.scala  | 22 ++++++++++++++
 3 files changed, 51 insertions(+), 27 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java 
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index decc1f12f1..8241270a87 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -91,21 +91,16 @@ public class PartitionIndex {
             }
         }
 
-        if (-1 == maxBucketsNum || totalBucketSet.isEmpty() || maxBucketId < 
maxBucketsNum - 1) {
+        int globalMaxBucketId = (maxBucketsNum == -1 ? Short.MAX_VALUE : 
maxBucketsNum) - 1;
+        if (totalBucketSet.isEmpty() || maxBucketId < globalMaxBucketId) {
             // 3. create a new bucket
-            for (int i = 0; i < Short.MAX_VALUE; i++) {
+            for (int i = 0; i <= globalMaxBucketId; i++) {
                 if (bucketFilter.test(i) && !totalBucketSet.contains(i)) {
-                    // The new bucketId may still be larger than the upper 
bound
-                    if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
-                        nonFullBucketInformation.put(i, 1L);
-                        totalBucketSet.add(i);
-                        totalBucketArray.add(i);
-                        hash2Bucket.put(hash, (short) i);
-                        return i;
-                    } else {
-                        // No need to enter the next iteration when upper 
bound exceeded
-                        break;
-                    }
+                    nonFullBucketInformation.put(i, 1L);
+                    totalBucketSet.add(i);
+                    totalBucketArray.add(i);
+                    hash2Bucket.put(hash, (short) i);
+                    return i;
                 }
             }
             if (-1 == maxBucketsNum) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 23e90df92b..ca1158e69a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -58,12 +58,13 @@ case class PaimonSparkWriter(table: FileStoreTable) {
 
   private lazy val bucketMode = table.bucketMode
 
+  private lazy val coreOptions = table.coreOptions()
+
   private lazy val disableReportStats = {
-    val options = table.coreOptions()
-    val config = options.toConfiguration
+    val config = coreOptions.toConfiguration
     config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis 
<= 0 ||
     table.partitionKeys.isEmpty ||
-    !options.partitionedTableInMetastore ||
+    !coreOptions.partitionedTableInMetastore ||
     table.catalogEnvironment.partitionHandler() == null
   }
 
@@ -163,7 +164,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       case CROSS_PARTITION =>
         // Topology: input -> bootstrap -> shuffle by key hash -> 
bucket-assigner -> shuffle by partition & bucket
         val rowType = 
SparkTypeUtils.toPaimonType(withInitBucketCol.schema).asInstanceOf[RowType]
-        val assignerParallelism = 
Option(table.coreOptions.dynamicBucketAssignerParallelism)
+        val assignerParallelism = 
Option(coreOptions.dynamicBucketAssignerParallelism)
           .map(_.toInt)
           .getOrElse(sparkParallelism)
         val bootstrapped = bootstrapAndRepartitionByKeyHash(
@@ -186,10 +187,17 @@ case class PaimonSparkWriter(table: FileStoreTable) {
         writeWithBucket(repartitioned)
 
       case HASH_DYNAMIC =>
-        val assignerParallelism = 
Option(table.coreOptions.dynamicBucketAssignerParallelism)
-          .map(_.toInt)
-          .getOrElse(sparkParallelism)
-        val numAssigners = 
Option(table.coreOptions.dynamicBucketInitialBuckets)
+        val assignerParallelism = {
+          val parallelism = 
Option(coreOptions.dynamicBucketAssignerParallelism)
+            .map(_.toInt)
+            .getOrElse(sparkParallelism)
+          if (coreOptions.dynamicBucketMaxBuckets() != -1) {
+            Math.min(coreOptions.dynamicBucketMaxBuckets().toInt, parallelism)
+          } else {
+            parallelism
+          }
+        }
+        val numAssigners = Option(coreOptions.dynamicBucketInitialBuckets)
           .map(initialBuckets => Math.min(initialBuckets.toInt, 
assignerParallelism))
           .getOrElse(assignerParallelism)
 
@@ -212,8 +220,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
                 new SimpleHashBucketAssigner(
                   numAssigners,
                   TaskContext.getPartitionId(),
-                  table.coreOptions.dynamicBucketTargetRowNum,
-                  table.coreOptions.dynamicBucketMaxBuckets
+                  coreOptions.dynamicBucketTargetRowNum,
+                  coreOptions.dynamicBucketMaxBuckets
                 )
               row => {
                 val sparkRow = new SparkRow(rowType, row)
@@ -242,7 +250,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       case HASH_FIXED =>
         if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
           // Topology: input -> shuffle by partition & bucket
-          val bucketNumber = table.coreOptions().bucket()
+          val bucketNumber = coreOptions.bucket()
           val bucketKeyCol = tableSchema
             .bucketKeys()
             .asScala
@@ -333,12 +341,11 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       return
     }
 
-    val options = table.coreOptions()
     val partitionComputer = new InternalRowPartitionComputer(
-      options.partitionDefaultName,
+      coreOptions.partitionDefaultName,
       table.schema.logicalPartitionType,
       table.partitionKeys.toArray(new Array[String](0)),
-      options.legacyPartitionName()
+      coreOptions.legacyPartitionName()
     )
     val hmsReporter = new PartitionStatisticsReporter(
       table,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
index 023ab16646..f005156182 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
@@ -86,6 +86,28 @@ class DynamicBucketTableTest extends PaimonSparkTestBase {
       Row(0) :: Row(1) :: Row(2) :: Nil)
   }
 
+  test(s"Paimon dynamic bucket table: write with max buckets") {
+    spark.sql(s"""
+                 |CREATE TABLE T (
+                 |  pk STRING,
+                 |  v STRING,
+                 |  pt STRING)
+                 |TBLPROPERTIES (
+                 |  'primary-key' = 'pk, pt',
+                 |  'bucket' = '-1',
+                 |  'dynamic-bucket.max-buckets'='2'
+                 |)
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql(
+      "INSERT INTO T VALUES ('1', 'a', 'p'), ('2', 'b', 'p'), ('3', 'c', 'p'), 
('4', 'd', 'p'), ('5', 'e', 'p'), ('6', 'e', 'p')")
+    spark.sql(
+      "INSERT INTO T VALUES ('11', 'a', 'p'), ('22', 'b', 'p'), ('33', 'c', 
'p'), ('44', 'd', 'p'), ('55', 'e', 'p'), ('66', 'e', 'p')")
+
+    checkAnswer(spark.sql("SELECT DISTINCT bucket FROM `T$FILES`"), 
Seq(Row(0), Row(1)))
+  }
+
   test(s"Paimon cross partition table: write with partition change") {
     sql(s"""
            |CREATE TABLE T (

Reply via email to