Flink CDC 社区有提供1.14支持的,2.2.1版本即可。你这个好像是没有开启checkpoint, 开启下就好了。 // enable checkpoint env.enableCheckpointing(1000);
祝好, Leonard > 2022年11月3日 上午11:34,左岩 <13520871...@163.com> 写道: > > 我用的是flink1.14 > ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件) > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setInteger("rest.port", 10041); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(conf); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > > env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); > > env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3"); > > // StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > > env.setParallelism(4); > > // 建表 > tenv.executeSql("CREATE TABLE flink_t_stu ( " + > " userid INT, " + > " username string, " + > " age string, " + > " `partition` INT, " + > " PRIMARY KEY(userid) NOT ENFORCED " + > " ) WITH ( " + > " 'connector' = 'mysql-cdc', " + > " 'server-id' = '5401-5404', " + > " 'hostname' = '192.168.0.220', " + > " 'port' = '3306', " + > " 'username' = 'root', " + > " 'password' = 'root', " + > " 'database-name' = 'zy', " + > " 'table-name' = 't_stu' " + > ")"); > > // 查询 > tenv.executeSql("select * from flink_t_stu").print(); > > env.execute(); > > } > <idealog2.txt>