Hi Chan,

After FLIP-6, the Flink ResourceManager dynamically allocate resource from
Yarn on demand.
What's your flink version? On the current code base, if the pending
containers in resource manager
is zero, then it will releaseall the excess containers. Could you please
check the
"Remaining pending container requests" in your jm logs?

On the other hand, Flink should not allocate such many resources. Do you
set the `taskmanager.numberOfTaskSlots`?
The default value is 1 and will allocate containers based on your max
parallelism.


Best,
Yang

Chan, Regina <regina.c...@gs.com> 于2019年10月23日周三 上午12:40写道:

> Hi,
>
>
>
> One of our Flink jobs has a lot of tiny Flink Jobs (and some larger jobs)
> associated with it that then request and release resources as need as per
> the FLIP-6 mode. Internally we track how much parallelism we’ve used before
> submitting the new job so that we’re bounded by the expected top cap. What
> we found was that the job intermittently holds onto 20-40x what is expected
> and thereby eating into our cluster’s overall resources. It seems as if
> Flink isn’t releasing the resources back to Yarn quickly enough for these.
>
>
>
> As an immediate stop gap, what I tried doing was just to revert to using
> legacy mode hoping that the resource utilization is then at least constant
> as per the number of task managers + slots + memory allocated. However, we
> then ran into this issue. Why would the client’s pending container requests
> still be 60 when Yarn shows it’s been allocated? What can we do here?
>
>
>
>
> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
> - Actor failed with exception. Stopping it now.
>
> java.lang.IllegalStateException: The RMClient's and YarnResourceManagers
> internal state about the number of pending container requests has diverged.
> Number client's pending container requests 60 != Number RM's pending
> container requests 0.
>
>             at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:217)
>
>             at
> org.apache.flink.yarn.YarnFlinkResourceManager.getPendingRequests(YarnFlinkResourceManager.java:520)
>
>             at
> org.apache.flink.yarn.YarnFlinkResourceManager.containersAllocated(YarnFlinkResourceManager.java:449)
>
>             at
> org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:227)
>
>             at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
>
>             at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
>
>             at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
>             at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
>             at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
>             at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
>             at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
>             at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
>             at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>             at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>             at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>             at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> JobManager logs: (full logs also attached)
>
>
>
> 2019-10-22 11:36:52,733 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e102_1569128826219_23941567_01_000002 - Remaining
> pending container requests: 118
>
> 2019-10-22 11:36:52,734 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758612734: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000002, NodeId:
> d49111-041.dc.gs.com:45454, NodeHttpAddress: d49111-041.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.59.83.235:45454 }, ] on host
> d49111-041.dc.gs.com
>
> 2019-10-22 11:36:52,736 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : d49111-041.dc.gs.com:45454
>
> 2019-10-22 11:36:52,784 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e102_1569128826219_23941567_01_000003 - Remaining
> pending container requests: 116
>
> 2019-10-22 11:36:52,784 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758612784: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000003, NodeId:
> d49111-162.dc.gs.com:45454, NodeHttpAddress: d49111-162.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.59.72.254:45454 }, ] on host
> d49111-162.dc.gs.com
>
> ….
>
> Received new container: container_e102_1569128826219_23941567_01_000066 -
> Remaining pending container requests: 2
>
> 2019-10-22 11:36:53,409 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758613409: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000066, NodeId:
> d49111-275.dc.gs.com:45454, NodeHttpAddress: d49111-275.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.50.199.239:45454 }, ] on host
> d49111-275.dc.gs.com
>
> 2019-10-22 11:36:53,411 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : d49111-275.dc.gs.com:45454
>
> 2019-10-22 11:36:53,418 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e102_1569128826219_23941567_01_000067 - Remaining
> pending container requests: 0
>
> 2019-10-22 11:36:53,418 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758613418: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000067, NodeId:
> d49111-409.dc.gs.com:45454, NodeHttpAddress: d49111-409.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.59.40.203:45454 }, ] on host
> d49111-409.dc.gs.com
>
> 2019-10-22 11:36:53,420 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : d49111-409.dc.gs.com:45454
>
> 2019-10-22 11:36:53,430 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e102_1569128826219_23941567_01_000070 - Remaining
> pending container requests: 0
>
> 2019-10-22 11:36:53,430 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758613430: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000070, NodeId:
> d49111-167.dc.gs.com:45454, NodeHttpAddress: d49111-167.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.51.138.251:45454 }, ] on host
> d49111-167.dc.gs.com
>
> 2019-10-22 11:36:53,432 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : d49111-167.dc.gs.com:45454
>
> 2019-10-22 11:36:53,439 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e102_1569128826219_23941567_01_000072 - Remaining
> pending container requests: 0
>
> 2019-10-22 11:36:53,440 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758613439: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000072, NodeId:
> d49111-436.dc.gs.com:45454, NodeHttpAddress: d49111-436.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.59.235.176:45454 }, ] on host
> d49111-436.dc.gs.com
>
> 2019-10-22 11:36:53,441 INFO
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : d49111-436.dc.gs.com:45454
>
> 2019-10-22 11:36:53,449 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
> new container: container_e102_1569128826219_23941567_01_000073 - Remaining
> pending container requests: 0
>
> 2019-10-22 11:36:53,449 INFO
> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
> TaskManager in container ContainerInLaunch @ 1571758613449: Container:
> [ContainerId: container_e102_1569128826219_23941567_01_000073, NodeId:
> d49111-387.dc.gs.com:45454, NodeHttpAddress: d49111-387.dc.gs.com:8042,
> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
> ContainerToken, service: 10.51.136.247:45454 }, ] on host
> d49111-387.dc.gs.com
>
> …..
>
>
>
>
>
> Thanks,
>
> Regina
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

Reply via email to