Author: kasha Date: Wed Dec 11 15:15:35 2013 New Revision: 1550168 URL: http://svn.apache.org/r1550168 Log: YARN-1481. Move internal services logic from AdminService to ResourceManager. (vinodkv via kasha)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1550168&r1=1550167&r2=1550168&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Dec 11 15:15:35 2013 @@ -136,6 +136,9 @@ Release 2.4.0 - UNRELEASED YARN-1378. Implemented a cleaner of old finished applications from the RM state-store. (Jian He via vinodkv) + YARN-1481. Move internal services logic from AdminService to ResourceManager. + (vinodkv via kasha) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1550168&r1=1550167&r2=1550168&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java Wed Dec 11 15:15:35 2013 @@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.net.InetSocketAddress; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,7 +41,6 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import com.google.protobuf.BlockingService; + public class AdminService extends AbstractService implements HAServiceProtocol, ResourceManagerAdministrationProtocol { @@ -73,10 +72,6 @@ public class AdminService extends Abstra private final RMContext rmContext; private final ResourceManager rm; - @VisibleForTesting - protected HAServiceProtocol.HAServiceState - haState = HAServiceProtocol.HAServiceState.INITIALIZING; - boolean haEnabled; private Server server; private InetSocketAddress masterServiceAddress; @@ -93,13 +88,6 @@ public class AdminService extends Abstra @Override public synchronized void serviceInit(Configuration conf) throws Exception { - haEnabled = HAUtil.isHAEnabled(conf); - if (haEnabled) { - HAUtil.verifyAndSetConfiguration(conf); - rm.setConf(conf); - } - rm.createAndInitActiveServices(); - masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, @@ -112,11 +100,6 @@ public class AdminService extends Abstra @Override protected synchronized void serviceStart() throws Exception { - if (haEnabled) { - transitionToStandby(true); - } else { - transitionToActive(); - } startServer(); super.serviceStart(); } @@ -124,8 +107,6 @@ public class AdminService extends Abstra @Override protected synchronized void serviceStop() throws Exception { stopServer(); - transitionToStandby(false); - haState = HAServiceState.STOPPING; super.serviceStop(); } @@ -145,7 +126,7 @@ public class AdminService extends Abstra refreshServiceAcls(conf, new RMPolicyProvider()); } - if (haEnabled) { + if (rmContext.isHAEnabled()) { RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, ProtobufRpcEngine.class); @@ -182,39 +163,27 @@ public class AdminService extends Abstra } private synchronized boolean isRMActive() { - return HAServiceState.ACTIVE == haState; + return HAServiceState.ACTIVE == rmContext.getHAServiceState(); } @Override public synchronized void monitorHealth() throws IOException { checkAccess("monitorHealth"); - if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { + if (isRMActive() && !rm.areActiveServicesRunning()) { throw new HealthCheckFailedException( "Active ResourceManager services are not running!"); } } - synchronized void transitionToActive() throws Exception { - if (haState == HAServiceProtocol.HAServiceState.ACTIVE) { - LOG.info("Already in active state"); - return; - } - - LOG.info("Transitioning to active"); - rm.startActiveServices(); - haState = HAServiceProtocol.HAServiceState.ACTIVE; - LOG.info("Transitioned to active"); - } - @Override - public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) - throws IOException { + public synchronized void transitionToActive( + HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { UserGroupInformation user = checkAccess("transitionToActive"); // TODO (YARN-1177): When automatic failover is enabled, // check if transition should be allowed for this request try { - transitionToActive(); + rm.transitionToActive(); RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive", "RMHAProtocolService"); } catch (Exception e) { @@ -226,32 +195,14 @@ public class AdminService extends Abstra } } - synchronized void transitionToStandby(boolean initialize) - throws Exception { - if (haState == HAServiceProtocol.HAServiceState.STANDBY) { - LOG.info("Already in standby state"); - return; - } - - LOG.info("Transitioning to standby"); - if (haState == HAServiceProtocol.HAServiceState.ACTIVE) { - rm.stopActiveServices(); - if (initialize) { - rm.createAndInitActiveServices(); - } - } - haState = HAServiceProtocol.HAServiceState.STANDBY; - LOG.info("Transitioned to standby"); - } - @Override - public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo) - throws IOException { + public synchronized void transitionToStandby( + HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException { UserGroupInformation user = checkAccess("transitionToStandby"); // TODO (YARN-1177): When automatic failover is enabled, // check if transition should be allowed for this request try { - transitionToStandby(true); + rm.transitionToStandby(true); RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToStandby", "RMHAProtocolService"); } catch (Exception e) { @@ -266,15 +217,15 @@ public class AdminService extends Abstra @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); + HAServiceState haState = rmContext.getHAServiceState(); HAServiceStatus ret = new HAServiceStatus(haState); - if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState == - HAServiceProtocol.HAServiceState.STANDBY) { + if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); } else { ret.setNotReadyToBecomeActive("State is " + haState); } return ret; - } + } @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1550168&r1=1550167&r2=1550168&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Wed Dec 11 15:15:35 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Dispatcher; @@ -42,7 +43,11 @@ import org.apache.hadoop.yarn.server.res public interface RMContext { Dispatcher getDispatcher(); - + + boolean isHAEnabled(); + + HAServiceState getHAServiceState(); + RMStateStore getStateStore(); ConcurrentMap<ApplicationId, RMApp> getRMApps(); Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1550168&r1=1550167&r2=1550168&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Wed Dec 11 15:15:35 2013 @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.re import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import com.google.common.annotations.VisibleForTesting; @@ -54,6 +56,10 @@ public class RMContextImpl implements RM private final ConcurrentMap<String, RMNode> inactiveNodes = new ConcurrentHashMap<String, RMNode>(); + private boolean isHAEnabled; + private HAServiceState haServiceState = + HAServiceProtocol.HAServiceState.INITIALIZING; + private AMLivelinessMonitor amLivelinessMonitor; private AMLivelinessMonitor amFinishingMonitor; private RMStateStore stateStore = null; @@ -211,6 +217,16 @@ public class RMContextImpl implements RM return resourceTrackerService; } + void setHAEnabled(boolean isHAEnabled) { + this.isHAEnabled = isHAEnabled; + } + + void setHAServiceState(HAServiceState haServiceState) { + synchronized (haServiceState) { + this.haServiceState = haServiceState; + } + } + void setDispatcher(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; } @@ -290,4 +306,16 @@ public class RMContextImpl implements RM ResourceTrackerService resourceTrackerService) { this.resourceTrackerService = resourceTrackerService; } + + @Override + public boolean isHAEnabled() { + return isHAEnabled; + } + + @Override + public HAServiceState getHAServiceState() { + synchronized (haServiceState) { + return haServiceState; + } + } } \ No newline at end of file Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1550168&r1=1550167&r2=1550168&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Wed Dec 11 15:15:35 2013 @@ -27,6 +27,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig.Policy; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaug import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -188,6 +191,12 @@ public class ResourceManager extends Com addService(adminService); rmContext.setRMAdminService(adminService); + this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf)); + if (this.rmContext.isHAEnabled()) { + HAUtil.verifyAndSetConfiguration(conf); + } + createAndInitActiveServices(); + super.serviceInit(conf); } @@ -217,9 +226,8 @@ public class ResourceManager extends Com } protected RMStateStoreOperationFailedEventDispatcher - createRMStateStoreOperationFailedEventDispatcher() { - return new RMStateStoreOperationFailedEventDispatcher( - rmContext.getRMAdminService()); + createRMStateStoreOperationFailedEventDispatcher() { + return new RMStateStoreOperationFailedEventDispatcher(rmContext, this); } protected Dispatcher createDispatcher() { @@ -655,11 +663,14 @@ public class ResourceManager extends Com @Private public static class RMStateStoreOperationFailedEventDispatcher implements EventHandler<RMStateStoreOperationFailedEvent> { - private final AdminService adminService; - public RMStateStoreOperationFailedEventDispatcher( - AdminService adminService) { - this.adminService = adminService; + private final RMContext rmContext; + private final ResourceManager rm; + + public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext, + ResourceManager resourceManager) { + this.rmContext = rmContext; + this.rm = resourceManager; } @Override @@ -671,16 +682,14 @@ public class ResourceManager extends Com } if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) { LOG.info("RMStateStore has been fenced"); - synchronized(adminService) { - if (adminService.haEnabled) { - try { - // Transition to standby and reinit active services - LOG.info("Transitioning RM to Standby mode"); - adminService.transitionToStandby(true); - return; - } catch (Exception e) { - LOG.error("Failed to transition RM to Standby mode."); - } + if (rmContext.isHAEnabled()) { + try { + // Transition to standby and reinit active services + LOG.info("Transitioning RM to Standby mode"); + rm.transitionToStandby(true); + return; + } catch (Exception e) { + LOG.error("Failed to transition RM to Standby mode."); } } } @@ -826,10 +835,6 @@ public class ResourceManager extends Com webApp = builder.start(new RMWebApp(this)); } - void setConf(Configuration configuration) { - conf = configuration; - } - /** * Helper method to create and init {@link #activeServices}. This creates an * instance of {@link RMActiveServices} and initializes it. @@ -870,6 +875,39 @@ public class ResourceManager extends Com return activeServices != null && activeServices.isInState(STATE.STARTED); } + synchronized void transitionToActive() throws Exception { + if (rmContext.getHAServiceState() == + HAServiceProtocol.HAServiceState.ACTIVE) { + LOG.info("Already in active state"); + return; + } + + LOG.info("Transitioning to active state"); + startActiveServices(); + rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE); + LOG.info("Transitioned to active state"); + } + + synchronized void transitionToStandby(boolean initialize) + throws Exception { + if (rmContext.getHAServiceState() == + HAServiceProtocol.HAServiceState.STANDBY) { + LOG.info("Already in standby state"); + return; + } + + LOG.info("Transitioning to standby state"); + if (rmContext.getHAServiceState() == + HAServiceProtocol.HAServiceState.ACTIVE) { + stopActiveServices(); + if (initialize) { + createAndInitActiveServices(); + } + } + rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY); + LOG.info("Transitioned to standby state"); + } + @Override protected void serviceStart() throws Exception { try { @@ -877,6 +915,13 @@ public class ResourceManager extends Com } catch(IOException ie) { throw new YarnRuntimeException("Failed to login", ie); } + + if (this.rmContext.isHAEnabled()) { + transitionToStandby(true); + } else { + transitionToActive(); + } + super.serviceStart(); } @@ -888,6 +933,8 @@ public class ResourceManager extends Com @Override protected void serviceStop() throws Exception { super.serviceStop(); + transitionToStandby(false); + rmContext.setHAServiceState(HAServiceState.STOPPING); } protected ResourceTrackerService createResourceTrackerService() {