YARN-3673. Create a FailoverProxy for Federation services. Contributed by Subru Krishnan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0cd1680d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0cd1680d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0cd1680d Branch: refs/heads/YARN-2915 Commit: 0cd1680d36ef744f7b644baf1a12655a87f02938 Parents: a8c46ad Author: Jian He <jia...@apache.org> Authored: Mon Aug 22 14:43:07 2016 +0800 Committer: Subru Krishnan <su...@apache.org> Committed: Mon Oct 3 14:15:36 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/yarn/conf/HAUtil.java | 30 ++- .../hadoop/yarn/conf/YarnConfiguration.java | 10 + .../yarn/conf/TestYarnConfigurationFields.java | 4 + .../TestFederationRMFailoverProxyProvider.java | 154 ++++++++++++++ .../hadoop/yarn/client/ClientRMProxy.java | 4 +- .../org/apache/hadoop/yarn/client/RMProxy.java | 23 +- .../src/main/resources/yarn-default.xml | 7 + .../hadoop-yarn-server-common/pom.xml | 2 - .../hadoop/yarn/server/api/ServerRMProxy.java | 4 +- .../failover/FederationProxyProviderUtil.java | 163 ++++++++++++++ .../FederationRMFailoverProxyProvider.java | 211 +++++++++++++++++++ .../federation/failover/package-info.java | 17 ++ 12 files changed, 613 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 594832e..cf221a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.conf; -import com.google.common.annotations.VisibleForTesting; +import java.net.InetSocketAddress; +import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import java.net.InetSocketAddress; -import java.util.Collection; +import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class HAUtil { @@ -44,6 +45,29 @@ public class HAUtil { } /** + * Returns true if Federation is configured. + * + * @param conf Configuration + * @return true if federation is configured in the configuration; else false. + */ + public static boolean isFederationEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_ENABLED); + } + + /** + * Returns true if RM failover is enabled in a Federation setting. + * + * @param conf Configuration + * @return if RM failover is enabled in conjunction with Federation in the + * configuration; else false. + */ + public static boolean isFederationFailoverEnabled(Configuration conf) { + return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + } + + /** * Returns true if Resource Manager HA is configured. * * @param conf Configuration http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a3f9ad0..63c8192b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2485,6 +2485,16 @@ public class YarnConfiguration extends Configuration { public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; + public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + "enabled"; + public static final boolean DEFAULT_FEDERATION_ENABLED = false; + + public static final String FEDERATION_FAILOVER_ENABLED = + FEDERATION_PREFIX + "failover.enabled"; + public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true; + + public static final String FEDERATION_SUBCLUSTER_ID = + FEDERATION_PREFIX + "sub-cluster.id"; + public static final String FEDERATION_STATESTORE_CLIENT_CLASS = FEDERATION_PREFIX + "state-store.class"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 000f5de..63413eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -95,6 +95,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { // Federation default configs to be ignored configurationPropsToSkipCompare .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID); + configurationPropsToSkipCompare + .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED); // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java new file mode 100644 index 0000000..fa3523c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for FederationRMFailoverProxyProvider. + */ +public class TestFederationRMFailoverProxyProvider { + + private Configuration conf; + private FederationStateStore stateStore; + private final String dummyCapability = "cap"; + + @Before + public void setUp() throws IOException, YarnException { + conf = new YarnConfiguration(); + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf); + } + + @After + public void tearDown() throws Exception { + stateStore.close(); + stateStore = null; + } + + @Test + public void testFederationRMFailoverProxyProvider() throws Exception { + final SubClusterId subClusterId = SubClusterId.newInstance("SC-1"); + final MiniYARNCluster cluster = new MiniYARNCluster( + "testFederationRMFailoverProxyProvider", 3, 0, 1, 1); + + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3"); + + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, + 2000); + + HATestUtil.setRpcAddressForRM("rm1", 10000, conf); + HATestUtil.setRpcAddressForRM("rm2", 20000, conf); + HATestUtil.setRpcAddressForRM("rm3", 30000, conf); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + + cluster.init(conf); + cluster.start(); + + // Transition rm3 to active; + makeRMActive(subClusterId, cluster, 2); + + ApplicationClientProtocol client = FederationProxyProviderUtil + .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId, + UserGroupInformation.getCurrentUser()); + + // client will retry until the rm becomes active. + GetClusterMetricsResponse response = + client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + + // validate response + checkResponse(response); + + // transition rm3 to standby + cluster.getResourceManager(2).getRMContext().getRMAdminService() + .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + + // Transition rm2 to active; + makeRMActive(subClusterId, cluster, 1); + response = client.getClusterMetrics(GetClusterMetricsRequest.newInstance()); + + // validate response + checkResponse(response); + + cluster.stop(); + } + + private void checkResponse(GetClusterMetricsResponse response) { + Assert.assertNotNull(response.getClusterMetrics()); + Assert.assertEquals(0, + response.getClusterMetrics().getNumActiveNodeManagers()); + } + + private void makeRMActive(final SubClusterId subClusterId, + final MiniYARNCluster cluster, final int index) { + try { + System.out.println("Transition rm" + (index + 1) + " to active"); + String dummyAddress = "host:" + index; + cluster.getResourceManager(index).getRMContext().getRMAdminService() + .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER)); + ResourceManager rm = cluster.getResourceManager(index); + InetSocketAddress amRMAddress = + rm.getApplicationMasterService().getBindAddress(); + InetSocketAddress clientRMAddress = + rm.getClientRMService().getBindAddress(); + SubClusterRegisterRequest request = SubClusterRegisterRequest + .newInstance(SubClusterInfo.newInstance(subClusterId, + amRMAddress.getAddress().getHostAddress() + ":" + + amRMAddress.getPort(), + clientRMAddress.getAddress().getHostAddress() + ":" + + clientRMAddress.getPort(), + dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1, + dummyCapability)); + stateStore.registerSubCluster(request); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java index b29263e..6365662 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java @@ -84,7 +84,7 @@ public class ClientRMProxy<T> extends RMProxy<T> { @Private @Override - protected InetSocketAddress getRMAddress(YarnConfiguration conf, + public InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException { if (protocol == ApplicationClientProtocol.class) { return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, @@ -111,7 +111,7 @@ public class ClientRMProxy<T> extends RMProxy<T> { @Private @Override - protected void checkAllowedProtocols(Class<?> protocol) { + public void checkAllowedProtocols(Class<?> protocol) { Preconditions.checkArgument( protocol.isAssignableFrom(ClientRMProtocols.class), "RM does not support this client protocol"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 3ab06bd..f93a182 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -64,14 +64,14 @@ public class RMProxy<T> { * Verify the passed protocol is supported. */ @Private - protected void checkAllowedProtocols(Class<?> protocol) {} + public void checkAllowedProtocols(Class<?> protocol) {} /** * Get the ResourceManager address from the provided Configuration for the * given protocol. */ @Private - protected InetSocketAddress getRMAddress( + public InetSocketAddress getRMAddress( YarnConfiguration conf, Class<?> protocol) throws IOException { throw new UnsupportedOperationException("This method should be invoked " + "from an instance of ClientRMProxy or ServerRMProxy"); @@ -90,7 +90,8 @@ public class RMProxy<T> { YarnConfiguration conf = (configuration instanceof YarnConfiguration) ? (YarnConfiguration) configuration : new YarnConfiguration(configuration); - RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); + RetryPolicy retryPolicy = createRetryPolicy(conf, + (HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf))); return newProxyInstance(conf, protocol, instance, retryPolicy); } @@ -116,7 +117,7 @@ public class RMProxy<T> { private static <T> T newProxyInstance(final YarnConfiguration conf, final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException{ - if (HAUtil.isHAEnabled(conf)) { + if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) { RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol); return (T) RetryProxy.create(protocol, provider, retryPolicy); @@ -146,7 +147,8 @@ public class RMProxy<T> { @Deprecated public static <T> T createRMProxy(final Configuration conf, final Class<T> protocol, InetSocketAddress rmAddress) throws IOException { - RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); + RetryPolicy retryPolicy = createRetryPolicy(conf, + (HAUtil.isHAEnabled(conf) || HAUtil.isFederationFailoverEnabled(conf))); T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress); return (T) RetryProxy.create(protocol, proxy, retryPolicy); @@ -155,9 +157,16 @@ public class RMProxy<T> { /** * Get a proxy to the RM at the specified address. To be used to create a * RetryProxy. + * + * @param conf Configuration to generate retry policy + * @param protocol Protocol for the proxy + * @param rmAddress Address of the ResourceManager + * @param <T> Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure */ @Private - static <T> T getProxy(final Configuration conf, + public static <T> T getProxy(final Configuration conf, final Class<T> protocol, final InetSocketAddress rmAddress) throws IOException { return UserGroupInformation.getCurrentUser().doAs( http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 910c296..ebbbbf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2625,6 +2625,13 @@ <!-- Federation Configuration --> <property> <description> + Flag to indicate whether the RM is participating in Federation or not. + </description> + <name>yarn.federation.enabled</name> + <value>false</value> + </property> + <property> + <description> Machine list file to be loaded by the FederationSubCluster Resolver </description> <name>yarn.federation.machine-list</name> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index b6fd0c5..1faf754 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -112,12 +112,10 @@ <dependency> <groupId>javax.cache</groupId> <artifactId>cache-api</artifactId> - <version>${jcache.version}</version> </dependency> <dependency> <groupId>org.ehcache</groupId> <artifactId>ehcache</artifactId> - <version>${ehcache.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java index 8555fc3..b3038e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java @@ -71,7 +71,7 @@ public class ServerRMProxy<T> extends RMProxy<T> { @InterfaceAudience.Private @Override - protected InetSocketAddress getRMAddress(YarnConfiguration conf, + public InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) { if (protocol == ResourceTracker.class) { return conf.getSocketAddr( @@ -93,7 +93,7 @@ public class ServerRMProxy<T> extends RMProxy<T> { @InterfaceAudience.Private @Override - protected void checkAllowedProtocols(Class<?> protocol) { + public void checkAllowedProtocols(Class<?> protocol) { Preconditions.checkArgument( protocol.isAssignableFrom(ResourceTracker.class), "ResourceManager does not support this protocol"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java new file mode 100644 index 0000000..a986008 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.failover; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that creates proxy for specified protocols when federation is + * enabled. The class creates a federation aware failover provider, i.e. the + * failover provider uses the {@code FederationStateStore} to determine the + * current active ResourceManager + */ +@Private +@Unstable +public final class FederationProxyProviderUtil { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationProxyProviderUtil.class); + + /** + * Create a proxy for the specified protocol. For non-HA, this is a direct + * connection to the ResourceManager address. When HA is enabled, the proxy + * handles the failover between the ResourceManagers as well. + * + * @param configuration Configuration to generate {@link ClientRMProxy} + * @param protocol Protocol for the proxy + * @param subClusterId the unique identifier or the sub-cluster + * @param user the user on whose behalf the proxy is being created + * @param <T> Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure + */ + @Public + @Unstable + public static <T> T createRMProxy(Configuration configuration, + final Class<T> protocol, SubClusterId subClusterId, + UserGroupInformation user) throws IOException { + return createRMProxy(configuration, protocol, subClusterId, user, null); + } + + /** + * Create a proxy for the specified protocol. For non-HA, this is a direct + * connection to the ResourceManager address. When HA is enabled, the proxy + * handles the failover between the ResourceManagers as well. + * + * @param configuration Configuration to generate {@link ClientRMProxy} + * @param protocol Protocol for the proxy + * @param subClusterId the unique identifier or the sub-cluster + * @param user the user on whose behalf the proxy is being created + * @param token the auth token to use for connection + * @param <T> Type information of the proxy + * @return Proxy to the RM + * @throws IOException on failure + */ + @Public + @Unstable + @SuppressWarnings("unchecked") + public static <T> T createRMProxy(final Configuration configuration, + final Class<T> protocol, SubClusterId subClusterId, + UserGroupInformation user, final Token token) throws IOException { + try { + final YarnConfiguration conf = new YarnConfiguration(configuration); + updateConf(conf, subClusterId); + if (token != null) { + LOG.info( + "Creating RMProxy with a token: {} to subcluster: {}" + + " for protocol: {}", + token, subClusterId, protocol.getSimpleName()); + user.addToken(token); + setAuthModeInConf(conf); + } else { + LOG.info("Creating RMProxy without a token to subcluster: {}" + + " for protocol: {}", subClusterId, protocol.getSimpleName()); + } + final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() { + @Override + public T run() throws Exception { + return ClientRMProxy.createRMProxy(conf, protocol); + } + }); + + return proxyConnection; + } catch (IOException e) { + String message = + "Error while creating of RM application master service proxy for" + + " appAttemptId: " + user; + LOG.info(message); + throw new YarnRuntimeException(message, e); + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + } + + private static void setAuthModeInConf(Configuration conf) { + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + SaslRpcServer.AuthMethod.TOKEN.toString()); + } + + // updating the conf with the refreshed RM addresses as proxy creations + // are based out of conf + private static void updateConf(Configuration conf, + SubClusterId subClusterId) { + conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId()); + // In a Federation setting, we will connect to not just the local cluster RM + // but also multiple external RMs. The membership information of all the RMs + // that are currently + // participating in Federation is available in the central + // FederationStateStore. + // So we will: + // 1. obtain the RM service addresses from FederationStateStore using the + // FederationRMFailoverProxyProvider. + // 2. disable traditional HA as that depends on local configuration lookup + // for RMs using indexes. + // 3. we will enable federation failover IF traditional HA is enabled so + // that the appropriate failover RetryPolicy is initialized. + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, + FederationRMFailoverProxyProvider.class, RMFailoverProxyProvider.class); + if (HAUtil.isHAEnabled(conf)) { + conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false); + } + } + + // disable instantiation + private FederationProxyProviderUtil() { + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java new file mode 100644 index 0000000..90a9239 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.failover; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.client.RMFailoverProxyProvider; +import org.apache.hadoop.yarn.client.RMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A FailoverProxyProvider implementation that uses the + * {@code FederationStateStore} to determine the ResourceManager to connect to. + * This supports both HA and regular mode which is controlled by configuration. + */ +@Private +@Unstable +public class FederationRMFailoverProxyProvider<T> + implements RMFailoverProxyProvider<T> { + private static final Logger LOG = + LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class); + + private RMProxy<T> rmProxy; + private Class<T> protocol; + private T current; + private YarnConfiguration conf; + private FederationStateStoreFacade facade; + private SubClusterId subClusterId; + private Collection<Token<? extends TokenIdentifier>> originalTokens; + private boolean federationFailoverEnabled = false; + + @Override + public void init(Configuration configuration, RMProxy<T> proxy, + Class<T> proto) { + this.rmProxy = proxy; + this.protocol = proto; + this.rmProxy.checkAllowedProtocols(this.protocol); + String clusterId = + configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID); + Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId"); + this.subClusterId = SubClusterId.newInstance(clusterId); + this.facade = facade.getInstance(); + if (configuration instanceof YarnConfiguration) { + this.conf = (YarnConfiguration) configuration; + } + federationFailoverEnabled = + conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, + YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES)); + + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + conf.getInt( + YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS, + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS)); + + try { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + originalTokens = currentUser.getTokens(); + LOG.info("Initialized Federation proxy for user: {}", + currentUser.getUserName()); + } catch (IOException e) { + LOG.warn("Could not get information of requester, ignoring for now."); + } + + } + + private void addOriginalTokens(UserGroupInformation currentUser) { + if (originalTokens == null || originalTokens.isEmpty()) { + return; + } + for (Token<? extends TokenIdentifier> token : originalTokens) { + currentUser.addToken(token); + } + } + + private T getProxyInternal(boolean isFailover) { + SubClusterInfo subClusterInfo; + UserGroupInformation currentUser = null; + try { + LOG.info("Failing over to the ResourceManager for SubClusterId: {}", + subClusterId); + subClusterInfo = facade.getSubCluster(subClusterId, isFailover); + // updating the conf with the refreshed RM addresses as proxy + // creations + // are based out of conf + updateRMAddress(subClusterInfo); + currentUser = UserGroupInformation.getCurrentUser(); + addOriginalTokens(currentUser); + } catch (YarnException e) { + LOG.error("Exception while trying to create proxy to the ResourceManager" + + " for SubClusterId: {}", subClusterId, e); + return null; + } catch (IOException e) { + LOG.warn("Could not get information of requester, ignoring for now."); + } + try { + final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); + LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress, + protocol.getSimpleName(), currentUser); + LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress, + subClusterId); + return RMProxy.getProxy(conf, protocol, rmAddress); + } catch (IOException ioe) { + LOG.error( + "IOException while trying to create proxy to the ResourceManager" + + " for SubClusterId: {}", + subClusterId, ioe); + return null; + } + } + + private void updateRMAddress(SubClusterInfo subClusterInfo) { + if (subClusterInfo != null) { + if (protocol == ApplicationClientProtocol.class) { + conf.set(YarnConfiguration.RM_ADDRESS, + subClusterInfo.getClientRMServiceAddress()); + } else if (protocol == ApplicationMasterProtocol.class) { + conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + subClusterInfo.getAMRMServiceAddress()); + } else if (protocol == ResourceManagerAdministrationProtocol.class) { + conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, + subClusterInfo.getRMAdminServiceAddress()); + } + } + } + + @Override + public synchronized ProxyInfo<T> getProxy() { + if (current == null) { + current = getProxyInternal(false); + } + return new ProxyInfo<T>(current, subClusterId.getId()); + } + + @Override + public synchronized void performFailover(T currentProxy) { + closeInternal(currentProxy); + current = getProxyInternal(federationFailoverEnabled); + } + + @Override + public Class<T> getInterface() { + return protocol; + } + + private void closeInternal(T currentProxy) { + if ((currentProxy != null) && (currentProxy instanceof Closeable)) { + try { + ((Closeable) currentProxy).close(); + } catch (IOException e) { + LOG.warn("Exception while trying to close proxy", e); + } + } else { + RPC.stopProxy(currentProxy); + } + + } + + /** + * Close all the proxy objects which have been opened over the lifetime of + * this proxy provider. + */ + @Override + public synchronized void close() throws IOException { + closeInternal(current); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0cd1680d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java new file mode 100644 index 0000000..b1baa0c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java @@ -0,0 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.failover; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org