大家好,我最近采用了flink cdc 对接mysql binlog ,每次重启或者停止job后,都是从表的第一条数据开始消费。 有做checkpoint和持久化,并且日志提示checkpoint成功,不知道为何重启应用始终是从头开始消费? 按照官方定义 .startupOptions(StartupOptions.initial()) 应该是历史+增量才对 flink 版本:1.12.2 flink cdc 版本:flink-sql-connector-mysql-cdc-1.4-SNAPSHOT.jar 相关核心代码: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(config.getProperty("stateBackend.path")).getCheckpointBackend()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.enableCheckpointing(10 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
DebeziumSourceFunction<String> sourceMilApplysLogStream = MySQLSource.<String>builder() .hostname(config.getProperty("datasource.db")) .port(Integer.parseInt(config.getProperty("datasource.port"))) .username(config.getProperty("datasource.username")) .password(config.getProperty("datasource.password")) .databaseList(config.getProperty("datasource.databaseList")) .tableList(config.getProperty("datasource.tableList")) .deserializer(new DebeziumDeserialization()) .serverId(Integer.parseInt(config.getProperty("datasource.server-id"))) .startupOptions(StartupOptions.initial()) .build();