Increase replication factor and/or use HDFS cache 
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
Try to reduce the size of the Jar, eg the Flink libraries do not need to be 
included.

> Am 30.08.2019 um 01:09 schrieb Elkhan Dadashov <elkhan.dadas...@gmail.com>:
> 
> Dear Flink developers,
> 
> Having  difficulty of getting  a Flink job started.
> 
> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+ 
> containers.  
> 
> The default HDFS replication is 3.
> 
> The Yarn queue is empty, and 800 containers  are allocated  almost 
> immediately  by Yarn  RM.
> 
> It takes very long time until all 800 nodes (node managers) will download 
> Uberjar from HDFS to local machines.
> 
> Q1:
> 
> a)  Do all those 800 nodes download of batch of  3  at a time  ? ( batch size 
> = HDFS replication size)
> 
> b) Or Do Flink TM's can replicate from each other  ? or  already started  
> TM's replicate  to  yet-started  nodes?
> 
> Most probably answer is (a), but  want to confirm.
> 
> Q2:
> 
> What  is the recommended way of handling  400MB+ Uberjar with 800+ containers 
> ?
> 
> Any specific params to tune?
> 
> Thanks.
> 
> Because downloading the UberJar takes really   long time, after around 15 
> minutes since the job kicked, facing this exception:
> 
> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
> start container.
> This token is expired. current time is 1567116179193 found 1567116001610
> Note: System times on machines may be out of sync. Check system time and time 
> zones.
>       at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
> Source)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>       at 
> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>       at 
> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>       at 
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>       at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 

Reply via email to