Thanks everyone for valuable input and sharing your experience for tackling the issue.
Regarding suggestions : - We provision some common jars in all cluster nodes *-->* but this requires dependence on Infra Team schedule for handling common jars/updating - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half size), did not improve much. Only 100 containers could started in time. but then receiving : org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to start container. This token is expired. current time is 1566422713305 found 1566422560552 Note: System times on machines may be out of sync. Check system time and time zones. - It would be nice to see FLINK-13184 <https://issues.apache.org/jira/browse/FLINK-13184> , but expected version that will get in is 1.10 - Increase replication factor --> It would be nice to have Flink conf for setting replication factor for only Fink job jars, but not the output. It is also challenging to set a replication for yet non-existing directory, the new files will have default replication factor. Will explore HDFS cache option. Maybe another option can be: - Letting yet-to-be-started Task Managers (or NodeManagers) download the jars from already started TaskManagers in P2P fashion, not to have a blocker on HDFS replication. Spark job without any tuning exact same size jar with 800 executors, can start without any issue at the same cluster in less than a minute. *Further questions:* *@ SHI Xiaogang <shixiaoga...@gmail.com <shixiaoga...@gmail.com>> :* I see that all 800 requests are sent concurrently : 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:16384, vCores:1>. Number pending requests 793. 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO org.apache.flink.yarn.YarnResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job e908cb4700d5127a0b67be035e4494f7 with allocation id AllocationID{cb016f7ce1eac1342001ccdb1427ba07}. 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO org.apache.flink.yarn.YarnResourceManager - Requesting new TaskExecutor container with resources <memory:16384, vCores:1>. Number pending requests 794. 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO org.apache.flink.yarn.YarnResourceManager - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job e908cb4700d5127a0b67be035e4494f7 with allocation id AllocationID{71bbb917374ade66df4c058c41b81f4e}. ... Can you please elaborate the part "As containers are launched and stopped one after another" ? Any pointer to class/method in Flink? *@ Zhu Zhu <reed...@gmail.com <reed...@gmail.com>> *: Regarding "One optimization that we take is letting yarn to reuse the flink-dist jar which was localized when running previous jobs." We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS (from offline source), to have 1 single pipeline for both batch and real-time. So for batch Flink job, the containers will be released once the job is done. I guess your job is real-time flink, so you can share the jars from already long-running jobs. Thanks. On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <zjf...@gmail.com> wrote: > I can think of 2 approaches: > > 1. Allow flink to specify the replication of the submitted uber jar. > 2. Allow flink to specify config flink.yarn.lib which is all the flink > related jars that are hosted on hdfs. This way users don't need to build > and submit a fat uber jar every time. And those flink jars hosted on hdfs > can also be specify replication separately. > > > > Till Rohrmann <trohrm...@apache.org> 于2019年8月30日周五 下午3:33写道: > >> For point 2. there exists already a JIRA issue [1] and a PR. I hope that >> we can merge it during this release cycle. >> >> [1] https://issues.apache.org/jira/browse/FLINK-13184 >> >> Cheers, >> Till >> >> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com> >> wrote: >> >>> Hi Datashov, >>> >>> We faced similar problems in our production clusters. >>> >>> Now both lauching and stopping of containers are performed in the main >>> thread of YarnResourceManager. As containers are launched and stopped one >>> after another, it usually takes long time to boostrap large jobs. Things >>> get worse when some node managers get lost. Yarn will retry many times to >>> communicate with them, leading to heartbeat timeout of TaskManagers. >>> >>> Following are some efforts we made to help Flink deal with large jobs. >>> >>> 1. We provision some common jars in all cluster nodes and ask our users >>> not to include these jars in their uberjar. When containers bootstrap, >>> these jars are added to the classpath via JVM options. That way, we can >>> efficiently reduce the size of uberjars. >>> >>> 2. We deploys some asynchronous threads to launch and stop containers in >>> YarnResourceManager. The bootstrap time can be efficiently reduced when >>> launching a large amount of containers. We'd like to contribute it to the >>> community very soon. >>> >>> 3. We deploys a timeout timer for each launching container. If a task >>> manager does not register in time after its container has been launched, a >>> new container will be allocated and launched. That will lead to certain >>> waste of resources, but can reduce the effects caused by slow or >>> problematic nodes. >>> >>> Now the community is considering the refactoring of ResourceManager. I >>> think it will be the time for improving its efficiency. >>> >>> Regards, >>> Xiaogang >>> >>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道: >>> >>>> Dear Flink developers, >>>> >>>> Having difficulty of getting a Flink job started. >>>> >>>> The job's uberjar/fat jar is around 400MB, and I need to kick 800+ >>>> containers. >>>> >>>> The default HDFS replication is 3. >>>> >>>> *The Yarn queue is empty, and 800 containers are allocated >>>> almost immediately by Yarn RM.* >>>> >>>> It takes very long time until all 800 nodes (node managers) will >>>> download Uberjar from HDFS to local machines. >>>> >>>> *Q1:* >>>> >>>> a) Do all those 800 nodes download of batch of 3 at a time ? ( >>>> batch size = HDFS replication size) >>>> >>>> b) Or Do Flink TM's can replicate from each other ? or already >>>> started TM's replicate to yet-started nodes? >>>> >>>> Most probably answer is (a), but want to confirm. >>>> >>>> *Q2:* >>>> >>>> What is the recommended way of handling 400MB+ Uberjar with 800+ >>>> containers ? >>>> >>>> Any specific params to tune? >>>> >>>> Thanks. >>>> >>>> Because downloading the UberJar takes really long time, after around >>>> 15 minutes since the job kicked, facing this exception: >>>> >>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to >>>> start container. >>>> This token is expired. current time is 1567116179193 found 1567116001610 >>>> Note: System times on machines may be out of sync. Check system time and >>>> time zones. >>>> at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown >>>> Source) >>>> at >>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>> at >>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) >>>> at >>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) >>>> at >>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205) >>>> at >>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) >>>> at >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) >>>> at >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) >>>> at >>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> >>>> >>>> > > -- > Best Regards > > Jeff Zhang >