This is an automated email from the ASF dual-hosted git repository.

shuyichen pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 2576076  [FLINK-10848][YARN] properly remove YARN ContainerRequest 
upon container allocation success
2576076 is described below

commit 2576076f36e75fa81896a7cc275315bd8cd848da
Author: Shuyi Chen <sh...@uber.com>
AuthorDate: Sat Nov 10 00:42:49 2018 -0800

    [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container 
allocation success
    
    This closes #7078
---
 .../src/test/java/org/apache/flink/yarn/YarnTestBase.java        | 2 ++
 .../java/org/apache/flink/yarn/YarnFlinkResourceManager.java     | 2 ++
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++-
 .../java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++
 .../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++
 5 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3763f65..f1e6a3a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger {
                
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
                
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
                YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // 
memory is overwritten in the MiniYARNCluster.
+               
YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
+                               
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
                // so we have to change the number of cores for testing.
                
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); 
// 20 seconds expiry (to ensure we properly heartbeat with YARN).
        }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 8e686bb..3327505 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                        numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
                        LOG.info("Received new container: {} - Remaining 
pending container requests: {}",
                                container.getId(), numPendingContainerRequests);
+                       resourceManagerClient.removeContainerRequest(new 
AMRMClient.ContainerRequest(
+                                       container.getResource(), null, null, 
container.getPriority()));
 
                        // decide whether to return the container, or whether 
to start a TaskManager
                        if (numRegistered + containersInLaunch.size() < 
numRequired) {
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6ff5cd6..6669f16 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -361,7 +361,8 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                                        "Received new container: {} - Remaining 
pending container requests: {}",
                                        container.getId(),
                                        numPendingContainerRequests);
-
+                               
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+                                               container.getResource(), null, 
null, container.getPriority()));
                                if (numPendingContainerRequests > 0) {
                                        numPendingContainerRequests--;
 
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index 10b2ce9..d665df6 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends 
TestLogger {
                                                        1),
                                                i));
                                
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
+                               
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+                               
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
                                containerList.add(mockContainer);
                        }
 
@@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends 
TestLogger {
 
                                int numberOfRegisteredResources = (Integer) 
Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
 
+                               verify(resourceManagerClient, 
times(numInitialTaskManagers)).removeContainerRequest(
+                                               
any(AMRMClient.ContainerRequest.class));
                                assertEquals(numInitialTaskManagers, 
numberOfRegisteredResources);
                        } finally {
                                if (resourceManager != null) {
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index d41d42d..ee325da 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger {
 
                                
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
                                
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                               
verify(mockResourceManagerClient).removeContainerRequest(
+                                               
any(AMRMClient.ContainerRequest.class));
                                
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
 
                                // Remote task executor registers with 
YarnResourceManager.
@@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger {
 
                                
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
                                
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+                               
verify(mockResourceManagerClient).removeContainerRequest(
+                                               
any(AMRMClient.ContainerRequest.class));
                                
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
 
                                // Callback from YARN when container is 
Completed, pending request can not be fulfilled by pending

Reply via email to