This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f117d404 [cdc] Fix duplicate schema IDs when multiple parallel 
updates (#2797)
0f117d404 is described below

commit 0f117d404d4433f32bbb4d73037fc4b267c41608
Author: chenxi0599 <[email protected]>
AuthorDate: Fri Feb 2 18:02:34 2024 +0800

    [cdc] Fix duplicate schema IDs when multiple parallel updates (#2797)
---
 .../apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 4985f4829..6d9a227f1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -149,6 +149,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         SingleOutputStreamOperatorUtils.getSideOutput(
                         parsed,
                         
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
+                .keyBy(t -> t.f0)
                 .process(new 
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
                 .name("Schema Evolution");
 

Reply via email to