This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8484bb4a25 [flink] Optimizing parallelism for fixed bucekt and
non-partitioned table (#4643)
8484bb4a25 is described below
commit 8484bb4a25e5b1873cad1716bb6076d6f60913ed
Author: HunterXHunter <[email protected]>
AuthorDate: Sat Dec 7 00:18:51 2024 +0800
[flink] Optimizing parallelism for fixed bucekt and non-partitioned table
(#4643)
---
.../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 5703c40824..ecaa5678dd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -265,6 +265,16 @@ public class FlinkSinkBuilder {
}
protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow>
input) {
+ int bucketNums = table.bucketSpec().getNumBuckets();
+ if (parallelism == null
+ && bucketNums < input.getParallelism()
+ && table.partitionKeys().isEmpty()) {
+ // For non-partitioned table, if the bucketNums is less than job
parallelism.
+ LOG.warn(
+ "For non-partitioned table, if bucketNums is less than the
parallelism of inputOperator,"
+ + " then the parallelism of writerOperator will be
set to bucketNums.");
+ parallelism = bucketNums;
+ }
DataStream<InternalRow> partitioned =
partition(
input,