flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。

按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。
至于不加上transient是否可能产生其他影响,就不太清楚了。

范超 <fanc...@mgtv.com> 于2020年9月10日周四 上午9:35写道:

> Transient 都不参与序列化了,怎么可能从checkopont里恢复?
>
> -----邮件原件-----
> 发件人: Yun Tang [mailto:myas...@live.com]
> 发送时间: 2020年9月7日 星期一 12:50
> 收件人: user-zh@flink.apache.org
> 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
>
> Hi
>
> 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]
>
> 可以排查的思路
>
>   1.  你的state是否开启了TTL呢
>   2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
>   3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么
>
> [1]
> https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158
>
> 祝好
> 唐云
> ________________________________
> From: Liu Rising <stockholm...@gmail.com>
> Sent: Sunday, September 6, 2020 17:45
> To: user-zh@flink.apache.org <user-zh@flink.apache.org>
> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取
>
> Hi 唐云
>
> 以下是state定义以及初始化的code
>
> public class FlinkKeyedProcessFunction extends
> KeyedProcessFunction<String, Tuple2&lt;String, ObjectNode>, Tuple2<String,
> JsonNode>> {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);
>
> ...
>
>     private final ParameterTool params;
>     private transient ListState<ObjectNode> unmatchedProbesState;
>
>     ...
>
>     FlinkKeyedProcessFunction(ParameterTool params) {
>         this.params = params;
>     }
>
>     @Override
>     public void open(Configuration parameters) {
>
>         ListStateDescriptor<ObjectNode> descriptor = new
> ListStateDescriptor<>(
>                 "unmatchedProbes", TypeInformation.of(ObjectNode.class)
>         );
>         unmatchedProbesState =
> getRuntimeContext().getListState(descriptor);
>
> 以下是往state里add内容的部分
> ...
>
>         List<ObjectNode> unmatchedProbes =
> mapMatching.getUnMatchedProbes(id);
>         unmatchedProbesState.clear();
>
>         if (unmatchedProbes.size() > 0) {
>             try {
>                 unmatchedProbesState.addAll(unmatchedProbes);
>             } catch (Exception e) {
>                 LOG.warn("Continue processing although failed to add
> unmatchedProbes to ListState. ID: " + id, e);
>             }
>         }
>
>        ...
>
> 以下是从state读取的code
>
>                     for (ObjectNode unmatchedProbe :
> unmatchedProbesState.get()) {
>                         LOG.info("Processing unmatched probe: " +
> unmatchedProbe);
>                         matchedValues.addAll(mapMatching.matchLocation(id,
> unmatchedProbe));
>                     }
>
>
> 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
> 去掉定义state那里的transient之后,上述问题不再出现。
>
> 谢谢。
> Rising
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复