flink用的自己的序列化机制。从chk恢复的时候,在open方法里会进行状态数据的注入。
按我的理解,该transient标记有没有都可以从chk恢复,但一般加入transient可以明确只有open方法中的数据注入这一种方法。 至于不加上transient是否可能产生其他影响,就不太清楚了。 范超 <fanc...@mgtv.com> 于2020年9月10日周四 上午9:35写道: > Transient 都不参与序列化了,怎么可能从checkopont里恢复? > > -----邮件原件----- > 发件人: Yun Tang [mailto:myas...@live.com] > 发送时间: 2020年9月7日 星期一 12:50 > 收件人: user-zh@flink.apache.org > 主题: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 > > Hi > > 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1] > > 可以排查的思路 > > 1. 你的state是否开启了TTL呢 > 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 > 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 > > [1] > https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 > > 祝好 > 唐云 > ________________________________ > From: Liu Rising <stockholm...@gmail.com> > Sent: Sunday, September 6, 2020 17:45 > To: user-zh@flink.apache.org <user-zh@flink.apache.org> > Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 > > Hi 唐云 > > 以下是state定义以及初始化的code > > public class FlinkKeyedProcessFunction extends > KeyedProcessFunction<String, Tuple2<String, ObjectNode>, Tuple2<String, > JsonNode>> { > > private static final Logger LOG = > LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); > > ... > > private final ParameterTool params; > private transient ListState<ObjectNode> unmatchedProbesState; > > ... > > FlinkKeyedProcessFunction(ParameterTool params) { > this.params = params; > } > > @Override > public void open(Configuration parameters) { > > ListStateDescriptor<ObjectNode> descriptor = new > ListStateDescriptor<>( > "unmatchedProbes", TypeInformation.of(ObjectNode.class) > ); > unmatchedProbesState = > getRuntimeContext().getListState(descriptor); > > 以下是往state里add内容的部分 > ... > > List<ObjectNode> unmatchedProbes = > mapMatching.getUnMatchedProbes(id); > unmatchedProbesState.clear(); > > if (unmatchedProbes.size() > 0) { > try { > unmatchedProbesState.addAll(unmatchedProbes); > } catch (Exception e) { > LOG.warn("Continue processing although failed to add > unmatchedProbes to ListState. ID: " + id, e); > } > } > > ... > > 以下是从state读取的code > > for (ObjectNode unmatchedProbe : > unmatchedProbesState.get()) { > LOG.info("Processing unmatched probe: " + > unmatchedProbe); > matchedValues.addAll(mapMatching.matchLocation(id, > unmatchedProbe)); > } > > > 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) > 去掉定义state那里的transient之后,上述问题不再出现。 > > 谢谢。 > Rising > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >