Hi Juho,
in fact, from your code I can't see any possible that the MapState could be 
inconsistency with the timer, it's looks like a bug to me, because once the 
checkpoint's complete and you haven't query the state in a customer thread 
async, then the result of the checkpoint should be consistency. The only case, 
I can see where the timer could be inconsistency with state is when the task is 
shutting down, that case the backend maybe already closed but the timer failed 
to shutdown, so that the time callback function may access a closed backend. 
But it shouldn't be reason of your case. Maybe, could you please provide us 
more information, like what type of backend are you using? are you using the 
RocksDBBackend? and I think @Stefan may tell more about this, and please 
correct me if I'm incorrect.


Best,
Sihua


On 05/15/2018 01:48,Bowen Li<bowenl...@gmail.com> wrote:
Hi Juho,


You are right, there's no transactional guarantee on timers and state in 
processElement(). They may end up with inconsistency if your job was cancelled 
in the middle of processing an element.


To avoid the situation, the best programming practice is to always check if the 
state you're trying to get is null or not.


I've also created https://issues.apache.org/jira/browse/FLINK-9362 to document 
this. 


Thanks
Bowen






On Mon, May 14, 2018 at 4:00 AM, Juho Autio <juho.au...@rovio.com> wrote:

We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
state. After restoring state from a checkpoint, it seems like a timer had been 
restored, but not the data that was expected to be in a related MapState if 
such timer has been added.



The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or 
maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete


Our code (simplified):


    private MapState<String, String> mapState;


    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
stateRetentionMillis);
    }


    public void onTimer(long timestamp, OnTimerContext ctx, ..) {

        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }


Normally this "just works". As you can see, it shouldn't be possible that 
"lastUpdated" doesn't exist in state if timer was registered and onTimer gets 
called.


However, after restoring state from a checkpoint, the job kept failing with 
this error:


Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..


So apparently onTimer was called but lastUpdated wasn't found in the MapState.


The background for restoring state in this case is not entirely clean. There 
was an OS level issue "Too many open files" after running a job for ~11 days. 
To fix that, we replaced the cluster with a new one and launched the Flink job 
again. State was successfully restored from the latest checkpoint that had been 
created by the "problematic execution". Now, I'm assuming that if the state 
wouldn't have been created successfully, restoring wouldn't succeed either – 
correct? This is just to rule out that the issue with state didn't happen 
because the checkpoint files were somehow corrupted due to the Too many open 
files problem.



Thank you all for your continued support!


P.S. I would be very much interested to hear if there's some cleaner way to 
achieve this kind of TTL for keyed state in Flink.

Reply via email to