Author: jianhe
Date: Wed Jul 9 18:25:45 2014
New Revision: 1609254
URL: http://svn.apache.org/r1609254
Log:
YARN-1366. Changed AMRMClient to re-register with RM and send outstanding
requests back to RM on work-preserving RM restart. Contributed by Rohith
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1609254&r1=1609253&r2=1609254&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jul 9 18:25:45 2014
@@ -75,6 +75,9 @@ Release 2.5.0 - UNRELEASED
YARN-1367. Changed NM to not kill containers on NM resync if RM
work-preserving
restart is enabled. (Anubhav Dhoot via jianhe)
+ YARN-1366. Changed AMRMClient to re-register with RM and send outstanding
requests
+ back to RM on work-preserving RM restart. (Rohith via jianhe)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml?rev=1609254&r1=1609253&r2=1609254&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
Wed Jul 9 18:25:45 2014
@@ -105,6 +105,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1609254&r1=1609253&r2=1609254&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
Wed Jul 9 18:25:45 2014
@@ -207,14 +207,21 @@ public abstract class AMRMClient<T exten
/**
* Request additional containers and receive new container allocations.
- * Requests made via <code>addContainerRequest</code> are sent to the
- * <code>ResourceManager</code>. New containers assigned to the master are
- * retrieved. Status of completed containers and node health updates are
- * also retrieved.
- * This also doubles up as a heartbeat to the ResourceManager and must be
- * made periodically.
- * The call may not always return any new allocations of containers.
- * App should not make concurrent allocate requests. May cause request loss.
+ * Requests made via <code>addContainerRequest</code> are sent to the
+ * <code>ResourceManager</code>. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are also
+ * retrieved. This also doubles up as a heartbeat to the ResourceManager and
+ * must be made periodically. The call may not always return any new
+ * allocations of containers. App should not make concurrent allocate
+ * requests. May cause request loss.
+ *
+ * <p>
+ * Note : If the user has not removed container requests that have already
+ * been satisfied, then the re-register may end up sending the entire
+ * container requests to the RM (including matched requests). Which would
mean
+ * the RM could end up giving it a lot of new allocated containers.
+ * </p>
+ *
* @param progressIndicator Indicates progress made by the master
* @return the response of the allocate request
* @throws YarnException
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1609254&r1=1609253&r2=1609254&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
Wed Jul 9 18:25:45 2014
@@ -234,8 +234,7 @@ extends AMRMClientAsync<T> {
while (true) {
try {
responseQueue.put(response);
- if (response.getAMCommand() == AMCommand.AM_RESYNC
- || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+ if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
return;
}
break;
@@ -280,7 +279,6 @@ extends AMRMClientAsync<T> {
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
- case AM_RESYNC:
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1609254&r1=1609253&r2=1609254&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
Wed Jul 9 18:25:45 2014
@@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.protoc
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import
org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -77,10 +80,18 @@ public class AMRMClientImpl<T extends Co
private int lastResponseId = 0;
+ protected String appHostName;
+ protected int appHostPort;
+ protected String appTrackingUrl;
+
protected ApplicationMasterProtocol rmClient;
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
+ // blacklistedNodes is required for keeping history of blacklisted nodes that
+ // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to
get
+ // current blacklisted nodes and send back to RM.
+ protected final Set<String> blacklistedNodes = new HashSet<String>();
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
@@ -150,6 +161,10 @@ public class AMRMClientImpl<T extends Co
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new
org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+ // pendingRelease holds history or release requests.request is removed only
if
+ // RM sends completedContainer.
+ // How it different from release? --> release is for per allocate() request.
+ protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
@@ -185,19 +200,27 @@ public class AMRMClientImpl<T extends Co
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException {
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
Preconditions.checkArgument(appHostName != null,
"The host name should not be null");
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
+ " should be any integers larger than or equal to -1");
- // do this only once ???
+
+ return registerApplicationMaster();
+ }
+
+ private RegisterApplicationMasterResponse registerApplicationMaster()
+ throws YarnException, IOException {
RegisterApplicationMasterRequest request =
- RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort,
- appTrackingUrl);
+ RegisterApplicationMasterRequest.newInstance(this.appHostName,
+ this.appHostPort, this.appTrackingUrl);
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
-
synchronized (this) {
- if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+ lastResponseId = 0;
+ if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
}
@@ -249,6 +272,25 @@ public class AMRMClientImpl<T extends Co
}
allocateResponse = rmClient.allocate(allocateRequest);
+ if (isResyncCommand(allocateResponse)) {
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ synchronized (this) {
+ release.addAll(this.pendingRelease);
+ blacklistAdditions.addAll(this.blacklistedNodes);
+ for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr :
remoteRequestsTable
+ .values()) {
+ for (Map<Resource, ResourceRequestInfo> capabalities :
rr.values()) {
+ for (ResourceRequestInfo request : capabalities.values()) {
+ addResourceRequestToAsk(request.remoteRequest);
+ }
+ }
+ }
+ }
+ // re register with RM
+ registerApplicationMaster();
+ return allocate(progressIndicator);
+ }
synchronized (this) {
// update these on successful RPC
@@ -258,6 +300,11 @@ public class AMRMClientImpl<T extends Co
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
+ if (!pendingRelease.isEmpty()
+ && !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
+ removePendingReleaseRequests(allocateResponse
+ .getCompletedContainersStatuses());
+ }
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@@ -288,6 +335,18 @@ public class AMRMClientImpl<T extends Co
return allocateResponse;
}
+ protected void removePendingReleaseRequests(
+ List<ContainerStatus> completedContainersStatuses) {
+ for (ContainerStatus containerStatus : completedContainersStatuses) {
+ pendingRelease.remove(containerStatus.getContainerId());
+ }
+ }
+
+ private boolean isResyncCommand(AllocateResponse allocateResponse) {
+ return allocateResponse.getAMCommand() != null
+ && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+ }
+
@Private
@VisibleForTesting
protected void populateNMTokens(List<NMToken> nmTokens) {
@@ -324,6 +383,12 @@ public class AMRMClientImpl<T extends Co
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for application"
+ " to be removed from RMStateStore");
+ } catch (ApplicationMasterNotRegisteredException e) {
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ // re register with RM
+ registerApplicationMaster();
+ unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
}
}
@@ -414,6 +479,7 @@ public class AMRMClientImpl<T extends Co
public synchronized void releaseAssignedContainer(ContainerId containerId) {
Preconditions.checkArgument(containerId != null,
"ContainerId can not be null.");
+ pendingRelease.add(containerId);
release.add(containerId);
}
@@ -655,6 +721,7 @@ public class AMRMClientImpl<T extends Co
if (blacklistAdditions != null) {
this.blacklistAdditions.addAll(blacklistAdditions);
+ this.blacklistedNodes.addAll(blacklistAdditions);
// if some resources are also in blacklistRemovals updated before, we
// should remove them here.
this.blacklistRemovals.removeAll(blacklistAdditions);
@@ -662,6 +729,7 @@ public class AMRMClientImpl<T extends Co
if (blacklistRemovals != null) {
this.blacklistRemovals.addAll(blacklistRemovals);
+ this.blacklistedNodes.removeAll(blacklistRemovals);
// if some resources are in blacklistAdditions before, we should remove
// them here.
this.blacklistAdditions.removeAll(blacklistRemovals);
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1609254&r1=1609253&r2=1609254&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Wed Jul 9 18:25:45 2014
@@ -203,39 +203,6 @@ public class TestAMRMClientAsync {
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
- @Test//(timeout=10000)
- public void testAMRMClientAsyncReboot() throws Exception {
- Configuration conf = new Configuration();
- TestCallbackHandler callbackHandler = new TestCallbackHandler();
- @SuppressWarnings("unchecked")
- AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
-
- final AllocateResponse rebootResponse = createAllocateResponse(
- new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
- rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
- when(client.allocate(anyFloat())).thenReturn(rebootResponse);
-
- AMRMClientAsync<ContainerRequest> asyncClient =
- AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
- asyncClient.init(conf);
- asyncClient.start();
-
- synchronized (callbackHandler.notifier) {
- asyncClient.registerApplicationMaster("localhost", 1234, null);
- while(callbackHandler.reboot == false) {
- try {
- callbackHandler.notifier.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- asyncClient.stop();
- // stopping should have joined all threads and completed all callbacks
- Assert.assertTrue(callbackHandler.callbackCount == 0);
- }
-
@Test (timeout = 10000)
public void testAMRMClientAsyncShutDown() throws Exception {
Configuration conf = new Configuration();
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java?rev=1609254&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
Wed Jul 9 18:25:45 2014
@@ -0,0 +1,499 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAMRMClientOnRMRestart {
+ static Configuration conf = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ conf = new Configuration();
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+ YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
true);
+ }
+
+ // Test does major 6 steps verification.
+ // Step-1 : AMRMClient send allocate request for 2 container requests
+ // Step-2 : 2 containers are allocated by RM.
+ // Step-3 : AM Send 1 containerRequest(cRequest3) and 1 releaseRequests to
+ // RM
+ // Step-4 : On RM restart, AM(does not know RM is restarted) sends additional
+ // containerRequest(cRequest4) and blacklisted nodes.
+ // Intern RM send resync command
+ // Step-5 : Allocater after resync command & new containerRequest(cRequest5)
+ // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
+ @Test(timeout = 60000)
+ public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
+
+ UserGroupInformation.setLoginUser(null);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // Phase-1 Start 1st RM
+ MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+ rm1.start();
+ DrainDispatcher dispatcher =
+ (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+ // Submit the application
+ RMApp app = rm1.submitApp(1024);
+ dispatcher.await();
+
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ rm1.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+ rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+
+ // Step-1 : AMRMClient send allocate request for 2 ContainerRequest
+ // cRequest1 = h1 and cRequest2 = h1,h2
+ // blacklisted nodes = h2
+ AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ ContainerRequest cRequest1 = createReq(1, 1024, new String[] { "h1" });
+ amClient.addContainerRequest(cRequest1);
+
+ ContainerRequest cRequest2 =
+ createReq(1, 1024, new String[] { "h1", "h2" });
+ amClient.addContainerRequest(cRequest2);
+
+ List<String> blacklistAdditions = new ArrayList<String>();
+ List<String> blacklistRemoval = new ArrayList<String>();
+ blacklistAdditions.add("h2");
+ blacklistRemoval.add("h10");
+ amClient.updateBlacklist(blacklistAdditions, blacklistRemoval);
+ blacklistAdditions.remove("h2");// remove from local list
+
+ AllocateResponse allocateResponse = amClient.allocate(0.1f);
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+ .getAllocatedContainers().size());
+
+ // Why 4 ask, why not 3 ask even h2 is blacklisted?
+ // On blacklisting host,applicationmaster has to remove ask request from
+ // remoterequest table.Here,test does not remove explicitely
+ assertAsksAndReleases(4, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(1, 1, rm1);
+
+ // Step-2 : NM heart beat is sent.
+ // On 2nd AM allocate request, RM allocates 2 containers to AM
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ allocateResponse = amClient.allocate(0.2f);
+ dispatcher.await();
+ // 2 containers are allocated i.e for cRequest1 and cRequest2.
+ Assert.assertEquals("No of assignments must be 0", 2, allocateResponse
+ .getAllocatedContainers().size());
+ assertAsksAndReleases(0, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ List<Container> allocatedContainers =
+ allocateResponse.getAllocatedContainers();
+ // removed allocated container requests
+ amClient.removeContainerRequest(cRequest1);
+ amClient.removeContainerRequest(cRequest2);
+
+ allocateResponse = amClient.allocate(0.2f);
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+ .getAllocatedContainers().size());
+ assertAsksAndReleases(4, 0, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+
+ // Step-3 : Send 1 containerRequest and 1 releaseRequests to RM
+ ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" });
+ amClient.addContainerRequest(cRequest3);
+
+ int pendingRelease = 0;
+ Iterator<Container> it = allocatedContainers.iterator();
+ while (it.hasNext()) {
+ amClient.releaseAssignedContainer(it.next().getId());
+ pendingRelease++;
+ it.remove();
+ break;// remove one container
+ }
+
+ allocateResponse = amClient.allocate(0.3f);
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+ .getAllocatedContainers().size());
+ assertAsksAndReleases(3, pendingRelease, rm1);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm1);
+ int completedContainer =
+ allocateResponse.getCompletedContainersStatuses().size();
+ pendingRelease -= completedContainer;
+
+ // Phase-2 start 2nd RM is up
+ MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
+ dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+ // NM should be rebooted on heartbeat, even first heartbeat for nm2
+ NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+ // new NM to represent NM re-register
+ nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ blacklistAdditions.add("h3");
+ amClient.updateBlacklist(blacklistAdditions, null);
+ blacklistAdditions.remove("h3");
+
+ it = allocatedContainers.iterator();
+ while (it.hasNext()) {
+ amClient.releaseAssignedContainer(it.next().getId());
+ pendingRelease++;
+ it.remove();
+ }
+
+ ContainerRequest cRequest4 =
+ createReq(1, 1024, new String[] { "h1", "h2" });
+ amClient.addContainerRequest(cRequest4);
+
+ // Step-4 : On RM restart, AM(does not know RM is restarted) sends
+ // additional
+ // containerRequest and blacklisted nodes.
+ // Intern RM send resync command,AMRMClient resend allocate request
+ allocateResponse = amClient.allocate(0.3f);
+ dispatcher.await();
+
+ completedContainer =
+ allocateResponse.getCompletedContainersStatuses().size();
+ pendingRelease -= completedContainer;
+
+ assertAsksAndReleases(4, pendingRelease, rm2);
+ assertBlacklistAdditionsAndRemovals(2, 0, rm2);
+
+ ContainerRequest cRequest5 =
+ createReq(1, 1024, new String[] { "h1", "h2", "h3" });
+ amClient.addContainerRequest(cRequest5);
+
+ // Step-5 : Allocater after resync command
+ allocateResponse = amClient.allocate(0.5f);
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
+ .getAllocatedContainers().size());
+
+ assertAsksAndReleases(5, 0, rm2);
+ assertBlacklistAdditionsAndRemovals(0, 0, rm2);
+
+ int noAssignedContainer = 0;
+ int count = 5;
+ while (count-- > 0) {
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ allocateResponse = amClient.allocate(0.5f);
+ dispatcher.await();
+ noAssignedContainer += allocateResponse.getAllocatedContainers().size();
+ if (noAssignedContainer == 3) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
+ Assert.assertEquals("Number of container should be 3", 3,
+ noAssignedContainer);
+
+ amClient.stop();
+ rm1.stop();
+ rm2.stop();
+ }
+
+ // Test verify for
+ // 1. AM try to unregister without registering
+ // 2. AM register to RM, and try to unregister immediately after RM restart
+ @Test(timeout = 60000)
+ public void testAMRMClientForUnregisterAMOnRMRestart() throws Exception {
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // Phase-1 Start 1st RM
+ MyResourceManager rm1 = new MyResourceManager(conf, memStore);
+ rm1.start();
+ DrainDispatcher dispatcher =
+ (DrainDispatcher) rm1.getRMContext().getDispatcher();
+
+ // Submit the application
+ RMApp app = rm1.submitApp(1024);
+ dispatcher.await();
+
+ MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ nm1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId =
+ app.getCurrentAppAttempt().getAppAttemptId();
+ rm1.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+ rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ ugi.addTokenIdentifier(token.decodeIdentifier());
+
+ AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("h1", 10000, "");
+ amClient.allocate(0.1f);
+
+ // Phase-2 start 2nd RM is up
+ MyResourceManager rm2 = new MyResourceManager(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ ((MyAMRMClientImpl) amClient).updateRMProxy(rm2);
+ dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher();
+
+ // NM should be rebooted on heartbeat, even first heartbeat for nm2
+ NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
+
+ // new NM to represent NM re-register
+ nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
+
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
+ NMContainerStatus containerReport =
+ NMContainerStatus.newInstance(containerId, ContainerState.RUNNING,
+ Resource.newInstance(1024, 1), "recover container", 0,
+ Priority.newInstance(0), 0);
+ nm1.registerNode(Arrays.asList(containerReport), null);
+ nm1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+ null, null);
+ rm2.waitForState(appAttemptId, RMAppAttemptState.FINISHING);
+ nm1.nodeHeartbeat(appAttemptId, 1, ContainerState.COMPLETE);
+ rm2.waitForState(appAttemptId, RMAppAttemptState.FINISHED);
+ rm2.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+
+ amClient.stop();
+ rm1.stop();
+ rm2.stop();
+
+ }
+
+ private static class MyFifoScheduler extends FifoScheduler {
+
+ public MyFifoScheduler(RMContext rmContext) {
+ super();
+ try {
+ Configuration conf = new Configuration();
+ reinitialize(conf, rmContext);
+ } catch (IOException ie) {
+ assert (false);
+ }
+ }
+
+ List<ResourceRequest> lastAsk = null;
+ List<ContainerId> lastRelease = null;
+ List<String> lastBlacklistAdditions;
+ List<String> lastBlacklistRemovals;
+
+ // override this to copy the objects otherwise FifoScheduler updates the
+ // numContainers in same objects as kept by RMContainerAllocator
+ @Override
+ public synchronized Allocation allocate(
+ ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
+ List<ContainerId> release, List<String> blacklistAdditions,
+ List<String> blacklistRemovals) {
+ List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+ for (ResourceRequest req : ask) {
+ ResourceRequest reqCopy =
+ ResourceRequest.newInstance(req.getPriority(),
+ req.getResourceName(), req.getCapability(),
+ req.getNumContainers(), req.getRelaxLocality());
+ askCopy.add(reqCopy);
+ }
+ lastAsk = ask;
+ lastRelease = release;
+ lastBlacklistAdditions = blacklistAdditions;
+ lastBlacklistRemovals = blacklistRemovals;
+ return super.allocate(applicationAttemptId, askCopy, release,
+ blacklistAdditions, blacklistRemovals);
+ }
+ }
+
+ private static class MyResourceManager extends MockRM {
+
+ private static long fakeClusterTimeStamp = System.currentTimeMillis();
+
+ public MyResourceManager(Configuration conf, RMStateStore store) {
+ super(conf, store);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ super.serviceStart();
+ // Ensure that the application attempt IDs for all the tests are the same
+ // The application attempt IDs will be used as the login user names
+ MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp);
+ }
+
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+
+ @Override
+ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+ // Dispatch inline for test sanity
+ return new EventHandler<SchedulerEvent>() {
+ @Override
+ public void handle(SchedulerEvent event) {
+ scheduler.handle(event);
+ }
+ };
+ }
+
+ @Override
+ protected ResourceScheduler createScheduler() {
+ return new MyFifoScheduler(this.getRMContext());
+ }
+
+ MyFifoScheduler getMyFifoScheduler() {
+ return (MyFifoScheduler) scheduler;
+ }
+ }
+
+ private static class MyAMRMClientImpl extends
+ AMRMClientImpl<ContainerRequest> {
+ private MyResourceManager rm;
+
+ public MyAMRMClientImpl(MyResourceManager rm) {
+ this.rm = rm;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ this.rmClient = this.rm.getApplicationMasterService();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ rmClient = null;
+ super.serviceStop();
+ }
+
+ public void updateRMProxy(MyResourceManager rm) {
+ rmClient = rm.getApplicationMasterService();
+ }
+ }
+
+ private static void assertBlacklistAdditionsAndRemovals(
+ int expectedAdditions, int expectedRemovals, MyResourceManager rm) {
+ Assert.assertEquals(expectedAdditions,
+ rm.getMyFifoScheduler().lastBlacklistAdditions.size());
+ Assert.assertEquals(expectedRemovals,
+ rm.getMyFifoScheduler().lastBlacklistRemovals.size());
+ }
+
+ private static void assertAsksAndReleases(int expectedAsk,
+ int expectedRelease, MyResourceManager rm) {
+ Assert.assertEquals(expectedAsk, rm.getMyFifoScheduler().lastAsk.size());
+ Assert.assertEquals(expectedRelease,
+ rm.getMyFifoScheduler().lastRelease.size());
+ }
+
+ private ContainerRequest createReq(int priority, int memory, String[] hosts)
{
+ Resource capability = Resource.newInstance(memory, 1);
+ Priority priorityOfContainer = Priority.newInstance(priority);
+ return new ContainerRequest(capability, hosts,
+ new String[] { NetworkTopology.DEFAULT_RACK }, priorityOfContainer);
+ }
+
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml?rev=1609254&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
Wed Jul 9 18:25:45 2014
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+ <property>
+ <name>hadoop.security.token.service.use_ip</name>
+ <value>false</value>
+ </property>
+
+</configuration>