感谢您的答复,今天我使用cancelWithSavepoint时也还是删除了,通过你的答复知道机制如此,我再使用另外的参数来观察下。



yinghua...@163.com
 
发件人: Yun Tang
发送时间: 2021-01-15 11:02
收件人: user-zh
主题: Re: 回复: 请教个Flink checkpoint的问题
Hi
 
这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with 
savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain 
checkpoint的数量为1而被subsume掉了,也就是被删掉了。
 
如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
 
另外说一句,即使是已经deprecated的cancel with 
savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
 
 
[1] https://issues.apache.org/jira/browse/FLINK-10354
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
 
祝好
唐云
________________________________
From: yinghua...@163.com <yinghua...@163.com>
Sent: Thursday, January 14, 2021 19:00
To: user-zh <user-zh@flink.apache.org>
Subject: 回复: 回复: 请教个Flink checkpoint的问题
 
好的,感谢您的回复!
 
 
 
yinghua...@163.com
 
发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:
 
If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention
 
如回答有误,请指正。
 
 
 
 
 
发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
    streamEnv.enableCheckpointing(5 * 60 * 1000);
    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
            
.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
    String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
    try {
      StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
      streamEnv.setStateBackend(rocksDBStateBackend);
 
 
 
yinghua...@163.com
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
yinghua...@163.com

回复