This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87648546b [flink][cdc] Add operator and transformation name for cdc
pipeline (#2135)
87648546b is described below
commit 87648546b47c332861778488eb3f075b54105d87
Author: yuzelin <[email protected]>
AuthorDate: Tue Oct 17 16:20:23 2023 +0800
[flink][cdc] Add operator and transformation name for cdc pipeline (#2135)
---
.../paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java | 3 ++-
.../paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java | 5 ++++-
.../paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 4 +++-
3 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index e7156dd3a..fc9fa30bf 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -173,7 +173,8 @@ public class KafkaSyncDatabaseAction extends ActionBase {
new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>()
.withInput(
env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source")
- .flatMap(recordParser))
+ .flatMap(recordParser)
+ .name("Parse"))
.withParserFactory(parserFactory)
.withCatalogLoader(catalogLoader())
.withDatabase(database)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 0d9da6df1..cd8e34e9b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -100,7 +100,10 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
try {
catalog.createTable(identifier, schema, true);
} catch (Exception e) {
- LOG.error("create newly added paimon table
error.", e);
+ LOG.error(
+ "Cannot create newly added Paimon
table {}",
+ identifier.getFullName(),
+ e);
}
});
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 2d522355e..d4c630777 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -138,6 +138,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
.process(
new CdcDynamicTableParsingProcessFunction<>(
database, catalogLoader,
parserFactory))
+ .name("Side Output")
.setParallelism(input.getParallelism());
// for newly-added tables, create a multiplexing operator that handles
all their records
@@ -149,7 +150,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
SingleOutputStreamOperatorUtils.getSideOutput(
parsed,
CdcDynamicTableParsingProcessFunction.DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG)
- .process(new
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader));
+ .process(new
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
+ .name("Schema Evolution");
DataStream<CdcMultiplexRecord> partitioned =
partition(