This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8484bb4a25 [flink] Optimizing parallelism for fixed bucekt and 
non-partitioned table (#4643)
8484bb4a25 is described below

commit 8484bb4a25e5b1873cad1716bb6076d6f60913ed
Author: HunterXHunter <[email protected]>
AuthorDate: Sat Dec 7 00:18:51 2024 +0800

    [flink] Optimizing parallelism for fixed bucekt and non-partitioned table 
(#4643)
---
 .../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java    | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 5703c40824..ecaa5678dd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -265,6 +265,16 @@ public class FlinkSinkBuilder {
     }
 
     protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> 
input) {
+        int bucketNums = table.bucketSpec().getNumBuckets();
+        if (parallelism == null
+                && bucketNums < input.getParallelism()
+                && table.partitionKeys().isEmpty()) {
+            // For non-partitioned table, if the bucketNums is less than job 
parallelism.
+            LOG.warn(
+                    "For non-partitioned table, if bucketNums is less than the 
parallelism of inputOperator,"
+                            + " then the parallelism of writerOperator will be 
set to bucketNums.");
+            parallelism = bucketNums;
+        }
         DataStream<InternalRow> partitioned =
                 partition(
                         input,

Reply via email to