Hi Robert,
increasing heap memory usage could be due to some memory leak in the user
code. Have you analyzed a heap dump? About the TM logs you shared. I don't
see anything suspicious there. Nothing about memory problems. Are those the
correct logs?

Best,
Matthias

On Thu, May 27, 2021 at 6:01 PM Jan Brusch <jan.bru...@neuland-bfi.de>
wrote:

> Hi Robert,
>
> that sounds like a case of either your application state ultimately being
> bigger than the available RAM or a memory leak in your application (e.g.,
> some states are not properly cleaned out after they are not needed anymore).
>
> If you have the available resources you could try and increase the
> TaskManager RAM size by a large amount and see where RAM usage tops out. If
> it ever does... in case of a memory leak it would grow indefinitely. Then
> you could reason about how to fix the memory leak or how to achieve your
> goal with a smaller application state.
>
> A remedy for application states larger than your available RAM is to use
> the RocksDB State backend, which allows for states larger than your
> application RAM. But that requires your kubernetes nodes to be equipped
> with a fast hard drive (SSD, optimally).
>
> That's how I would approach your problem...
>
>
> Hope that helps
>
> Jan
> On 27.05.21 17:51, Robert Cullen wrote:
>
> Hello Jan,
>
> My flink cluster is running on a kubernetes single node (rke). I have the
> JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The
> TaskManger reaches the max JVM Heap Size after about one hour then fails.
> Here is a snippet from the TaskManager log:
>
> 2021-05-27 15:36:36,040 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
> JobManager address, beginning registration
> 2021-05-27 15:36:36,041 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
> registration at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
> c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
> reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot 
> TaskSlot(index:2, state:ALLOCATED, resource profile: 
> ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb 
> (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb 
> (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 
> 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 
> c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
> 2021-05-27 15:36:36,042 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close 
> JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive 
> slot request 85433366f8bf1c5efd3b88f634676764 for job 
> c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 
> 00000000000000000000000000000000.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
> slot for 85433366f8bf1c5efd3b88f634676764.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
> c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
> 2021-05-27 15:36:36,043 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
> register at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 
> 00000000-0000-0000-0000-000000000000.
> 2021-05-27 15:36:36,044 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
> JobManager address, beginning registration
> 2021-05-27 15:36:36,045 INFO  
> org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
> registration at job manager 
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job 
> c5ff9686e944f62a24c10c6bf20a5a55.
>
> I guess the simple resolution is to increase the JVM Heap Size?
>
> On Thu, May 27, 2021 at 10:51 AM Jan Brusch <jan.bru...@neuland-bfi.de>
> wrote:
>
>> Hi Robert,
>>
>> do you have some additional info? For example the last log message of the
>> unreachable TaskManagers. Is the Job running in kubernetes? What backend
>> are you using?
>>
>> From the first looks of it, I have seen this behaviour mostly in cases
>> where one or more taskmanagers shut down due to GarbageCollection issues or
>> OutOfMemory-Errors.
>>
>>
>> Best regards
>>
>> Jan
>> On 27.05.21 16:44, Robert Cullen wrote:
>>
>> I have a job that fails after @1 hour due to a TaskManager Timeout. How
>> can I prevent this from happening?
>>
>> 2021-05-27 10:24:21
>> org.apache.flink.runtime.JobException: Recovery is suppressed by 
>> NoRestartBackoffTimeStrategy
>>     at 
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>>     at 
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>>     at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
>>     at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
>>     at 
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
>>     at 
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
>>     at 
>> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
>>     at 
>> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
>>     at 
>> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
>>     at 
>> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
>>     at 
>> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
>>     at 
>> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
>>     at 
>> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
>>     at 
>> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
>>     at 
>> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
>>     at 
>> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
>>     at 
>> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
>>     at 
>> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
>>     at 
>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
>>     at 
>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
>>     at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>     at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>>     at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>>     at 
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>>     at 
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>>     at akka.actor.Actor.aroundReceive(Actor.scala:517)
>>     at akka.actor.Actor.aroundReceive$(Actor.scala:515)
>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at 
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at 
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager 
>> with id 10.42.0.49:6122-e26293 timed out.
>>     at 
>> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
>>     ... 27 more
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>> --
>> neuland  – Büro für Informatik GmbH
>> Konsul-Smidt-Str. 8g, 28217 Bremen
>>
>> Telefon (0421) 380107 57
>> Fax (0421) 380107 99https://www.neuland-bfi.de
>> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>>
>>
>> Geschäftsführer: Thomas Gebauer, Jan Zander
>> Registergericht: Amtsgericht Bremen, HRB 23395 HB
>> USt-ID. DE 246585501
>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99https://www.neuland-bfi.de
> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>

Reply via email to