Thanks  everyone for valuable input and sharing  your experience for
tackling the issue.

Regarding suggestions :
- We provision some common jars in all cluster nodes  *-->*  but this
requires dependence on Infra Team schedule for handling common jars/updating
- Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half size),
did not improve much. Only 100 containers could started in time. but then
receiving :

org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request
to start container.
This token is expired. current time is 1566422713305 found 1566422560552
Note: System times on machines may be out of sync. Check system time
and time zones.


- It would be nice to see FLINK-13184
<https://issues.apache.org/jira/browse/FLINK-13184> , but expected version
that will get in is 1.10
- Increase replication factor --> It would be nice to have Flink conf for
setting replication factor for only Fink job jars, but not the output. It
is also challenging to set a replication for yet non-existing directory,
the new files will have default replication factor. Will explore HDFS cache
option.

Maybe another option can be:
- Letting yet-to-be-started Task Managers (or NodeManagers) download the
jars from already started TaskManagers  in P2P fashion, not to have a
blocker on HDFS replication.

Spark job without any tuning exact same size jar with 800 executors, can
start without any issue at the same cluster in less than a minute.

*Further questions:*

*@ SHI Xiaogang <shixiaoga...@gmail.com <shixiaoga...@gmail.com>> :*

I see that all 800 requests are sent concurrently :

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources <memory:16384, vCores:1>. Number pending requests
793.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.

2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
container with resources <memory:16384, vCores:1>. Number pending requests
794.
2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
 org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
nativeMemoryInMB=0, networkMemoryInMB=0} for job
e908cb4700d5127a0b67be035e4494f7 with allocation id
AllocationID{71bbb917374ade66df4c058c41b81f4e}.
...

Can you please elaborate the part  "As containers are launched and stopped
one after another" ? Any pointer to class/method in Flink?

*@ Zhu Zhu <reed...@gmail.com <reed...@gmail.com>> *:

Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."

We are intending to use Flink Real-time pipeline for Replay from Hive/HDFS
(from offline source), to have 1 single pipeline for both batch and
real-time. So for batch Flink job, the containers will be released once the
job is done.
I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

Thanks.


On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang <zjf...@gmail.com> wrote:

> I can think of 2 approaches:
>
> 1. Allow flink to specify the replication of the submitted uber jar.
> 2. Allow flink to specify config flink.yarn.lib which is all the flink
> related jars that are hosted on hdfs. This way users don't need to build
> and submit a fat uber jar every time. And those flink jars hosted on hdfs
> can also be specify replication separately.
>
>
>
> Till Rohrmann <trohrm...@apache.org> 于2019年8月30日周五 下午3:33写道:
>
>> For point 2. there exists already a JIRA issue [1] and a PR. I hope that
>> we can merge it during this release cycle.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-13184
>>
>> Cheers,
>> Till
>>
>> On Fri, Aug 30, 2019 at 4:06 AM SHI Xiaogang <shixiaoga...@gmail.com>
>> wrote:
>>
>>> Hi Datashov,
>>>
>>> We faced similar problems in our production clusters.
>>>
>>> Now both lauching and stopping of containers are performed in the main
>>> thread of YarnResourceManager. As containers are launched and stopped one
>>> after another, it usually takes long time to boostrap large jobs. Things
>>> get worse when some node managers get lost. Yarn will retry many times to
>>> communicate with them, leading to heartbeat timeout of TaskManagers.
>>>
>>> Following are some efforts we made to help Flink deal with large jobs.
>>>
>>> 1. We provision some common jars in all cluster nodes and ask our users
>>> not to include these jars in their uberjar. When containers bootstrap,
>>> these jars are added to the classpath via JVM options. That way, we can
>>> efficiently reduce the size of uberjars.
>>>
>>> 2. We deploys some asynchronous threads to launch and stop containers in
>>> YarnResourceManager. The bootstrap time can be efficiently  reduced when
>>> launching a large amount of containers. We'd like to contribute it to the
>>> community very soon.
>>>
>>> 3. We deploys a timeout timer for each launching container. If a task
>>> manager does not register in time after its container has been launched, a
>>> new container will be allocated and launched. That will lead to certain
>>> waste of resources, but can reduce the effects caused by slow or
>>> problematic nodes.
>>>
>>> Now the community is considering the refactoring of ResourceManager. I
>>> think it will be the time for improving its efficiency.
>>>
>>> Regards,
>>> Xiaogang
>>>
>>> Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2019年8月30日周五 上午7:10写道:
>>>
>>>> Dear Flink developers,
>>>>
>>>> Having  difficulty of getting  a Flink job started.
>>>>
>>>> The job's uberjar/fat jar is around 400MB, and  I need to kick 800+
>>>> containers.
>>>>
>>>> The default HDFS replication is 3.
>>>>
>>>> *The Yarn queue is empty, and 800 containers  are allocated
>>>> almost immediately  by Yarn  RM.*
>>>>
>>>> It takes very long time until all 800 nodes (node managers) will
>>>> download Uberjar from HDFS to local machines.
>>>>
>>>> *Q1:*
>>>>
>>>> a)  Do all those 800 nodes download of batch of  3  at a time  ? (
>>>> batch size = HDFS replication size)
>>>>
>>>> b) Or Do Flink TM's can replicate from each other  ? or  already
>>>> started  TM's replicate  to  yet-started  nodes?
>>>>
>>>> Most probably answer is (a), but  want to confirm.
>>>>
>>>> *Q2:*
>>>>
>>>> What  is the recommended way of handling  400MB+ Uberjar with 800+
>>>> containers ?
>>>>
>>>> Any specific params to tune?
>>>>
>>>> Thanks.
>>>>
>>>> Because downloading the UberJar takes really   long time, after around
>>>> 15 minutes since the job kicked, facing this exception:
>>>>
>>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>>> start container.
>>>> This token is expired. current time is 1567116179193 found 1567116001610
>>>> Note: System times on machines may be out of sync. Check system time and 
>>>> time zones.
>>>>    at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown 
>>>> Source)
>>>>    at 
>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>    at 
>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168)
>>>>    at 
>>>> org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)
>>>>    at 
>>>> org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:205)
>>>>    at 
>>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>    at 
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>    at 
>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>    at 
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>    at 
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>    at 
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>>
>>>>
>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to