.print(); 去掉也不行,   
跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢








在 2022-11-04 16:52:08,"yinghua...@163.com" <yinghua...@163.com> 写道:
>你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>    StatementSet statementSet = tenv.createStatementSet();
>    statementSet.addInsertSql(sql1);
>    statementSet.addInsertSql(sql2);
>    TableResult result = statementSet.execute();
>    result.getJobClient().get().getJobID().toString();
>
>
>或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();   
>--------------------这个任务给去掉
>
>
>
>yinghua...@163.com
> 
>发件人: 左岩
>发送时间: 2022-11-04 14:34
>收件人: user-zh
>主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>代码如下:控制台打印情况见附件
>public static void main(String[] args) throws Exception {
>Configuration conf = new Configuration();
>conf.setInteger("rest.port", 10041);
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment(conf);
>StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
>env.setParallelism(1);
>// 建表
>tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>"      userid INT, " +
>"      username string, " +
>"      age string, " +
>"      `partition` INT, " +
>"     PRIMARY KEY(userid) NOT ENFORCED " +
>"     ) WITH ( " +
>"     'connector' = 'mysql-cdc', " +
>"     'server-id' = '5401-5404', " +
>"     'scan.startup.mode' = 'latest-offset', " +
>//                "     'scan.startup.mode' = 'earliest-offset', " +
>"     'hostname' = '192.168.0.220', " +
>"     'port' = '3306', " +
>"     'username' = 'root', " +
>"     'password' = 'root', " +
>"     'database-name' = 'zy', " +
>"     'table-name' = 't_stu' " +
>")");
> 
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
> 
> 
>// 建一个目标表,用来存放查询结果
>tenv.executeSql(
>"CREATE TABLE flink_t_stu2 ( " +
>"      userid INT, " +
>"      username string, " +
>"      age string, " +
>"      `partition` INT, " +
>"     PRIMARY KEY(userid) NOT ENFORCED " +
>"     ) WITH ( " +
>"  'connector' = 'jdbc', " +
>"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>"  'table-name' = 't_stu2', " +
>"  'username' = 'root', " +
>"  'password' = 'root'  " +
>")"
>);
> 
>tenv.executeSql("INSERT INTO flink_t_stu2 " +
>"SELECT * FROM flink_t_stu");
>env.execute();
> 
>    }

回复