答复: Flink Datastream实现删除操作

2024-06-04 文章 Xiqian YU
您好,

Iceberg 为 Flink 实现的 connector 同时支持 DataStream API 和 Table API[1]。其 DataStream 
API 提供 Append(默认行为)、Overwrite、Upsert 三种可选的模式,您可以使用下面的 Java 代码片段实现:

首先创建对应数据行 Schema 格式的反序列化器,例如,可以使用 RowDataDebeziumDeserializeSchema 的生成器来快速构造一个:


private RowDataDebeziumDeserializeSchema getDeserializer(
DataType dataType) {
LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType);
return RowDataDebeziumDeserializeSchema.newBuilder()
.setPhysicalRowType((RowType) dataType.getLogicalType())
.setResultTypeInfo(typeInfo)
.build();
}

然后,您可以使用该反序列化器创建 MySQL 数据源:

MySqlSource mySqlSource =
MySqlSource.builder()
// 其他参数配置略
.deserializer(getDeserializer({{ ROW_DATA_TYPE_HERE }}))
.build();

并创建一个 Iceberg 数据源:

Configuration hadoopConf = new Configuration();
TableLoader tableLoader = 
TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(mysqlSource)
.tableLoader(tableLoader)
// 此处可以追加 .overwrite(true) 或 .upsert(true)
// 来配置 Overwrite 或 Upsert 行为
.append();

P.S. 在接下来的 Flink CDC 版本中,预计会为 3.0 版本新增的 Pipeline 作业[2]提供写入 Iceberg 
的能力,使用起来更方便快捷。如果能够满足您的需求,也请多多尝试。

祝好!

Regards,
yux

[1] https://iceberg.apache.org/docs/1.5.2/flink-writes/#writing-with-datastream
[2] 
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/get-started/introduction/



发件人: zapjone 
日期: 星期二, 2024年6月4日 18:34
收件人: user-zh@flink.apache.org 
主题: Flink Datastream实现删除操作
各位大佬好:
想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream 
api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?


Flink Datastream实现删除操作

2024-06-04 文章 zapjone
各位大佬好:
想请教下,在使用mysql-cdc到iceberg,通过sql方式可以实现自动更新和删除功能。但在使用datastream 
api进行处理后,注册成临时表,怎么实现类似于sql方式的自动更新和删除呢?