Hi Till, Thanks for your support on task manager, in continuation to above email when we increased the TM and JM memory to run the job with increased parallelism but the job is getting failed in *one day* with below exception.
Logs : *org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.* New session configuration is below and parallelism we provided -*p 28* to job to get more TM to process the heavy load. *sudo flink-yarn-session -Djobmanager.memory.process.size=16000m -Dtaskmanager.memory.process.size=40000m -s 14 -d* *Note:- * we have Retry Logic mentioned below : environment.setRestartStrategy(RestartStrategies.failureRateRestart(5, // max failures per interval Time.of(30, TimeUnit.MINUTES), // time interval for measuring failure rate Time.of(60, TimeUnit.SECONDS) // delay )); *Below is the exception which I am getting:* org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=1800000,backoffTimeMS=60000,maxFailuresPerInterval=5) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:49) at org.apache.flink.runtime.executiongraph.ExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(ExecutionGraph.java:1710) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1287) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1255) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:954) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:173) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:165) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$SingleTaskSlot.release(SlotSharingManager.java:732) at org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager$MultiTaskSlot.release(SlotSharingManager.java:537) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:149) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.tryFailingAllocatedSlot(SlotPoolImpl.java:733) at org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl.failAllocation(SlotPoolImpl.java:713) at org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:562) at org.apache.flink.runtime.jobmaster.JobMaster.notifyAllocationFailure(JobMaster.java:700) at sun.reflect.GeneratedMethodAccessor99.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Container released on a *lost* node at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ... 22 more 2021-01-10 20:07:48,974 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job a7cffc31c4aeb01356c5132c908be314. 2021-01-10 20:07:48,974 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down 2021-01-10 20:07:48,978 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job a7cffc31c4aeb01356c5132c908be314 reached globally terminal state FAILED. 2021-01-10 20:07:49,006 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Gas Job Runner V2(a7cffc31c4aeb01356c5132c908be314). 2021-01-10 20:07:49,006 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool. 2021-01-10 20:07:49,007 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 39a33f865ba12bac16dd21b834527750: JobManager is shutting down.. 2021-01-10 20:07:49,007 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool. 2021-01-10 20:07:49,007 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager 00000000000000000000000000000...@akka.tcp://flink@ip-10-6-0-231.ec2.internal:39039/user/rpc/jobmanager_2 for job a7cffc31c4aeb01356c5132c908be314 from the resource manager. Could you please help us here why it is failing now when we increased the “parallelism”? Thanks & Regards, -Deep On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh <about.d...@gmail.com> wrote: > Thanks Till, for the detailed explanation.I tried and it is working fine. > > Once again thanks for your quick response. > > Regards, > -Deep > > On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, <trohrm...@apache.org> wrote: > >> Hi Deep, >> >> Flink has dropped support for specifying the number of TMs via -n since >> the >> introduction of Flip-6. Since then, Flink will automatically start TMs >> depending on the required resources. Hence, there is no need to specify >> the >> -n parameter anymore. Instead, you should specify the parallelism with >> which you would like to run your job via the -p option. >> >> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to >> limit the upper limit of slots a cluster is allowed to allocate [1]. >> >> [1] https://issues.apache.org/jira/browse/FLINK-16605 >> >> Cheers, >> Till >> >> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh <about.d...@gmail.com> >> wrote: >> >> > Hi Guys, >> > >> > I’m struggling while initiating the task manager with flink 1.11.0 in >> AWS >> > EMR but with older versions it is not. Let me put the full context here. >> > >> > *When using Flink 1.9.1 and EMR 5.29.0* >> > >> > To create a long running session, we used the below command. >> > >> > *sudo flink-yarn-session -n <Number of TM> -s <Number of slot> -jm >> <memory> >> > -tm <memory> -d* >> > >> > and followed by below command to run the final job. >> > >> > *flink run -m yarn-cluster -yid <flink sessionId> -yn <Number of TM> -ys >> > <Number of slot> -yjm <memory> -ytm <memory> -c <ClassName> <Jar Path>* >> > >> > and if “n” is 6 then it is used to create 6 task managers to start the >> job, >> > so whatever “n” is configured the result was that number of TM the job >> is >> > being started. >> > >> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and >> > EMR 6.1.0*) we are unable to achieve the desired values for TM. >> > >> > Please find the session Ids of new configuration, >> > >> > *sudo flink-yarn-session -Djobmanager.memory.process.size=<Memory in GB> >> > -Dtaskmanager.memory.process.size=<Memory in GB> -n <no of TM> -s <No of >> > slot/core> -d* >> > >> > And the final Job command >> > >> > *flink run -m yarn-cluster -yid <Flink sessionId> -c <ClassName> <Jar >> > Path>* >> > >> > I have tried a lot of combinations, but nothing worked out so far. I >> > request your help in this regard as the plan to have this configuration >> in >> > *PRODUCTION* soon. >> > >> > Thanks in advance. >> > >> > >> > Regards, >> > >> > -Deep >> > >> >