Author: jianhe
Date: Wed Jul 9 18:28:00 2014
New Revision: 1609255
URL: http://svn.apache.org/r1609255
Log:
Merge r1609254 from trunk. YARN-1366. Changed AMRMClient to re-register with RM
and send outstanding requests back to RM on work-preserving RM restart.
Contributed by Rohith
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
- copied unchanged from r1609254,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
- copied unchanged from r1609254,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/core-site.xml
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Jul 9
18:28:00 2014
@@ -57,6 +57,9 @@ Release 2.5.0 - UNRELEASED
YARN-1367. Changed NM to not kill containers on NM resync if RM
work-preserving
restart is enabled. (Anubhav Dhoot via jianhe)
+ YARN-1366. Changed AMRMClient to re-register with RM and send outstanding
requests
+ back to RM on work-preserving RM restart. (Rohith via jianhe)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
Wed Jul 9 18:28:00 2014
@@ -123,6 +123,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
Wed Jul 9 18:28:00 2014
@@ -207,14 +207,21 @@ public abstract class AMRMClient<T exten
/**
* Request additional containers and receive new container allocations.
- * Requests made via <code>addContainerRequest</code> are sent to the
- * <code>ResourceManager</code>. New containers assigned to the master are
- * retrieved. Status of completed containers and node health updates are
- * also retrieved.
- * This also doubles up as a heartbeat to the ResourceManager and must be
- * made periodically.
- * The call may not always return any new allocations of containers.
- * App should not make concurrent allocate requests. May cause request loss.
+ * Requests made via <code>addContainerRequest</code> are sent to the
+ * <code>ResourceManager</code>. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are also
+ * retrieved. This also doubles up as a heartbeat to the ResourceManager and
+ * must be made periodically. The call may not always return any new
+ * allocations of containers. App should not make concurrent allocate
+ * requests. May cause request loss.
+ *
+ * <p>
+ * Note : If the user has not removed container requests that have already
+ * been satisfied, then the re-register may end up sending the entire
+ * container requests to the RM (including matched requests). Which would
mean
+ * the RM could end up giving it a lot of new allocated containers.
+ * </p>
+ *
* @param progressIndicator Indicates progress made by the master
* @return the response of the allocate request
* @throws YarnException
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
Wed Jul 9 18:28:00 2014
@@ -234,8 +234,7 @@ extends AMRMClientAsync<T> {
while (true) {
try {
responseQueue.put(response);
- if (response.getAMCommand() == AMCommand.AM_RESYNC
- || response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
+ if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) {
return;
}
break;
@@ -280,7 +279,6 @@ extends AMRMClientAsync<T> {
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
- case AM_RESYNC:
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
Wed Jul 9 18:28:00 2014
@@ -47,7 +47,9 @@ import org.apache.hadoop.yarn.api.protoc
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -59,6 +61,7 @@ import org.apache.hadoop.yarn.client.api
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import
org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -77,10 +80,18 @@ public class AMRMClientImpl<T extends Co
private int lastResponseId = 0;
+ protected String appHostName;
+ protected int appHostPort;
+ protected String appTrackingUrl;
+
protected ApplicationMasterProtocol rmClient;
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
+ // blacklistedNodes is required for keeping history of blacklisted nodes that
+ // are sent to RM. On RESYNC command from RM, blacklistedNodes are used to
get
+ // current blacklisted nodes and send back to RM.
+ protected final Set<String> blacklistedNodes = new HashSet<String>();
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
@@ -150,6 +161,10 @@ public class AMRMClientImpl<T extends Co
protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
new
org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
protected final Set<ContainerId> release = new TreeSet<ContainerId>();
+ // pendingRelease holds history or release requests.request is removed only
if
+ // RM sends completedContainer.
+ // How it different from release? --> release is for per allocate() request.
+ protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
public AMRMClientImpl() {
super(AMRMClientImpl.class.getName());
@@ -185,19 +200,27 @@ public class AMRMClientImpl<T extends Co
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl)
throws YarnException, IOException {
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
Preconditions.checkArgument(appHostName != null,
"The host name should not be null");
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host"
+ " should be any integers larger than or equal to -1");
- // do this only once ???
+
+ return registerApplicationMaster();
+ }
+
+ private RegisterApplicationMasterResponse registerApplicationMaster()
+ throws YarnException, IOException {
RegisterApplicationMasterRequest request =
- RegisterApplicationMasterRequest.newInstance(appHostName, appHostPort,
- appTrackingUrl);
+ RegisterApplicationMasterRequest.newInstance(this.appHostName,
+ this.appHostPort, this.appTrackingUrl);
RegisterApplicationMasterResponse response =
rmClient.registerApplicationMaster(request);
-
synchronized (this) {
- if(!response.getNMTokensFromPreviousAttempts().isEmpty()) {
+ lastResponseId = 0;
+ if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
}
@@ -249,6 +272,25 @@ public class AMRMClientImpl<T extends Co
}
allocateResponse = rmClient.allocate(allocateRequest);
+ if (isResyncCommand(allocateResponse)) {
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ synchronized (this) {
+ release.addAll(this.pendingRelease);
+ blacklistAdditions.addAll(this.blacklistedNodes);
+ for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr :
remoteRequestsTable
+ .values()) {
+ for (Map<Resource, ResourceRequestInfo> capabalities :
rr.values()) {
+ for (ResourceRequestInfo request : capabalities.values()) {
+ addResourceRequestToAsk(request.remoteRequest);
+ }
+ }
+ }
+ }
+ // re register with RM
+ registerApplicationMaster();
+ return allocate(progressIndicator);
+ }
synchronized (this) {
// update these on successful RPC
@@ -258,6 +300,11 @@ public class AMRMClientImpl<T extends Co
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
+ if (!pendingRelease.isEmpty()
+ && !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
+ removePendingReleaseRequests(allocateResponse
+ .getCompletedContainersStatuses());
+ }
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
@@ -288,6 +335,18 @@ public class AMRMClientImpl<T extends Co
return allocateResponse;
}
+ protected void removePendingReleaseRequests(
+ List<ContainerStatus> completedContainersStatuses) {
+ for (ContainerStatus containerStatus : completedContainersStatuses) {
+ pendingRelease.remove(containerStatus.getContainerId());
+ }
+ }
+
+ private boolean isResyncCommand(AllocateResponse allocateResponse) {
+ return allocateResponse.getAMCommand() != null
+ && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
+ }
+
@Private
@VisibleForTesting
protected void populateNMTokens(List<NMToken> nmTokens) {
@@ -324,6 +383,12 @@ public class AMRMClientImpl<T extends Co
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for application"
+ " to be removed from RMStateStore");
+ } catch (ApplicationMasterNotRegisteredException e) {
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ + " hence resyncing.");
+ // re register with RM
+ registerApplicationMaster();
+ unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
}
}
@@ -414,6 +479,7 @@ public class AMRMClientImpl<T extends Co
public synchronized void releaseAssignedContainer(ContainerId containerId) {
Preconditions.checkArgument(containerId != null,
"ContainerId can not be null.");
+ pendingRelease.add(containerId);
release.add(containerId);
}
@@ -655,6 +721,7 @@ public class AMRMClientImpl<T extends Co
if (blacklistAdditions != null) {
this.blacklistAdditions.addAll(blacklistAdditions);
+ this.blacklistedNodes.addAll(blacklistAdditions);
// if some resources are also in blacklistRemovals updated before, we
// should remove them here.
this.blacklistRemovals.removeAll(blacklistAdditions);
@@ -662,6 +729,7 @@ public class AMRMClientImpl<T extends Co
if (blacklistRemovals != null) {
this.blacklistRemovals.addAll(blacklistRemovals);
+ this.blacklistedNodes.removeAll(blacklistRemovals);
// if some resources are in blacklistAdditions before, we should remove
// them here.
this.blacklistAdditions.removeAll(blacklistRemovals);
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java?rev=1609255&r1=1609254&r2=1609255&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
Wed Jul 9 18:28:00 2014
@@ -203,39 +203,6 @@ public class TestAMRMClientAsync {
Assert.assertTrue(callbackHandler.callbackCount == 0);
}
- @Test//(timeout=10000)
- public void testAMRMClientAsyncReboot() throws Exception {
- Configuration conf = new Configuration();
- TestCallbackHandler callbackHandler = new TestCallbackHandler();
- @SuppressWarnings("unchecked")
- AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
-
- final AllocateResponse rebootResponse = createAllocateResponse(
- new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
- rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
- when(client.allocate(anyFloat())).thenReturn(rebootResponse);
-
- AMRMClientAsync<ContainerRequest> asyncClient =
- AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
- asyncClient.init(conf);
- asyncClient.start();
-
- synchronized (callbackHandler.notifier) {
- asyncClient.registerApplicationMaster("localhost", 1234, null);
- while(callbackHandler.reboot == false) {
- try {
- callbackHandler.notifier.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- asyncClient.stop();
- // stopping should have joined all threads and completed all callbacks
- Assert.assertTrue(callbackHandler.callbackCount == 0);
- }
-
@Test (timeout = 10000)
public void testAMRMClientAsyncShutDown() throws Exception {
Configuration conf = new Configuration();