Btw having a trace level log of a restart from a problematic checkpoint could 
actually be helpful if we cannot find the problem from the previous points. 
This can give a more detailed view of what checkpoint files are mapped to which 
operator.

I am having one more question: did the „too many open files“ problem only 
happen with local recovery (asking since it should actually not add the the 
amount of open files), and did you deactivate it on the second cluster for the 
restart or changed your OS settings?

> Am 15.05.2018 um 10:09 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> 
> What I would like to see from the logs is (also depending a bit on your log 
> level):
> 
> - all exceptions.
> - in which context exactly the „too many open files“ problem occurred, 
> because I think for checkpoint consistency it should not matter as a 
> checkpoint with such a problem should never succeed.
> - files that are written for checkpoints/savepoints.
> - completed checkpoints/savepoints ids.
> - the restored checkpoint/savepoint id.
> - files that are loaded on restore.
> 
>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>>:
>> 
>> Thanks all. I'll have to see about sharing the logs & configuration..
>> 
>> Is there something special that you'd like to see from the logs? It may be 
>> easier for me to get specific lines and obfuscate sensitive information 
>> instead of trying to do that for the full logs.
>> 
>> We basically have: RocksDBStateBackend with 
>> enableIncrementalCheckpointing=true, external state path on s3.
>> 
>> The code that we use is:
>> 
>>             env.setStateBackend(getStateBackend(statePath, new 
>> RocksDBStateBackend(statePath, true)));
>>             
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
>>  60 * 1000));
>>             
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
>>  1));
>>             
>> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>>  10 * 60 * 1000));
>>             
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> 
>> The problematic state that we tried to use was a checkpoint created with 
>> this conf.
>> 
>> > Are you using the local recovery feature?
>> 
>> Yes, and in this particular case the job was constantly failing/restarting 
>> because of Too Many Open Files. So we terminated the cluster entirely, 
>> created a new one, and launched a new job by specifying the latest 
>> checkpoint path to restore state from.
>> 
>> This is the only time I have seen this error happen with timer state. I 
>> still have that bad checkpoint data on s3, so I might be able to try to 
>> restore it again if needed to debug it. But that would require some 
>> tweaking, because I don't want to tangle with the same kafka consumer group 
>> offsets or send old data again to production endpoint.
>> 
>> Please keep in mind that there was that Too Many Open Files issue on the 
>> cluster that created the problematic checkpoint, if you think that's 
>> relevant.
>> 
>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter 
>> <s.rich...@data-artisans.com <mailto:s.rich...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I agree, this looks like a bug. Can you tell us your exact configuration of 
>> the state backend, e.g. if you are using incremental checkpoints or not. Are 
>> you using the local recovery feature? Are you restarting the job from a 
>> checkpoint or a savepoint? Can you provide logs for both the job that failed 
>> and the restarted job?
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com 
>>> <mailto:juho.au...@rovio.com>>:
>>> 
>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
>>> state. After restoring state from a checkpoint, it seems like a timer had 
>>> been restored, but not the data that was expected to be in a related 
>>> MapState if such timer has been added.
>>> 
>>> The way I see this is that there's a bug, either of these:
>>> - The writing of timers & map states to Flink state is not synchronized (or 
>>> maybe there are no such guarantees by design?)
>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>> 
>>> Our code (simplified):
>>> 
>>>     private MapState<String, String> mapState;
>>> 
>>>     public void processElement(..) {
>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
>>> stateRetentionMillis);
>>>     }
>>> 
>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>             mapState.clear();
>>>         }
>>>     }
>>> 
>>> Normally this "just works". As you can see, it shouldn't be possible that 
>>> "lastUpdated" doesn't exist in state if timer was registered and onTimer 
>>> gets called.
>>> 
>>> However, after restoring state from a checkpoint, the job kept failing with 
>>> this error:
>>> 
>>> Caused by: java.lang.NumberFormatException: null
>>> at java.lang.Long.parseLong(Long.java:552)
>>> at java.lang.Long.parseLong(Long.java:631)
>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
>>> ..
>>> 
>>> So apparently onTimer was called but lastUpdated wasn't found in the 
>>> MapState.
>>> 
>>> The background for restoring state in this case is not entirely clean. 
>>> There was an OS level issue "Too many open files" after running a job for 
>>> ~11 days. To fix that, we replaced the cluster with a new one and launched 
>>> the Flink job again. State was successfully restored from the latest 
>>> checkpoint that had been created by the "problematic execution". Now, I'm 
>>> assuming that if the state wouldn't have been created successfully, 
>>> restoring wouldn't succeed either – correct? This is just to rule out that 
>>> the issue with state didn't happen because the checkpoint files were 
>>> somehow corrupted due to the Too many open files problem.
>>> 
>>> Thank you all for your continued support!
>>> 
>>> P.S. I would be very much interested to hear if there's some cleaner way to 
>>> achieve this kind of TTL for keyed state in Flink.
>> 
>> 
>> 
> 

Reply via email to