Hi Robert, increasing heap memory usage could be due to some memory leak in the user code. Have you analyzed a heap dump? About the TM logs you shared. I don't see anything suspicious there. Nothing about memory problems. Are those the correct logs?
Best, Matthias On Thu, May 27, 2021 at 6:01 PM Jan Brusch <jan.bru...@neuland-bfi.de> wrote: > Hi Robert, > > that sounds like a case of either your application state ultimately being > bigger than the available RAM or a memory leak in your application (e.g., > some states are not properly cleaned out after they are not needed anymore). > > If you have the available resources you could try and increase the > TaskManager RAM size by a large amount and see where RAM usage tops out. If > it ever does... in case of a memory leak it would grow indefinitely. Then > you could reason about how to fix the memory leak or how to achieve your > goal with a smaller application state. > > A remedy for application states larger than your available RAM is to use > the RocksDB State backend, which allows for states larger than your > application RAM. But that requires your kubernetes nodes to be equipped > with a fast hard drive (SSD, optimally). > > That's how I would approach your problem... > > > Hope that helps > > Jan > On 27.05.21 17:51, Robert Cullen wrote: > > Hello Jan, > > My flink cluster is running on a kubernetes single node (rke). I have the > JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The > TaskManger reaches the max JVM Heap Size after about one hour then fails. > Here is a snippet from the TaskManager log: > > 2021-05-27 15:36:36,040 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved > JobManager address, beginning registration > 2021-05-27 15:36:36,041 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful > registration at job manager > akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job > c5ff9686e944f62a24c10c6bf20a5a55. > 2021-05-27 15:36:36,042 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish > JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. > 2021-05-27 15:36:36,042 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer > reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55. > 2021-05-27 15:36:36,042 INFO > org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot > TaskSlot(index:2, state:ALLOCATED, resource profile: > ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb > (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb > (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: > 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55). > 2021-05-27 15:36:36,042 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job > c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring. > 2021-05-27 15:36:36,042 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close > JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55. > 2021-05-27 15:36:36,043 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive > slot request 85433366f8bf1c5efd3b88f634676764 for job > c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id > 00000000000000000000000000000000. > 2021-05-27 15:36:36,043 INFO > org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated > slot for 85433366f8bf1c5efd3b88f634676764. > 2021-05-27 15:36:36,043 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job > c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring. > 2021-05-27 15:36:36,043 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to > register at job manager > akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id > 00000000-0000-0000-0000-000000000000. > 2021-05-27 15:36:36,044 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved > JobManager address, beginning registration > 2021-05-27 15:36:36,045 INFO > org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful > registration at job manager > akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job > c5ff9686e944f62a24c10c6bf20a5a55. > > I guess the simple resolution is to increase the JVM Heap Size? > > On Thu, May 27, 2021 at 10:51 AM Jan Brusch <jan.bru...@neuland-bfi.de> > wrote: > >> Hi Robert, >> >> do you have some additional info? For example the last log message of the >> unreachable TaskManagers. Is the Job running in kubernetes? What backend >> are you using? >> >> From the first looks of it, I have seen this behaviour mostly in cases >> where one or more taskmanagers shut down due to GarbageCollection issues or >> OutOfMemory-Errors. >> >> >> Best regards >> >> Jan >> On 27.05.21 16:44, Robert Cullen wrote: >> >> I have a job that fails after @1 hour due to a TaskManager Timeout. How >> can I prevent this from happening? >> >> 2021-05-27 10:24:21 >> org.apache.flink.runtime.JobException: Recovery is suppressed by >> NoRestartBackoffTimeStrategy >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) >> at >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) >> at >> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) >> at >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) >> at >> org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) >> at >> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462) >> at >> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139) >> at >> org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079) >> at >> org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783) >> at >> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) >> at >> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) >> at >> org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) >> at >> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670) >> at >> java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) >> at >> java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) >> at >> org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) >> at >> org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) >> at >> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385) >> at >> org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361) >> at >> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249) >> at >> org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230) >> at >> org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497) >> at >> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295) >> at >> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) >> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) >> at akka.actor.Actor.aroundReceive(Actor.scala:517) >> at akka.actor.Actor.aroundReceive$(Actor.scala:515) >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager >> with id 10.42.0.49:6122-e26293 timed out. >> at >> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299) >> ... 27 more >> >> -- >> Robert Cullen >> 240-475-4490 >> >> -- >> neuland – Büro für Informatik GmbH >> Konsul-Smidt-Str. 8g, 28217 Bremen >> >> Telefon (0421) 380107 57 >> Fax (0421) 380107 99https://www.neuland-bfi.de >> https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi >> >> >> Geschäftsführer: Thomas Gebauer, Jan Zander >> Registergericht: Amtsgericht Bremen, HRB 23395 HB >> USt-ID. DE 246585501 >> >> > > -- > Robert Cullen > 240-475-4490 > > -- > neuland – Büro für Informatik GmbH > Konsul-Smidt-Str. 8g, 28217 Bremen > > Telefon (0421) 380107 57 > Fax (0421) 380107 99https://www.neuland-bfi.de > https://twitter.com/neulandhttps://facebook.com/neulandbfihttps://xing.com/company/neulandbfi > > > Geschäftsführer: Thomas Gebauer, Jan Zander > Registergericht: Amtsgericht Bremen, HRB 23395 HB > USt-ID. DE 246585501 > >