Author: vinodkv Date: Tue Dec 17 22:32:38 2013 New Revision: 1551739 URL: http://svn.apache.org/r1551739 Log: YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help with RM failover. Contributed by Karthik Kambatla.
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Dec 17 22:32:38 2013 @@ -52,6 +52,9 @@ Release 2.4.0 - UNRELEASED YARN-312. Introduced ResourceManagerAdministrationProtocol changes to support changing resources on node. (Junping Du via vinodkv) + YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help + with RM failover. (Karthik Kambatla via vinodkv) + IMPROVEMENTS YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml Tue Dec 17 22:32:38 2013 @@ -310,4 +310,12 @@ <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> + <!-- Ignore INSTANCE not being final as it is created in sub-classes --> + <Match> + <Class name="org.apache.hadoop.yarn.client.RMProxy" /> + <Field name="INSTANCE" /> + <Bug pattern="MS_SHOULD_BE_FINAL"/> + </Match> + + </FindBugsFilter> Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Dec 17 22:32:38 2013 @@ -296,6 +296,31 @@ public class YarnConfiguration extends C HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS : RM_WEBAPP_ADDRESS)); + public static final String CLIENT_FAILOVER_PREFIX = + YARN_PREFIX + "client.failover-"; + public static final String CLIENT_FAILOVER_PROXY_PROVIDER = + CLIENT_FAILOVER_PREFIX + "proxy-provider"; + public static final String DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER = + "org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider"; + + public static final String CLIENT_FAILOVER_MAX_ATTEMPTS = + CLIENT_FAILOVER_PREFIX + "max-attempts"; + + public static final String CLIENT_FAILOVER_SLEEPTIME_BASE_MS = + CLIENT_FAILOVER_PREFIX + "sleep-base-ms"; + + public static final String CLIENT_FAILOVER_SLEEPTIME_MAX_MS = + CLIENT_FAILOVER_PREFIX + "sleep-max-ms"; + + public static final String CLIENT_FAILOVER_RETRIES = + CLIENT_FAILOVER_PREFIX + "retries"; + public static final int DEFAULT_CLIENT_FAILOVER_RETRIES = 0; + + public static final String CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS = + CLIENT_FAILOVER_PREFIX + "retries-on-socket-timeouts"; + public static final int + DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS = 0; + //////////////////////////////// // RM state store configs //////////////////////////////// @@ -850,22 +875,31 @@ public class YarnConfiguration extends C public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX + "is.minicluster"; + public static final String YARN_MC_PREFIX = YARN_PREFIX + "minicluster."; + /** Whether to use fixed ports with the minicluster. */ - public static final String YARN_MINICLUSTER_FIXED_PORTS = YARN_PREFIX - + "minicluster.fixed.ports"; + public static final String YARN_MINICLUSTER_FIXED_PORTS = + YARN_MC_PREFIX + "fixed.ports"; /** * Default is false to be able to run tests concurrently without port * conflicts. */ - public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false; + public static final boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false; + + /** + * Whether the NM should use RPC to connect to the RM. Default is false. + * Can be set to true only when using fixed ports. + */ + public static final String YARN_MINICLUSTER_USE_RPC = YARN_MC_PREFIX + "use-rpc"; + public static final boolean DEFAULT_YARN_MINICLUSTER_USE_RPC = false; /** * Whether users are explicitly trying to control resource monitoring * configuration for the MiniYARNCluster. Disabled by default. */ public static final String YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = - YARN_PREFIX + "minicluster.control-resource-monitoring"; + YARN_MC_PREFIX + "control-resource-monitoring"; public static final boolean DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false; Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java Tue Dec 17 22:32:38 2013 @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -34,17 +35,37 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; -public class ClientRMProxy<T> extends RMProxy<T> { +import com.google.common.base.Preconditions; +public class ClientRMProxy<T> extends RMProxy<T> { private static final Log LOG = LogFactory.getLog(ClientRMProxy.class); + private interface ClientRMProtocols extends ApplicationClientProtocol, + ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { + // Add nothing + } + + static { + INSTANCE = new ClientRMProxy(); + } + + private ClientRMProxy(){ + super(); + } + + /** + * Create a proxy to the ResourceManager for the specified protocol. + * @param configuration Configuration with all the required information. + * @param protocol Client protocol for which proxy is being requested. + * @param <T> Type of proxy. + * @return Proxy to the ResourceManager for the specified client protocol. + * @throws IOException + */ public static <T> T createRMProxy(final Configuration configuration, final Class<T> protocol) throws IOException { - YarnConfiguration conf = (configuration instanceof YarnConfiguration) - ? (YarnConfiguration) configuration - : new YarnConfiguration(configuration); - InetSocketAddress rmAddress = getRMAddress(conf, protocol); - return createRMProxy(conf, protocol, rmAddress); + // This method exists only to initiate this class' static INSTANCE. TODO: + // FIX if possible + return RMProxy.createRMProxy(configuration, protocol); } private static void setupTokens(InetSocketAddress resourceManagerAddress) @@ -63,7 +84,9 @@ public class ClientRMProxy<T> extends RM } } - private static InetSocketAddress getRMAddress(YarnConfiguration conf, + @InterfaceAudience.Private + @Override + protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException { if (protocol == ApplicationClientProtocol.class) { return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, @@ -89,4 +112,12 @@ public class ClientRMProxy<T> extends RM throw new IllegalStateException(message); } } + + @InterfaceAudience.Private + @Override + protected void checkAllowedProtocols(Class<?> protocol) { + Preconditions.checkArgument( + protocol.isAssignableFrom(ClientRMProtocols.class), + "RM does not support this client protocol"); + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java Tue Dec 17 22:32:38 2013 @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -79,7 +78,6 @@ public class YarnClientImpl extends Yarn private static final Log LOG = LogFactory.getLog(YarnClientImpl.class); protected ApplicationClientProtocol rmClient; - protected InetSocketAddress rmAddress; protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; @@ -89,15 +87,9 @@ public class YarnClientImpl extends Yarn super(YarnClientImpl.class.getName()); } - private static InetSocketAddress getRmAddress(Configuration conf) { - return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); - } - @SuppressWarnings("deprecation") @Override protected void serviceInit(Configuration conf) throws Exception { - this.rmAddress = getRmAddress(conf); asyncApiPollIntervalMillis = conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); @@ -180,9 +172,7 @@ public class YarnClientImpl extends Yarn } } - - LOG.info("Submitted application " + applicationId + " to ResourceManager" - + " at " + rmAddress); + LOG.info("Submitted application " + applicationId); return applicationId; } Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java?rev=1551739&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java Tue Dec 17 22:32:38 2013 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.AdminService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestRMFailover { + private static final Log LOG = + LogFactory.getLog(TestRMFailover.class.getName()); + + private static final String RM1_NODE_ID = "rm1"; + private static final int RM1_PORT_BASE = 10000; + private static final String RM2_NODE_ID = "rm2"; + private static final int RM2_PORT_BASE = 20000; + private static final HAServiceProtocol.StateChangeRequestInfo req = + new HAServiceProtocol.StateChangeRequestInfo( + HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED); + + private static Configuration conf; + private static MiniYARNCluster cluster; + + private static void setConfForRM(String rmId, String prefix, String value) { + conf.set(HAUtil.addSuffix(prefix, rmId), value); + } + + private static void setRpcAddressForRM(String rmId, int base) { + setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); + } + + private static AdminService getRMAdminService(int index) { + return + cluster.getResourceManager(index).getRMContext().getRMAdminService(); + } + + @BeforeClass + public static void setup() throws IOException { + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); + setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + + conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 100); + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, 1000L); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); + + cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1); + cluster.init(conf); + cluster.start(); + + cluster.getResourceManager(0).getRMContext().getRMAdminService() + .transitionToActive(req); + assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex()); + } + + @AfterClass + public static void teardown() { + cluster.stop(); + } + + private void verifyClientConnection() { + int numRetries = 3; + while(numRetries-- > 0) { + Configuration conf = new YarnConfiguration(TestRMFailover.conf); + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + try { + client.getApplications(); + return; + } catch (Exception e) { + LOG.error(e); + } finally { + client.stop(); + } + } + fail("Client couldn't connect to the Active RM"); + } + + @Test + public void testExplicitFailover() + throws YarnException, InterruptedException, IOException { + assertTrue("NMs failed to connect to the RM", + cluster.waitForNodeManagersToConnect(5000)); + verifyClientConnection(); + + // Failover to the second RM + getRMAdminService(0).transitionToStandby(req); + getRMAdminService(1).transitionToActive(req); + assertEquals("Wrong ResourceManager is active", + HAServiceProtocol.HAServiceState.ACTIVE, + getRMAdminService(1).getServiceStatus().getState()); + assertTrue("NMs failed to connect to the RM", + cluster.waitForNodeManagersToConnect(5000)); + verifyClientConnection(); + + // Failover back to the first RM + getRMAdminService(1).transitionToStandby(req); + getRMAdminService(0).transitionToActive(req); + assertEquals("Wrong ResourceManager is active", + HAServiceProtocol.HAServiceState.ACTIVE, + getRMAdminService(0).getServiceStatus().getState()); + assertTrue("NMs failed to connect to the RM", + cluster.waitForNodeManagersToConnect(5000)); + verifyClientConnection(); + } +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java?rev=1551739&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java Tue Dec 17 22:32:38 2013 @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ConfiguredRMFailoverProxyProvider<T> + implements RMFailoverProxyProvider<T> { + private static final Log LOG = + LogFactory.getLog(ConfiguredRMFailoverProxyProvider.class); + + private int currentProxyIndex = 0; + Map<String, T> proxies = new HashMap<String, T>(); + + private RMProxy<T> rmProxy; + private Class<T> protocol; + protected YarnConfiguration conf; + protected String[] rmServiceIds; + + @Override + public void init(Configuration configuration, RMProxy<T> rmProxy, + Class<T> protocol) { + this.rmProxy = rmProxy; + this.protocol = protocol; + this.rmProxy.checkAllowedProtocols(this.protocol); + this.conf = new YarnConfiguration(configuration); + Collection<String> rmIds = HAUtil.getRMHAIds(conf); + this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]); + conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, + conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES)); + + conf.setInt(CommonConfigurationKeysPublic. + IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, + conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS, + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS)); + } + + private T getProxyInternal() { + try { + final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol); + return RMProxy.getProxy(conf, protocol, rmAddress); + } catch (IOException ioe) { + LOG.error("Unable to create proxy to the ResourceManager " + + rmServiceIds[currentProxyIndex], ioe); + return null; + } + } + + @Override + public synchronized T getProxy() { + String rmId = rmServiceIds[currentProxyIndex]; + T current = proxies.get(rmId); + if (current == null) { + current = getProxyInternal(); + proxies.put(rmId, current); + } + return current; + } + + @Override + public synchronized void performFailover(T currentProxy) { + currentProxyIndex = (currentProxyIndex + 1) % rmServiceIds.length; + conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]); + LOG.info("Failing over to " + rmServiceIds[currentProxyIndex]); + } + + @Override + public Class<T> getInterface() { + return protocol; + } + + /** + * Close all the proxy objects which have been opened over the lifetime of + * this proxy provider. + */ + @Override + public synchronized void close() throws IOException { + for (T proxy : proxies.values()) { + if (proxy instanceof Closeable) { + ((Closeable)proxy).close(); + } else { + RPC.stopProxy(proxy); + } + } + } +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java?rev=1551739&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java Tue Dec 17 22:32:38 2013 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.FailoverProxyProvider; + +@InterfaceAudience.Private +public interface RMFailoverProxyProvider<T> extends FailoverProxyProvider <T> { + /** + * Initialize internal data structures, invoked right after instantiation. + * + * @param conf Configuration to use + * @param proxy The {@link RMProxy} instance to use + * @param protocol The communication protocol to use + */ + public void init(Configuration conf, RMProxy<T> proxy, Class<T> protocol); +} Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java Tue Dec 17 22:32:38 2013 @@ -36,6 +36,8 @@ import org.apache.hadoop.io.retry.RetryP import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -48,7 +50,68 @@ import com.google.common.annotations.Vis public class RMProxy<T> { private static final Log LOG = LogFactory.getLog(RMProxy.class); + protected static RMProxy INSTANCE; + protected RMProxy() {} + + /** + * Verify the passed protocol is supported. + */ + @Private + protected void checkAllowedProtocols(Class<?> protocol) {} + + /** + * Get the ResourceManager address from the provided Configuration for the + * given protocol. + */ + @Private + protected InetSocketAddress getRMAddress( + YarnConfiguration conf, Class<?> protocol) throws IOException { + throw new UnsupportedOperationException("This method should be invoked " + + "from an instance of ClientRMProxy or ServerRMProxy"); + } + + /** + * Create a proxy for the specified protocol. For non-HA, + * this is a direct connection to the ResourceManager address. When HA is + * enabled, the proxy handles the failover between the ResourceManagers as + * well. + */ + @Private + protected static <T> T createRMProxy(final Configuration configuration, + final Class<T> protocol) throws IOException { + YarnConfiguration conf = (configuration instanceof YarnConfiguration) + ? (YarnConfiguration) configuration + : new YarnConfiguration(configuration); + RetryPolicy retryPolicy = createRetryPolicy(conf); + if (HAUtil.isHAEnabled(conf)) { + RMFailoverProxyProvider<T> provider = + INSTANCE.createRMFailoverProxyProvider(conf, protocol); + return (T) RetryProxy.create(protocol, provider, retryPolicy); + } else { + InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol); + LOG.info("Connecting to ResourceManager at " + rmAddress); + T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); + return (T) RetryProxy.create(protocol, proxy, retryPolicy); + } + } + + /** + * @deprecated + * This method is deprecated and is not used by YARN internally any more. + * To create a proxy to the RM, use ClientRMProxy#createRMProxy or + * ServerRMProxy#createRMProxy. + * + * Create a proxy to the ResourceManager at the specified address. + * + * @param conf Configuration to generate retry policy + * @param protocol Protocol for the proxy + * @param rmAddress Address of the ResourceManager + * @param <T> Type information of the proxy + * @return Proxy to the RM + * @throws IOException + */ + @Deprecated public static <T> T createRMProxy(final Configuration conf, final Class<T> protocol, InetSocketAddress rmAddress) throws IOException { RetryPolicy retryPolicy = createRetryPolicy(conf); @@ -57,12 +120,16 @@ public class RMProxy<T> { return (T) RetryProxy.create(protocol, proxy, retryPolicy); } - private static <T> T getProxy(final Configuration conf, + /** + * Get a proxy to the RM at the specified address. To be used to create a + * RetryProxy. + */ + @Private + static <T> T getProxy(final Configuration conf, final Class<T> protocol, final InetSocketAddress rmAddress) throws IOException { return UserGroupInformation.getCurrentUser().doAs( new PrivilegedAction<T>() { - @Override public T run() { return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf); @@ -70,6 +137,50 @@ public class RMProxy<T> { }); } + /** + * Helper method to create FailoverProxyProvider. + */ + private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider( + Configuration conf, Class<T> protocol) { + Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass; + try { + defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>) + Class.forName( + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER); + } catch (Exception e) { + throw new YarnRuntimeException("Invalid default failover provider class" + + YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e); + } + + RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, + defaultProviderClass, RMFailoverProxyProvider.class), conf); + provider.init(conf, (RMProxy<T>) this, protocol); + return provider; + } + + /** + * A RetryPolicy to allow failing over upto the specified maximum time. + */ + private static class FailoverUptoMaximumTimePolicy implements RetryPolicy { + private long maxTime; + + FailoverUptoMaximumTimePolicy(long maxTime) { + this.maxTime = maxTime; + } + + @Override + public RetryAction shouldRetry(Exception e, int retries, int failovers, + boolean isIdempotentOrAtMostOnce) throws Exception { + return System.currentTimeMillis() < maxTime + ? RetryAction.FAILOVER_AND_RETRY + : RetryAction.FAIL; + } + } + + /** + * Fetch retry policy from Configuration + */ @Private @VisibleForTesting public static RetryPolicy createRetryPolicy(Configuration conf) { @@ -81,19 +192,10 @@ public class RMProxy<T> { conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, YarnConfiguration - .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); - - if (rmConnectionRetryIntervalMS < 0) { - throw new YarnRuntimeException("Invalid Configuration. " + - YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + - " should not be negative."); - } + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); boolean waitForEver = (rmConnectWaitMS == -1); - - if (waitForEver) { - return RetryPolicies.RETRY_FOREVER; - } else { + if (!waitForEver) { if (rmConnectWaitMS < 0) { throw new YarnRuntimeException("Invalid Configuration. " + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS @@ -110,18 +212,54 @@ public class RMProxy<T> { } } + // Handle HA case first + if (HAUtil.isHAEnabled(conf)) { + final long failoverSleepBaseMs = conf.getLong( + YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, + rmConnectionRetryIntervalMS); + + final long failoverSleepMaxMs = conf.getLong( + YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, + rmConnectionRetryIntervalMS); + + int maxFailoverAttempts = conf.getInt( + YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1); + + RetryPolicy basePolicy = RetryPolicies.TRY_ONCE_THEN_FAIL; + if (maxFailoverAttempts == -1) { + if (waitForEver) { + basePolicy = RetryPolicies.FAILOVER_FOREVER; + } else { + basePolicy = new FailoverUptoMaximumTimePolicy( + System.currentTimeMillis() + rmConnectWaitMS); + } + maxFailoverAttempts = 0; + } + + return RetryPolicies.failoverOnNetworkException(basePolicy, + maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs); + } + + if (waitForEver) { + return RetryPolicies.RETRY_FOREVER; + } + + if (rmConnectionRetryIntervalMS < 0) { + throw new YarnRuntimeException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + + " should not be negative."); + } + RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, - rmConnectionRetryIntervalMS, - TimeUnit.MILLISECONDS); + rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS); Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<Class<? extends Exception>, RetryPolicy>(); exceptionToPolicyMap.put(ConnectException.class, retryPolicy); //TO DO: after HADOOP-9576, IOException can be changed to EOFException exceptionToPolicyMap.put(IOException.class, retryPolicy); - - return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL, - exceptionToPolicyMap); + return RetryPolicies.retryByException( + RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Tue Dec 17 22:32:38 2013 @@ -425,6 +425,61 @@ </property> <property> + <description>When HA is enabled, the class to be used by Clients, AMs and + NMs to failover to the Active RM. It should extend + org.apache.hadoop.yarn.client.RMFailoverProxyProvider</description> + <name>yarn.client.failover-proxy-provider</name> + <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value> + </property> + + <property> + <description>When HA is enabled, the max number of times + FailoverProxyProvider should attempt failover. When set, + this overrides the yarn.resourcemanager.connect.max-wait.ms. When + not set, this is inferred from + yarn.resourcemanager.connect.max-wait.ms.</description> + <name>yarn.client.failover-max-attempts</name> + <!--value>15</value--> + </property> + + <property> + <description>When HA is enabled, the sleep base (in milliseconds) to be + used for calculating the exponential delay between failovers. When set, + this overrides the yarn.resourcemanager.connect.* settings. When + not set, yarn.resourcemanager.connect.retry-interval.ms is used instead. + </description> + <name>yarn.client.failover-sleep-base-ms</name> + <!--value>500</value--> + </property> + + <property> + <description>When HA is enabled, the maximum sleep time (in milliseconds) + between failovers. When set, this overrides the + yarn.resourcemanager.connect.* settings. When not set, + yarn.resourcemanager.connect.retry-interval.ms is used instead.</description> + <name>yarn.client.failover-sleep-max-ms</name> + <!--value>15000</value--> + </property> + + <property> + <description>When HA is enabled, the number of retries per + attempt to connect to a ResourceManager. In other words, + it is the ipc.client.connect.max.retries to be used during + failover attempts</description> + <name>yarn.client.failover-retries</name> + <value>0</value> + </property> + + <property> + <description>When HA is enabled, the number of retries per + attempt to connect to a ResourceManager on socket timeouts. In other + words, it is the ipc.client.connect.max.retries.on.timeouts to be used + during failover attempts</description> + <name>yarn.client.failover-retries-on-socket-timeouts</name> + <value>0</value> + </property> + + <property> <description>The maximum number of completed applications RM keeps. </description> <name>yarn.resourcemanager.max-completed-applications</name> <value>10000</value> Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java Tue Dec 17 22:32:38 2013 @@ -23,25 +23,43 @@ import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; -public class ServerRMProxy<T> extends RMProxy<T> { +import com.google.common.base.Preconditions; +public class ServerRMProxy<T> extends RMProxy<T> { private static final Log LOG = LogFactory.getLog(ServerRMProxy.class); + static { + INSTANCE = new ServerRMProxy(); + } + + private ServerRMProxy() { + super(); + } + + /** + * Create a proxy to the ResourceManager for the specified protocol. + * @param configuration Configuration with all the required information. + * @param protocol Server protocol for which proxy is being requested. + * @param <T> Type of proxy. + * @return Proxy to the ResourceManager for the specified server protocol. + * @throws IOException + */ public static <T> T createRMProxy(final Configuration configuration, final Class<T> protocol) throws IOException { - YarnConfiguration conf = (configuration instanceof YarnConfiguration) - ? (YarnConfiguration) configuration - : new YarnConfiguration(configuration); - InetSocketAddress rmAddress = getRMAddress(conf, protocol); - return createRMProxy(conf, protocol, rmAddress); + // This method exists only to initiate this class' static INSTANCE. TODO: + // FIX if possible + return RMProxy.createRMProxy(configuration, protocol); } - private static InetSocketAddress getRMAddress(YarnConfiguration conf, - Class<?> protocol) { + @InterfaceAudience.Private + @Override + protected InetSocketAddress getRMAddress(YarnConfiguration conf, + Class<?> protocol) { if (protocol == ResourceTracker.class) { return conf.getSocketAddr( YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, @@ -55,4 +73,12 @@ public class ServerRMProxy<T> extends RM throw new IllegalStateException(message); } } + + @InterfaceAudience.Private + @Override + protected void checkAllowedProtocols(Class<?> protocol) { + Preconditions.checkArgument( + protocol.isAssignableFrom(ResourceTracker.class), + "ResourceManager does not support this protocol"); + } } \ No newline at end of file Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Tue Dec 17 22:32:38 2013 @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -38,6 +39,7 @@ import org.apache.hadoop.service.Abstrac import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -65,6 +67,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import static org.junit.Assert.fail; + /** * Embedded Yarn minicluster for testcases that need to interact with a cluster. * <p/> @@ -91,9 +95,11 @@ public class MiniYARNCluster extends Com private NodeManager[] nodeManagers; private ResourceManager[] resourceManagers; + private String[] rmIds; + + private boolean useFixedPorts; + private boolean useRpc = false; - private ResourceManagerWrapper resourceManagerWrapper; - private ConcurrentMap<ApplicationAttemptId, Long> appMasters = new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2); @@ -163,15 +169,7 @@ public class MiniYARNCluster extends Com } resourceManagers = new ResourceManager[numResourceManagers]; - for (int i = 0; i < numResourceManagers; i++) { - resourceManagers[i] = new ResourceManager(); - addService(new ResourceManagerWrapper(i)); - } - nodeManagers = new CustomNodeManager[numNodeManagers]; - for(int index = 0; index < numNodeManagers; index++) { - addService(new NodeManagerWrapper(index)); - nodeManagers[index] = new CustomNodeManager(); - } + nodeManagers = new NodeManager[numNodeManagers]; } /** @@ -185,20 +183,45 @@ public class MiniYARNCluster extends Com this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs); } - @Override + @Override public void serviceInit(Configuration conf) throws Exception { + useFixedPorts = conf.getBoolean( + YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS); + useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, + YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC); + + if (useRpc && !useFixedPorts) { + throw new YarnRuntimeException("Invalid configuration!" + + " Minicluster can use rpc only when configured to use fixed ports"); + } + if (resourceManagers.length > 1) { conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); - - StringBuilder rmIds = new StringBuilder(); - for (int i = 0; i < resourceManagers.length; i++) { - if (i != 0) { - rmIds.append(","); + if (conf.get(YarnConfiguration.RM_HA_IDS) == null) { + StringBuilder rmIds = new StringBuilder(); + for (int i = 0; i < resourceManagers.length; i++) { + if (i != 0) { + rmIds.append(","); + } + rmIds.append("rm" + i); } - rmIds.append("rm" + i); + conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); } - conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString()); + Collection<String> rmIdsCollection = HAUtil.getRMHAIds(conf); + rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]); } + + for (int i = 0; i < resourceManagers.length; i++) { + resourceManagers[i] = new ResourceManager(); + addService(new ResourceManagerWrapper(i)); + } + for(int index = 0; index < nodeManagers.length; index++) { + nodeManagers[index] = + useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager(); + addService(new NodeManagerWrapper(index)); + } + super.serviceInit( conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); } @@ -213,11 +236,12 @@ public class MiniYARNCluster extends Com * * In an non-HA cluster, return the index of the only RM. * - * @return index of the active RM + * @return index of the active RM or -1 if none of them transition to + * active even after 5 seconds of waiting */ @InterfaceAudience.Private @VisibleForTesting - int getActiveRMIndex() { + public int getActiveRMIndex() { if (resourceManagers.length == 1) { return 0; } @@ -292,9 +316,7 @@ public class MiniYARNCluster extends Com } private void setHARMConfiguration(Configuration conf) { - String rmId = "rm" + index; String hostname = MiniYARNCluster.getHostname(); - conf.set(YarnConfiguration.RM_HA_ID, rmId); for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { for (String id : HAUtil.getRMHAIds(conf)) { conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0"); @@ -306,15 +328,17 @@ public class MiniYARNCluster extends Com protected synchronized void serviceInit(Configuration conf) throws Exception { conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true); - if (!conf.getBoolean( - YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, - YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) { + + if (!useFixedPorts) { if (HAUtil.isHAEnabled(conf)) { setHARMConfiguration(conf); } else { setNonHARMConfiguration(conf); } } + if (HAUtil.isHAEnabled(conf)) { + conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]); + } resourceManagers[index].init(conf); resourceManagers[index].getRMContext().getDispatcher().register (RMAppAttemptEventType.class, @@ -500,7 +524,9 @@ public class MiniYARNCluster extends Com protected void doSecureLogin() throws IOException { // Don't try to login using keytab in the testcase. } + } + private class ShortCircuitedNodeManager extends CustomNodeManager { @Override protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { @@ -553,4 +579,28 @@ public class MiniYARNCluster extends Com }; } } + + /** + * Wait for all the NodeManagers to connect to the ResourceManager. + * + * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds. + * @return true if all NodeManagers connect to the (Active) + * ResourceManager, false otherwise. + * @throws YarnException + * @throws InterruptedException + */ + public boolean waitForNodeManagersToConnect(long timeout) + throws YarnException, InterruptedException { + ResourceManager rm = getResourceManager(); + GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); + + for (int i = 0; i < timeout / 100; i++) { + if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req) + .getClusterMetrics().getNumNodeManagers()) { + return true; + } + Thread.sleep(100); + } + return false; + } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java?rev=1551739&r1=1551738&r2=1551739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java Tue Dec 17 22:32:38 2013 @@ -33,6 +33,7 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class TestMiniYARNClusterForHA { @@ -56,16 +57,7 @@ public class TestMiniYARNClusterForHA { @Test public void testClusterWorks() throws YarnException, InterruptedException { - ResourceManager rm = cluster.getResourceManager(0); - GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance(); - - for (int i = 0; i < 600; i++) { - if (1 == rm.getClientRMService().getClusterMetrics(req) - .getClusterMetrics().getNumNodeManagers()) { - return; - } - Thread.sleep(100); - } - fail("NodeManager never registered with the RM"); + assertTrue("NMs fail to connect to the RM", + cluster.waitForNodeManagersToConnect(5000)); } }