This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new b16152147 [FLINK-37572][pipeline-connector/paimon] Shuffle Event by 
bucket and tableid to avoid data skew
b16152147 is described below

commit b161521477a7c3d6ba41a5ee2d45ce71877e1780
Author: Kunni <[email protected]>
AuthorDate: Thu Apr 3 15:16:44 2025 +0800

    [FLINK-37572][pipeline-connector/paimon] Shuffle Event by bucket and 
tableid to avoid data skew
    
    This closes  #3970
---
 .../cdc/connectors/paimon/sink/v2/PaimonEventSink.java      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
index ee789233f..eb2af9f43 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2;
 import org.apache.flink.cdc.common.event.Event;
 import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
 import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
 import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
 import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.FlushEventAlignmentOperator;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -63,8 +64,16 @@ public class PaimonEventSink extends PaimonSink<Event> 
implements WithPreWriteTo
                 .name("Assign Bucket")
                 // All Events after BucketAssignOperator are decorated with 
BucketWrapper.
                 .partitionCustom(
-                        (bucket, numPartitions) -> bucket % numPartitions,
-                        (event) -> ((BucketWrapper) event).getBucket())
+                        Math::floorMod,
+                        (event) -> {
+                            if (event instanceof BucketWrapperChangeEvent) {
+                                // Add hash of tableId to avoid data skew.
+                                return ((BucketWrapperChangeEvent) 
event).getBucket()
+                                        + ((BucketWrapperChangeEvent) 
event).tableId().hashCode();
+                            } else {
+                                return ((BucketWrapper) event).getBucket();
+                            }
+                        })
                 // Avoid disorder of FlushEvent and DataChangeEvent.
                 .transform(
                         "FlushEventAlignment",

Reply via email to