Repository: hadoop Updated Branches: refs/heads/trunk 85d81ae58 -> ca669f9f8
YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru). Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca669f9f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca669f9f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca669f9f Branch: refs/heads/trunk Commit: ca669f9f8bc7abe5b7d4648c589aa1756bd336d1 Parents: 85d81ae5 Author: Subru Krishnan <[email protected]> Authored: Thu Sep 28 13:04:03 2017 -0700 Committer: Subru Krishnan <[email protected]> Committed: Thu Sep 28 13:04:03 2017 -0700 ---------------------------------------------------------------------- .../amrmproxy/FederationInterceptor.java | 86 +++++++++++++------- .../amrmproxy/TestFederationInterceptor.java | 54 ++++++++++++ 2 files changed, 111 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca669f9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 28724aa..33cfca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -540,30 +540,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( - request.getResourceBlacklistRequest().getBlacklistAdditions())) { - for (String resourceName : request.getResourceBlacklistRequest() - .getBlacklistAdditions()) { - SubClusterId subClusterId = getSubClusterForNode(resourceName); - if (subClusterId != null) { - AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( - subClusterId, request, requestMap); - newRequest.getResourceBlacklistRequest().getBlacklistAdditions() - .add(resourceName); + if (request.getResourceBlacklistRequest() != null) { + if (!isNullOrEmpty( + request.getResourceBlacklistRequest().getBlacklistAdditions())) { + for (String resourceName : request.getResourceBlacklistRequest() + .getBlacklistAdditions()) { + SubClusterId subClusterId = getSubClusterForNode(resourceName); + if (subClusterId != null) { + AllocateRequest newRequest = + findOrCreateAllocateRequestForSubCluster(subClusterId, request, + requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistAdditions() + .add(resourceName); + } } } - } - - if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty( - request.getResourceBlacklistRequest().getBlacklistRemovals())) { - for (String resourceName : request.getResourceBlacklistRequest() - .getBlacklistRemovals()) { - SubClusterId subClusterId = getSubClusterForNode(resourceName); - if (subClusterId != null) { - AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster( - subClusterId, request, requestMap); - newRequest.getResourceBlacklistRequest().getBlacklistRemovals() - .add(resourceName); + if (!isNullOrEmpty( + request.getResourceBlacklistRequest().getBlacklistRemovals())) { + for (String resourceName : request.getResourceBlacklistRequest() + .getBlacklistRemovals()) { + SubClusterId subClusterId = getSubClusterForNode(resourceName); + if (subClusterId != null) { + AllocateRequest newRequest = + findOrCreateAllocateRequestForSubCluster(subClusterId, request, + requestMap); + newRequest.getResourceBlacklistRequest().getBlacklistRemovals() + .add(resourceName); + } } } } @@ -896,13 +899,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { } } - if (!isNullOrEmpty(otherResponse.getNMTokens())) { - if (!isNullOrEmpty(homeResponse.getNMTokens())) { - homeResponse.getNMTokens().addAll(otherResponse.getNMTokens()); - } else { - homeResponse.setNMTokens(otherResponse.getNMTokens()); - } - } + homeResponse.setNumClusterNodes( + homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes()); PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage(); PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage(); @@ -935,6 +933,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor { spar1.getContainers().addAll(spar2.getContainers()); } } + + if (!isNullOrEmpty(otherResponse.getNMTokens())) { + if (!isNullOrEmpty(homeResponse.getNMTokens())) { + homeResponse.getNMTokens().addAll(otherResponse.getNMTokens()); + } else { + homeResponse.setNMTokens(otherResponse.getNMTokens()); + } + } + + if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) { + if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) { + homeResponse.getUpdatedContainers() + .addAll(otherResponse.getUpdatedContainers()); + } else { + homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers()); + } + } + + if (!isNullOrEmpty(otherResponse.getUpdateErrors())) { + if (!isNullOrEmpty(homeResponse.getUpdateErrors())) { + homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors()); + } else { + homeResponse.setUpdateErrors(otherResponse.getUpdateErrors()); + } + } } /** @@ -1052,6 +1075,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return this.uamPool.getAllUAMIds().size(); } + @VisibleForTesting + public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() { + return this.asyncResponseSink; + } + /** * Private structure for encapsulating SubClusterId and * RegisterApplicationMasterResponse instances. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca669f9f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 34b0741..3db0e35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; @@ -36,8 +38,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateContainerError; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -493,4 +502,49 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { } catch (YarnException e) { } } + + @Test + public void testAllocateResponse() throws Exception { + interceptor.registerApplicationMaster( + RegisterApplicationMasterRequest.newInstance(null, 0, null)); + AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); + + Map<SubClusterId, List<AllocateResponse>> asyncResponseSink = + interceptor.getAsyncResponseSink(); + + ContainerId cid = ContainerId.newContainerId(attemptId, 0); + ContainerStatus cStatus = Records.newRecord(ContainerStatus.class); + cStatus.setContainerId(cid); + Container container = + Container.newInstance(cid, null, null, null, null, null); + + AllocateResponse response = Records.newRecord(AllocateResponse.class); + response.setAllocatedContainers(Collections.singletonList(container)); + response.setCompletedContainersStatuses(Collections.singletonList(cStatus)); + response.setUpdatedNodes( + Collections.singletonList(Records.newRecord(NodeReport.class))); + response.setNMTokens( + Collections.singletonList(Records.newRecord(NMToken.class))); + response.setUpdatedContainers( + Collections.singletonList(Records.newRecord(UpdatedContainer.class))); + response.setUpdateErrors(Collections + .singletonList(Records.newRecord(UpdateContainerError.class))); + response.setAvailableResources(Records.newRecord(Resource.class)); + response.setPreemptionMessage(Records.newRecord(PreemptionMessage.class)); + + List<AllocateResponse> list = new ArrayList<>(); + list.add(response); + asyncResponseSink.put(SubClusterId.newInstance("SC-1"), list); + + response = interceptor.allocate(allocateRequest); + + Assert.assertEquals(1, response.getAllocatedContainers().size()); + Assert.assertNotNull(response.getAvailableResources()); + Assert.assertEquals(1, response.getCompletedContainersStatuses().size()); + Assert.assertEquals(1, response.getUpdatedNodes().size()); + Assert.assertNotNull(response.getPreemptionMessage()); + Assert.assertEquals(1, response.getNMTokens().size()); + Assert.assertEquals(1, response.getUpdatedContainers().size()); + Assert.assertEquals(1, response.getUpdateErrors().size()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
