This is an automated email from the ASF dual-hosted git repository. shuyichen pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push: new 2576076 [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success 2576076 is described below commit 2576076f36e75fa81896a7cc275315bd8cd848da Author: Shuyi Chen <sh...@uber.com> AuthorDate: Sat Nov 10 00:42:49 2018 -0800 [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success This closes #7078 --- .../src/test/java/org/apache/flink/yarn/YarnTestBase.java | 2 ++ .../java/org/apache/flink/yarn/YarnFlinkResourceManager.java | 2 ++ .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++- .../java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++ .../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++ 5 files changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 3763f65..f1e6a3a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger { YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); // so we have to change the number of cores for testing. YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 8e686bb..3327505 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); LOG.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( + container.getResource(), null, null, container.getPriority())); // decide whether to return the container, or whether to start a TaskManager if (numRegistered + containersInLaunch.size() < numRequired) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6ff5cd6..6669f16 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -361,7 +361,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme "Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); - + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( + container.getResource(), null, null, container.getPriority())); if (numPendingContainerRequests > 0) { numPendingContainerRequests--; diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java index 10b2ce9..d665df6 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger { 1), i)); when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); containerList.add(mockContainer); } @@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger { int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); assertEquals(numInitialTaskManagers, numberOfRegisteredResources); } finally { if (resourceManager != null) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index d41d42d..ee325da 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger { resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Remote task executor registers with YarnResourceManager. @@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger { resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending