This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b88e9f8f4f YARN-11509. The FederationInterceptor#launchUAM Added
retry logic. (#5727)
8b88e9f8f4f is described below
commit 8b88e9f8f4fa6a98fba71f1fb5bd8a674cc8a400
Author: slfan1989 <[email protected]>
AuthorDate: Wed Jul 12 09:47:07 2023 +0800
YARN-11509. The FederationInterceptor#launchUAM Added retry logic. (#5727)
---
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 14 ++
.../src/main/resources/yarn-default.xml | 18 +++
.../amrmproxy/FederationInterceptor.java | 180 +++++++++++++--------
.../amrmproxy/TokenAndRegisterResponse.java | 45 ++++++
.../amrmproxy/TestFederationInterceptor.java | 65 ++++++--
.../amrmproxy/TestableFederationInterceptor.java | 19 +++
6 files changed, 258 insertions(+), 83 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1e1fbb59939..648fddbbbe9 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4058,6 +4058,20 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
60000; // one minute
+ // AMRMProxy Register UAM Retry-Num
+ public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+ FEDERATION_PREFIX + "amrmproxy.register.uam.retry-count";
+ // Register a UAM , we will retry a maximum of 3 times.
+ public static final int
DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+ 3;
+
+ // AMRMProxy Register UAM Retry Interval
+ public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+ FEDERATION_PREFIX + "amrmproxy.register.uam.interval";
+ // Retry Interval, default 100 ms
+ public static final long
DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+ TimeUnit.MILLISECONDS.toMillis(100);
+
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
+ "policy-manager";
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 132f08f6b07..f722af852f4 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5408,4 +5408,22 @@
</description>
</property>
+ <property>
+ <description>
+ The number of retry for Register UAM.
+ The default value is 3.
+ </description>
+ <name>yarn.federation.amrmproxy.register.uam.retry-count</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <description>
+ Interval between retry for Register UAM.
+ The default value is 100ms.
+ </description>
+ <name>yarn.federation.amrmproxy.register.uam.interval</name>
+ <value>100ms</value>
+ </property>
+
</configuration>
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 14a2d60c2b5..32c5bf217e2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -87,6 +88,7 @@ import
org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import
org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import
org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
@@ -251,6 +253,10 @@ public class FederationInterceptor extends
AbstractRequestInterceptor {
// the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs;
+ private int registerUamRetryNum;
+
+ private long registerUamRetryInterval;
+
private boolean waitUamRegisterDone;
private MonotonicClock clock = new MonotonicClock();
@@ -355,6 +361,24 @@ public class FederationInterceptor extends
AbstractRequestInterceptor {
this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
}
+
+ this.registerUamRetryNum = conf.getInt(
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+ if (this.registerUamRetryNum <= 0) {
+ LOG.info("{} configured to be {}, should be positive. Using default of
{}.",
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+ this.subClusterTimeOut,
+
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+ this.registerUamRetryNum =
+
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT;
+ }
+
+ this.registerUamRetryInterval = conf.getTimeDuration(
+ YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+ TimeUnit.MILLISECONDS);
+
this.waitUamRegisterDone =
conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
}
@@ -701,7 +725,7 @@ public class FederationInterceptor extends
AbstractRequestInterceptor {
if (this.finishAMCalled) {
LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
- + "processing and return dummy response" + this.attemptId);
+ + "processing and return dummy response.", this.attemptId);
return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
}
@@ -1255,85 +1279,77 @@ public class FederationInterceptor extends
AbstractRequestInterceptor {
// Check to see if there are any new sub-clusters in this request
// list and create and register Unmanaged AM instance for the new ones
List<SubClusterId> newSubClusters = new ArrayList<>();
- for (SubClusterId subClusterId : requests.keySet()) {
- if (!subClusterId.equals(this.homeSubClusterId)
- && !this.uamPool.hasUAMId(subClusterId.getId())) {
- newSubClusters.add(subClusterId);
+ requests.keySet().stream().forEach(subClusterId -> {
+ String id = subClusterId.getId();
+ if (!subClusterId.equals(this.homeSubClusterId) &&
!this.uamPool.hasUAMId(id)) {
+ newSubClusters.add(subClusterId);
// Set sub-cluster to be timed out initially
- lastSCResponseTime.put(subClusterId,
- clock.getTime() - subClusterTimeOut);
+ lastSCResponseTime.put(subClusterId, clock.getTime() -
subClusterTimeOut);
}
- }
+ });
this.uamRegisterFutures.clear();
+
for (final SubClusterId scId : newSubClusters) {
- Future<?> future = this.threadpool.submit(new Runnable() {
- @Override
- public void run() {
- String subClusterId = scId.getId();
-
- // Create a config loaded with federation on and subclusterId
- // for each UAM
- YarnConfiguration config = new YarnConfiguration(getConf());
- FederationProxyProviderUtil.updateConfForFederation(config,
- subClusterId);
-
- RegisterApplicationMasterResponse uamResponse = null;
- Token<AMRMTokenIdentifier> token = null;
- try {
- ApplicationId applicationId = attemptId.getApplicationId();
- ApplicationSubmissionContext originalSubmissionContext =
-
federationFacade.getApplicationSubmissionContext(applicationId);
-
- // For appNameSuffix, use subClusterId of the home sub-cluster
- token = uamPool.launchUAM(subClusterId, config,
- applicationId, amRegistrationResponse.getQueue(),
- getApplicationContext().getUser(), homeSubClusterId.toString(),
- true, subClusterId, originalSubmissionContext);
-
- secondaryRelayers.put(subClusterId,
- uamPool.getAMRMClientRelayer(subClusterId));
-
- uamResponse = uamPool.registerApplicationMaster(subClusterId,
- amRegistrationRequest);
- } catch (Throwable e) {
- LOG.error("Failed to register application master: " + subClusterId
- + " Application: " + attemptId, e);
- // TODO: UAM registration for this sub-cluster RM
- // failed. For now, we ignore the resource requests and continue
- // but we need to fix this and handle this situation. One way would
- // be to send the request to another RM by consulting the policy.
- return;
- }
- uamRegistrations.put(scId, uamResponse);
- LOG.info("Successfully registered unmanaged application master: "
- + subClusterId + " ApplicationId: " + attemptId);
- try {
- uamPool.allocateAsync(subClusterId, requests.get(scId),
- new HeartbeatCallBack(scId, true));
- } catch (Throwable e) {
- LOG.error("Failed to allocate async to " + subClusterId
- + " Application: " + attemptId, e);
- }
+ Future<?> future = this.threadpool.submit(() -> {
- // Save the UAM token in registry or NMSS
- try {
- if (registryClient != null) {
- registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
- subClusterId, token);
- } else if (getNMStateStore() != null) {
- getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
- NMSS_SECONDARY_SC_PREFIX + subClusterId,
- token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
- }
- } catch (Throwable e) {
- LOG.error("Failed to persist UAM token from " + subClusterId
- + " Application: " + attemptId, e);
+ String subClusterId = scId.getId();
+
+ // Create a config loaded with federation on and subclusterId
+ // for each UAM
+ YarnConfiguration config = new YarnConfiguration(getConf());
+ FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId);
+ ApplicationId applicationId = attemptId.getApplicationId();
+
+ RegisterApplicationMasterResponse uamResponse;
+ Token<AMRMTokenIdentifier> token;
+
+ // LaunchUAM And RegisterApplicationMaster
+ try {
+ TokenAndRegisterResponse result =
+ ((FederationActionRetry<TokenAndRegisterResponse>) (retryCount)
->
+ launchUAMAndRegisterApplicationMaster(config, subClusterId,
applicationId)).
+ runWithRetries(registerUamRetryNum, registerUamRetryInterval);
+
+ token = result.getToken();
+ uamResponse = result.getResponse();
+ } catch (Throwable e) {
+ LOG.error("Failed to register application master: {} Application:
{}.",
+ subClusterId, attemptId, e);
+ return;
+ }
+
+ uamRegistrations.put(scId, uamResponse);
+
+ LOG.info("Successfully registered unmanaged application master: {} " +
+ "ApplicationId: {}.", subClusterId, attemptId);
+
+ // Allocate Request
+ try {
+ uamPool.allocateAsync(subClusterId, requests.get(scId),
+ new HeartbeatCallBack(scId, true));
+ } catch (Throwable e) {
+ LOG.error("Failed to allocate async to {} Application: {}.",
+ subClusterId, attemptId, e);
+ }
+
+ // Save the UAM token in registry or NMSS
+ try {
+ if (registryClient != null) {
+ registryClient.writeAMRMTokenForUAM(applicationId, subClusterId,
token);
+ } else if (getNMStateStore() != null) {
+ getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+ NMSS_SECONDARY_SC_PREFIX + subClusterId,
+ token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
}
+ } catch (Throwable e) {
+ LOG.error("Failed to persist UAM token from {} Application {}",
+ subClusterId, attemptId, e);
}
});
+
this.uamRegisterFutures.put(scId, future);
}
@@ -1347,10 +1363,34 @@ public class FederationInterceptor extends
AbstractRequestInterceptor {
}
}
-
return newSubClusters;
}
+ protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(
+ YarnConfiguration config, String subClusterId, ApplicationId
applicationId)
+ throws IOException, YarnException {
+
+ // Prepare parameter information
+ ApplicationSubmissionContext originalSubmissionContext =
+ federationFacade.getApplicationSubmissionContext(applicationId);
+ String submitter = getApplicationContext().getUser();
+ String homeRM = homeSubClusterId.toString();
+ String queue = amRegistrationResponse.getQueue();
+
+ // For appNameSuffix, use subClusterId of the home sub-cluster
+ Token<AMRMTokenIdentifier> token = uamPool.launchUAM(subClusterId, config,
applicationId,
+ queue, submitter, homeRM, true, subClusterId,
originalSubmissionContext);
+
+ // Set the relationship between SubCluster and AMRMClientRelayer.
+ secondaryRelayers.put(subClusterId,
uamPool.getAMRMClientRelayer(subClusterId));
+
+ // RegisterApplicationMaster
+ RegisterApplicationMasterResponse uamResponse =
+ uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest);
+
+ return new TokenAndRegisterResponse(token, uamResponse);
+ }
+
/**
* Prepare the base allocation response. Use lastSCResponse and
* lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
new file mode 100644
index 00000000000..d67ecab9a99
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.security.token.Token;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * This class contains information about the AMRM token and the
RegisterApplicationMasterResponse.
+ */
+public class TokenAndRegisterResponse {
+ private Token<AMRMTokenIdentifier> token;
+ private RegisterApplicationMasterResponse response;
+
+ public TokenAndRegisterResponse(Token<AMRMTokenIdentifier> pToken,
+ RegisterApplicationMasterResponse pResponse) {
+ this.token = pToken;
+ this.response = pResponse;
+ }
+
+ public Token<AMRMTokenIdentifier> getToken() {
+ return token;
+ }
+
+ public RegisterApplicationMasterResponse getResponse() {
+ return response;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 9e875437234..9e3e73f7f99 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -38,7 +38,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -179,9 +178,8 @@ public class TestFederationInterceptor extends
BaseAMRMProxyTest {
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500);
- // Wait UAM Register Down
- conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);
-
+ // Register UAM Retry Interval 1ms
+
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
1);
return conf;
}
@@ -597,10 +595,6 @@ public class TestFederationInterceptor extends
BaseAMRMProxyTest {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
- // Waiting for SC-1 to time out.
- GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size()
== 1, 100, 1000);
-
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
@@ -859,7 +853,7 @@ public class TestFederationInterceptor extends
BaseAMRMProxyTest {
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
- LOG.info("Allocated container {}", c.getId());
+ LOG.info("Allocated container " + c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
@@ -893,10 +887,6 @@ public class TestFederationInterceptor extends
BaseAMRMProxyTest {
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
- // Waiting for SC-1 to time out.
- GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size()
== 1, 100, 1000);
-
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
@@ -1444,4 +1434,53 @@ public class TestFederationInterceptor extends
BaseAMRMProxyTest {
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
}
+
+ @Test
+ public void testLaunchUAMAndRegisterApplicationMasterRetry() throws
Exception {
+
+ UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
+ interceptor.setRetryCount(2);
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+ // Register the application
+ RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(0);
+ registerReq.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+ int numberOfContainers = 3;
+ List<Container> containers = getContainersAndAssert(numberOfContainers,
numberOfContainers);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+ // Release all containers
+ releaseContainersAndAssert(containers);
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResponse =
+ interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResponse);
+ Assert.assertTrue(finishResponse.getIsUnregistered());
+
+ return null;
+ });
+
+ Assert.assertEquals(0, interceptor.getRetryCount());
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
index a1558744283..070131ef39a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java
@@ -55,6 +55,7 @@ public class TestableFederationInterceptor extends
FederationInterceptor {
private MockResourceManagerFacade mockRm;
private boolean isClientRPC = false;
+ private int retryCount = 0;
public TestableFederationInterceptor() {
}
@@ -258,6 +259,24 @@ public class TestableFederationInterceptor extends
FederationInterceptor {
}
}
+ @Override
+ protected TokenAndRegisterResponse
launchUAMAndRegisterApplicationMaster(YarnConfiguration config,
+ String subClusterId, ApplicationId applicationId) throws IOException,
YarnException {
+ if (retryCount > 0) {
+ retryCount--;
+ throw new YarnException("launchUAMAndRegisterApplicationMaster will
retry");
+ }
+ return super.launchUAMAndRegisterApplicationMaster(config, subClusterId,
applicationId);
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
/**
* Wrap the handler thread, so it calls from the same user.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]