This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 87648546b [flink][cdc] Add operator and transformation name for cdc 
pipeline (#2135)
87648546b is described below

commit 87648546b47c332861778488eb3f075b54105d87
Author: yuzelin <[email protected]>
AuthorDate: Tue Oct 17 16:20:23 2023 +0800

    [flink][cdc] Add operator and transformation name for cdc pipeline (#2135)
---
 .../paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java       | 3 ++-
 .../paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java | 5 ++++-
 .../paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java       | 4 +++-
 3 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index e7156dd3a..fc9fa30bf 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -173,7 +173,8 @@ public class KafkaSyncDatabaseAction extends ActionBase {
         new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
                 .withInput(
                         env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Kafka Source")
-                                .flatMap(recordParser))
+                                .flatMap(recordParser)
+                                .name("Parse"))
                 .withParserFactory(parserFactory)
                 .withCatalogLoader(catalogLoader())
                 .withDatabase(database)
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 0d9da6df1..cd8e34e9b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -100,7 +100,10 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
                             try {
                                 catalog.createTable(identifier, schema, true);
                             } catch (Exception e) {
-                                LOG.error("create newly added paimon table 
error.", e);
+                                LOG.error(
+                                        "Cannot create newly added Paimon 
table {}",
+                                        identifier.getFullName(),
+                                        e);
                             }
                         });
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 2d522355e..d4c630777 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -138,6 +138,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                         .process(
                                 new CdcDynamicTableParsingProcessFunction<>(
                                         database, catalogLoader, 
parserFactory))
+                        .name("Side Output")
                         .setParallelism(input.getParallelism());
 
         // for newly-added tables, create a multiplexing operator that handles 
all their records
@@ -149,7 +150,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         SingleOutputStreamOperatorUtils.getSideOutput(
                         parsed,
                         
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
-                .process(new 
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader));
+                .process(new 
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
+                .name("Schema Evolution");
 
         DataStream<CdcMultiplexRecord> partitioned =
                 partition(

Reply via email to