Flink CDC 社区有提供1.14支持的,2.2.1版本即可。你这个好像是没有开启checkpoint, 开启下就好了。
// enable checkpoint
env.enableCheckpointing(1000);


祝好,
Leonard

> 2022年11月3日 上午11:34,左岩 <13520871...@163.com> 写道:
> 
> 我用的是flink1.14 
> ,因为官方没有匹配的版本,所以自己编译的flinkCDC,binlog也开启了,然后也没报错,读不到mysql的数据,idea控制台不报错也不输出数据,可能是什么原因呢(运行日志见附件)
> public static void main(String[] args) throws Exception {
>         Configuration conf = new Configuration();
>         conf.setInteger("rest.port", 10041);
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>         StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>         env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
>         
> env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
> 
> //        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> 
>         env.setParallelism(4);
> 
>         // 建表
>         tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>                 "      userid INT, " +
>                 "      username string, " +
>                 "      age string, " +
>                 "      `partition` INT, " +
>                 "     PRIMARY KEY(userid) NOT ENFORCED " +
>                 "     ) WITH ( " +
>                 "     'connector' = 'mysql-cdc', " +
>                 "     'server-id' = '5401-5404', " +
>                 "     'hostname' = '192.168.0.220', " +
>                 "     'port' = '3306', " +
>                 "     'username' = 'root', " +
>                 "     'password' = 'root', " +
>                 "     'database-name' = 'zy', " +
>                 "     'table-name' = 't_stu' " +
>                 ")");
> 
>         // 查询
>         tenv.executeSql("select * from flink_t_stu").print();
> 
>         env.execute();
> 
>     }
> <idealog2.txt>

回复