Re: 怎么使用flink内存替代redis

2020-01-19 文章 zhisheng
可以使用 ValueState、ListState、MapState 等存储

sun <1392427...@qq.com> 于2020年1月16日周四 下午5:56写道:

>
> 以前我们使用flink的时候,用到了redis,将redis作为缓存的中间件,存放一些缓存的数据,好像flink可以把这些缓存的数据放在flink内存里面,请问怎么使用flink内存来替代redis


Re: Flink 1.6, increment Checkpoint, the shared dir stored the last year checkpoint state

2020-01-19 文章 Yun Tang
Hi Lake

A more suitable place for this mail should be in user-mail list.

There are three reasons why this could happen:

  1.  This file is orphan file e.g. file uploaded during one checkpoint but 
task manager exited unexpectedly leave that checkpoint not completed.
  2.  This file should be removed by checkpoint coordinator but takes too long 
to complete before job shut down.
  3.  This file is still useful. This is possible in theory because some 
specific rocksDB sst file might not be selected during compactions for a long 
time.

Best
Yun Tang

From: LakeShen 
Sent: Sunday, January 19, 2020 18:55
To: user ; user-zh@flink.apache.org 
; dev 
Subject: Flink 1.6, increment Checkpoint, the shared dir stored the last year 
checkpoint state

Hi community,
now I have a flink sql job, and I set the flink sql sate retention 
time, there are three dir in flink checkpoint dir :
1. chk -xx dir
2. shared dir
3. taskowned dir

I find the shared dir store the last year checkpoint state,the only reason I 
thought is that the latest
checkpoint retain reference of last year checkpoint state file.
Are there any other reason to lead this? Or is it a bug?

Thanks to your replay.

Best wishes,
Lake Shen



Flink 1.6, increment Checkpoint, the shared dir stored the last year checkpoint state

2020-01-19 文章 LakeShen
Hi community,
now I have a flink sql job, and I set the flink sql sate retention
time, there are three dir in flink checkpoint dir :
1. chk -xx dir
2. shared dir
3. taskowned dir

I find the shared dir store the last year checkpoint state,the only reason
I thought is that the latest
checkpoint retain reference of last year checkpoint state file.
Are there any other reason to lead this? Or is it a bug?

Thanks to your replay.

Best wishes,
Lake Shen


Re: blink planner的org.apache.flink.table.api.ValidationException报错

2020-01-19 文章 Kevin Liao
改用最新 master 代码编译(打包后版本 1.11-SNAPSHOT)

将这段

.withSchema(new Schema()
//.field("logger_name", Types.STRING)
//.field("host", Types.STRING)
//.field("@timestamp", Types.SQL_TIMESTAMP)
//.field("_rowtime", Types.SQL_TIMESTAMP)
//.rowtime(
//new
Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
.field("doc", Types.POJO(Doc.class))
)


改成使用 DataTypes 后可以跑通


Kevin Liao  于2020年1月14日周二 上午11:52写道:

> 我用的是
> https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.11.tgz
> 官网下载的
>
> 您说的 master 最新的版本我稍后试一下,谢谢
>
> JingsongLee  于2020年1月14日周二 上午11:51写道:
>
>> 谢谢,
>> 你可以试下最新的1.9版本或是1.10或是master吗?因为这里修了一些bug,不确定还存在不。
>>
>> Best,
>> Jingsong Lee
>>
>> --
>> From:Kevin Liao 
>> Send Time:2020年1月14日(星期二) 11:38
>> To:user-zh ; JingsongLee <
>> lzljs3620...@aliyun.com>
>> Subject:Re: blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> flink 版本是 1.9.1 release
>>
>> Doc
>> 完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
>> 30 多个字段,我理解这跟字段数关系不大
>>
>> ```
>>
>> import org.apache.commons.lang3.builder.ToStringBuilder;
>> import 
>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
>>
>> /**
>>  * @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
>>  */
>> @JsonIgnoreProperties(ignoreUnknown = true)
>> public class Doc {
>>
>>   private String suv;
>>   private Float factor = 1F;
>>   private String st;
>>   private String agentId;
>>   private Long timestamp;
>>
>>   ... // omit some, omit getters and setters
>>
>> ```
>>
>> 希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
>>
>> JingsongLee  于2020年1月14日周二 上午11:25写道:
>> Hi Kevin,
>>
>> 这是什么版本?
>> Doc类能完整提供下吗?方便我们复现。
>>
>> Best,
>> Jingsong Lee
>>
>>
>> --
>> From:Kevin Liao 
>> Send Time:2020年1月13日(星期一) 17:37
>> To:user-zh 
>> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>>
>> tEnv.connect(new Kafka()
>> .version("universal")
>> .topic("xxx")
>> .startFromLatest()
>> .property("bootstrap.servers",
>> "")
>> .property("group.id", ""))
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema()
>> //.field("logger_name", Types.STRING)
>> //.field("host", Types.STRING)
>> //.field("@timestamp", Types.SQL_TIMESTAMP)
>> //.field("_rowtime", Types.SQL_TIMESTAMP)
>> //.rowtime(
>> //new
>>
>> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
>> .field("doc", Types.POJO(Doc.class))
>> )
>> .inAppendMode()
>> .registerTableSource("xxx");
>>
>> Table result = tEnv.sqlQuery(
>> "SELECT doc.xxx1, doc.xxx2,  ... , doc.xxxN as seq FROM xxx");
>>
>> //result.printSchema();
>> tEnv.toAppendStream(result,
>> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
>> STRING, STRING, STRING,
>> STRING, LONG, STRING, INT, STRING, INT)).print();
>>
>>
>>
>> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>>
>>
>> 、、、
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type
>> LEGACY(PojoType) of
>> table field 'doc' does not match with type
>> PojoType of the field
>> 'doc' of the TableSource return type.
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>>  at
>> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.s

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 Yun Tang
Hi

如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?
chk- 开头的目录,描述的是对应checkpoint独享的state,包括 operator 
state,rocksDB的非增量文件(例如rocksDB的VERSION等非SST文件)等等,所以只要新的checkpoint完成了,这些目录可以安全删除。

CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢
restoreLatestCheckpointedState 主要是给failover用,从checkpoint store获取last completed 
checkpoint进行恢复。
restoreSavepoint 是给作业启动时候用,从用户传入的Savepoint加载需要的state进行恢复。

祝好
唐云



From: LakeShen 
Sent: Sunday, January 19, 2020 16:48
To: user-zh@flink.apache.org 
Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢?

LakeShen  于2020年1月19日周日 下午4:30写道:

> 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
> 1. chk-id 的目录
> 2. shared 目录,其中状态非常大
> 3. taskowned
>
> 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
> chk-id目录。
> 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
> 占用的存储不大。下面是相关文件目录的大小:
> 1.3 Mhdfs:xxx/chk-94794
> 1.1 Thdfs:xxx/shared
> 0hdfs:xxx/taskowned
> 如果有什么理解错误,请指出,非常感谢。
>
> 祝好,
> 沈磊
>
> Yun Tang  于2020年1月19日周日 下午4:11写道:
>
>> Hi
>>
>> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
>> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
>> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
>> 开头的目录。
>>
>> 祝好
>> 唐云
>>
>> 
>> From: LakeShen 
>> Sent: Sunday, January 19, 2020 15:42
>> To: user-zh@flink.apache.org 
>> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>>
>> LakeShen  于2020年1月19日周日 下午3:30写道:
>>
>> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
>> > Savepoint 类似,如果不清理,就永久保留。
>> > 非常感谢
>> >
>> >
>> > Yun Tang  于2020年1月19日周日 下午2:06写道:
>> >
>> >> Hi
>> >>
>> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> >>
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>> >>
>> >> 希望这些解释能解答你的困惑
>> >>
>> >> [1]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> >> [2]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>> >>
>> >> 祝好
>> >> 唐云
>> >>
>> >>
>> >>
>> >>
>> >> 
>> >> From: LakeShen 
>> >> Sent: Friday, January 17, 2020 16:28
>> >> To: user-zh@flink.apache.org 
>> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>> >>
>> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
>> state.checkpoints.num-retained的数值时,会对之前的完成的
>> >> Checkpoint 状态做清理。
>> >>
>> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> >> Checkpoint 超时失败。
>> >>
>> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
>> state.checkpoints.num-retained又为1,完成的
>> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>> >>
>> >> 希望有大佬能帮我解惑,非常感谢
>> >>
>> >
>>
>


Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 LakeShen
另外,我的容错恢复是 Flink 自身行为 Checkpoint 的容错恢复,我看 CheckpointCoordinator 有两个方法:
restoreLatestCheckpointedState 和 restoreSavepoint,所以这两个方法有什么区别呢?

LakeShen  于2020年1月19日周日 下午4:30写道:

> 非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
> 1. chk-id 的目录
> 2. shared 目录,其中状态非常大
> 3. taskowned
>
> 我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
> chk-id目录。
> 如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
> 占用的存储不大。下面是相关文件目录的大小:
> 1.3 Mhdfs:xxx/chk-94794
> 1.1 Thdfs:xxx/shared
> 0hdfs:xxx/taskowned
> 如果有什么理解错误,请指出,非常感谢。
>
> 祝好,
> 沈磊
>
> Yun Tang  于2020年1月19日周日 下午4:11写道:
>
>> Hi
>>
>> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
>> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
>> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
>> 开头的目录。
>>
>> 祝好
>> 唐云
>>
>> 
>> From: LakeShen 
>> Sent: Sunday, January 19, 2020 15:42
>> To: user-zh@flink.apache.org 
>> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>>
>> LakeShen  于2020年1月19日周日 下午3:30写道:
>>
>> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
>> > Savepoint 类似,如果不清理,就永久保留。
>> > 非常感谢
>> >
>> >
>> > Yun Tang  于2020年1月19日周日 下午2:06写道:
>> >
>> >> Hi
>> >>
>> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> >>
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>> >>
>> >> 希望这些解释能解答你的困惑
>> >>
>> >> [1]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> >> [2]
>> >>
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>> >>
>> >> 祝好
>> >> 唐云
>> >>
>> >>
>> >>
>> >>
>> >> 
>> >> From: LakeShen 
>> >> Sent: Friday, January 17, 2020 16:28
>> >> To: user-zh@flink.apache.org 
>> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>> >>
>> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
>> state.checkpoints.num-retained的数值时,会对之前的完成的
>> >> Checkpoint 状态做清理。
>> >>
>> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> >> Checkpoint 超时失败。
>> >>
>> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
>> state.checkpoints.num-retained又为1,完成的
>> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>> >>
>> >> 希望有大佬能帮我解惑,非常感谢
>> >>
>> >
>>
>


Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 LakeShen
非常感谢唐云的答复,目前在每个 Job-id 的 Checkpoint 子目录,有三个目录:
1. chk-id 的目录
2. shared 目录,其中状态非常大
3. taskowned

我研究了一下源码,flink 算子 和 keystate 的 managed state 会上传到 shared 目录,raw state 会上传到
chk-id目录。
如果我只删除 chk-开头的目录,是不是在 shared 的 managed state 不会删除呢?因为我查看 HDFS 的 chk-id目录,
占用的存储不大。下面是相关文件目录的大小:
1.3 Mhdfs:xxx/chk-94794
1.1 Thdfs:xxx/shared
0hdfs:xxx/taskowned
如果有什么理解错误,请指出,非常感谢。

祝好,
沈磊

Yun Tang  于2020年1月19日周日 下午4:11写道:

> Hi
>
> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
> 如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk-
> 开头的目录。
>
> 祝好
> 唐云
>
> 
> From: LakeShen 
> Sent: Sunday, January 19, 2020 15:42
> To: user-zh@flink.apache.org 
> Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>
> 是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。
>
> LakeShen  于2020年1月19日周日 下午3:30写道:
>
> > Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> > Savepoint 类似,如果不清理,就永久保留。
> > 非常感谢
> >
> >
> > Yun Tang  于2020年1月19日周日 下午2:06写道:
> >
> >> Hi
> >>
> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
> >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
> >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
> >> 因此,加载的checkpoint被赋予了savepoint的property [2]。
> >> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
> >>
> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
> >>
> >> 希望这些解释能解答你的困惑
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
> >> [2]
> >>
> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
> >>
> >> 祝好
> >> 唐云
> >>
> >>
> >>
> >>
> >> 
> >> From: LakeShen 
> >> Sent: Friday, January 17, 2020 16:28
> >> To: user-zh@flink.apache.org 
> >> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
> >>
> >> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
> >> 我看了一下源码,发现当完成的 Checkpoint 数大于
> state.checkpoints.num-retained的数值时,会对之前的完成的
> >> Checkpoint 状态做清理。
> >>
> >> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
> >> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
> >> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
> >> Checkpoint 超时失败。
> >>
> >> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是
> state.checkpoints.num-retained又为1,完成的
> >> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
> >> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
> >>
> >> 希望有大佬能帮我解惑,非常感谢
> >>
> >
>


Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 Yun Tang
Hi

目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。
如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。
如果开启了增量checkpoint:在确保当前作业的checkpoint有最新完成的情况下,可以直接删除 job-id 目录下的 chk- 开头的目录。

祝好
唐云


From: LakeShen 
Sent: Sunday, January 19, 2020 15:42
To: user-zh@flink.apache.org 
Subject: Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

是否有办法清理掉这种容错恢复,之前的 Checkpoint 状态呢,现在集群的 HDFS 存储占用较多。

LakeShen  于2020年1月19日周日 下午3:30写道:

> Hi 唐云 ,非常感谢你的回答,我有个疑问,那从之前的 Checkpoint 状态恢复,之后 Flink 还会对这些状态进行清理吗,是否和
> Savepoint 类似,如果不清理,就永久保留。
> 非常感谢
>
>
> Yun Tang  于2020年1月19日周日 下午2:06写道:
>
>> Hi
>>
>> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by
>> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码
>> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。
>> 因此,加载的checkpoint被赋予了savepoint的property [2]。
>> 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed
>> 属性是false,也就是当新的checkpoint完成时,在subsume阶段这个旧的checkpoint不会被discard掉,所以你restored的chk-94040是一直保留的。
>>
>> 希望这些解释能解答你的困惑
>>
>> [1]
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1141
>> [2]
>> https://github.com/apache/flink/blob/ee3101a075f681501fbc8c7cc4119476d497e5f3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L214
>>
>> 祝好
>> 唐云
>>
>>
>>
>>
>> 
>> From: LakeShen 
>> Sent: Friday, January 17, 2020 16:28
>> To: user-zh@flink.apache.org 
>> Subject: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理
>>
>> 大家好,我现在有个任务,状态比较大,使用的增量的 Checkpoint,Flink 1.6 版本,默认的 Checkpoint 保留数为1,
>> 我看了一下源码,发现当完成的 Checkpoint 数大于 state.checkpoints.num-retained的数值时,会对之前的完成的
>> Checkpoint 状态做清理。
>>
>> 当时我早上八点40多任务 Checkpoint 成功,当时的 chk-id 为
>> 94040,然后到下午15点之间,Checkpoint都是超时失败的,然后15点容错恢复,从
>> chk-94040 恢复(八点40的状态),最新的 chk-id 为 94080。94080,94081,94082都是成功的,后面接着任务
>> Checkpoint 超时失败。
>>
>> 但此时 早上八点40多的任务的状态(也就是94040)还是没有清理,但是 state.checkpoints.num-retained又为1,完成的
>> Checkpoint 数大于1,所以我的理解,应该会清理掉 94040(早上八点40的状态),但是实际没有清理,状态文件还在 HDFS
>> 上面。这是为什么呢,难道说状态容错恢复,不会清理之前的状态吗?
>>
>> 希望有大佬能帮我解惑,非常感谢
>>
>