.print(); 去掉也不行,
跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
在 2022-11-04 16:52:08,"yinghua...@163.com" <yinghua...@163.com> 写道:
>你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
> StatementSet statementSet = tenv.createStatementSet();
> statementSet.addInsertSql(sql1);
> statementSet.addInsertSql(sql2);
> TableResult result = statementSet.execute();
> result.getJobClient().get().getJobID().toString();
>
>
>或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
>--------------------这个任务给去掉
>
>
>
>yinghua...@163.com
>
>发件人: 左岩
>发送时间: 2022-11-04 14:34
>收件人: user-zh
>主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>代码如下:控制台打印情况见附件
>public static void main(String[] args) throws Exception {
>Configuration conf = new Configuration();
>conf.setInteger("rest.port", 10041);
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment(conf);
>StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>
>env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>
>env.setParallelism(1);
>// 建表
>tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>" userid INT, " +
>" username string, " +
>" age string, " +
>" `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>" 'connector' = 'mysql-cdc', " +
>" 'server-id' = '5401-5404', " +
>" 'scan.startup.mode' = 'latest-offset', " +
>// " 'scan.startup.mode' = 'earliest-offset', " +
>" 'hostname' = '192.168.0.220', " +
>" 'port' = '3306', " +
>" 'username' = 'root', " +
>" 'password' = 'root', " +
>" 'database-name' = 'zy', " +
>" 'table-name' = 't_stu' " +
>")");
>
>// 查询
>tenv.executeSql("select * from flink_t_stu").print();
>
>
>// 建一个目标表,用来存放查询结果
>tenv.executeSql(
>"CREATE TABLE flink_t_stu2 ( " +
>" userid INT, " +
>" username string, " +
>" age string, " +
>" `partition` INT, " +
>" PRIMARY KEY(userid) NOT ENFORCED " +
>" ) WITH ( " +
>" 'connector' = 'jdbc', " +
>" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>" 'table-name' = 't_stu2', " +
>" 'username' = 'root', " +
>" 'password' = 'root' " +
>")"
>);
>
>tenv.executeSql("INSERT INTO flink_t_stu2 " +
>"SELECT * FROM flink_t_stu");
>env.execute();
>
> }