Hi, 你可以试一下用statement set[1],将这个query同时写入到print sink中吗?




在tm日志里可以查看到print 
sink的结果,看看里面有没有-D类型的数据。如果没有的话,证明是test_changelog源表可能就没有-D的数据;如果有的话,就需要后续进一步排查sink表在ds和sql上的行为差异。







[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/#insert-into-multiple-tables

[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/print/




--

    Best!
    Xuyang





在 2024-06-11 10:39:10,"zapjone" <zapj...@163.com> 写道:
>大佬们好:
>   我使用datastream 
> api进行实时读取mysql数据时,通过tableEnv.fromChangelogStream将datastram转换成了变更表,在使用sql将变更表数据写入数据湖中。
> 但经过测试,insert、update都可以正常实现,但delete无法实现删除操作,使用sql进行测试时,可以实现删除操作。(因有些逻辑需要api操作,就没有使用sql方式实现)。
>代码:
>StreamExecutionEnvironment env = ...;
>StreamTableEnvironment tableEnv = ...;
>MySqlSource<String> mysqlSource = MysqlSouce.builder()....build();
>SingleOutputStramOperator<Row> datastream = 
>env.fromSource(mysqlSource).flatMap();// 手动解析信息,并转换为Row类型,带有RowKind
>tableEnv.createTemporaryView("test_changelog",tableEnv.fromChangelogStream(datastream));
>tableEnv.executeSql("insert into xxx select * from test_changelog");
>
>
>

回复