Author: kasha
Date: Wed Dec 11 15:15:35 2013
New Revision: 1550168

URL: http://svn.apache.org/r1550168
Log:
YARN-1481. Move internal services logic from AdminService to ResourceManager. 
(vinodkv via kasha)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
    
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Dec 11 
15:15:35 2013
@@ -136,6 +136,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1378. Implemented a cleaner of old finished applications from the RM
     state-store. (Jian He via vinodkv)
 
+    YARN-1481. Move internal services logic from AdminService to 
ResourceManager.
+    (vinodkv via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
 Wed Dec 11 15:15:35 2013
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,7 +41,6 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 
+import com.google.protobuf.BlockingService;
+
 public class AdminService extends AbstractService implements
     HAServiceProtocol, ResourceManagerAdministrationProtocol {
 
@@ -73,10 +72,6 @@ public class AdminService extends Abstra
 
   private final RMContext rmContext;
   private final ResourceManager rm;
-  @VisibleForTesting
-  protected HAServiceProtocol.HAServiceState
-      haState = HAServiceProtocol.HAServiceState.INITIALIZING;
-  boolean haEnabled;
 
   private Server server;
   private InetSocketAddress masterServiceAddress;
@@ -93,13 +88,6 @@ public class AdminService extends Abstra
 
   @Override
   public synchronized void serviceInit(Configuration conf) throws Exception {
-    haEnabled = HAUtil.isHAEnabled(conf);
-    if (haEnabled) {
-      HAUtil.verifyAndSetConfiguration(conf);
-      rm.setConf(conf);
-    }
-    rm.createAndInitActiveServices();
-
     masterServiceAddress = conf.getSocketAddr(
         YarnConfiguration.RM_ADMIN_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -112,11 +100,6 @@ public class AdminService extends Abstra
 
   @Override
   protected synchronized void serviceStart() throws Exception {
-    if (haEnabled) {
-      transitionToStandby(true);
-    } else {
-      transitionToActive();
-    }
     startServer();
     super.serviceStart();
   }
@@ -124,8 +107,6 @@ public class AdminService extends Abstra
   @Override
   protected synchronized void serviceStop() throws Exception {
     stopServer();
-    transitionToStandby(false);
-    haState = HAServiceState.STOPPING;
     super.serviceStop();
   }
 
@@ -145,7 +126,7 @@ public class AdminService extends Abstra
       refreshServiceAcls(conf, new RMPolicyProvider());
     }
 
-    if (haEnabled) {
+    if (rmContext.isHAEnabled()) {
       RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
           ProtobufRpcEngine.class);
 
@@ -182,39 +163,27 @@ public class AdminService extends Abstra
   }
 
   private synchronized boolean isRMActive() {
-    return HAServiceState.ACTIVE == haState;
+    return HAServiceState.ACTIVE == rmContext.getHAServiceState();
   }
 
   @Override
   public synchronized void monitorHealth()
       throws IOException {
     checkAccess("monitorHealth");
-    if (haState == HAServiceProtocol.HAServiceState.ACTIVE && 
!rm.areActiveServicesRunning()) {
+    if (isRMActive() && !rm.areActiveServicesRunning()) {
       throw new HealthCheckFailedException(
           "Active ResourceManager services are not running!");
     }
   }
 
-  synchronized void transitionToActive() throws Exception {
-    if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
-      LOG.info("Already in active state");
-      return;
-    }
-
-    LOG.info("Transitioning to active");
-    rm.startActiveServices();
-    haState = HAServiceProtocol.HAServiceState.ACTIVE;
-    LOG.info("Transitioned to active");
-  }
-
   @Override
-  public synchronized void 
transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
-      throws IOException {
+  public synchronized void transitionToActive(
+      HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
     UserGroupInformation user = checkAccess("transitionToActive");
     // TODO (YARN-1177): When automatic failover is enabled,
     // check if transition should be allowed for this request
     try {
-      transitionToActive();
+      rm.transitionToActive();
       RMAuditLogger.logSuccess(user.getShortUserName(),
           "transitionToActive", "RMHAProtocolService");
     } catch (Exception e) {
@@ -226,32 +195,14 @@ public class AdminService extends Abstra
     }
   }
 
-  synchronized void transitionToStandby(boolean initialize)
-      throws Exception {
-    if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
-      LOG.info("Already in standby state");
-      return;
-    }
-
-    LOG.info("Transitioning to standby");
-    if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
-      rm.stopActiveServices();
-      if (initialize) {
-        rm.createAndInitActiveServices();
-      }
-    }
-    haState = HAServiceProtocol.HAServiceState.STANDBY;
-    LOG.info("Transitioned to standby");
-  }
-
   @Override
-  public synchronized void 
transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
-      throws IOException {
+  public synchronized void transitionToStandby(
+      HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
     UserGroupInformation user = checkAccess("transitionToStandby");
     // TODO (YARN-1177): When automatic failover is enabled,
     // check if transition should be allowed for this request
     try {
-      transitionToStandby(true);
+      rm.transitionToStandby(true);
       RMAuditLogger.logSuccess(user.getShortUserName(),
           "transitionToStandby", "RMHAProtocolService");
     } catch (Exception e) {
@@ -266,15 +217,15 @@ public class AdminService extends Abstra
   @Override
   public synchronized HAServiceStatus getServiceStatus() throws IOException {
     checkAccess("getServiceState");
+    HAServiceState haState = rmContext.getHAServiceState();
     HAServiceStatus ret = new HAServiceStatus(haState);
-    if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
-        HAServiceProtocol.HAServiceState.STANDBY) {
+    if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
       ret.setReadyToBecomeActive();
     } else {
       ret.setNotReadyToBecomeActive("State is " + haState);
     }
     return ret;
-  }
+  } 
 
   @Override
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 Wed Dec 11 15:15:35 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -42,7 +43,11 @@ import org.apache.hadoop.yarn.server.res
 public interface RMContext {
 
   Dispatcher getDispatcher();
-  
+
+  boolean isHAEnabled();
+
+  HAServiceState getHAServiceState();
+
   RMStateStore getStateStore();
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 Wed Dec 11 15:15:35 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.re
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.res
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
-import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +56,10 @@ public class RMContextImpl implements RM
   private final ConcurrentMap<String, RMNode> inactiveNodes
     = new ConcurrentHashMap<String, RMNode>();
 
+  private boolean isHAEnabled;
+  private HAServiceState haServiceState =
+      HAServiceProtocol.HAServiceState.INITIALIZING;
+  
   private AMLivelinessMonitor amLivelinessMonitor;
   private AMLivelinessMonitor amFinishingMonitor;
   private RMStateStore stateStore = null;
@@ -211,6 +217,16 @@ public class RMContextImpl implements RM
     return resourceTrackerService;
   }
 
+  void setHAEnabled(boolean isHAEnabled) {
+    this.isHAEnabled = isHAEnabled;
+  }
+
+  void setHAServiceState(HAServiceState haServiceState) {
+    synchronized (haServiceState) {
+      this.haServiceState = haServiceState;
+    }
+  }
+
   void setDispatcher(Dispatcher dispatcher) {
     this.rmDispatcher = dispatcher;
   }
@@ -290,4 +306,16 @@ public class RMContextImpl implements RM
       ResourceTrackerService resourceTrackerService) {
     this.resourceTrackerService = resourceTrackerService;
   }
+
+  @Override
+  public boolean isHAEnabled() {
+    return isHAEnabled;
+  }
+
+  @Override
+  public HAServiceState getHAServiceState() {
+    synchronized (haServiceState) {
+      return haServiceState;
+    }
+  }
 }
\ No newline at end of file

Modified: 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 Wed Dec 11 15:15:35 2013
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpConfig.Policy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaug
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -188,6 +191,12 @@ public class ResourceManager extends Com
     addService(adminService);
     rmContext.setRMAdminService(adminService);
 
+    this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
+    if (this.rmContext.isHAEnabled()) {
+      HAUtil.verifyAndSetConfiguration(conf);
+    }
+    createAndInitActiveServices();
+
     super.serviceInit(conf);
   }
   
@@ -217,9 +226,8 @@ public class ResourceManager extends Com
   }
 
   protected RMStateStoreOperationFailedEventDispatcher
-  createRMStateStoreOperationFailedEventDispatcher() {
-    return new RMStateStoreOperationFailedEventDispatcher(
-        rmContext.getRMAdminService());
+      createRMStateStoreOperationFailedEventDispatcher() {
+    return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
   }
 
   protected Dispatcher createDispatcher() {
@@ -655,11 +663,14 @@ public class ResourceManager extends Com
   @Private
   public static class RMStateStoreOperationFailedEventDispatcher implements
       EventHandler<RMStateStoreOperationFailedEvent> {
-    private final AdminService adminService;
 
-    public RMStateStoreOperationFailedEventDispatcher(
-        AdminService adminService) {
-      this.adminService = adminService;
+    private final RMContext rmContext;
+    private final ResourceManager rm;
+
+    public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
+        ResourceManager resourceManager) {
+      this.rmContext = rmContext;
+      this.rm = resourceManager;
     }
 
     @Override
@@ -671,16 +682,14 @@ public class ResourceManager extends Com
       }
       if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
         LOG.info("RMStateStore has been fenced");
-        synchronized(adminService) {
-          if (adminService.haEnabled) {
-            try {
-              // Transition to standby and reinit active services
-              LOG.info("Transitioning RM to Standby mode");
-              adminService.transitionToStandby(true);
-              return;
-            } catch (Exception e) {
-              LOG.error("Failed to transition RM to Standby mode.");
-            }
+        if (rmContext.isHAEnabled()) {
+          try {
+            // Transition to standby and reinit active services
+            LOG.info("Transitioning RM to Standby mode");
+            rm.transitionToStandby(true);
+            return;
+          } catch (Exception e) {
+            LOG.error("Failed to transition RM to Standby mode.");
           }
         }
       }
@@ -826,10 +835,6 @@ public class ResourceManager extends Com
     webApp = builder.start(new RMWebApp(this));
   }
 
-  void setConf(Configuration configuration) {
-    conf = configuration;
-  }
-
   /**
    * Helper method to create and init {@link #activeServices}. This creates an
    * instance of {@link RMActiveServices} and initializes it.
@@ -870,6 +875,39 @@ public class ResourceManager extends Com
     return activeServices != null && activeServices.isInState(STATE.STARTED);
   }
 
+  synchronized void transitionToActive() throws Exception {
+    if (rmContext.getHAServiceState() ==
+        HAServiceProtocol.HAServiceState.ACTIVE) {
+      LOG.info("Already in active state");
+      return;
+    }
+
+    LOG.info("Transitioning to active state");
+    startActiveServices();
+    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
+    LOG.info("Transitioned to active state");
+  }
+
+  synchronized void transitionToStandby(boolean initialize)
+      throws Exception {
+    if (rmContext.getHAServiceState() ==
+        HAServiceProtocol.HAServiceState.STANDBY) {
+      LOG.info("Already in standby state");
+      return;
+    }
+
+    LOG.info("Transitioning to standby state");
+    if (rmContext.getHAServiceState() ==
+        HAServiceProtocol.HAServiceState.ACTIVE) {
+      stopActiveServices();
+      if (initialize) {
+        createAndInitActiveServices();
+      }
+    }
+    rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
+    LOG.info("Transitioned to standby state");
+  }
+
   @Override
   protected void serviceStart() throws Exception {
     try {
@@ -877,6 +915,13 @@ public class ResourceManager extends Com
     } catch(IOException ie) {
       throw new YarnRuntimeException("Failed to login", ie);
     }
+
+    if (this.rmContext.isHAEnabled()) {
+      transitionToStandby(true);
+    } else {
+      transitionToActive();
+    }
+
     super.serviceStart();
   }
   
@@ -888,6 +933,8 @@ public class ResourceManager extends Com
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();
+    transitionToStandby(false);
+    rmContext.setHAServiceState(HAServiceState.STOPPING);
   }
   
   protected ResourceTrackerService createResourceTrackerService() {


Reply via email to