Hi, 你可以试一下用statement set[1],将这个query同时写入到print sink中吗?
在tm日志里可以查看到print sink的结果,看看里面有没有-D类型的数据。如果没有的话,证明是test_changelog源表可能就没有-D的数据;如果有的话,就需要后续进一步排查sink表在ds和sql上的行为差异。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/#insert-into-multiple-tables [2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/print/ -- Best! Xuyang 在 2024-06-11 10:39:10,"zapjone" <zapj...@163.com> 写道: >大佬们好: > 我使用datastream > api进行实时读取mysql数据时,通过tableEnv.fromChangelogStream将datastram转换成了变更表,在使用sql将变更表数据写入数据湖中。 > 但经过测试,insert、update都可以正常实现,但delete无法实现删除操作,使用sql进行测试时,可以实现删除操作。(因有些逻辑需要api操作,就没有使用sql方式实现)。 >代码: >StreamExecutionEnvironment env = ...; >StreamTableEnvironment tableEnv = ...; >MySqlSource<String> mysqlSource = MysqlSouce.builder()....build(); >SingleOutputStramOperator<Row> datastream = >env.fromSource(mysqlSource).flatMap();// 手动解析信息,并转换为Row类型,带有RowKind >tableEnv.createTemporaryView("test_changelog",tableEnv.fromChangelogStream(datastream)); >tableEnv.executeSql("insert into xxx select * from test_changelog"); > > >