This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 555c5928a8 [flink] Postpone mode should support 
'partition.sink-strategy' (#5743)
555c5928a8 is described below

commit 555c5928a8d5389509a0c584de49755d7f04d753
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 13 19:41:10 2025 +0800

    [flink] Postpone mode should support 'partition.sink-strategy' (#5743)
---
 docs/layouts/shortcodes/generated/core_configuration.html     |  2 +-
 paimon-api/src/main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java   | 11 +++++++++--
 .../flink/sink/RowDataHashPartitionChannelComputer.java       | 10 ++++------
 .../org/apache/paimon/flink/PostponeBucketTableITCase.java    | 11 ++++++++++-
 5 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 020b03c0c8..8cd61f61d6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -765,7 +765,7 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><h5>partition.sink-strategy</h5></td>
             <td style="word-wrap: break-word;">NONE</td>
             <td><p>Enum</p></td>
-            <td>This is only for partitioned unaware-buckets append table, and 
the purpose is to reduce small files and improve write performance. Through 
this repartitioning strategy to reduce the number of partitions written by each 
task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning, 
this is the default behavior, this strategy is suitable for the number of 
partitions you write in a batch is much smaller than write 
parallelism.</li><li>hash: Hash the partitions  [...]
+            <td>This is only for partitioned append table or postpone pk 
table, and the purpose is to reduce small files and improve write performance. 
Through this repartitioning strategy to reduce the number of partitions written 
by each task to as few as possible.<ul><li>none: Rebalanced or Forward 
partitioning, this is the default behavior, this strategy is suitable for the 
number of partitions you write in a batch is much smaller than write 
parallelism.</li><li>hash: Hash the partit [...]
         </tr>
         <tr>
             <td><h5>partition.timestamp-formatter</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e79d1593d5..1df7949acc 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1325,7 +1325,7 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "This is only for partitioned 
unaware-buckets append table, and the purpose is to reduce small files and 
improve write performance."
+                                            "This is only for partitioned 
append table or postpone pk table, and the purpose is to reduce small files and 
improve write performance."
                                                     + " Through this 
repartitioning strategy to reduce the number of partitions written by each task 
to as few as possible.")
                                     .list(
                                             text(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index e83bbeecbb..7f211870b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -30,6 +30,7 @@ import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.ChannelComputer;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -291,8 +292,14 @@ public class FlinkSinkBuilder {
     }
 
     private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> 
input) {
-        DataStream<InternalRow> partitioned =
-                partition(input, new 
PostponeBucketChannelComputer(table.schema()), parallelism);
+        ChannelComputer<InternalRow> channelComputer;
+        if (!table.partitionKeys().isEmpty()
+                && table.coreOptions().partitionSinkStrategy() == 
PartitionSinkStrategy.HASH) {
+            channelComputer = new 
RowDataHashPartitionChannelComputer(table.schema());
+        } else {
+            channelComputer = new 
PostponeBucketChannelComputer(table.schema());
+        }
+        DataStream<InternalRow> partitioned = partition(input, 
channelComputer, parallelism);
         FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, 
null);
         return sink.sinkFrom(partitioned);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
index 73258e2966..8943c549ce 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
@@ -20,9 +20,8 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.AppendTableRowKeyExtractor;
 import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 
 /** This is only for partitioned unaware-buckets Append only table. */
 public class RowDataHashPartitionChannelComputer implements 
ChannelComputer<InternalRow> {
@@ -32,7 +31,7 @@ public class RowDataHashPartitionChannelComputer implements 
ChannelComputer<Inte
     private final TableSchema schema;
 
     private transient int numChannels;
-    private transient KeyAndBucketExtractor<InternalRow> extractor;
+    private transient RowPartitionKeyExtractor extractor;
 
     public RowDataHashPartitionChannelComputer(TableSchema schema) {
         this.schema = schema;
@@ -41,13 +40,12 @@ public class RowDataHashPartitionChannelComputer implements 
ChannelComputer<Inte
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.extractor = new AppendTableRowKeyExtractor(schema);
+        this.extractor = new RowPartitionKeyExtractor(schema);
     }
 
     @Override
     public int channel(InternalRow record) {
-        extractor.setRecord(record);
-        return ChannelComputer.select(extractor.partition(), 0, numChannels);
+        return ChannelComputer.select(extractor.partition(record), 0, 
numChannels);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index cf319ac1c2..f66c2f0998 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -75,7 +76,15 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                 values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + 
j));
             }
         }
-        tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        if (random.nextBoolean()) {
+            tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+        } else {
+            tEnv.executeSql(
+                            "INSERT INTO T /*+ 
OPTIONS('partition.sink-strategy'='hash') */ VALUES "
+                                    + String.join(", ", values))
+                    .await();
+        }
         assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
 
         tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();

Reply via email to