Hi Pawel

First of all, I don't think the akka timeout exception has relationship with 
checkpoint taking long time. And both RocksDBStateBackend and FsStateBackend 
could have the async part of checkpoint, which would upload data to DFS in 
general. That's why async part would take more time than sync part of 
checkpoint in most cases.

You could try to notice whether the checkpoint alignment time is much longer 
than before, back pressure of a job would cause tasks in downstream received 
checkpoint barrier later and tasks must receive all barriers from its inputs to 
trigger checkpoint [1]. If the long checkpoint alignment time mainly impact the 
overall checkpoint duration, you should check the tasks which cause back 
pressure.

Also, the long time of checkpoint might also be caused by the low write 
performance of DFS.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers
[https://ci.apache.org/projects/flink/flink-docs-release-1.6/fig/stream_barriers.svg]<https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers>

Apache Flink 1.6 Documentation: Data Streaming Fault 
Tolerance<https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers>
Apache Flink offers a fault tolerance mechanism to consistently recover the 
state of data streaming applications. The mechanism ensures that even in the 
presence of failures, the program’s state will eventually reflect every record 
from the data stream exactly once. Note that there is a switch to ...
ci.apache.org

Best
Yun Tang

________________________________
From: Pawel Bartoszek <pawelbartosze...@gmail.com>
Sent: Wednesday, October 24, 2018 23:11
To: User
Subject: Flink weird checkpointing behaviour

Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed 
that some checkpoints are taking a very long time to complete some of them 
event fails with exception

Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].

We have noticed that Checkpoint Duration (Async) is taking most of checkpoint 
time compared to Checkpoint Duration (Sync). I thought that Async checkpoints 
are only offered by RocksDB backend state. We use filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

Flink configuration

akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true

Reply via email to