hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。
目前有一份作业,开启checkpoint, cancel 后重新启动,发现数据会丢失1小部分。 1. flink.conf execution.checkpointing.interval: 10000 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.checkpoints-after-tasks-finish.enabled: true state.backend: filesystem state.checkpoints.dir: hdfs://******:8020/flink/checkpoints state.savepoints.dir: hdfs://****:8020/flink/savepoints 2. source table CREATE TABLE source_kafka_nginxlog ( ts BIGINT, ...... pt AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = 'nginxlog', -- 有将flink 1.15针对的补丁(FLINK-24697)打上 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'latest', 'properties.bootstrap.servers' = '***:9092', 'properties.group.id' = 'zep', 'format'='json' ); 3. sink table CREATE TABLE sink_kafka_nginxlog_statistic ( ts BIGINT, ...... clt_rq BIGINT not null ) WITH ( 'connector' = 'kafka', 'topic' = 'nginxlog-statistic-flink', 'sink.parallelism' = '20', 'sink.delivery-guarantee' = 'exactly-once', 'sink.transactional-id-prefix' = 'nginxlog-statistic-flink', 'properties.transaction.timeout.ms' = '3600000', 'scan.startup.mode' = 'group-offsets', 'properties.auto.offset.reset' = 'latest', 'properties.bootstrap.servers' = '***:9092', 'properties.group.id' = 'zep', 'value.format' = 'csv' ) | | Jerry Guo | | wangyixuhongm...@163.com |