This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 766d08b672 [core] Limit max parallelism for postpone batch write fixed
bucket to avoid too many buckets (#7212)
766d08b672 is described below
commit 766d08b67250d4cb97220f57e1b9790324bb77ce
Author: yuzelin <[email protected]>
AuthorDate: Wed Feb 4 21:46:41 2026 +0800
[core] Limit max parallelism for postpone batch write fixed bucket to avoid
too many buckets (#7212)
---
.../shortcodes/generated/core_configuration.html | 18 ++++++++++++------
.../src/main/java/org/apache/paimon/CoreOptions.java | 10 ++++++++++
.../paimon/flink/sink/PostponeBatchWriteOperator.java | 6 +++++-
.../paimon/spark/commands/PaimonSparkWriter.scala | 4 +++-
4 files changed, 30 insertions(+), 8 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 592b82c58c..2e97f9beef 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -152,12 +152,6 @@ under the License.
<td>String</td>
<td>Fields that are ignored for comparison while generating -U, +U
changelog for the same record. This configuration is only valid for the
changelog-producer.row-deduplicate is true.</td>
</tr>
- <tr>
- <td><h5>table-read.sequence-number.enabled</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to include the _SEQUENCE_NUMBER field when reading the
audit_log or binlog system tables. This is only valid for primary key
tables.</td>
- </tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -1031,6 +1025,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td>Boolean</td>
<td>Whether to write the data into fixed bucket for batch writing
a postpone bucket table.</td>
</tr>
+ <tr>
+ <td><h5>postpone.batch-write-fixed-bucket.max-parallelism</h5></td>
+ <td style="word-wrap: break-word;">2048</td>
+ <td>Integer</td>
+ <td>The number of partitions for global index.</td>
+ </tr>
<tr>
<td><h5>postpone.default-bucket-num</h5></td>
<td style="word-wrap: break-word;">1</td>
@@ -1314,6 +1314,12 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td>Duration</td>
<td>The delay duration of stream read when scan incremental
snapshots.</td>
</tr>
+ <tr>
+ <td><h5>table-read.sequence-number.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to include the _SEQUENCE_NUMBER field when reading the
audit_log or binlog system tables. This is only valid for primary key
tables.</td>
+ </tr>
<tr>
<td><h5>tag.automatic-completion</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 54633b930c..2b0112b2ad 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2164,6 +2164,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to write the data into fixed bucket for
batch writing a postpone bucket table.");
+ public static final ConfigOption<Integer>
POSTPONE_BATCH_WRITE_FIXED_BUCKET_MAX_PARALLELISM =
+ key("postpone.batch-write-fixed-bucket.max-parallelism")
+ .intType()
+ .defaultValue(2048)
+ .withDescription("The number of partitions for global
index.");
+
public static final ConfigOption<Integer> POSTPONE_DEFAULT_BUCKET_NUM =
key("postpone.default-bucket-num")
.intType()
@@ -3401,6 +3407,10 @@ public class CoreOptions implements Serializable {
return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET);
}
+ public int postponeBatchWriteFixedBucketMaxParallelism() {
+ return options.get(POSTPONE_BATCH_WRITE_FIXED_BUCKET_MAX_PARALLELISM);
+ }
+
public int postponeDefaultBucketNum() {
return options.get(POSTPONE_DEFAULT_BUCKET_NUM);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
index 9d166d0c25..5fc50542f0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBatchWriteOperator.java
@@ -72,7 +72,11 @@ public class PostponeBatchWriteOperator extends
StatelessRowDataStoreWriteOperat
super.open();
int sinkParallelism =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
- this.defaultNumBuckets = sinkParallelism <= 0 ? 1 : sinkParallelism;
+ sinkParallelism = sinkParallelism <= 0 ? 1 : sinkParallelism;
+ this.defaultNumBuckets =
+ Math.min(
+ sinkParallelism,
+
table.coreOptions().postponeBatchWriteFixedBucketMaxParallelism());
TableSchema schema = table.schema();
this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d81d70f55b..de5804cc72 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -123,7 +123,9 @@ case class PaimonSparkWriter(
val postponePartitionBucketComputer: Option[BinaryRow => Integer] =
if (postponeBatchWriteFixedBucket) {
val knownNumBuckets = PostponeUtils.getKnownNumBuckets(table)
- val defaultPostponeNumBuckets = withInitBucketCol.rdd.getNumPartitions
+ val defaultPostponeNumBuckets = Math.min(
+ withInitBucketCol.rdd.getNumPartitions,
+ table.coreOptions().postponeBatchWriteFixedBucketMaxParallelism)
Some((p: BinaryRow) => knownNumBuckets.getOrDefault(p,
defaultPostponeNumBuckets))
} else {
None