你好,DataStream 的方式需要设置 includeSchemaChanges(true) 参数,并且设置自定义的
deserializer,参考这个链接[1]。
如果不想使用 json 的方式,希望自定义 deserializer,从 SourceRecord 里提取 ddl
的方式可以参考这个链接[2]提供的方案。
[1]
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/faq/faq/#q6-i-want-to-get-ddl-events-in-the-database-what-should-i-d
Zapjone 好,
目前的 Schema Evolution 的实现依赖传递 CDC Event 事件的 Pipeline 连接器和框架。如果您希望插入自定义算子逻辑,建议参考
flink-cdc-composer 模块中的 FlinkPipelineComposer 类构建算子链作业的方式,并在其中插入自定义的 Operator
以实现您的业务逻辑。
另外,对于一些简单的处理逻辑,如果能够使用 YAML 作业的 Route(路由)、Transform(变换)功能表述的话,直接编写对应的 YAML
规则会更简单。
祝好!
Regards,
yux
De : zapjone
大佬们好:
想请假下,在flink
cdc3.0中支持schema变更,但看到是pipeline方式的,因业务问题需要使用datastream进行特殊处理,所以想请假下,在flink cdc
3.0中datastream api中怎么使用schema变更呢?或者相关文档呢?
Hi, 你可以试一下用statement set[1],将这个query同时写入到print sink中吗?
在tm日志里可以查看到print
sink的结果,看看里面有没有-D类型的数据。如果没有的话,证明是test_changelog源表可能就没有-D的数据;如果有的话,就需要后续进一步排查sink表在ds和sql上的行为差异。
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/#insert-into-multiple-tables
[2]