大佬们好:
   我使用datastream 
api进行实时读取mysql数据时,通过tableEnv.fromChangelogStream将datastram转换成了变更表,在使用sql将变更表数据写入数据湖中。
 
但经过测试,insert、update都可以正常实现,但delete无法实现删除操作,使用sql进行测试时,可以实现删除操作。(因有些逻辑需要api操作,就没有使用sql方式实现)。
代码:
StreamExecutionEnvironment env = ...;
StreamTableEnvironment tableEnv = ...;
MySqlSource<String> mysqlSource = MysqlSouce.builder()....build();
SingleOutputStramOperator<Row> datastream = 
env.fromSource(mysqlSource).flatMap();// 手动解析信息,并转换为Row类型,带有RowKind
tableEnv.createTemporaryView("test_changelog",tableEnv.fromChangelogStream(datastream));
tableEnv.executeSql("insert into xxx select * from test_changelog");



回复