Re: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
> On Nov 4, 2022, at 2:34 PM, 左岩 <13520871...@163.com> wrote: > > tenv.executeSql("xxx); > env.execute(); 这样使用是不对的,你可以看下这两个方法的java doc 祝好, Leonard
Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里 在 2022-11-07 10:11:56,"Shengkai Fang" 写道: >你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。 > >Best, >Shengkai > >左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道: > >> >> >> >> >> >> >> >> >> >> .print(); 去掉也不行, >> >> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 >> >> >> >> >> >> >> >> >> 在 2022-11-04 16:52:08,"yinghua...@163.com" 写道: >> >> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 >> >StatementSet statementSet = tenv.createStatementSet(); >> >statementSet.addInsertSql(sql1); >> >statementSet.addInsertSql(sql2); >> >TableResult result = statementSet.execute(); >> >result.getJobClient().get().getJobID().toString(); >> > >> > >> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 >> >// 查询 >> >tenv.executeSql("select * from flink_t_stu").print(); >> 这个任务给去掉 >> > >> > >> > >> >yinghua...@163.com >> > >> >发件人: 左岩 >> >发送时间: 2022-11-04 14:34 >> >收件人: user-zh >> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 >> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 >> >代码如下:控制台打印情况见附件 >> >public static void main(String[] args) throws Exception { >> >Configuration conf = new Configuration(); >> >conf.setInteger("rest.port", 10041); >> >StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(conf); >> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); >> > >> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); >> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); >> > >> >env.setParallelism(1); >> >// 建表 >> >tenv.executeSql("CREATE TABLE flink_t_stu ( " + >> >" userid INT, " + >> >" username string, " + >> >" age string, " + >> >" `partition` INT, " + >> >" PRIMARY KEY(userid) NOT ENFORCED " + >> >" ) WITH ( " + >> >" 'connector' = 'mysql-cdc', " + >> >" 'server-id' = '5401-5404', " + >> >" 'scan.startup.mode' = 'latest-offset', " + >> >//" 'scan.startup.mode' = 'earliest-offset', " + >> >" 'hostname' = '192.168.0.220', " + >> >" 'port' = '3306', " + >> >" 'username' = 'root', " + >> >" 'password' = 'root', " + >> >" 'database-name' = 'zy', " + >> >" 'table-name' = 't_stu' " + >> >")"); >> > >> >// 查询 >> >tenv.executeSql("select * from flink_t_stu").print(); >> > >> > >> >// 建一个目标表,用来存放查询结果 >> >tenv.executeSql( >> >"CREATE TABLE flink_t_stu2 ( " + >> >" userid INT, " + >> >" username string, " + >> >" age string, " + >> >" `partition` INT, " + >> >" PRIMARY KEY(userid) NOT ENFORCED " + >> >" ) WITH ( " + >> >" 'connector' = 'jdbc', " + >> >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + >> >" 'table-name' = 't_stu2', " + >> >" 'username' = 'root', " + >> >" 'password' = 'root' " + >> >")" >> >); >> > >> >tenv.executeSql("INSERT INTO flink_t_stu2 " + >> >"SELECT * FROM flink_t_stu"); >> >env.execute(); >> > >> >} >>
Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。 Best, Shengkai 左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道: > > > > > > > > > > .print(); 去掉也不行, > > 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 > > > > > > > > > 在 2022-11-04 16:52:08,"yinghua...@163.com" 写道: > > >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 > >StatementSet statementSet = tenv.createStatementSet(); > >statementSet.addInsertSql(sql1); > >statementSet.addInsertSql(sql2); > >TableResult result = statementSet.execute(); > >result.getJobClient().get().getJobID().toString(); > > > > > >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 > >// 查询 > >tenv.executeSql("select * from flink_t_stu").print(); > --------这个任务给去掉 > > > > > > > >yinghua...@163.com > > > >发件人: 左岩 > >发送时间: 2022-11-04 14:34 > >收件人: user-zh > >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 > >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 > >代码如下:控制台打印情况见附件 > >public static void main(String[] args) throws Exception { > >Configuration conf = new Configuration(); > >conf.setInteger("rest.port", 10041); > >StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > > > >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); > >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); > > > >env.setParallelism(1); > >// 建表 > >tenv.executeSql("CREATE TABLE flink_t_stu ( " + > >" userid INT, " + > >" username string, " + > >" age string, " + > >" `partition` INT, " + > >" PRIMARY KEY(userid) NOT ENFORCED " + > >" ) WITH ( " + > >" 'connector' = 'mysql-cdc', " + > >" 'server-id' = '5401-5404', " + > >" 'scan.startup.mode' = 'latest-offset', " + > >//" 'scan.startup.mode' = 'earliest-offset', " + > >" 'hostname' = '192.168.0.220', " + > >" 'port' = '3306', " + > >" 'username' = 'root', " + > >" 'password' = 'root', " + > >" 'database-name' = 'zy', " + > >" 'table-name' = 't_stu' " + > >")"); > > > >// 查询 > >tenv.executeSql("select * from flink_t_stu").print(); > > > > > >// 建一个目标表,用来存放查询结果 > >tenv.executeSql( > >"CREATE TABLE flink_t_stu2 ( " + > >" userid INT, " + > >" username string, " + > >" age string, " + > >" `partition` INT, " + > >" PRIMARY KEY(userid) NOT ENFORCED " + > >" ) WITH ( " + > >" 'connector' = 'jdbc', " + > >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + > >" 'table-name' = 't_stu2', " + > >" 'username' = 'root', " + > >" 'password' = 'root' " + > >")" > >); > > > >tenv.executeSql("INSERT INTO flink_t_stu2 " + > >"SELECT * FROM flink_t_stu"); > >env.execute(); > > > >} >
Re:回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
.print(); 去掉也不行, 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢 在 2022-11-04 16:52:08,"yinghua...@163.com" 写道: >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下 >StatementSet statementSet = tenv.createStatementSet(); >statementSet.addInsertSql(sql1); >statementSet.addInsertSql(sql2); >TableResult result = statementSet.execute(); >result.getJobClient().get().getJobID().toString(); > > >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。 >// 查询 >tenv.executeSql("select * from flink_t_stu").print(); >这个任务给去掉 > > > >yinghua...@163.com > >发件人: 左岩 >发送时间: 2022-11-04 14:34 >收件人: user-zh >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里 >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 >代码如下:控制台打印情况见附件 >public static void main(String[] args) throws Exception { >Configuration conf = new Configuration(); >conf.setInteger("rest.port", 10041); >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(conf); >StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); > >env.setParallelism(1); >// 建表 >tenv.executeSql("CREATE TABLE flink_t_stu ( " + >" userid INT, " + >" username string, " + >" age string, " + >" `partition` INT, " + >" PRIMARY KEY(userid) NOT ENFORCED " + >" ) WITH ( " + >" 'connector' = 'mysql-cdc', " + >" 'server-id' = '5401-5404', " + >" 'scan.startup.mode' = 'latest-offset', " + >//" 'scan.startup.mode' = 'earliest-offset', " + >" 'hostname' = '192.168.0.220', " + >" 'port' = '3306', " + >" 'username' = 'root', " + >" 'password' = 'root', " + >" 'database-name' = 'zy', " + >" 'table-name' = 't_stu' " + >")"); > >// 查询 >tenv.executeSql("select * from flink_t_stu").print(); > > >// 建一个目标表,用来存放查询结果 >tenv.executeSql( >"CREATE TABLE flink_t_stu2 ( " + >" userid INT, " + >" username string, " + >" age string, " + >" `partition` INT, " + >" PRIMARY KEY(userid) NOT ENFORCED " + >" ) WITH ( " + >" 'connector' = 'jdbc', " + >" 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + >" 'table-name' = 't_stu2', " + >" 'username' = 'root', " + >" 'password' = 'root' " + >")" >); > >tenv.executeSql("INSERT INTO flink_t_stu2 " + >"SELECT * FROM flink_t_stu"); >env.execute(); > >}
FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同 代码如下:控制台打印情况见附件 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInteger("rest.port", 10041); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); env.setParallelism(1); // 建表 tenv.executeSql("CREATE TABLE flink_t_stu ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'mysql-cdc', " + " 'server-id' = '5401-5404', " + " 'scan.startup.mode' = 'latest-offset', " + //" 'scan.startup.mode' = 'earliest-offset', " + " 'hostname' = '192.168.0.220', " + " 'port' = '3306', " + " 'username' = 'root', " + " 'password' = 'root', " + " 'database-name' = 'zy', " + " 'table-name' = 't_stu' " + ")"); // 查询 tenv.executeSql("select * from flink_t_stu").print(); // 建一个目标表,用来存放查询结果 tenv.executeSql( "CREATE TABLE flink_t_stu2 ( " + " userid INT, " + " username string, " + " age string, " + " `partition` INT, " + " PRIMARY KEY(userid) NOT ENFORCED " + " ) WITH ( " + " 'connector' = 'jdbc', " + " 'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " + " 'table-name' = 't_stu2', " + " 'username' = 'root', " + " 'password' = 'root' " + ")" ); tenv.executeSql("INSERT INTO flink_t_stu2 " + "SELECT * FROM flink_t_stu"); env.execute(); }