Hi,

This Task Manager log is suggesting that problems lays on the Job Manager side 
(no visible gap in the logs, GC Time reported is accumulated and 31 seconds 
accumulated over 963 gc collections is low value). Could you show the Job 
Manager log itself? Probably it’s the own that’s causing the TaskManager to 
timeout.

On the other hand, I see that Task Manager max heap size is ~5GB and I assume 
this is the same setting for the Job manager. A Stefan pointed out, there is 
some memory overhead on the Job Manager for retaining the checkpoint and it is 
around couple of hundred bytes (maybe even 1KB) per operator instance. By doing 
quick math:

2880 checkpoints * 10 task managers * 10 operators in the job * 8 parallelism 
per task manager * 500 bytes = ~1GB

The answer might be that you just need to increase the Job Manager max heap to 
retain 2880 checkpoints.

Piotrek

> On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez 
> <jose.fernan...@rovio.com> wrote:
> 
> Hi,
> 
> I wonder what reason you might have that you ever want such a huge number of 
> retained checkpoints? 
> 
> The Flink jobs running on EMR cluster require a checkpoint at midnight. (In 
> our use case we need to synch a loaded delta to our a third party partner 
> with the streamed data). The delta load the whole day data and that's why we 
> wanted to have available the midnight's checkpoint to start from there.
> We could also make a savepoint at midnight, but it’s not as handy (we would 
> need to build our own tooling to do it), and it can’t benefit from the 
> smaller latency of an incremental checkpoint. Another thining is that 
> implementing our own savepoint tool is a bit hard to monitor. Besides, 
> retaining several having checkpoints created every minute is that it would 
> also allow us to load a delta at any time. Please, if there are better ways 
> of achieving this, let me know.
> 
> From where does the log trace come from?  
> 
> It comes from the TaskManager.  
> 
> Please search on the opposite side of the time outing connection for possible 
> root cause of the timeout including:
> - possible error/exceptions/warnings
> - long GC pauses or other blocking operations (possibly long unnatural gaps 
> in the logs)
> - machine health (CPU usage, disks usage, network connections)
> 
> It seems that TaskManager disconnect from JobManager and then cannot reach it 
> again and I cannot tell the reason. I think machine health metrics mentioned 
> above seems to be OK. Would you say Direct memory stats usage is correct? 
> What is the way to check the GC pauses?
> Those are some traces from the TaskManager log, before/after it detached from 
> JobManager
> 
> 2018-01-08 22:26:37,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Garbage 
> collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS 
> MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage 
> stats: [HEAP: 868/5597/5597 MB, NON HEAP: 116/119/-1 MB (used/committed/max)]
> 2018-01-08 22:26:42,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Direct memory 
> stats: Count: 100, Total Capacity: 29942814, Used Memory: 29942815
> 2018-01-08 22:26:42,263 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap pool 
> stats: [Code Cache: 42/42/240 MB (used/committed/max)], [Metaspace: 66/68/-1 
> MB (used/committed/max)], [Compressed Class Space: 8/8/1024 MB 
> (used/committed/max)]
> 2018-01-08 22:26:42,264 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Garbage 
> collector stats: [PS Scavenge, GC TIME (ms): 31476, GC COUNT: 923], [PS 
> MarkSweep, GC TIME (ms): 10999, GC COUNT: 36]
> 2018-01-08 22:26:42,999 WARN  akka.remote.RemoteWatcher                       
>               - Detected unreachable: 
> [akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341 
> <http://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/>]
> 2018-01-08 22:26:43,034 INFO  org.apache.flink.yarn.YarnTaskManager           
>               - TaskManager akka://flink/user/taskmanager disconnects from 
> JobManager 
> akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager
>  
> <http://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: 
> JobManager is no longer reachable
> 2018-01-08 22:26:43,035 INFO  org.apache.flink.yarn.YarnTaskManager           
>               - Cancelling all computations and discarding all cached data.
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Sink: Discarded events 
> (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,037 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Sink: Discarded events (4/4) 
> (50b6fc8908a4b13dbbe73f4686beda7d) switched from RUNNING to FAILED.
> java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects 
> from JobManager 
> akka.tcp://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager
>  
> <http://fl...@ip-10-1-51-209.cloud-internal.rovio.com:35341/user/jobmanager>: 
> JobManager is no longer reachable
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1095)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:311)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>       at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:120)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>       at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>       at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-01-08 22:26:43,069 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Triggering cancellation of task code Sink: Discarded events 
> (4/4) (50b6fc8908a4b13dbbe73f4686beda7d).
> 2018-01-08 22:26:43,087 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Sink: CounterSink (async 
> call completed) (3/4) (b9f2b35e1f9822320cded759c2daea1e).
> 
> 
> José Miguel Tejedor Fernández
> Server developer
> jose.fernan...@rovio.com <mailto:jose.fernan...@rovio.com>
> Rovio Entertainment Ltd.
> Keilaranta 7, FIN - 02150 Espoo, Finland
> www.rovio.com <http://www.rovio.com/>
> 
> 
> On Wed, Jan 10, 2018 at 10:50 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> there is no known limitation in the strict sense, but you might run out of 
> dfs space or job manager memory if you keep around a huge number checkpoints. 
> I wonder what reason you might have that you ever want such a huge number of 
> retained checkpoints? Usually keeping one checkpoint should do the job, maybe 
> a couple more if you are very afraid about corruption that goes beyond your 
> DFSs capabilities to handle it. Is there any reason for that or maybe a 
> misconception about increasing the number of retained checkpoints is good for?
> 
> Best,
> Stefan 
> 
> 
>> Am 10.01.2018 um 08:54 schrieb Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>>:
>> 
>> Hi,
>> 
>> Increasing akka’s timeouts is rarely a solution for any problems - it either 
>> do not help, or just mask the issue making it less visible. But yes, it is 
>> possible to bump the limits: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#distributed-coordination-via-akka>
>> 
>> I don’t think that state.checkpoints.num-retained was thought to handle such 
>> large numbers of retained checkpoint so maybe there are some known/unknown 
>> limitations. Stefan, do you know something in this regard?
>> 
>> Parallel thing to do is that like for any other akka timeout, you should 
>> track down the root cause of it. This one warning line doesn’t tell much. 
>> From where does it come from? Client log? Job manager log? Task manager log? 
>> Please search on the opposite side of the time outing connection for 
>> possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural gaps 
>> in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>> 
>> Piotrek
>> 
>>> On 9 Jan 2018, at 16:38, Jose Miguel Tejedor Fernandez 
>>> <jose.fernan...@rovio.com <mailto:jose.fernan...@rovio.com>> wrote:
>>> 
>>> Hello,
>>> 
>>> I have several stream jobs running (v. 1.3.1 ) in production which always 
>>> fails after a fixed period of around 30h after being executing. That's the 
>>> WARN trace before failing:
>>> 
>>> Association with remote system 
>>> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
>>> <http://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876/>] has failed, 
>>> address is now gated for [5000] ms. Reason: [Association failed with 
>>> [akka.tcp://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876 
>>> <http://fl...@ip-10-1-51-134.cloud-internal.acme.com:39876/>]] Caused by: 
>>> [No response from remote for outbound association. Handshake timed out 
>>> after [20000 ms].
>>> 
>>> The main change done in the job configuration was to increase the 
>>> state.checkpoints.num-retained from 1 to 2880. I am using asynchronous 
>>> RocksDB to persists to snapshot the state. (I attach some screenshots with 
>>> the  checkpoint conf from webUI)
>>> 
>>> May my assumption be correct that the increase of checkpoints.num-retained 
>>> is causing the problem? Any known issue regarding this?
>>> Besides, Is there any way to increase the Akka handshake timeout from the 
>>> current 20000 ms to a higher value? I considered that it may be convenient 
>>> to increase the timeout to 1 minute instead.
>>> 
>>> BR
>>> 
>>> 
>>> <Screen Shot 2018-01-09 at 17.35.25.png><Screen Shot 2018-01-09 at 
>>> 17.35.18.png><Screen Shot 2018-01-09 at 17.35.00.png>
>> 
> 
> 

Reply via email to