CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里

















在 2022-11-07 10:11:56,"Shengkai Fang" <fskm...@gmail.com> 写道:
>你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> .print(); 去掉也不行,
>>  
>> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-04 16:52:08,"yinghua...@163.com" <yinghua...@163.com> 写道:
>>
>> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>> >    StatementSet statementSet = tenv.createStatementSet();
>> >    statementSet.addInsertSql(sql1);
>> >    statementSet.addInsertSql(sql2);
>> >    TableResult result = statementSet.execute();
>> >    result.getJobClient().get().getJobID().toString();
>> >
>> >
>> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>>  --------------------这个任务给去掉
>> >
>> >
>> >
>> >yinghua...@163.com
>> >
>> >发件人: 左岩
>> >发送时间: 2022-11-04 14:34
>> >收件人: user-zh
>> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>> >代码如下:控制台打印情况见附件
>> >public static void main(String[] args) throws Exception {
>> >Configuration conf = new Configuration();
>> >conf.setInteger("rest.port", 10041);
>> >StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> >
>> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>> >
>> >env.setParallelism(1);
>> >// 建表
>> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>> >"      userid INT, " +
>> >"      username string, " +
>> >"      age string, " +
>> >"      `partition` INT, " +
>> >"     PRIMARY KEY(userid) NOT ENFORCED " +
>> >"     ) WITH ( " +
>> >"     'connector' = 'mysql-cdc', " +
>> >"     'server-id' = '5401-5404', " +
>> >"     'scan.startup.mode' = 'latest-offset', " +
>> >//                "     'scan.startup.mode' = 'earliest-offset', " +
>> >"     'hostname' = '192.168.0.220', " +
>> >"     'port' = '3306', " +
>> >"     'username' = 'root', " +
>> >"     'password' = 'root', " +
>> >"     'database-name' = 'zy', " +
>> >"     'table-name' = 't_stu' " +
>> >")");
>> >
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> >
>> >
>> >// 建一个目标表,用来存放查询结果
>> >tenv.executeSql(
>> >"CREATE TABLE flink_t_stu2 ( " +
>> >"      userid INT, " +
>> >"      username string, " +
>> >"      age string, " +
>> >"      `partition` INT, " +
>> >"     PRIMARY KEY(userid) NOT ENFORCED " +
>> >"     ) WITH ( " +
>> >"  'connector' = 'jdbc', " +
>> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>> >"  'table-name' = 't_stu2', " +
>> >"  'username' = 'root', " +
>> >"  'password' = 'root'  " +
>> >")"
>> >);
>> >
>> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
>> >"SELECT * FROM flink_t_stu");
>> >env.execute();
>> >
>> >    }
>>

回复