Hi Juho, As Sihua said, this shouldn't happen and indicates a bug. Did you only encounter this once or can you easily reproduce the problem?
Best, Aljoscha > On 15. May 2018, at 05:57, sihua zhou <summerle...@163.com> wrote: > > Hi Juho, > in fact, from your code I can't see any possible that the MapState could be > inconsistency with the timer, it's looks like a bug to me, because once the > checkpoint's complete and you haven't query the state in a customer thread > async, then the result of the checkpoint should be consistency. The only > case, I can see where the timer could be inconsistency with state is when the > task is shutting down, that case the backend maybe already closed but the > timer failed to shutdown, so that the time callback function may access a > closed backend. But it shouldn't be reason of your case. Maybe, could you > please provide us more information, like what type of backend are you using? > are you using the RocksDBBackend? and I think @Stefan may tell more about > this, and please correct me if I'm incorrect. > > Best, > Sihua > > On 05/15/2018 01:48,Bowen Li<bowenl...@gmail.com> > <mailto:bowenl...@gmail.com> wrote: > Hi Juho, > > You are right, there's no transactional guarantee on timers and state in > processElement(). They may end up with inconsistency if your job was > cancelled in the middle of processing an element. > > To avoid the situation, the best programming practice is to always check if > the state you're trying to get is null or not. > > I've also created https://issues.apache.org/jira/browse/FLINK-9362 > <https://issues.apache.org/jira/browse/FLINK-9362> to document this. > > Thanks > Bowen > > > > On Mon, May 14, 2018 at 4:00 AM, Juho Autio <juho.au...@rovio.com > <mailto:juho.au...@rovio.com>> wrote: > We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old > state. After restoring state from a checkpoint, it seems like a timer had > been restored, but not the data that was expected to be in a related MapState > if such timer has been added. > > The way I see this is that there's a bug, either of these: > - The writing of timers & map states to Flink state is not synchronized (or > maybe there are no such guarantees by design?) > - Flink may restore a checkpoint that is actually corrupted/incomplete > > Our code (simplified): > > private MapState<String, String> mapState; > > public void processElement(..) { > mapState.put("lastUpdated", ctx.timestamp().toString()); > ctx.timerService().registerEventTimeTimer(ctx.timestamp() + > stateRetentionMillis); > } > > public void onTimer(long timestamp, OnTimerContext ctx, ..) { > long lastUpdated = Long.parseLong(mapState.get("lastUpdated")); > if (timestamp >= lastUpdated + stateRetentionMillis) { > mapState.clear(); > } > } > > Normally this "just works". As you can see, it shouldn't be possible that > "lastUpdated" doesn't exist in state if timer was registered and onTimer gets > called. > > However, after restoring state from a checkpoint, the job kept failing with > this error: > > Caused by: java.lang.NumberFormatException: null > at java.lang.Long.parseLong(Long.java:552) > at java.lang.Long.parseLong(Long.java:631) > at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136) > .. > > So apparently onTimer was called but lastUpdated wasn't found in the MapState. > > The background for restoring state in this case is not entirely clean. There > was an OS level issue "Too many open files" after running a job for ~11 days. > To fix that, we replaced the cluster with a new one and launched the Flink > job again. State was successfully restored from the latest checkpoint that > had been created by the "problematic execution". Now, I'm assuming that if > the state wouldn't have been created successfully, restoring wouldn't succeed > either – correct? This is just to rule out that the issue with state didn't > happen because the checkpoint files were somehow corrupted due to the Too > many open files problem. > > Thank you all for your continued support! > > P.S. I would be very much interested to hear if there's some cleaner way to > achieve this kind of TTL for keyed state in Flink. > >