YARN-5411. Create a proxy chain for ApplicationClientProtocol in the Router. (Giovanni Matteo Fumarola via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/469a5c93 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/469a5c93 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/469a5c93 Branch: refs/heads/YARN-2915 Commit: 469a5c932d9add09fdb9b42a626963918c73080d Parents: c199075 Author: Subru Krishnan <su...@apache.org> Authored: Wed May 3 18:26:15 2017 -0700 Committer: Subru Krishnan <su...@apache.org> Committed: Tue Jul 25 16:56:32 2017 -0700 ---------------------------------------------------------------------- hadoop-project/pom.xml | 7 + .../hadoop/yarn/conf/YarnConfiguration.java | 21 + .../hadoop/yarn/util/LRUCacheHashMap.java | 49 ++ .../src/main/resources/yarn-default.xml | 18 + .../hadoop/yarn/util/TestLRUCacheHashMap.java | 74 +++ .../hadoop-yarn-server-common/pom.xml | 11 + .../yarn/server/MockResourceManagerFacade.java | 511 +++++++++++++++++ .../hadoop-yarn-server-nodemanager/pom.xml | 7 + .../amrmproxy/MockRequestInterceptor.java | 14 +- .../amrmproxy/MockResourceManagerFacade.java | 514 ----------------- .../hadoop-yarn-server-router/pom.xml | 19 + .../hadoop/yarn/server/router/Router.java | 98 +++- .../AbstractClientRequestInterceptor.java | 89 +++ .../clientrm/ClientRequestInterceptor.java | 65 +++ .../DefaultClientRequestInterceptor.java | 334 +++++++++++ .../router/clientrm/RouterClientRMService.java | 544 ++++++++++++++++++ .../server/router/clientrm/package-info.java | 20 + .../hadoop/yarn/server/router/TestRouter.java | 26 - .../router/clientrm/BaseRouterClientRMTest.java | 574 +++++++++++++++++++ .../clientrm/MockClientRequestInterceptor.java | 36 ++ .../PassThroughClientRequestInterceptor.java | 267 +++++++++ .../clientrm/TestRouterClientRMService.java | 210 +++++++ 22 files changed, 2960 insertions(+), 548 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 15bd1fa..7301e90 100755 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -324,6 +324,13 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-tests</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7bcb123..cf9c237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2637,6 +2637,27 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + public static final String ROUTER_PREFIX = YARN_PREFIX + "router."; + + public static final String ROUTER_CLIENTRM_PREFIX = + ROUTER_PREFIX + "clientrm."; + + public static final String ROUTER_CLIENTRM_ADDRESS = + ROUTER_CLIENTRM_PREFIX + ".address"; + public static final int DEFAULT_ROUTER_CLIENTRM_PORT = 8050; + public static final String DEFAULT_ROUTER_CLIENTRM_ADDRESS = + "0.0.0.0:" + DEFAULT_ROUTER_CLIENTRM_PORT; + + public static final String ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE = + ROUTER_CLIENTRM_PREFIX + "interceptor-class.pipeline"; + public static final String DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS = + "org.apache.hadoop.yarn.server.router.clientrm." + + "DefaultClientRequestInterceptor"; + + public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = + ROUTER_CLIENTRM_PREFIX + "cache-max-size"; + public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25; + //////////////////////////////// // Other Configs //////////////////////////////// http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java new file mode 100644 index 0000000..7cb4e1b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java @@ -0,0 +1,49 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.util; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * LRU cache with a configurable maximum cache size and access order. + */ +public class LRUCacheHashMap<K, V> extends LinkedHashMap<K, V> { + + private static final long serialVersionUID = 1L; + + // Maximum size of the cache + private int maxSize; + + /** + * Constructor. + * + * @param maxSize max size of the cache + * @param accessOrder true for access-order, false for insertion-order + */ + public LRUCacheHashMap(int maxSize, boolean accessOrder) { + super(maxSize, 0.75f, accessOrder); + this.maxSize = maxSize; + } + + @Override + protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { + return size() > maxSize; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 6af7321..94dccd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3167,6 +3167,24 @@ <property> <description> + The comma separated list of class names that implement the + RequestInterceptor interface. This is used by the RouterClientRMService + to create the request processing pipeline for users. + </description> + <name>yarn.router.clientrm.interceptor-class.pipeline</name> + <value>org.apache.hadoop.yarn.server.router.clientrm.DefaultClientRequestInterceptor</value> + </property> + + <property> + <description> + Size of LRU cache for Router ClientRM Service. + </description> + <name>yarn.router.clientrm.cache-max-size</name> + <value>25</value> + </property> + + <property> + <description> Comma-separated list of PlacementRules to determine how applications submitted by certain users get mapped to certain queues. Default is user-group, which corresponds to UserGroupMappingPlacementRule. http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java new file mode 100644 index 0000000..1cbb56c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java @@ -0,0 +1,74 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.util; + +import java.io.IOException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class to validate the correctness of the LRUCacheHashMap. + * + */ +public class TestLRUCacheHashMap { + + /** + * Test if the different entries are generated, and LRU cache is working as + * expected. + */ + @Test + public void testLRUCache() + throws YarnException, IOException, InterruptedException { + + int mapSize = 5; + + LRUCacheHashMap<String, Integer> map = + new LRUCacheHashMap<String, Integer>(mapSize, true); + + map.put("1", 1); + map.put("2", 2); + map.put("3", 3); + map.put("4", 4); + map.put("5", 5); + + Assert.assertEquals(mapSize, map.size()); + + // Check if all the elements in the map are from 1 to 5 + for (int i = 1; i < mapSize; i++) { + Assert.assertTrue(map.containsKey(Integer.toString(i))); + } + + map.put("6", 6); + map.put("3", 3); + map.put("7", 7); + map.put("8", 8); + + Assert.assertEquals(mapSize, map.size()); + + // Check if all the elements in the map are from 5 to 8 and the 3 + for (int i = 5; i < mapSize; i++) { + Assert.assertTrue(map.containsKey(Integer.toString(i))); + } + + Assert.assertTrue(map.containsKey("3")); + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index c9f6d79..5f85097 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -209,6 +209,17 @@ </excludes> </configuration> </plugin> + <plugin> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <phase>test-compile</phase> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java new file mode 100644 index 0000000..e302c70 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -0,0 +1,511 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.ReservationAllocationState; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +/** + * Mock Resource Manager facade implementation that exposes all the methods + * implemented by the YARN RM. The behavior and the values returned by this mock + * implementation is expected by the Router/AMRMProxy unit test cases. So please + * change the implementation with care. + */ +public class MockResourceManagerFacade + implements ApplicationClientProtocol, ApplicationMasterProtocol { + + private static final Logger LOG = + LoggerFactory.getLogger(MockResourceManagerFacade.class); + + private HashMap<String, List<ContainerId>> applicationContainerIdMap = + new HashMap<String, List<ContainerId>>(); + private HashMap<ContainerId, Container> allocatedContainerMap = + new HashMap<ContainerId, Container>(); + private AtomicInteger containerIndex = new AtomicInteger(0); + private Configuration conf; + + public MockResourceManagerFacade(Configuration conf, + int startContainerIndex) { + this.conf = conf; + this.containerIndex.set(startContainerIndex); + } + + private static String getAppIdentifier() throws IOException { + AMRMTokenIdentifier result = null; + UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); + Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers(); + for (TokenIdentifier tokenId : tokenIds) { + if (tokenId instanceof AMRMTokenIdentifier) { + result = (AMRMTokenIdentifier) tokenId; + break; + } + } + return result != null ? result.getApplicationAttemptId().toString() : ""; + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + LOG.info("Registering application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + Assert.assertFalse( + "The application id is already registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + // Keep track of the containers that are returned to this application + applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>()); + } + + return RegisterApplicationMasterResponse.newInstance(null, null, null, null, + null, request.getHost(), null); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + String amrmToken = getAppIdentifier(); + LOG.info("Finishing application attempt: " + amrmToken); + + synchronized (applicationContainerIdMap) { + // Remove the containers that were being tracked for this application + Assert.assertTrue("The application id is NOT registered: " + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken); + for (ContainerId c : ids) { + allocatedContainerMap.remove(c); + } + } + + return FinishApplicationMasterResponse.newInstance( + request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED + ? true : false); + } + + protected ApplicationId getApplicationId(int id) { + return ApplicationId.newInstance(12345, id); + } + + protected ApplicationAttemptId getApplicationAttemptId(int id) { + return ApplicationAttemptId.newInstance(getApplicationId(id), 1); + } + + @SuppressWarnings("deprecation") + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + if (request.getAskList() != null && request.getAskList().size() > 0 + && request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + Assert.fail("The mock RM implementation does not support receiving " + + "askList and releaseList in the same heartbeat"); + } + + String amrmToken = getAppIdentifier(); + + ArrayList<Container> containerList = new ArrayList<Container>(); + if (request.getAskList() != null) { + for (ResourceRequest rr : request.getAskList()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + ContainerId containerId = ContainerId.newInstance( + getApplicationAttemptId(1), containerIndex.incrementAndGet()); + Container container = Records.newRecord(Container.class); + container.setId(containerId); + container.setPriority(rr.getPriority()); + + // We don't use the node for running containers in the test cases. So + // it is OK to hard code it to some dummy value + NodeId nodeId = + NodeId.newInstance(!Strings.isNullOrEmpty(rr.getResourceName()) + ? rr.getResourceName() : "dummy", 1000); + container.setNodeId(nodeId); + container.setResource(rr.getCapability()); + containerList.add(container); + + synchronized (applicationContainerIdMap) { + // Keep track of the containers returned to this application. We + // will need it in future + Assert.assertTrue( + "The application id is Not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List<ContainerId> ids = applicationContainerIdMap.get(amrmToken); + ids.add(containerId); + this.allocatedContainerMap.put(containerId, container); + } + } + } + } + + if (request.getReleaseList() != null + && request.getReleaseList().size() > 0) { + LOG.info("Releasing containers: " + request.getReleaseList().size()); + synchronized (applicationContainerIdMap) { + Assert + .assertTrue( + "The application id is not registered before allocate(): " + + amrmToken, + applicationContainerIdMap.containsKey(amrmToken)); + List<ContainerId> ids = applicationContainerIdMap.get(amrmToken); + + for (ContainerId id : request.getReleaseList()) { + boolean found = false; + for (ContainerId c : ids) { + if (c.equals(id)) { + found = true; + break; + } + } + + Assert.assertTrue("ContainerId " + id + + " being released is not valid for application: " + + conf.get("AMRMTOKEN"), found); + + ids.remove(id); + + // Return the released container back to the AM with new fake Ids. The + // test case does not care about the IDs. The IDs are faked because + // otherwise the LRM will throw duplication identifier exception. This + // returning of fake containers is ONLY done for testing purpose - for + // the test code to get confirmation that the sub-cluster resource + // managers received the release request + ContainerId fakeContainerId = ContainerId.newInstance( + getApplicationAttemptId(1), containerIndex.incrementAndGet()); + Container fakeContainer = allocatedContainerMap.get(id); + fakeContainer.setId(fakeContainerId); + containerList.add(fakeContainer); + } + } + } + + LOG.info("Allocating containers: " + containerList.size() + + " for application attempt: " + conf.get("AMRMTOKEN")); + + // Always issue a new AMRMToken as if RM rolled master key + Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); + + return AllocateResponse.newInstance(0, new ArrayList<ContainerStatus>(), + containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, + 1, null, new ArrayList<NMToken>(), newAMRMToken, + new ArrayList<UpdatedContainer>()); + } + + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + + GetApplicationReportResponse response = + Records.newRecord(GetApplicationReportResponse.class); + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(request.getApplicationId()); + report.setCurrentApplicationAttemptId( + ApplicationAttemptId.newInstance(request.getApplicationId(), 1)); + response.setApplicationReport(report); + return response; + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + + GetApplicationAttemptReportResponse response = + Records.newRecord(GetApplicationAttemptReportResponse.class); + ApplicationAttemptReport report = + Records.newRecord(ApplicationAttemptReport.class); + report.setApplicationAttemptId(request.getApplicationAttemptId()); + report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); + response.setApplicationAttemptReport(report); + return response; + } + + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + return GetNewApplicationResponse.newInstance(null, null, null); + } + + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + return SubmitApplicationResponse.newInstance(); + } + + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + return KillApplicationResponse.newInstance(true); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + return GetClusterMetricsResponse.newInstance(null); + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + return GetApplicationsResponse.newInstance(null); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + return GetClusterNodesResponse.newInstance(null); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + return GetQueueInfoResponse.newInstance(null); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + return GetQueueUserAclsInfoResponse.newInstance(null); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + return GetDelegationTokenResponse.newInstance(null); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + return RenewDelegationTokenResponse.newInstance(0); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + return CancelDelegationTokenResponse.newInstance(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + return MoveApplicationAcrossQueuesResponse.newInstance(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + return GetApplicationAttemptsResponse.newInstance(null); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + return GetContainerReportResponse.newInstance(null); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + return GetContainersResponse.newInstance(null); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + return ReservationSubmissionResponse.newInstance(); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + return ReservationListResponse + .newInstance(new ArrayList<ReservationAllocationState>()); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + return ReservationUpdateResponse.newInstance(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + return ReservationDeleteResponse.newInstance(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + return GetNodesToLabelsResponse + .newInstance(new HashMap<NodeId, Set<String>>()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + return GetClusterNodeLabelsResponse.newInstance(new ArrayList<NodeLabel>()); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + return GetLabelsToNodesResponse.newInstance(null); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + return GetNewReservationResponse + .newInstance(ReservationId.newInstance(0, 0)); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + return FailApplicationAttemptResponse.newInstance(); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + return UpdateApplicationPriorityResponse.newInstance(null); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + return new SignalContainerResponsePBImpl(); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return UpdateApplicationTimeoutsResponse.newInstance(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 094519a..28ee0d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -172,6 +172,13 @@ <groupId>org.fusesource.leveldbjni</groupId> <artifactId>leveldbjni-all</artifactId> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java index c962f97..1cbb237 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; public class MockRequestInterceptor extends AbstractRequestInterceptor { @@ -38,22 +39,21 @@ public class MockRequestInterceptor extends AbstractRequestInterceptor { public void init(AMRMProxyApplicationContext appContext) { super.init(appContext); - mockRM = - new MockResourceManagerFacade(new YarnConfiguration( - super.getConf()), 0); + mockRM = new MockResourceManagerFacade( + new YarnConfiguration(super.getConf()), 0); } @Override public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { + RegisterApplicationMasterRequest request) + throws YarnException, IOException { return mockRM.registerApplicationMaster(request); } @Override public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { + FinishApplicationMasterRequest request) + throws YarnException, IOException { return mockRM.finishApplicationMaster(request); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java deleted file mode 100644 index f584c94..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ /dev/null @@ -1,514 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Strings; -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; -import org.apache.hadoop.yarn.api.ApplicationClientProtocol; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.records.AMCommand; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.UpdatedContainer; -import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.util.Records; -import org.junit.Assert; -import org.eclipse.jetty.util.log.Log; - -/** - * Mock Resource Manager facade implementation that exposes all the methods - * implemented by the YARN RM. The behavior and the values returned by this mock - * implementation is expected by the unit test cases. So please change the - * implementation with care. - */ -public class MockResourceManagerFacade implements - ApplicationMasterProtocol, ApplicationClientProtocol { - - private HashMap<String, List<ContainerId>> applicationContainerIdMap = - new HashMap<String, List<ContainerId>>(); - private HashMap<ContainerId, Container> allocatedContainerMap = - new HashMap<ContainerId, Container>(); - private AtomicInteger containerIndex = new AtomicInteger(0); - private Configuration conf; - - public MockResourceManagerFacade(Configuration conf, - int startContainerIndex) { - this.conf = conf; - this.containerIndex.set(startContainerIndex); - } - - private static String getAppIdentifier() throws IOException { - AMRMTokenIdentifier result = null; - UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); - Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers(); - for (TokenIdentifier tokenId : tokenIds) { - if (tokenId instanceof AMRMTokenIdentifier) { - result = (AMRMTokenIdentifier) tokenId; - break; - } - } - return result != null ? result.getApplicationAttemptId().toString() - : ""; - } - - @Override - public RegisterApplicationMasterResponse registerApplicationMaster( - RegisterApplicationMasterRequest request) throws YarnException, - IOException { - String amrmToken = getAppIdentifier(); - Log.getLog().info("Registering application attempt: " + amrmToken); - - synchronized (applicationContainerIdMap) { - Assert.assertFalse("The application id is already registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); - // Keep track of the containers that are returned to this application - applicationContainerIdMap.put(amrmToken, - new ArrayList<ContainerId>()); - } - - return RegisterApplicationMasterResponse.newInstance(null, null, null, - null, null, request.getHost(), null); - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster( - FinishApplicationMasterRequest request) throws YarnException, - IOException { - String amrmToken = getAppIdentifier(); - Log.getLog().info("Finishing application attempt: " + amrmToken); - - synchronized (applicationContainerIdMap) { - // Remove the containers that were being tracked for this application - Assert.assertTrue("The application id is NOT registered: " - + amrmToken, applicationContainerIdMap.containsKey(amrmToken)); - List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken); - for (ContainerId c : ids) { - allocatedContainerMap.remove(c); - } - } - - return FinishApplicationMasterResponse - .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true - : false); - } - - protected ApplicationId getApplicationId(int id) { - return ApplicationId.newInstance(12345, id); - } - - protected ApplicationAttemptId getApplicationAttemptId(int id) { - return ApplicationAttemptId.newInstance(getApplicationId(id), 1); - } - - @SuppressWarnings("deprecation") - @Override - public AllocateResponse allocate(AllocateRequest request) - throws YarnException, IOException { - if (request.getAskList() != null && request.getAskList().size() > 0 - && request.getReleaseList() != null - && request.getReleaseList().size() > 0) { - Assert.fail("The mock RM implementation does not support receiving " - + "askList and releaseList in the same heartbeat"); - } - - String amrmToken = getAppIdentifier(); - - ArrayList<Container> containerList = new ArrayList<Container>(); - if (request.getAskList() != null) { - for (ResourceRequest rr : request.getAskList()) { - for (int i = 0; i < rr.getNumContainers(); i++) { - ContainerId containerId = - ContainerId.newInstance(getApplicationAttemptId(1), - containerIndex.incrementAndGet()); - Container container = Records.newRecord(Container.class); - container.setId(containerId); - container.setPriority(rr.getPriority()); - - // We don't use the node for running containers in the test cases. So - // it is OK to hard code it to some dummy value - NodeId nodeId = - NodeId.newInstance( - !Strings.isNullOrEmpty(rr.getResourceName()) ? rr - .getResourceName() : "dummy", 1000); - container.setNodeId(nodeId); - container.setResource(rr.getCapability()); - containerList.add(container); - - synchronized (applicationContainerIdMap) { - // Keep track of the containers returned to this application. We - // will need it in future - Assert.assertTrue( - "The application id is Not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List<ContainerId> ids = - applicationContainerIdMap.get(amrmToken); - ids.add(containerId); - this.allocatedContainerMap.put(containerId, container); - } - } - } - } - - if (request.getReleaseList() != null - && request.getReleaseList().size() > 0) { - Log.getLog().info("Releasing containers: " - + request.getReleaseList().size()); - synchronized (applicationContainerIdMap) { - Assert.assertTrue( - "The application id is not registered before allocate(): " - + amrmToken, - applicationContainerIdMap.containsKey(amrmToken)); - List<ContainerId> ids = applicationContainerIdMap.get(amrmToken); - - for (ContainerId id : request.getReleaseList()) { - boolean found = false; - for (ContainerId c : ids) { - if (c.equals(id)) { - found = true; - break; - } - } - - Assert.assertTrue( - "ContainerId " + id - + " being released is not valid for application: " - + conf.get("AMRMTOKEN"), found); - - ids.remove(id); - - // Return the released container back to the AM with new fake Ids. The - // test case does not care about the IDs. The IDs are faked because - // otherwise the LRM will throw duplication identifier exception. This - // returning of fake containers is ONLY done for testing purpose - for - // the test code to get confirmation that the sub-cluster resource - // managers received the release request - ContainerId fakeContainerId = - ContainerId.newInstance(getApplicationAttemptId(1), - containerIndex.incrementAndGet()); - Container fakeContainer = allocatedContainerMap.get(id); - fakeContainer.setId(fakeContainerId); - containerList.add(fakeContainer); - } - } - } - - Log.getLog().info("Allocating containers: " + containerList.size() - + " for application attempt: " + conf.get("AMRMTOKEN")); - - // Always issue a new AMRMToken as if RM rolled master key - Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], ""); - - return AllocateResponse.newInstance(0, - new ArrayList<ContainerStatus>(), containerList, - new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, - new ArrayList<NMToken>(), newAMRMToken, - new ArrayList<UpdatedContainer>()); - } - - @Override - public GetApplicationReportResponse getApplicationReport( - GetApplicationReportRequest request) throws YarnException, - IOException { - - GetApplicationReportResponse response = - Records.newRecord(GetApplicationReportResponse.class); - ApplicationReport report = Records.newRecord(ApplicationReport.class); - report.setYarnApplicationState(YarnApplicationState.ACCEPTED); - report.setApplicationId(request.getApplicationId()); - report.setCurrentApplicationAttemptId(ApplicationAttemptId - .newInstance(request.getApplicationId(), 1)); - response.setApplicationReport(report); - return response; - } - - @Override - public GetApplicationAttemptReportResponse getApplicationAttemptReport( - GetApplicationAttemptReportRequest request) throws YarnException, - IOException { - GetApplicationAttemptReportResponse response = - Records.newRecord(GetApplicationAttemptReportResponse.class); - ApplicationAttemptReport report = - Records.newRecord(ApplicationAttemptReport.class); - report.setApplicationAttemptId(request.getApplicationAttemptId()); - report - .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED); - response.setApplicationAttemptReport(report); - return response; - } - - @Override - public GetNewApplicationResponse getNewApplication( - GetNewApplicationRequest request) throws YarnException, IOException { - return null; - } - - @Override - public SubmitApplicationResponse submitApplication( - SubmitApplicationRequest request) throws YarnException, IOException { - return null; - } - - @Override - public KillApplicationResponse forceKillApplication( - KillApplicationRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetClusterMetricsResponse getClusterMetrics( - GetClusterMetricsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetApplicationsResponse getApplications( - GetApplicationsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetClusterNodesResponse getClusterNodes( - GetClusterNodesRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetQueueUserAclsInfoResponse getQueueUserAcls( - GetQueueUserAclsInfoRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetDelegationTokenResponse getDelegationToken( - GetDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public RenewDelegationTokenResponse renewDelegationToken( - RenewDelegationTokenRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public CancelDelegationTokenResponse cancelDelegationToken( - CancelDelegationTokenRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetApplicationAttemptsResponse getApplicationAttempts( - GetApplicationAttemptsRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetContainerReportResponse getContainerReport( - GetContainerReportRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetContainersResponse getContainers(GetContainersRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetNewReservationResponse getNewReservation( - GetNewReservationRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationSubmissionResponse submitReservation( - ReservationSubmissionRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationListResponse listReservations( - ReservationListRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationUpdateResponse updateReservation( - ReservationUpdateRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public ReservationDeleteResponse deleteReservation( - ReservationDeleteRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetNodesToLabelsResponse getNodeToLabels( - GetNodesToLabelsRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public GetClusterNodeLabelsResponse getClusterNodeLabels( - GetClusterNodeLabelsRequest request) throws YarnException, - IOException { - throw new NotImplementedException(); - } - - @Override - public GetLabelsToNodesResponse getLabelsToNodes( - GetLabelsToNodesRequest request) throws YarnException, IOException { - return null; - } - - @Override - public UpdateApplicationPriorityResponse updateApplicationPriority( - UpdateApplicationPriorityRequest request) throws YarnException, - IOException { - return null; - } - - @Override - public SignalContainerResponse signalToContainer( - SignalContainerRequest request) throws IOException { -return null; -} - - @Override - public FailApplicationAttemptResponse failApplicationAttempt( - FailApplicationAttemptRequest request) throws YarnException, IOException { - throw new NotImplementedException(); - } - - @Override - public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( - UpdateApplicationTimeoutsRequest request) - throws YarnException, IOException { - throw new NotImplementedException(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index 25afa5c..89813de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -50,12 +50,31 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> </dependency> </dependencies> <build> <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java index 7be8a59..7cfabf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java @@ -18,6 +18,20 @@ package org.apache.hadoop.yarn.server.router; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * The router is a stateless YARN component which is the entry point to the * cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with @@ -33,6 +47,88 @@ package org.apache.hadoop.yarn.server.router; * This provides a placeholder for throttling mis-behaving clients (YARN-1546) * and masks the access to multiple RMs (YARN-3659). */ -public class Router{ +public class Router extends CompositeService { + + private static final Logger LOG = LoggerFactory.getLogger(Router.class); + private static CompositeServiceShutdownHook routerShutdownHook; + private Configuration conf; + private AtomicBoolean isStopping = new AtomicBoolean(false); + private RouterClientRMService clientRMProxyService; + + /** + * Priority of the Router shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + public Router() { + super(Router.class.getName()); + } + + protected void doSecureLogin() throws IOException { + // TODO YARN-6539 Create SecureLogin inside Router + } + + @Override + protected void serviceInit(Configuration config) throws Exception { + this.conf = config; + clientRMProxyService = createClientRMProxyService(); + addService(clientRMProxyService); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + try { + doSecureLogin(); + } catch (IOException e) { + throw new YarnRuntimeException("Failed Router login", e); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (isStopping.getAndSet(true)) { + return; + } + super.serviceStop(); + } + + protected void shutDown() { + new Thread() { + @Override + public void run() { + Router.this.stop(); + } + }.start(); + } + + protected RouterClientRMService createClientRMProxyService() { + return new RouterClientRMService(); + } + + public static void main(String[] argv) { + Configuration conf = new YarnConfiguration(); + Thread + .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(Router.class, argv, LOG); + Router router = new Router(); + try { + + // Remove the old hook if we are rebooting. + if (null != routerShutdownHook) { + ShutdownHookManager.get().removeShutdownHook(routerShutdownHook); + } + + routerShutdownHook = new CompositeServiceShutdownHook(router); + ShutdownHookManager.get().addShutdownHook(routerShutdownHook, + SHUTDOWN_HOOK_PRIORITY); + router.init(conf); + router.start(); + } catch (Throwable t) { + LOG.error("Error starting Router", t); + System.exit(-1); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java new file mode 100644 index 0000000..fc6a118 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.conf.Configuration; + +/** + * Implements the RequestInterceptor interface and provides common functionality + * which can can be used and/or extended by other concrete intercepter classes. + * + */ +public abstract class AbstractClientRequestInterceptor + implements ClientRequestInterceptor { + private Configuration conf; + private ClientRequestInterceptor nextInterceptor; + + /** + * Sets the {@code RequestInterceptor} in the chain. + */ + @Override + public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) { + this.nextInterceptor = nextInterceptor; + } + + /** + * Sets the {@link Configuration}. + */ + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + if (this.nextInterceptor != null) { + this.nextInterceptor.setConf(conf); + } + } + + /** + * Gets the {@link Configuration}. + */ + @Override + public Configuration getConf() { + return this.conf; + } + + /** + * Initializes the {@code ClientRequestInterceptor}. + */ + @Override + public void init(String user) { + if (this.nextInterceptor != null) { + this.nextInterceptor.init(user); + } + } + + /** + * Disposes the {@code ClientRequestInterceptor}. + */ + @Override + public void shutdown() { + if (this.nextInterceptor != null) { + this.nextInterceptor.shutdown(); + } + } + + /** + * Gets the next {@link ClientRequestInterceptor} in the chain. + */ + @Override + public ClientRequestInterceptor getNextInterceptor() { + return this.nextInterceptor; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/469a5c93/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java new file mode 100644 index 0000000..2f8fb93 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; + +/** + * Defines the contract to be implemented by the request intercepter classes, + * that can be used to intercept and inspect messages sent from the client to + * the resource manager. + */ +public interface ClientRequestInterceptor + extends ApplicationClientProtocol, Configurable { + /** + * This method is called for initializing the intercepter. This is guaranteed + * to be called only once in the lifetime of this instance. + * + * @param user the name of the client + */ + void init(String user); + + /** + * This method is called to release the resources held by the intercepter. + * This will be called when the application pipeline is being destroyed. The + * concrete implementations should dispose the resources and forward the + * request to the next intercepter, if any. + */ + void shutdown(); + + /** + * Sets the next intercepter in the pipeline. The concrete implementation of + * this interface should always pass the request to the nextInterceptor after + * inspecting the message. The last intercepter in the chain is responsible to + * send the messages to the resource manager service and so the last + * intercepter will not receive this method call. + * + * @param nextInterceptor the ClientRequestInterceptor to set in the pipeline + */ + void setNextInterceptor(ClientRequestInterceptor nextInterceptor); + + /** + * Returns the next intercepter in the chain. + * + * @return the next intercepter in the chain + */ + ClientRequestInterceptor getNextInterceptor(); + +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org