Re: flink cdc 3.0 schema变更问题

2024-06-12 文章 Yanquan Lv
你好,DataStream 的方式需要设置 includeSchemaChanges(true) 参数,并且设置自定义的 deserializer,参考这个链接[1]。 如果不想使用 json 的方式,希望自定义 deserializer,从 SourceRecord 里提取 ddl 的方式可以参考这个链接[2]提供的方案。 [1] https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/faq/faq/#q6-i-want-to-get-ddl-events-in-the-database-what-should-i-d

Re: flink cdc 3.0 schema变更问题

2024-06-12 文章 Xiqian YU
Zapjone 好, 目前的 Schema Evolution 的实现依赖传递 CDC Event 事件的 Pipeline 连接器和框架。如果您希望插入自定义算子逻辑,建议参考 flink-cdc-composer 模块中的 FlinkPipelineComposer 类构建算子链作业的方式,并在其中插入自定义的 Operator 以实现您的业务逻辑。 另外,对于一些简单的处理逻辑,如果能够使用 YAML 作业的 Route(路由)、Transform(变换)功能表述的话,直接编写对应的 YAML 规则会更简单。 祝好! Regards, yux De : zapjone

flink cdc 3.0 schema变更问题

2024-06-12 文章 zapjone
大佬们好: 想请假下,在flink cdc3.0中支持schema变更,但看到是pipeline方式的,因业务问题需要使用datastream进行特殊处理,所以想请假下,在flink cdc 3.0中datastream api中怎么使用schema变更呢?或者相关文档呢?

Re:changelogstream删除问题

2024-06-12 文章 Xuyang
Hi, 你可以试一下用statement set[1],将这个query同时写入到print sink中吗? 在tm日志里可以查看到print sink的结果,看看里面有没有-D类型的数据。如果没有的话,证明是test_changelog源表可能就没有-D的数据;如果有的话,就需要后续进一步排查sink表在ds和sql上的行为差异。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/#insert-into-multiple-tables [2]