
Good that we are more or less on track with this problem :) But the problem 
here is not that heap size is too small, bot that your kernel is running out of 
memory and starts killing processes. Either:

1. some other process is using the available memory 
2. Increase memory allocation on your machine/virtual machine/container/cgroup
3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
memory buffer pool). Of course for any given job/state 
size/configuration/cluster size there is some minimal reasonable memory size 
that you have to assign to Flink, otherwise you will have poor performance 
and/or constant garbage collections and/or you will start getting OOM errors 
from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a 
different level).


> On 14 Aug 2018, at 07:36, Shailesh Jain <shailesh.j...@stellapps.com> wrote:
> Hi Piotrek,
> Thanks for your reply. I checked through the syslogs for that time, and I see 
> this:
> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
> process 2305 (java) score 468 or sacrifice child
> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) 
> total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
> As you pointed out, kernel killed the task manager process.
> If I had already set the max heap size for the JVM (to 3GB in this case), and 
> the memory usage stats showed 2329MB being used 90 seconds earlier, it seems 
> a bit unlikely for operators to consume 700 MB heap space in that short time, 
> because our events ingestion rate is not that high (close to 10 events per 
> minute).
> 2018-08-08 13:19:23,341 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage 
> stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
> Is it possible to log individual operator's memory consumption? This would 
> help in narrowing down on the root cause. There were around 50 operators 
> running (~8 kafka source/sink, ~8 Window operators, and the rest CEP 
> operators).
> Thanks,
> Shailesh
> On Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> Please post full TaskManager logs, including stderr and stdout. (Have you 
> checked the stderr/stdout for some messages?)
> I could think of couple reasons:
> 1. process segfault
> 2. process killed by OS
> 3. OS failure
> 1. Should be visible by some message in stderr/stdout file and can be caused 
> by for example JVM, RocksDB or some other native library/code bug. 
> 2. Is your system maybe running out of memory? Kernel might kill process if 
> that’s happening. You can also check system (linux?) logs for errors that 
> correlate in time. Where are those logs depend on your OS. 
> 3. This might be tricky, but I have seen kernel failures that prevented any 
> messages from being logged for example. Besides this TaskManager failure is 
> your machine operating normally without any other problems/crashes/restarts?
> Piotrek
>> On 10 Aug 2018, at 06:59, Shailesh Jain <shailesh.j...@stellapps.com 
>> <mailto:shailesh.j...@stellapps.com>> wrote:
>> Hi,
>> I hit a similar issue yesterday, the task manager died suspiciously, no 
>> error logs in the task manager logs, but I see the following exceptions in 
>> the job manager logs:
>> 2018-08-05 18:03:28,322 ERROR akka.remote.Remoting                           
>>                - Association to [akka.tcp://flink@localhost:34483 <>] with 
>> UID [328996232] irrecoverably failed. Quarantining address.
>> java.util.concurrent.TimeoutException: Remote system has been silent for too 
>> long. (more than 48.0 hours)
>>         at 
>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at 
>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> but almost 3 days later it hit this:
>> 2018-08-08 13:22:00,061 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal 
>> state machine job (1057c13d169dae609466210174e2cc8b) switched from state 
>> java.lang.Exception: TaskManager was lost/killed: 
>> 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
>>         at 
>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>         at 
>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>         at 
>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>         at 
>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>         at 
>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>         at org.apache.flink.runtime.jobmanager.JobManager.org 
>> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>         at 
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>         at 
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>         at 
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>         at 
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>         at 
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>         at 
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>         at 
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>         at 
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>         at 
>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at 
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> followed by:
>> 2018-08-08 13:22:20,090 INFO  
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Internal 
>> state machine job (1057c13d169dae609466210174e2cc8b) switched from state 
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
>> Not enough free slots available to run the job. You can decrease the 
>> operator parallelism or increase the number of slots per TaskManager in the 
>> configuration. Task to schedule: < Attempt #2 (Source: Custom Source -> 
>> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < 
>> fbd084243e87c3fdf3c709a0f2eecfd7 > in sharing group < SlotSharingGroup 
>> [fa00013ef15454ea93d21e8c346e0dd4, fbd084243e87c3fdf3c709a0f2eecfd7, 
>> 8f5517c035f67da702f459ef5f3b849f] >. Resources available to scheduler: 
>> Number of instances=0, total number of slots=0, available slots=0
>>         at 
>> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
>>         at 
>> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
>>         at 
>> org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
>>         at 
>> java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
>>         at 
>> java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
>>         at 
>> org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
>>         at 
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
>>         at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
>>         at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
>>         at 
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
>>         at 
>> org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>>         at 
>> org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>>         at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>         at java.lang.Thread.run(Thread.java:748)
>> There are no error logs in task manager, and following is the last memory 
>> consumption log by task manager:
>> 2018-08-08 13:19:23,341 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager              - Memory usage 
>> stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB 
>> (used/committed/max)]
>> 2018-08-08 13:19:23,341 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager              - Direct 
>> memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
>> 2018-08-08 13:19:23,341 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager              - Off-heap 
>> pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace: 
>> 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB 
>> (used/committed/max)]
>> 2018-08-08 13:19:23,341 INFO  
>> org.apache.flink.runtime.taskmanager.TaskManager              - Garbage 
>> collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT: 
>> 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2]
>> So I think it rules out OOM as a cause for this crash.
>> Any ideas/leads to debug this would be really helpful. The cluster is 
>> running on version 1.4.2.
>> Thanks,
>> Shailesh
>> On Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov 
>> <alexander.smirn...@gmail.com <mailto:alexander.smirn...@gmail.com>> wrote:
>> Hi Piotr,
>> I didn't find anything special in the logs before the failure. 
>> Here are the logs, please take a look:
>> https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing
>> <https://drive.google.com/drive/folders/1zlUDMpbO9xZjjJzf28lUX-bkn_x7QV59?usp=sharing>
>> The configuration is:
>> 3 task managers:
>> qafdsflinkw011.scl 
>> qafdsflinkw012.scl 
>> qafdsflinkw013.scl - lost connection
>> 3 job  managers:
>> qafdsflinkm011.scl - the leader
>> qafdsflinkm012.scl 
>> qafdsflinkm013.scl 
>> 3 zookeepers:
>> qafdsflinkzk011.scl
>> qafdsflinkzk012.scl
>> qafdsflinkzk013.scl
>> Thank you,
>> Alex
>> On Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> Does the issue really happen after 48 hours? 
>> Is there some indication of a failure in TaskManager log?
>> If you will be still unable to solve the problem, please provide full 
>> TaskManager and JobManager logs.
>> Piotrek
>>> On 21 Mar 2018, at 16:00, Alexander Smirnov <alexander.smirn...@gmail.com 
>>> <mailto:alexander.smirn...@gmail.com>> wrote:
>>> One more question - I see a lot of line like the following in the logs
>>> [2018-03-21 00:30:35,975] ERROR Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] 
>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,208] WARN Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:41068 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:41068/>] with unknown UID is 
>>> irrecoverably failed. Address cannot be quarantined without knowing the 
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,235] WARN Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40677 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:40677/>] with unknown UID is 
>>> irrecoverably failed. Address cannot be quarantined without knowing the 
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,256] WARN Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:40382 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:40382/>] with unknown UID is 
>>> irrecoverably failed. Address cannot be quarantined without knowing the 
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,256] WARN Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:44744 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:44744/>] with unknown UID is 
>>> irrecoverably failed. Address cannot be quarantined without knowing the 
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> [2018-03-21 00:34:15,266] WARN Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:42413/>] with unknown UID is 
>>> irrecoverably failed. Address cannot be quarantined without knowing the 
>>> UID, gating instead for 5000 ms. (akka.remote.Remoting)
>>> The host is available, but I don't understand where port number comes from. 
>>> Task Manager uses another port (which is printed in logs on startup)
>>> Could you please help to understand why it happens?
>>> Thank you,
>>> Alex
>>> On Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov 
>>> <alexander.smirn...@gmail.com <mailto:alexander.smirn...@gmail.com>> wrote:
>>> Hello,
>>> I've assembled a standalone cluster of 3 task managers and 3 job 
>>> managers(and 3 ZK) following the instructions at 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html>
>>>  and 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/jobmanager_high_availability.html>
>>> It works ok, but randomly, task managers becomes unavailable. JobManager 
>>> has exception like below in logs:
>>> [2018-03-19 00:33:10,211] WARN Association with remote system 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:42413/>] has failed, address 
>>> is now gated for [5000] ms. Reason: [Association failed with 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:42413 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:42413/>]] Caused by: 
>>> [Connection refused: qafdsflinkw811.nn.five9lab.com/ 
>>> <http://qafdsflinkw811.nn.five9lab.com/>] 
>>> (akka.remote.ReliableDeliverySupervisor)
>>> [2018-03-21 00:30:35,975] ERROR Association to 
>>> [akka.tcp://fl...@qafdsflinkw811.nn.five9lab.com:35320 
>>> <http://fl...@qafdsflinkw811.nn.five9lab.com:35320/>] with UID [1500204560] 
>>> irrecoverably failed. Quarantining address. (akka.remote.Remoting)
>>> java.util.concurrent.TimeoutException: Remote system has been silent for 
>>> too long. (more than 48.0 hours)
>>>         at 
>>> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>         at 
>>> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>         at 
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>         at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>         at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>         at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> I can't find a reason for this exception, any ideas?
>>> Thank you,
>>> Alex

Reply via email to