[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623207#comment-16623207
 ] 

ASF GitHub Bot commented on FLINK-9567:
---------------------------------------

asfgit closed pull request #6669: [FLINK-9567][runtime][yarn] Fix the yarn 
container over allocation in…
URL: https://github.com/apache/flink/pull/6669
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index cf3588f6593..956e40fe61b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -334,7 +334,7 @@ public void onContainersCompleted(final 
List<ContainerStatus> statuses) {
                                        if (yarnWorkerNode != null) {
                                                // Container completed 
unexpectedly ~> start a new one
                                                final Container container = 
yarnWorkerNode.getContainer();
-                                               
internalRequestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
+                                               
requestYarnContainer(container.getResource(), 
yarnWorkerNode.getContainer().getPriority());
                                        }
                                        // Eagerly close the connection with 
task manager.
                                        closeTaskManagerConnection(resourceId, 
new Exception(containerStatus.getDiagnostics()));
@@ -443,17 +443,24 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
                return new Tuple2<>(host, Integer.valueOf(port));
        }
 
+       /**
+        * Request new container if pending containers cannot satisfies pending 
slot requests.
+        */
        private void requestYarnContainer(Resource resource, Priority priority) 
{
-               resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
+               int pendingSlotRequests = getNumberPendingSlotRequests();
+               int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
+               if (pendingSlotRequests > pendingSlotAllocation) {
+                       resourceManagerClient.addContainerRequest(new 
AMRMClient.ContainerRequest(resource, null, null, priority));
 
-               // make sure we transmit the request fast and receive fast news 
of granted allocations
-               
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+                       // make sure we transmit the request fast and receive 
fast news of granted allocations
+                       
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-               numPendingContainerRequests++;
+                       numPendingContainerRequests++;
 
-               log.info("Requesting new TaskExecutor container with resources 
{}. Number pending requests {}.",
-                       resource,
-                       numPendingContainerRequests);
+                       log.info("Requesting new TaskExecutor container with 
resources {}. Number pending requests {}.",
+                               resource,
+                               numPendingContainerRequests);
+               }
        }
 
        private ContainerLaunchContext createTaskExecutorLaunchContext(Resource 
resource, String containerId, String host)
@@ -510,15 +517,4 @@ private int generatePriority(ResourceProfile 
resourceProfile) {
                        return priority;
                }
        }
-
-       /**
-        * Request new container if pending containers cannot satisfies pending 
slot requests.
-        */
-       private void internalRequestYarnContainer(Resource resource, Priority 
priority) {
-               int pendingSlotRequests = getNumberPendingSlotRequests();
-               int pendingSlotAllocation = numPendingContainerRequests * 
numberOfTaskSlots;
-               if (pendingSlotRequests > pendingSlotAllocation) {
-                       requestYarnContainer(resource, priority);
-               }
-       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink does not release resource in Yarn Cluster mode
> ----------------------------------------------------
>
>                 Key: FLINK-9567
>                 URL: https://issues.apache.org/jira/browse/FLINK-9567
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management, YARN
>    Affects Versions: 1.5.0
>            Reporter: Shimin Yang
>            Assignee: Shimin Yang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.1, 1.6.0
>
>         Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_000024. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to