This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new b16152147 [FLINK-37572][pipeline-connector/paimon] Shuffle Event by
bucket and tableid to avoid data skew
b16152147 is described below
commit b161521477a7c3d6ba41a5ee2d45ce71877e1780
Author: Kunni <[email protected]>
AuthorDate: Thu Apr 3 15:16:44 2025 +0800
[FLINK-37572][pipeline-connector/paimon] Shuffle Event by bucket and
tableid to avoid data skew
This closes #3970
---
.../cdc/connectors/paimon/sink/v2/PaimonEventSink.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
index ee789233f..eb2af9f43 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2;
import org.apache.flink.cdc.common.event.Event;
import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
+import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
import
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.FlushEventAlignmentOperator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -63,8 +64,16 @@ public class PaimonEventSink extends PaimonSink<Event>
implements WithPreWriteTo
.name("Assign Bucket")
// All Events after BucketAssignOperator are decorated with
BucketWrapper.
.partitionCustom(
- (bucket, numPartitions) -> bucket % numPartitions,
- (event) -> ((BucketWrapper) event).getBucket())
+ Math::floorMod,
+ (event) -> {
+ if (event instanceof BucketWrapperChangeEvent) {
+ // Add hash of tableId to avoid data skew.
+ return ((BucketWrapperChangeEvent)
event).getBucket()
+ + ((BucketWrapperChangeEvent)
event).tableId().hashCode();
+ } else {
+ return ((BucketWrapper) event).getBucket();
+ }
+ })
// Avoid disorder of FlushEvent and DataChangeEvent.
.transform(
"FlushEventAlignment",