大家好,我最近采用了flink cdc 对接mysql binlog ,每次重启或者停止job后,都是从表的第一条数据开始消费。
有做checkpoint和持久化,并且日志提示checkpoint成功,不知道为何重启应用始终是从头开始消费?
按照官方定义
.startupOptions(StartupOptions.initial()) 应该是历史+增量才对
flink 版本:1.12.2
flink cdc  版本:flink-sql-connector-mysql-cdc-1.4-SNAPSHOT.jar
相关核心代码:
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(new 
RocksDBStateBackend(config.getProperty("stateBackend.path")).getCheckpointBackend());
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.enableCheckpointing(10 * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


DebeziumSourceFunction<String> sourceMilApplysLogStream = 
MySQLSource.<String>builder()
                .hostname(config.getProperty("datasource.db"))
                .port(Integer.parseInt(config.getProperty("datasource.port")))
                .username(config.getProperty("datasource.username"))
                .password(config.getProperty("datasource.password"))
                .databaseList(config.getProperty("datasource.databaseList"))
                .tableList(config.getProperty("datasource.tableList"))
                .deserializer(new DebeziumDeserialization())
                
.serverId(Integer.parseInt(config.getProperty("datasource.server-id")))
                .startupOptions(StartupOptions.initial())
                .build();



回复