This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 766d08b672 [core] Limit max parallelism for postpone batch write fixed 
bucket to avoid too many buckets (#7212)
766d08b672 is described below

commit 766d08b67250d4cb97220f57e1b9790324bb77ce
Author: yuzelin <[email protected]>
AuthorDate: Wed Feb 4 21:46:41 2026 +0800

    [core] Limit max parallelism for postpone batch write fixed bucket to avoid 
too many buckets (#7212)
---
 .../shortcodes/generated/core_configuration.html       | 18 ++++++++++++------
 .../src/main/java/org/apache/paimon/CoreOptions.java   | 10 ++++++++++
 .../paimon/flink/sink/PostponeBatchWriteOperator.java  |  6 +++++-
 .../paimon/spark/commands/PaimonSparkWriter.scala      |  4 +++-
 4 files changed, 30 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 592b82c58c..2e97f9beef 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -152,12 +152,6 @@ under the License.
             <td>String</td>
             <td>Fields that are ignored for comparison while generating -U, +U 
changelog for the same record. This configuration is only valid for the 
changelog-producer.row-deduplicate is true.</td>
         </tr>
-        <tr>
-            <td><h5>table-read.sequence-number.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to include the _SEQUENCE_NUMBER field when reading the 
audit_log or binlog system tables. This is only valid for primary key 
tables.</td>
-        </tr>
         <tr>
             <td><h5>changelog.num-retained.max</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -1031,6 +1025,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>Boolean</td>
             <td>Whether to write the data into fixed bucket for batch writing 
a postpone bucket table.</td>
         </tr>
+        <tr>
+            <td><h5>postpone.batch-write-fixed-bucket.max-parallelism</h5></td>
+            <td style="word-wrap: break-word;">2048</td>
+            <td>Integer</td>
+            <td>The number of partitions for global index.</td>
+        </tr>
         <tr>
             <td><h5>postpone.default-bucket-num</h5></td>
             <td style="word-wrap: break-word;">1</td>
@@ -1314,6 +1314,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>Duration</td>
             <td>The delay duration of stream read when scan incremental 
snapshots.</td>
         </tr>
+        <tr>
+            <td><h5>table-read.sequence-number.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to include the _SEQUENCE_NUMBER field when reading the 
audit_log or binlog system tables. This is only valid for primary key 
tables.</td>
+        </tr>
         <tr>
             <td><h5>tag.automatic-completion</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 54633b930c..2b0112b2ad 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2164,6 +2164,12 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to write the data into fixed bucket for 
batch writing a postpone bucket table.");
 
+    public static final ConfigOption<Integer> 
POSTPONE_BATCH_WRITE_FIXED_BUCKET_MAX_PARALLELISM =
+            key("postpone.batch-write-fixed-bucket.max-parallelism")
+                    .intType()
+                    .defaultValue(2048)
+                    .withDescription("The number of partitions for global 
index.");
+
     public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
             key("postpone.default-bucket-num")
                     .intType()
@@ -3401,6 +3407,10 @@ public class CoreOptions implements Serializable {
         return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
     }
 
+    public int postponeBatchWriteFixedBucketMaxParallelism() {
+        return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET_MAX_PARALLELISM);
+    }
+
     public int postponeDefaultBucketNum() {
         return options.get(POSTPONE_DEFAULT_BUCKET_NUM);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
index 9d166d0c25..5fc50542f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
@@ -72,7 +72,11 @@ public class PostponeBatchWriteOperator extends 
StatelessRowDataStoreWriteOperat
         super.open();
 
         int sinkParallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
-        this.defaultNumBuckets = sinkParallelism <= 0 ? 1 : sinkParallelism;
+        sinkParallelism = sinkParallelism <= 0 ? 1 : sinkParallelism;
+        this.defaultNumBuckets =
+                Math.min(
+                        sinkParallelism,
+                        
table.coreOptions().postponeBatchWriteFixedBucketMaxParallelism());
 
         TableSchema schema = table.schema();
         this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d81d70f55b..de5804cc72 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -123,7 +123,9 @@ case class PaimonSparkWriter(
     val postponePartitionBucketComputer: Option[BinaryRow => Integer] =
       if (postponeBatchWriteFixedBucket) {
         val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
-        val defaultPostponeNumBuckets = withInitBucketCol.rdd.getNumPartitions
+        val defaultPostponeNumBuckets = Math.min(
+          withInitBucketCol.rdd.getNumPartitions,
+          table.coreOptions().postponeBatchWriteFixedBucketMaxParallelism)
         Some((p: BinaryRow) => knownNumBuckets.getOrDefault(p, 
defaultPostponeNumBuckets))
       } else {
         None

Reply via email to