This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 555c5928a8 [flink] Postpone mode should support
'partition.sink-strategy' (#5743)
555c5928a8 is described below
commit 555c5928a8d5389509a0c584de49755d7f04d753
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 13 19:41:10 2025 +0800
[flink] Postpone mode should support 'partition.sink-strategy' (#5743)
---
docs/layouts/shortcodes/generated/core_configuration.html | 2 +-
paimon-api/src/main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +++++++++--
.../flink/sink/RowDataHashPartitionChannelComputer.java | 10 ++++------
.../org/apache/paimon/flink/PostponeBucketTableITCase.java | 11 ++++++++++-
5 files changed, 25 insertions(+), 11 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 020b03c0c8..8cd61f61d6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -765,7 +765,7 @@ This config option does not affect the default filesystem
metastore.</td>
<td><h5>partition.sink-strategy</h5></td>
<td style="word-wrap: break-word;">NONE</td>
<td><p>Enum</p></td>
- <td>This is only for partitioned unaware-buckets append table, and
the purpose is to reduce small files and improve write performance. Through
this repartitioning strategy to reduce the number of partitions written by each
task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning,
this is the default behavior, this strategy is suitable for the number of
partitions you write in a batch is much smaller than write
parallelism.</li><li>hash: Hash the partitions [...]
+ <td>This is only for partitioned append table or postpone pk
table, and the purpose is to reduce small files and improve write performance.
Through this repartitioning strategy to reduce the number of partitions written
by each task to as few as possible.<ul><li>none: Rebalanced or Forward
partitioning, this is the default behavior, this strategy is suitable for the
number of partitions you write in a batch is much smaller than write
parallelism.</li><li>hash: Hash the partit [...]
</tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e79d1593d5..1df7949acc 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1325,7 +1325,7 @@ public class CoreOptions implements Serializable {
.withDescription(
Description.builder()
.text(
- "This is only for partitioned
unaware-buckets append table, and the purpose is to reduce small files and
improve write performance."
+ "This is only for partitioned
append table or postpone pk table, and the purpose is to reduce small files and
improve write performance."
+ " Through this
repartitioning strategy to reduce the number of partitions written by each task
to as few as possible.")
.list(
text(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index e83bbeecbb..7f211870b9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -30,6 +30,7 @@ import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -291,8 +292,14 @@ public class FlinkSinkBuilder {
}
private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow>
input) {
- DataStream<InternalRow> partitioned =
- partition(input, new
PostponeBucketChannelComputer(table.schema()), parallelism);
+ ChannelComputer<InternalRow> channelComputer;
+ if (!table.partitionKeys().isEmpty()
+ && table.coreOptions().partitionSinkStrategy() ==
PartitionSinkStrategy.HASH) {
+ channelComputer = new
RowDataHashPartitionChannelComputer(table.schema());
+ } else {
+ channelComputer = new
PostponeBucketChannelComputer(table.schema());
+ }
+ DataStream<InternalRow> partitioned = partition(input,
channelComputer, parallelism);
FixedBucketSink sink = new FixedBucketSink(table, overwritePartition,
null);
return sink.sinkFrom(partitioned);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
index 73258e2966..8943c549ce 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
@@ -20,9 +20,8 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.AppendTableRowKeyExtractor;
import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
/** This is only for partitioned unaware-buckets Append only table. */
public class RowDataHashPartitionChannelComputer implements
ChannelComputer<InternalRow> {
@@ -32,7 +31,7 @@ public class RowDataHashPartitionChannelComputer implements
ChannelComputer<Inte
private final TableSchema schema;
private transient int numChannels;
- private transient KeyAndBucketExtractor<InternalRow> extractor;
+ private transient RowPartitionKeyExtractor extractor;
public RowDataHashPartitionChannelComputer(TableSchema schema) {
this.schema = schema;
@@ -41,13 +40,12 @@ public class RowDataHashPartitionChannelComputer implements
ChannelComputer<Inte
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.extractor = new AppendTableRowKeyExtractor(schema);
+ this.extractor = new RowPartitionKeyExtractor(schema);
}
@Override
public int channel(InternalRow record) {
- extractor.setRecord(record);
- return ChannelComputer.select(extractor.partition(), 0, numChannels);
+ return ChannelComputer.select(extractor.partition(record), 0,
numChannels);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index cf319ac1c2..f66c2f0998 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
@@ -75,7 +76,15 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
values.add(String.format("(%d, %d, %d)", i, j, i * numKeys +
j));
}
}
- tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ if (random.nextBoolean()) {
+ tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ",
values)).await();
+ } else {
+ tEnv.executeSql(
+ "INSERT INTO T /*+
OPTIONS('partition.sink-strategy'='hash') */ VALUES "
+ + String.join(", ", values))
+ .await();
+ }
assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty();
tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();