Hi Datashov,

We faced similar problems in our production clusters.

Now both lauching and stopping of containers are performed in the main
thread of YarnResourceManager. As containers are launched and stopped one
after another, it usually takes long time to boostrap large jobs. Things
get worse when some node managers get lost. Yarn will retry many times to
communicate with them, leading to heartbeat timeout of TaskManagers.

Following are some efforts we made to help Flink deal with large jobs.

1. We provision some common jars in all cluster nodes and ask our users not
to include these jars in their uberjar. When containers bootstrap, these
jars are added to the classpath via JVM options. That way, we can
efficiently reduce the size of uberjars.

2. We deploys some asynchronous threads to launch and stop containers in
YarnResourceManager. The bootstrap time can be efficiently  reduced when
launching a large amount of containers. We'd like to contribute it to the
community very soon.

3. We deploys a timeout timer for each launching container. If a task
manager does not register in time after its container has been launched, a
new container will be allocated and launched. That will lead to certain
waste of resources, but can reduce the effects caused by slow or
problematic nodes.

Now the community is considering the refactoring of ResourceManager. I
think it will be the time for improving its efficiency.

Regards,
Xiaogang

Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道:

> Dear Flink developers,
>
> Having  difficulty of getting  a Flink job started.
>
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
> containers.
>
> The default HDFS replication is 3.
>
> *The Yarn queue is empty, and 800 containers  are allocated
> almost immediately  by Yarn  RM.*
>
> It takes very long time until all 800 nodes (node managers) will download
> Uberjar from HDFS to local machines.
>
> *Q1:*
>
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch
> size = HDFS replication size)
>
> b) Or Do Flink TM's can replicate from each other  ? or  already started
> TM's replicate  to  yet-started  nodes?
>
> Most probably answer is (a), but  want to confirm.
>
> *Q2:*
>
> What  is the recommended way of handling  400MB+ Uberjar with 800+
> containers ?
>
> Any specific params to tune?
>
> Thanks.
>
> Because downloading the UberJar takes really   long time, after around 15
> minutes since the job kicked, facing this exception:
>
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>       at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>       at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>       at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>       at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>       at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>

Reply via email to