Repository: hadoop Updated Branches: refs/heads/trunk ee21b13cb -> a3d9934f9
YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across ResourceManager work-preserving-restart or failover. Contributed by Jian He. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a3d9934f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a3d9934f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a3d9934f Branch: refs/heads/trunk Commit: a3d9934f916471a845dc679449d08f94dead550d Parents: ee21b13 Author: Vinod Kumar Vavilapalli <[email protected]> Authored: Thu Sep 18 10:16:18 2014 -0700 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Thu Sep 18 10:16:18 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../TestUnmanagedAMLauncher.java | 2 +- .../yarn/client/api/impl/AMRMClientImpl.java | 1 + .../TestApplicationMasterServiceOnHA.java | 2 +- .../yarn/client/api/impl/TestAMRMClient.java | 2 + .../hadoop/yarn/client/ClientRMProxy.java | 56 +++++++++++--------- .../hadoop/yarn/security/AMRMTokenSelector.java | 9 +++- .../hadoop/yarn/client/TestClientRMProxy.java | 30 +++++++++++ 8 files changed, 77 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5a23814..643fb14 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -383,6 +383,9 @@ Release 2.6.0 - UNRELEASED YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving FinalApplicationStatus. (Zhijie Shen via jianhe) + YARN-1779. Fixed AMRMClient to handle AMRMTokens correctly across + ResourceManager work-preserving-restart or failover. (Jian He via vinodkv) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java index 08cacee..3aba01a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java @@ -53,7 +53,7 @@ public class TestUnmanagedAMLauncher { .getLog(TestUnmanagedAMLauncher.class); protected static MiniYARNCluster yarnCluster = null; - protected static Configuration conf = new Configuration(); + protected static Configuration conf = new YarnConfiguration(); @BeforeClass public static void setup() throws InterruptedException, IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index e36d7ad..88b2f45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -756,6 +756,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token .getIdentifier().array(), token.getPassword().array(), new Text( token.getKind()), new Text(token.getService())); + amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig())); UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); if (UserGroupInformation.isSecurityEnabled()) { currentUGI = UserGroupInformation.getLoginUser(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java index 0b42ac3..5b12940 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceOnHA.java @@ -57,7 +57,7 @@ public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{ Token<AMRMTokenIdentifier> appToken = this.cluster.getResourceManager().getRMContext() .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId); - appToken.setService(new Text("appToken service")); + appToken.setService(ClientRMProxy.getAMRMTokenService(conf)); UserGroupInformation.setLoginUser(UserGroupInformation .createRemoteUser(UserGroupInformation.getCurrentUser() .getUserName())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 38dbf79..a434e35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.NMTokenCache; @@ -196,6 +197,7 @@ public class TestAMRMClient { // of testing. UserGroupInformation.setLoginUser(UserGroupInformation .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); + appAttempt.getAMRMToken().setService(ClientRMProxy.getAMRMTokenService(conf)); UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index 3434755..b29263e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -22,11 +22,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; -import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.SecurityUtil; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @InterfaceAudience.Public @@ -70,23 +72,17 @@ public class ClientRMProxy<T> extends RMProxy<T> { return createRMProxy(configuration, protocol, INSTANCE); } - private static void setupTokens(InetSocketAddress resourceManagerAddress) + private static void setAMRMTokenService(final Configuration conf) throws IOException { - // It is assumed for now that the only AMRMToken in AM's UGI is for this - // cluster/RM. TODO: Fix later when we have some kind of cluster-ID as - // default service-address, see YARN-1779. for (Token<? extends TokenIdentifier> token : UserGroupInformation .getCurrentUser().getTokens()) { if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - // This token needs to be directly provided to the AMs, so set the - // appropriate service-name. We'll need more infrastructure when we - // need to set it in HA case. - SecurityUtil.setTokenService(token, resourceManagerAddress); + token.setService(getAMRMTokenService(conf)); } } } - @InterfaceAudience.Private + @Private @Override protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException { @@ -100,12 +96,10 @@ public class ClientRMProxy<T> extends RMProxy<T> { YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_PORT); } else if (protocol == ApplicationMasterProtocol.class) { - InetSocketAddress serviceAddr = - conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); - setupTokens(serviceAddr); - return serviceAddr; + setAMRMTokenService(conf); + return conf.getSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); } else { String message = "Unsupported protocol found when creating the proxy " + "connection to ResourceManager: " + @@ -115,7 +109,7 @@ public class ClientRMProxy<T> extends RMProxy<T> { } } - @InterfaceAudience.Private + @Private @Override protected void checkAllowedProtocols(Class<?> protocol) { Preconditions.checkArgument( @@ -132,8 +126,23 @@ public class ClientRMProxy<T> extends RMProxy<T> { * RMDelegationToken for * @return - Service name for RMDelegationToken */ - @InterfaceStability.Unstable + @Unstable public static Text getRMDelegationTokenService(Configuration conf) { + return getTokenService(conf, YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); + } + + @Unstable + public static Text getAMRMTokenService(Configuration conf) { + return getTokenService(conf, YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); + } + + @Unstable + public static Text getTokenService(Configuration conf, String address, + String defaultAddr, int defaultPort) { if (HAUtil.isHAEnabled(conf)) { // Build a list of service addresses to form the service name ArrayList<String> services = new ArrayList<String>(); @@ -142,17 +151,14 @@ public class ClientRMProxy<T> extends RMProxy<T> { // Set RM_ID to get the corresponding RM_ADDRESS yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); services.add(SecurityUtil.buildTokenService( - yarnConf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT)).toString()); + yarnConf.getSocketAddr(address, defaultAddr, defaultPort)) + .toString()); } return new Text(Joiner.on(',').join(services)); } // Non-HA case - no need to set RM_ID - return SecurityUtil.buildTokenService( - conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT)); + return SecurityUtil.buildTokenService(conf.getSocketAddr(address, + defaultAddr, defaultPort)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java index 4693839..be3701d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/AMRMTokenSelector.java @@ -48,11 +48,18 @@ public class AMRMTokenSelector implements LOG.debug("Token kind is " + token.getKind().toString() + " and the token's service name is " + token.getService()); if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind()) - && service.equals(token.getService())) { + && checkService(service, token)) { return (Token<AMRMTokenIdentifier>) token; } } return null; } + private boolean checkService(Text service, + Token<? extends TokenIdentifier> token) { + if (service == null || token.getService() == null) { + return false; + } + return token.getService().toString().contains(service.toString()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3d9934f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java index 1a252ab..700a37f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/TestClientRMProxy.java @@ -56,4 +56,34 @@ public class TestClientRMProxy { service.contains(defaultRMAddress)); } } + + @Test + public void testGetAMRMTokenService() { + String defaultRMAddress = YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS; + YarnConfiguration conf = new YarnConfiguration(); + + // HA is not enabled + Text tokenService = ClientRMProxy.getAMRMTokenService(conf); + String[] services = tokenService.toString().split(","); + assertEquals(1, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + + // HA is enabled + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm1"), + "0.0.0.0"); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME, "rm2"), + "0.0.0.0"); + tokenService = ClientRMProxy.getAMRMTokenService(conf); + services = tokenService.toString().split(","); + assertEquals(2, services.length); + for (String service : services) { + assertTrue("Incorrect token service name", + service.contains(defaultRMAddress)); + } + } }
