Author: vinodkv
Date: Tue Dec 17 22:32:38 2013
New Revision: 1551739

URL: http://svn.apache.org/r1551739
Log:
YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help 
with RM failover. Contributed by Karthik Kambatla.

Added:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Dec 17 22:32:38 2013
@@ -52,6 +52,9 @@ Release 2.4.0 - UNRELEASED
     YARN-312. Introduced ResourceManagerAdministrationProtocol changes to 
support
     changing resources on node. (Junping Du via vinodkv)
 
+    YARN-1028. Added FailoverProxyProvider capability to ResourceManager to 
help
+    with RM failover. (Karthik Kambatla via vinodkv)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
 Tue Dec 17 22:32:38 2013
@@ -310,4 +310,12 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
+  <!-- Ignore INSTANCE not being final as it is created in sub-classes -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.client.RMProxy" />
+    <Field name="INSTANCE" />
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+
 </FindBugsFilter>

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 Tue Dec 17 22:32:38 2013
@@ -296,6 +296,31 @@ public class YarnConfiguration extends C
           HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS
               : RM_WEBAPP_ADDRESS));
 
+  public static final String CLIENT_FAILOVER_PREFIX =
+      YARN_PREFIX + "client.failover-";
+  public static final String CLIENT_FAILOVER_PROXY_PROVIDER =
+      CLIENT_FAILOVER_PREFIX + "proxy-provider";
+  public static final String DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER =
+      "org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider";
+
+  public static final String CLIENT_FAILOVER_MAX_ATTEMPTS =
+      CLIENT_FAILOVER_PREFIX + "max-attempts";
+
+  public static final String CLIENT_FAILOVER_SLEEPTIME_BASE_MS =
+      CLIENT_FAILOVER_PREFIX + "sleep-base-ms";
+
+  public static final String CLIENT_FAILOVER_SLEEPTIME_MAX_MS =
+      CLIENT_FAILOVER_PREFIX + "sleep-max-ms";
+
+  public static final String CLIENT_FAILOVER_RETRIES =
+      CLIENT_FAILOVER_PREFIX + "retries";
+  public static final int DEFAULT_CLIENT_FAILOVER_RETRIES = 0;
+
+  public static final String CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS =
+      CLIENT_FAILOVER_PREFIX + "retries-on-socket-timeouts";
+  public static final int
+      DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS = 0;
+
   ////////////////////////////////
   // RM state store configs
   ////////////////////////////////
@@ -850,22 +875,31 @@ public class YarnConfiguration extends C
   public static final String IS_MINI_YARN_CLUSTER = YARN_PREFIX
       + "is.minicluster";
 
+  public static final String YARN_MC_PREFIX = YARN_PREFIX + "minicluster.";
+
   /** Whether to use fixed ports with the minicluster. */
-  public static final String YARN_MINICLUSTER_FIXED_PORTS = YARN_PREFIX
-      + "minicluster.fixed.ports";
+  public static final String YARN_MINICLUSTER_FIXED_PORTS =
+      YARN_MC_PREFIX + "fixed.ports";
 
   /**
    * Default is false to be able to run tests concurrently without port
    * conflicts.
    */
-  public static boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+  public static final boolean DEFAULT_YARN_MINICLUSTER_FIXED_PORTS = false;
+
+  /**
+   * Whether the NM should use RPC to connect to the RM. Default is false.
+   * Can be set to true only when using fixed ports.
+   */
+  public static final String YARN_MINICLUSTER_USE_RPC = YARN_MC_PREFIX + 
"use-rpc";
+  public static final boolean DEFAULT_YARN_MINICLUSTER_USE_RPC = false;
 
   /**
    * Whether users are explicitly trying to control resource monitoring
    * configuration for the MiniYARNCluster. Disabled by default.
    */
   public static final String YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING =
-      YARN_PREFIX + "minicluster.control-resource-monitoring";
+      YARN_MC_PREFIX + "control-resource-monitoring";
   public static final boolean
       DEFAULT_YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING = false;
 

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
 Tue Dec 17 22:32:38 2013
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -34,17 +35,37 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 
-public class ClientRMProxy<T> extends RMProxy<T>  {
+import com.google.common.base.Preconditions;
 
+public class ClientRMProxy<T> extends RMProxy<T>  {
   private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
 
+  private interface ClientRMProtocols extends ApplicationClientProtocol,
+      ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
+    // Add nothing
+  }
+
+  static {
+    INSTANCE = new ClientRMProxy();
+  }
+
+  private ClientRMProxy(){
+    super();
+  }
+
+  /**
+   * Create a proxy to the ResourceManager for the specified protocol.
+   * @param configuration Configuration with all the required information.
+   * @param protocol Client protocol for which proxy is being requested.
+   * @param <T> Type of proxy.
+   * @return Proxy to the ResourceManager for the specified client protocol.
+   * @throws IOException
+   */
   public static <T> T createRMProxy(final Configuration configuration,
       final Class<T> protocol) throws IOException {
-    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
-        ? (YarnConfiguration) configuration
-        : new YarnConfiguration(configuration);
-    InetSocketAddress rmAddress = getRMAddress(conf, protocol);
-    return createRMProxy(conf, protocol, rmAddress);
+    // This method exists only to initiate this class' static INSTANCE. TODO:
+    // FIX if possible
+    return RMProxy.createRMProxy(configuration, protocol);
   }
 
   private static void setupTokens(InetSocketAddress resourceManagerAddress)
@@ -63,7 +84,9 @@ public class ClientRMProxy<T> extends RM
     }
   }
 
-  private static InetSocketAddress getRMAddress(YarnConfiguration conf,
+  @InterfaceAudience.Private
+  @Override
+  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
       Class<?> protocol) throws IOException {
     if (protocol == ApplicationClientProtocol.class) {
       return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
@@ -89,4 +112,12 @@ public class ClientRMProxy<T> extends RM
       throw new IllegalStateException(message);
     }
   }
+
+  @InterfaceAudience.Private
+  @Override
+  protected void checkAllowedProtocols(Class<?> protocol) {
+    Preconditions.checkArgument(
+        protocol.isAssignableFrom(ClientRMProtocols.class),
+        "RM does not support this client protocol");
+  }
 }

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
 Tue Dec 17 22:32:38 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -79,7 +78,6 @@ public class YarnClientImpl extends Yarn
   private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
 
   protected ApplicationClientProtocol rmClient;
-  protected InetSocketAddress rmAddress;
   protected long submitPollIntervalMillis;
   private long asyncApiPollIntervalMillis;
 
@@ -89,15 +87,9 @@ public class YarnClientImpl extends Yarn
     super(YarnClientImpl.class.getName());
   }
 
-  private static InetSocketAddress getRmAddress(Configuration conf) {
-    return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-      YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
-  }
-
   @SuppressWarnings("deprecation")
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    this.rmAddress = getRmAddress(conf);
     asyncApiPollIntervalMillis =
         
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
           
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
@@ -180,9 +172,7 @@ public class YarnClientImpl extends Yarn
       }
     }
 
-
-    LOG.info("Submitted application " + applicationId + " to ResourceManager"
-        + " at " + rmAddress);
+    LOG.info("Submitted application " + applicationId);
     return applicationId;
   }
 

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java?rev=1551739&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
 Tue Dec 17 22:32:38 2013
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRMFailover {
+  private static final Log LOG =
+      LogFactory.getLog(TestRMFailover.class.getName());
+
+  private static final String RM1_NODE_ID = "rm1";
+  private static final int RM1_PORT_BASE = 10000;
+  private static final String RM2_NODE_ID = "rm2";
+  private static final int RM2_PORT_BASE = 20000;
+  private static final HAServiceProtocol.StateChangeRequestInfo req =
+      new HAServiceProtocol.StateChangeRequestInfo(
+          HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED);
+
+  private static Configuration conf;
+  private static MiniYARNCluster cluster;
+
+  private static void setConfForRM(String rmId, String prefix, String value) {
+    conf.set(HAUtil.addSuffix(prefix, rmId), value);
+  }
+
+  private static void setRpcAddressForRM(String rmId, int base) {
+    setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, 
"0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT));
+    setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" +
+        (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
+  }
+
+  private static AdminService getRMAdminService(int index) {
+    return
+        cluster.getResourceManager(index).getRMContext().getRMAdminService();
+  }
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+    setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE);
+    setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
+
+    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 100);
+    conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
+    conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, 1000L);
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
+
+    cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1);
+    cluster.init(conf);
+    cluster.start();
+
+    cluster.getResourceManager(0).getRMContext().getRMAdminService()
+        .transitionToActive(req);
+    assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+  }
+
+  @AfterClass
+  public static void teardown() {
+    cluster.stop();
+  }
+
+  private void verifyClientConnection() {
+    int numRetries = 3;
+    while(numRetries-- > 0) {
+      Configuration conf = new YarnConfiguration(TestRMFailover.conf);
+      YarnClient client = YarnClient.createYarnClient();
+      client.init(conf);
+      client.start();
+      try {
+        client.getApplications();
+        return;
+      } catch (Exception e) {
+        LOG.error(e);
+      } finally {
+        client.stop();
+      }
+    }
+    fail("Client couldn't connect to the Active RM");
+  }
+
+  @Test
+  public void testExplicitFailover()
+      throws YarnException, InterruptedException, IOException {
+    assertTrue("NMs failed to connect to the RM",
+        cluster.waitForNodeManagersToConnect(5000));
+    verifyClientConnection();
+
+    // Failover to the second RM
+    getRMAdminService(0).transitionToStandby(req);
+    getRMAdminService(1).transitionToActive(req);
+    assertEquals("Wrong ResourceManager is active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        getRMAdminService(1).getServiceStatus().getState());
+    assertTrue("NMs failed to connect to the RM",
+        cluster.waitForNodeManagersToConnect(5000));
+    verifyClientConnection();
+
+    // Failover back to the first RM
+    getRMAdminService(1).transitionToStandby(req);
+    getRMAdminService(0).transitionToActive(req);
+    assertEquals("Wrong ResourceManager is active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        getRMAdminService(0).getServiceStatus().getState());
+    assertTrue("NMs failed to connect to the RM",
+        cluster.waitForNodeManagersToConnect(5000));
+    verifyClientConnection();
+  }
+}

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java?rev=1551739&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ConfiguredRMFailoverProxyProvider.java
 Tue Dec 17 22:32:38 2013
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ConfiguredRMFailoverProxyProvider<T>
+    implements RMFailoverProxyProvider<T> {
+  private static final Log LOG =
+      LogFactory.getLog(ConfiguredRMFailoverProxyProvider.class);
+
+  private int currentProxyIndex = 0;
+  Map<String, T> proxies = new HashMap<String, T>();
+
+  private RMProxy<T> rmProxy;
+  private Class<T> protocol;
+  protected YarnConfiguration conf;
+  protected String[] rmServiceIds;
+
+  @Override
+  public void init(Configuration configuration, RMProxy<T> rmProxy,
+                    Class<T> protocol) {
+    this.rmProxy = rmProxy;
+    this.protocol = protocol;
+    this.rmProxy.checkAllowedProtocols(this.protocol);
+    this.conf = new YarnConfiguration(configuration);
+    Collection<String> rmIds = HAUtil.getRMHAIds(conf);
+    this.rmServiceIds = rmIds.toArray(new String[rmIds.size()]);
+    conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
+
+    
conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
+            YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
+
+    conf.setInt(CommonConfigurationKeysPublic.
+        IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+        
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
+            
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
+  }
+
+  private T getProxyInternal() {
+    try {
+      final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+      return RMProxy.getProxy(conf, protocol, rmAddress);
+    } catch (IOException ioe) {
+      LOG.error("Unable to create proxy to the ResourceManager " +
+          rmServiceIds[currentProxyIndex], ioe);
+      return null;
+    }
+  }
+
+  @Override
+  public synchronized T getProxy() {
+    String rmId = rmServiceIds[currentProxyIndex];
+    T current = proxies.get(rmId);
+    if (current == null) {
+      current = getProxyInternal();
+      proxies.put(rmId, current);
+    }
+    return current;
+  }
+
+  @Override
+  public synchronized void performFailover(T currentProxy) {
+    currentProxyIndex = (currentProxyIndex + 1) % rmServiceIds.length;
+    conf.set(YarnConfiguration.RM_HA_ID, rmServiceIds[currentProxyIndex]);
+    LOG.info("Failing over to " + rmServiceIds[currentProxyIndex]);
+  }
+
+  @Override
+  public Class<T> getInterface() {
+    return protocol;
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * this proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    for (T proxy : proxies.values()) {
+      if (proxy instanceof Closeable) {
+        ((Closeable)proxy).close();
+      } else {
+        RPC.stopProxy(proxy);
+      }
+    }
+  }
+}

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java?rev=1551739&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMFailoverProxyProvider.java
 Tue Dec 17 22:32:38 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+
+@InterfaceAudience.Private
+public interface RMFailoverProxyProvider<T> extends FailoverProxyProvider <T> {
+  /**
+   * Initialize internal data structures, invoked right after instantiation.
+   *
+   * @param conf Configuration to use
+   * @param proxy The {@link RMProxy} instance to use
+   * @param protocol The communication protocol to use
+   */
+  public void init(Configuration conf, RMProxy<T> proxy, Class<T> protocol);
+}

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
 Tue Dec 17 22:32:38 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.io.retry.RetryP
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -48,7 +50,68 @@ import com.google.common.annotations.Vis
 public class RMProxy<T> {
 
   private static final Log LOG = LogFactory.getLog(RMProxy.class);
+  protected static RMProxy INSTANCE;
 
+  protected RMProxy() {}
+
+  /**
+   * Verify the passed protocol is supported.
+   */
+  @Private
+  protected void checkAllowedProtocols(Class<?> protocol) {}
+
+  /**
+   * Get the ResourceManager address from the provided Configuration for the
+   * given protocol.
+   */
+  @Private
+  protected InetSocketAddress getRMAddress(
+      YarnConfiguration conf, Class<?> protocol) throws IOException {
+    throw new UnsupportedOperationException("This method should be invoked " +
+        "from an instance of ClientRMProxy or ServerRMProxy");
+  }
+
+  /**
+   * Create a proxy for the specified protocol. For non-HA,
+   * this is a direct connection to the ResourceManager address. When HA is
+   * enabled, the proxy handles the failover between the ResourceManagers as
+   * well.
+   */
+  @Private
+  protected static <T> T createRMProxy(final Configuration configuration,
+      final Class<T> protocol) throws IOException {
+    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
+        ? (YarnConfiguration) configuration
+        : new YarnConfiguration(configuration);
+    RetryPolicy retryPolicy = createRetryPolicy(conf);
+    if (HAUtil.isHAEnabled(conf)) {
+      RMFailoverProxyProvider<T> provider =
+          INSTANCE.createRMFailoverProxyProvider(conf, protocol);
+      return (T) RetryProxy.create(protocol, provider, retryPolicy);
+    } else {
+      InetSocketAddress rmAddress = INSTANCE.getRMAddress(conf, protocol);
+      LOG.info("Connecting to ResourceManager at " + rmAddress);
+      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
+      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
+    }
+  }
+
+  /**
+   * @deprecated
+   * This method is deprecated and is not used by YARN internally any more.
+   * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
+   * ServerRMProxy#createRMProxy.
+   *
+   * Create a proxy to the ResourceManager at the specified address.
+   *
+   * @param conf Configuration to generate retry policy
+   * @param protocol Protocol for the proxy
+   * @param rmAddress Address of the ResourceManager
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException
+   */
+  @Deprecated
   public static <T> T createRMProxy(final Configuration conf,
       final Class<T> protocol, InetSocketAddress rmAddress) throws IOException 
{
     RetryPolicy retryPolicy = createRetryPolicy(conf);
@@ -57,12 +120,16 @@ public class RMProxy<T> {
     return (T) RetryProxy.create(protocol, proxy, retryPolicy);
   }
 
-  private static <T> T getProxy(final Configuration conf,
+  /**
+   * Get a proxy to the RM at the specified address. To be used to create a
+   * RetryProxy.
+   */
+  @Private
+  static <T> T getProxy(final Configuration conf,
       final Class<T> protocol, final InetSocketAddress rmAddress)
       throws IOException {
     return UserGroupInformation.getCurrentUser().doAs(
       new PrivilegedAction<T>() {
-
         @Override
         public T run() {
           return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
@@ -70,6 +137,50 @@ public class RMProxy<T> {
       });
   }
 
+  /**
+   * Helper method to create FailoverProxyProvider.
+   */
+  private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
+      Configuration conf, Class<T> protocol) {
+    Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
+    try {
+      defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
+          Class.forName(
+              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
+    } catch (Exception e) {
+      throw new YarnRuntimeException("Invalid default failover provider class" 
+
+          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
+    }
+
+    RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
+        conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+            defaultProviderClass, RMFailoverProxyProvider.class), conf);
+    provider.init(conf, (RMProxy<T>) this, protocol);
+    return provider;
+  }
+
+  /**
+   * A RetryPolicy to allow failing over upto the specified maximum time.
+   */
+  private static class FailoverUptoMaximumTimePolicy implements RetryPolicy {
+    private long maxTime;
+
+    FailoverUptoMaximumTimePolicy(long maxTime) {
+      this.maxTime = maxTime;
+    }
+
+    @Override
+    public RetryAction shouldRetry(Exception e, int retries, int failovers,
+        boolean isIdempotentOrAtMostOnce) throws Exception {
+      return System.currentTimeMillis() < maxTime
+          ? RetryAction.FAILOVER_AND_RETRY
+          : RetryAction.FAIL;
+    }
+  }
+
+  /**
+   * Fetch retry policy from Configuration
+   */
   @Private
   @VisibleForTesting
   public static RetryPolicy createRetryPolicy(Configuration conf) {
@@ -81,19 +192,10 @@ public class RMProxy<T> {
         conf.getLong(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
             YarnConfiguration
-            .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
-
-    if (rmConnectionRetryIntervalMS < 0) {
-      throw new YarnRuntimeException("Invalid Configuration. " +
-          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
-          " should not be negative.");
-    }
+                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
 
     boolean waitForEver = (rmConnectWaitMS == -1);
-
-    if (waitForEver) {
-      return  RetryPolicies.RETRY_FOREVER;
-    } else {
+    if (!waitForEver) {
       if (rmConnectWaitMS < 0) {
         throw new YarnRuntimeException("Invalid Configuration. "
             + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
@@ -110,18 +212,54 @@ public class RMProxy<T> {
       }
     }
 
+    // Handle HA case first
+    if (HAUtil.isHAEnabled(conf)) {
+      final long failoverSleepBaseMs = conf.getLong(
+          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
+          rmConnectionRetryIntervalMS);
+
+      final long failoverSleepMaxMs = conf.getLong(
+          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
+          rmConnectionRetryIntervalMS);
+
+      int maxFailoverAttempts = conf.getInt(
+          YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
+
+      RetryPolicy basePolicy = RetryPolicies.TRY_ONCE_THEN_FAIL;
+      if (maxFailoverAttempts == -1) {
+        if (waitForEver) {
+          basePolicy = RetryPolicies.FAILOVER_FOREVER;
+        } else {
+          basePolicy = new FailoverUptoMaximumTimePolicy(
+              System.currentTimeMillis() + rmConnectWaitMS);
+        }
+        maxFailoverAttempts = 0;
+      }
+
+      return RetryPolicies.failoverOnNetworkException(basePolicy,
+          maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs);
+    }
+
+    if (waitForEver) {
+      return RetryPolicies.RETRY_FOREVER;
+    }
+
+    if (rmConnectionRetryIntervalMS < 0) {
+      throw new YarnRuntimeException("Invalid Configuration. " +
+          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
+          " should not be negative.");
+    }
+
     RetryPolicy retryPolicy =
         RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
-            rmConnectionRetryIntervalMS,
-            TimeUnit.MILLISECONDS);
+            rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
 
     Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
         new HashMap<Class<? extends Exception>, RetryPolicy>();
     exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
     //TO DO: after HADOOP-9576,  IOException can be changed to EOFException
     exceptionToPolicyMap.put(IOException.class, retryPolicy);
-
-    return RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-      exceptionToPolicyMap);
+    return RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
   }
 }

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 Tue Dec 17 22:32:38 2013
@@ -425,6 +425,61 @@
   </property>
 
   <property>
+    <description>When HA is enabled, the class to be used by Clients, AMs and
+      NMs to failover to the Active RM. It should extend
+      org.apache.hadoop.yarn.client.RMFailoverProxyProvider</description>
+    <name>yarn.client.failover-proxy-provider</name>
+    
<value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value>
+  </property>
+
+  <property>
+    <description>When HA is enabled, the max number of times
+      FailoverProxyProvider should attempt failover. When set,
+      this overrides the yarn.resourcemanager.connect.max-wait.ms. When
+      not set, this is inferred from
+      yarn.resourcemanager.connect.max-wait.ms.</description>
+    <name>yarn.client.failover-max-attempts</name>
+    <!--value>15</value-->
+  </property>
+
+  <property>
+    <description>When HA is enabled, the sleep base (in milliseconds) to be
+      used for calculating the exponential delay between failovers. When set,
+      this overrides the yarn.resourcemanager.connect.* settings. When
+      not set, yarn.resourcemanager.connect.retry-interval.ms is used instead.
+    </description>
+    <name>yarn.client.failover-sleep-base-ms</name>
+    <!--value>500</value-->
+  </property>
+
+  <property>
+    <description>When HA is enabled, the maximum sleep time (in milliseconds)
+      between failovers. When set, this overrides the
+      yarn.resourcemanager.connect.* settings. When not set,
+      yarn.resourcemanager.connect.retry-interval.ms is used 
instead.</description>
+    <name>yarn.client.failover-sleep-max-ms</name>
+    <!--value>15000</value-->
+  </property>
+
+  <property>
+    <description>When HA is enabled, the number of retries per
+      attempt to connect to a ResourceManager. In other words,
+      it is the ipc.client.connect.max.retries to be used during
+      failover attempts</description>
+    <name>yarn.client.failover-retries</name>
+    <value>0</value>
+  </property>
+
+  <property>
+    <description>When HA is enabled, the number of retries per
+      attempt to connect to a ResourceManager on socket timeouts. In other
+      words, it is the ipc.client.connect.max.retries.on.timeouts to be used
+      during failover attempts</description>
+    <name>yarn.client.failover-retries-on-socket-timeouts</name>
+    <value>0</value>
+  </property>
+
+  <property>
     <description>The maximum number of completed applications RM keeps. 
</description>
     <name>yarn.resourcemanager.max-completed-applications</name>
     <value>10000</value>

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
 Tue Dec 17 22:32:38 2013
@@ -23,25 +23,43 @@ import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.client.RMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
-public class ServerRMProxy<T> extends RMProxy<T> {
+import com.google.common.base.Preconditions;
 
+public class ServerRMProxy<T> extends RMProxy<T> {
   private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
 
+  static {
+    INSTANCE = new ServerRMProxy();
+  }
+
+  private ServerRMProxy() {
+    super();
+  }
+
+  /**
+   * Create a proxy to the ResourceManager for the specified protocol.
+   * @param configuration Configuration with all the required information.
+   * @param protocol Server protocol for which proxy is being requested.
+   * @param <T> Type of proxy.
+   * @return Proxy to the ResourceManager for the specified server protocol.
+   * @throws IOException
+   */
   public static <T> T createRMProxy(final Configuration configuration,
       final Class<T> protocol) throws IOException {
-    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
-        ? (YarnConfiguration) configuration
-        : new YarnConfiguration(configuration);
-    InetSocketAddress rmAddress = getRMAddress(conf, protocol);
-    return createRMProxy(conf, protocol, rmAddress);
+    // This method exists only to initiate this class' static INSTANCE. TODO:
+    // FIX if possible
+    return RMProxy.createRMProxy(configuration, protocol);
   }
 
-  private static InetSocketAddress getRMAddress(YarnConfiguration conf,
-                                                Class<?> protocol) {
+  @InterfaceAudience.Private
+  @Override
+  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+                                           Class<?> protocol) {
     if (protocol == ResourceTracker.class) {
       return conf.getSocketAddr(
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
@@ -55,4 +73,12 @@ public class ServerRMProxy<T> extends RM
       throw new IllegalStateException(message);
     }
   }
+
+  @InterfaceAudience.Private
+  @Override
+  protected void checkAllowedProtocols(Class<?> protocol) {
+    Preconditions.checkArgument(
+        protocol.isAssignableFrom(ResourceTracker.class),
+        "ResourceManager does not support this protocol");
+  }
 }
\ No newline at end of file

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
 Tue Dec 17 22:32:38 2013
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -38,6 +39,7 @@ import org.apache.hadoop.service.Abstrac
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -65,6 +67,8 @@ import org.apache.hadoop.yarn.server.res
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
+import static org.junit.Assert.fail;
+
 /**
  * Embedded Yarn minicluster for testcases that need to interact with a 
cluster.
  * <p/>
@@ -91,9 +95,11 @@ public class MiniYARNCluster extends Com
 
   private NodeManager[] nodeManagers;
   private ResourceManager[] resourceManagers;
+  private String[] rmIds;
+
+  private boolean useFixedPorts;
+  private boolean useRpc = false;
 
-  private ResourceManagerWrapper resourceManagerWrapper;
-  
   private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
       new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
   
@@ -163,15 +169,7 @@ public class MiniYARNCluster extends Com
     }
 
     resourceManagers = new ResourceManager[numResourceManagers];
-    for (int i = 0; i < numResourceManagers; i++) {
-      resourceManagers[i] = new ResourceManager();
-      addService(new ResourceManagerWrapper(i));
-    }
-    nodeManagers = new CustomNodeManager[numNodeManagers];
-    for(int index = 0; index < numNodeManagers; index++) {
-      addService(new NodeManagerWrapper(index));
-      nodeManagers[index] = new CustomNodeManager();
-    }
+    nodeManagers = new NodeManager[numNodeManagers];
   }
 
   /**
@@ -185,20 +183,45 @@ public class MiniYARNCluster extends Com
     this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
   }
 
-    @Override
+  @Override
   public void serviceInit(Configuration conf) throws Exception {
+    useFixedPorts = conf.getBoolean(
+        YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
+        YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS);
+    useRpc = conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC,
+        YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
+
+    if (useRpc && !useFixedPorts) {
+      throw new YarnRuntimeException("Invalid configuration!" +
+          " Minicluster can use rpc only when configured to use fixed ports");
+    }
+
     if (resourceManagers.length > 1) {
       conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
-
-      StringBuilder rmIds = new StringBuilder();
-      for (int i = 0; i < resourceManagers.length; i++) {
-        if (i != 0) {
-          rmIds.append(",");
+      if (conf.get(YarnConfiguration.RM_HA_IDS) == null) {
+        StringBuilder rmIds = new StringBuilder();
+        for (int i = 0; i < resourceManagers.length; i++) {
+          if (i != 0) {
+            rmIds.append(",");
+          }
+          rmIds.append("rm" + i);
         }
-        rmIds.append("rm" + i);
+        conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
       }
-      conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
+      Collection<String> rmIdsCollection = HAUtil.getRMHAIds(conf);
+      rmIds = rmIdsCollection.toArray(new String[rmIdsCollection.size()]);
     }
+
+    for (int i = 0; i < resourceManagers.length; i++) {
+      resourceManagers[i] = new ResourceManager();
+      addService(new ResourceManagerWrapper(i));
+    }
+    for(int index = 0; index < nodeManagers.length; index++) {
+      nodeManagers[index] =
+          useRpc ? new CustomNodeManager() : new ShortCircuitedNodeManager();
+      addService(new NodeManagerWrapper(index));
+    }
+
     super.serviceInit(
         conf instanceof YarnConfiguration ? conf : new 
YarnConfiguration(conf));
   }
@@ -213,11 +236,12 @@ public class MiniYARNCluster extends Com
    *
    * In an non-HA cluster, return the index of the only RM.
    *
-   * @return index of the active RM
+   * @return index of the active RM or -1 if none of them transition to
+   * active even after 5 seconds of waiting
    */
   @InterfaceAudience.Private
   @VisibleForTesting
-  int getActiveRMIndex() {
+  public int getActiveRMIndex() {
     if (resourceManagers.length == 1) {
       return 0;
     }
@@ -292,9 +316,7 @@ public class MiniYARNCluster extends Com
     }
 
     private void setHARMConfiguration(Configuration conf) {
-      String rmId = "rm" + index;
       String hostname = MiniYARNCluster.getHostname();
-      conf.set(YarnConfiguration.RM_HA_ID, rmId);
       for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
         for (String id : HAUtil.getRMHAIds(conf)) {
           conf.set(HAUtil.addSuffix(confKey, id), hostname + ":0");
@@ -306,15 +328,17 @@ public class MiniYARNCluster extends Com
     protected synchronized void serviceInit(Configuration conf)
         throws Exception {
       conf.setBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, true);
-      if (!conf.getBoolean(
-          YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
-          YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
+
+      if (!useFixedPorts) {
         if (HAUtil.isHAEnabled(conf)) {
           setHARMConfiguration(conf);
         } else {
           setNonHARMConfiguration(conf);
         }
       }
+      if (HAUtil.isHAEnabled(conf)) {
+        conf.set(YarnConfiguration.RM_HA_ID, rmIds[index]);
+      }
       resourceManagers[index].init(conf);
       resourceManagers[index].getRMContext().getDispatcher().register
           (RMAppAttemptEventType.class,
@@ -500,7 +524,9 @@ public class MiniYARNCluster extends Com
     protected void doSecureLogin() throws IOException {
       // Don't try to login using keytab in the testcase.
     }
+  }
 
+  private class ShortCircuitedNodeManager extends CustomNodeManager {
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@@ -553,4 +579,28 @@ public class MiniYARNCluster extends Com
       };
     }
   }
+
+  /**
+   * Wait for all the NodeManagers to connect to the ResourceManager.
+   *
+   * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+   * @return true if all NodeManagers connect to the (Active)
+   * ResourceManager, false otherwise.
+   * @throws YarnException
+   * @throws InterruptedException
+   */
+  public boolean waitForNodeManagersToConnect(long timeout)
+      throws YarnException, InterruptedException {
+    ResourceManager rm = getResourceManager();
+    GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
+
+    for (int i = 0; i < timeout / 100; i++) {
+      if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req)
+          .getClusterMetrics().getNumNodeManagers()) {
+        return true;
+      }
+      Thread.sleep(100);
+    }
+    return false;
+  }
 }

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java?rev=1551739&r1=1551738&r2=1551739&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
 Tue Dec 17 22:32:38 2013
@@ -33,6 +33,7 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestMiniYARNClusterForHA {
@@ -56,16 +57,7 @@ public class TestMiniYARNClusterForHA {
 
   @Test
   public void testClusterWorks() throws YarnException, InterruptedException {
-    ResourceManager rm = cluster.getResourceManager(0);
-    GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-
-    for (int i = 0; i < 600; i++) {
-      if (1 == rm.getClientRMService().getClusterMetrics(req)
-          .getClusterMetrics().getNumNodeManagers()) {
-        return;
-      }
-      Thread.sleep(100);
-    }
-    fail("NodeManager never registered with the RM");
+    assertTrue("NMs fail to connect to the RM",
+        cluster.waitForNodeManagersToConnect(5000));
   }
 }


Reply via email to