Yes, I'm rescaling from a checkpoint.

> that behavior is not guaranteed yet

If restoring + rescaling a checkpoint is not supported properly, I don't
understand why Flink doesn't entirely refuse to restore in that case?

Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be
exact, the package was built at flink commit 8395508b0401353ed07375e22882e7
581d46ac0e.

I also made this additional test:
- launch again from the checkpoint with the original parallelism=8
- cancel with savepoint (this was after ~7 minutes, during which the job
also created some new incremental checkpoints
- restore the newly created savepoint with parallelism=16
- it works

Now I have restored from the original checkpoint multiple times and it
consistently fails with that java.lang.NumberFormatException when trying
with parallelism=18.

I was able to enable trace-level logs with this change (nothing changed in
logback.xml):

Added these lines to flink-1.5-SNAPSHOT/conf/log4j.properties:

# To debug checkpoint restoring
log4j.logger.org.apache.flink.contrib.streaming.state=TRACE
log4j.logger.org.apache.flink.runtime.state=TRACE
log4j.logger.org.apache.flink.runtime.checkpoint=TRACE
log4j.logger.org.apache.flink.streaming.api.operators=TRACE
log4j.logger.org.apache.flink.streaming.runtime.tasks=TRACE

(This is confusing to me – I had understood that Flink would have migrated
entirely to use logback configuration instead of log4j?)

I modified the logs to remove sensitive information, but because I'm not
100% sure that I caught everything, I will share the logs personally with
Stefan only.

On Wed, May 16, 2018 at 5:22 AM, sihua zhou <summerle...@163.com> wrote:

> Hi Juho,
> if I'm not misunderstand, you saied your're rescaling the job from the
> checkpoint? If yes, I think that behavior is not guaranteed yet, you can
> find this on the doc https://ci.apache.org/proj
> ects/flink/flink-docs-release-1.4/ops/state/checkpoints.html
> #difference-to-savepoints. So, I not sure whether this is a "bug" at
> current stage(personally I'd like to dig it out because currently we also
> use the checkpoint like the way you are) ...
>
> Best, Sihua
>
> On 05/16/2018 01:46,Juho Autio<juho.au...@rovio.com>
> <juho.au...@rovio.com> wrote:
>
> I was able to reproduce this error.
>
> I just happened to notice an important detail about the original failure:
> - checkpoint was created with a 1-node cluster (parallelism=8)
> - restored on a 2-node cluster (parallelism=16), caused that null
> exception
>
> I tried restoring again from the problematic checkpoint again
> - restored on a 1-node cluster, no problems
> - restored on a 2-node cluster, getting the original error!
>
> So now I have a way to reproduce the bug. To me it seems like the
> checkpoint itself is fine. The bug seems to be in redistributing the state
> of a restored checkpoint to a higher parallelism. I only tested each
> cluster size once (as described above) so it could also be coincidence, but
> seems at least likely now that it's about the state redistribution.
>
> I'll try to follow up with those TRACE-level logs tomorrow. Today I tried
> adding these to the logback.xml, but I didn't get anything else but INFO
> level logs:
>
>     <logger name="org.apache.flink.contrib.streaming.state" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.runtime.state" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.runtime.checkpoint" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.streaming.api.operators" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>     <logger name="org.apache.flink.streaming.runtime.tasks" level="TRACE">
>         <appender-ref ref="file"/>
>     </logger>
>
> Maybe I need to edit the log4j.properties instead(?). Indeed it's Flink
> 1.5-SNAPSHOT and the package has all of these in the conf/ dir:
>
> log4j-cli.properties
> log4j-console.properties
> log4j.properties
> log4j-yarn-session.properties
> logback-console.xml
> logback.xml
> logback-yarn.xml
>
> On Tue, May 15, 2018 at 11:49 AM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> Am 15.05.2018 um 10:34 schrieb Juho Autio <juho.au...@rovio.com>:
>>
>> Ok, that should be possible to provide. Are there any specific packages
>> to set on trace level? Maybe just go with org.apache.flink.* on TRACE?
>>
>>
>> The following packages would be helpful:
>>
>> org.apache.flink.contrib.streaming.state.*
>> org.apache.flink.runtime.state.*
>> org.apache.flink.runtime.checkpoint.*
>> org.apache.flink.streaming.api.operators.*
>> org.apache.flink.streaming.runtime.tasks.*
>>
>>
>> > did the „too many open files“ problem only happen with local recovery
>> (asking since it should actually not add the the amount of open files)
>>
>> I think it happened in various places, maybe not when restoring.. Any way
>> if the situation is like that, the system is pretty much unusable (on OS
>> level), so it shouldn't matter too much which operation of the application
>> it causes to fail? Any way I'll try to grab & share all log lines that say
>> "Too Many Open Files"..
>>
>> > and did you deactivate it on the second cluster for the restart or
>> changed your OS settings?
>>
>> No, didn't change anything except for increasing the ulimit on OS to
>> prevent this from happening again. Note that the system only ran out of
>> files after ~11 days of uptime. During that time there had been some local
>> recoveries. This makes me wonder though, could it be that many local
>> recoveries eventually caused this – could it be that in the occasion of
>> local recovery some "old" files are left open, making the system eventually
>> run out of files?
>>
>>
>>
>> From the way how local recovery works with incremental RocksDB
>> checkpoints, I would not assume that it is the cause of the problem. In
>> this particular case, the number of opened files on a local FS should not
>> be higher than the number without local recovery. Maybe it is just a matter
>> of the OS limit and the number of operators with a RocksDB backend running
>> on the machine and the amount of files managed by all those RocksDB
>> instances that simply exceed the limit. If you have an overview how many
>> parallel operator instances with keyed state were running on the machine
>> and assume some reasonable number of files per RocksDB instance and the
>> limit configured in your OS, could that be the case?
>>
>>
>> Thanks!
>>
>>
>> Thanks for your help!
>>
>>
>> On Tue, May 15, 2018 at 11:17 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Btw having a trace level log of a restart from a problematic checkpoint
>>> could actually be helpful if we cannot find the problem from the previous
>>> points. This can give a more detailed view of what checkpoint files are
>>> mapped to which operator.
>>>
>>> I am having one more question: did the „too many open files“ problem
>>> only happen with local recovery (asking since it should actually not add
>>> the the amount of open files), and did you deactivate it on the second
>>> cluster for the restart or changed your OS settings?
>>>
>>>
>>> Am 15.05.2018 um 10:09 schrieb Stefan Richter <
>>> s.rich...@data-artisans.com>:
>>>
>>> What I would like to see from the logs is (also depending a bit on your
>>> log level):
>>>
>>> - all exceptions.
>>> - in which context exactly the „too many open files“ problem occurred,
>>> because I think for checkpoint consistency it should not matter as a
>>> checkpoint with such a problem should never succeed.
>>> - files that are written for checkpoints/savepoints.
>>> - completed checkpoints/savepoints ids.
>>> - the restored checkpoint/savepoint id.
>>> - files that are loaded on restore.
>>>
>>> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com>:
>>>
>>> Thanks all. I'll have to see about sharing the logs & configuration..
>>>
>>> Is there something special that you'd like to see from the logs? It may
>>> be easier for me to get specific lines and obfuscate sensitive information
>>> instead of trying to do that for the full logs.
>>>
>>> We basically have: RocksDBStateBackend with 
>>> enableIncrementalCheckpointing=true,
>>> external state path on s3.
>>>
>>> The code that we use is:
>>>
>>>             env.setStateBackend(getStateBackend(statePath, new
>>> RocksDBStateBackend(statePath, true)));
>>>             env.getCheckpointConfig().setMinPauseBetweenCheckpoints(para
>>> ms.getLong("checkpoint.minPause", 60 * 1000));
>>>             env.getCheckpointConfig().setMaxConcurrentCheckpoints(params
>>> .getInt("checkpoint.maxConcurrent", 1));
>>>             
>>> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>>> 10 * 60 * 1000));
>>>             env.getCheckpointConfig().enableExternalizedCheckpoints(Chec
>>> kpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>
>>> The problematic state that we tried to use was a checkpoint created with
>>> this conf.
>>>
>>> > Are you using the local recovery feature?
>>>
>>> Yes, and in this particular case the job was constantly
>>> failing/restarting because of Too Many Open Files. So we terminated the
>>> cluster entirely, created a new one, and launched a new job by specifying
>>> the latest checkpoint path to restore state from.
>>>
>>> This is the only time I have seen this error happen with timer state. I
>>> still have that bad checkpoint data on s3, so I might be able to try to
>>> restore it again if needed to debug it. But that would require some
>>> tweaking, because I don't want to tangle with the same kafka consumer group
>>> offsets or send old data again to production endpoint.
>>>
>>> Please keep in mind that there was that Too Many Open Files issue on the
>>> cluster that created the problematic checkpoint, if you think that's
>>> relevant.
>>>
>>> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I agree, this looks like a bug. Can you tell us your exact
>>>> configuration of the state backend, e.g. if you are using incremental
>>>> checkpoints or not. Are you using the local recovery feature? Are you
>>>> restarting the job from a checkpoint or a savepoint? Can you provide logs
>>>> for both the job that failed and the restarted job?
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>>
>>>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>
>>>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear
>>>> old state. After restoring state from a checkpoint, it seems like a timer
>>>> had been restored, but not the data that was expected to be in a related
>>>> MapState if such timer has been added.
>>>>
>>>> The way I see this is that there's a bug, either of these:
>>>> - The writing of timers & map states to Flink state is not synchronized
>>>> (or maybe there are no such guarantees by design?)
>>>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>>>>
>>>> Our code (simplified):
>>>>
>>>>     private MapState<String, String> mapState;
>>>>
>>>>     public void processElement(..) {
>>>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp()
>>>> + stateRetentionMillis);
>>>>     }
>>>>
>>>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>>>             mapState.clear();
>>>>         }
>>>>     }
>>>>
>>>> Normally this "just works". As you can see, it shouldn't be possible
>>>> that "lastUpdated" doesn't exist in state if timer was registered and
>>>> onTimer gets called.
>>>>
>>>> However, after restoring state from a checkpoint, the job kept failing
>>>> with this error:
>>>>
>>>> Caused by: java.lang.NumberFormatException: null
>>>> at java.lang.Long.parseLong(Long.java:552)
>>>> at java.lang.Long.parseLong(Long.java:631)
>>>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunct
>>>> ion.java:136)
>>>> ..
>>>>
>>>> So apparently onTimer was called but lastUpdated wasn't found in the
>>>> MapState.
>>>>
>>>> The background for restoring state in this case is not entirely clean.
>>>> There was an OS level issue "Too many open files" after running a job for
>>>> ~11 days. To fix that, we replaced the cluster with a new one and launched
>>>> the Flink job again. State was successfully restored from the latest
>>>> checkpoint that had been created by the "problematic execution". Now, I'm
>>>> assuming that if the state wouldn't have been created successfully,
>>>> restoring wouldn't succeed either – correct? This is just to rule out that
>>>> the issue with state didn't happen because the checkpoint files were
>>>> somehow corrupted due to the Too many open files problem.
>>>>
>>>> Thank you all for your continued support!
>>>>
>>>> P.S. I would be very much interested to hear if there's some cleaner
>>>> way to achieve this kind of TTL for keyed state in Flink.
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>
>

Reply via email to