Repository: ambari Updated Branches: refs/heads/trunk 367c61c41 -> 32d0c90b3
AMBARI-4513 ZooKeeper services need to be started first and running prior to starting HDFS in a HA Environment. (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/32d0c90b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/32d0c90b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/32d0c90b Branch: refs/heads/trunk Commit: 32d0c90b31c9c4de1076cc2817640f1b094ccdb8 Parents: 367c61c Author: Dmytro Sen <d...@hortonworks.com> Authored: Thu Jul 24 14:59:18 2014 +0300 Committer: Dmytro Sen <d...@hortonworks.com> Committed: Thu Jul 24 14:59:18 2014 +0300 ---------------------------------------------------------------------- .../controller/AmbariManagementController.java | 6 + .../AmbariManagementControllerImpl.java | 22 +- .../internal/ServiceResourceProvider.java | 292 +++++++++++-------- .../server/metadata/RoleCommandOrder.java | 72 ++++- .../src/main/resources/properties.json | 1 + .../AmbariManagementControllerTest.java | 31 +- .../internal/ServiceResourceProviderTest.java | 25 +- .../server/metadata/RoleCommandOrderTest.java | 57 +++- 8 files changed, 343 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java index d1e85df..2776394 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java @@ -22,6 +22,7 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.ActionManager; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.internal.RequestStageContainer; +import org.apache.ambari.server.metadata.RoleCommandOrder; import org.apache.ambari.server.scheduler.ExecutionScheduleManager; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; @@ -638,5 +639,10 @@ public interface AmbariManagementController { */ public MaintenanceState getEffectiveMaintenanceState(ServiceComponentHost sch) throws AmbariException; + + /** + * Get Role Command Order + */ + public RoleCommandOrder getRoleCommandOrder(Cluster cluster); } http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index dd99d87..8380118 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -30,6 +30,7 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -270,7 +271,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle return uriBuilder.toString(); } - private RoleCommandOrder getRoleCommandOrder(Cluster cluster) { + @Override + public RoleCommandOrder getRoleCommandOrder(Cluster cluster) { RoleCommandOrder rco; rco = injector.getInstance(RoleCommandOrder.class); rco.initialize(cluster); @@ -1281,10 +1283,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle // Flatten changed Schs that are going to be Started List<ServiceComponentHost> serviceComponentHosts = new ArrayList<ServiceComponentHost>(); if (changedScHosts != null && !changedScHosts.isEmpty()) { - for (String sc : changedScHosts.keySet()) { - for (State state : changedScHosts.get(sc).keySet()) { + for (Entry<String, Map<State, List<ServiceComponentHost>>> stringMapEntry : changedScHosts.entrySet()) { + for (State state : stringMapEntry.getValue().keySet()) { if (state == State.STARTED) { - serviceComponentHosts.addAll(changedScHosts.get(sc).get(state)); + serviceComponentHosts.addAll(stringMapEntry.getValue().get(state)); } } } @@ -1326,10 +1328,10 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle LOG.info("Client hosts for reinstall : " + clientSchs.size()); if (changedScHosts != null) { - for (String sc : clientSchs.keySet()) { - Map<State, List<ServiceComponentHost>> schMap = new HashMap<State, List<ServiceComponentHost>>(); - schMap.put(State.INSTALLED, clientSchs.get(sc)); - changedScHosts.put(sc, schMap); + for (Entry<String, List<ServiceComponentHost>> stringListEntry : clientSchs.entrySet()) { + Map<State, List<ServiceComponentHost>> schMap = new EnumMap<State, List<ServiceComponentHost>>(State.class); + schMap.put(State.INSTALLED, stringListEntry.getValue()); + changedScHosts.put(stringListEntry.getKey(), schMap); } } } @@ -1411,7 +1413,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle String commandTimeout = configs.getDefaultAgentTaskTimeout(); /* * This script is only used for - * default commads like INSTALL/STOP/START + * default commands like INSTALL/STOP/START */ CommandScriptDefinition script = componentInfo.getCommandScript(); if (serviceInfo.getSchemaVersion().equals(AmbariMetaInfo.SCHEMA_VERSION_2)) { @@ -2104,7 +2106,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } else { if (!changedScHosts.containsKey(sc.getName())) { changedScHosts.put(sc.getName(), - new HashMap<State, List<ServiceComponentHost>>()); + new EnumMap<State, List<ServiceComponentHost>>(State.class)); } if (!changedScHosts.get(sc.getName()).containsKey(newState)) { changedScHosts.get(sc.getName()).put(newState, http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java index 21dcdc8..91eb458 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumMap; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -29,13 +30,12 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.inject.Inject; - import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ClusterNotFoundException; import org.apache.ambari.server.DuplicateResourceException; import org.apache.ambari.server.ObjectNotFoundException; import org.apache.ambari.server.ParentObjectNotFoundException; +import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.ServiceNotFoundException; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.AmbariManagementController; @@ -57,10 +57,10 @@ import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.metadata.RoleCommandOrder; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; -import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.MaintenanceState; import org.apache.ambari.server.state.Service; import org.apache.ambari.server.state.ServiceComponent; @@ -95,6 +95,9 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider private static final String QUERY_PARAMETERS_RECONFIGURE_CLIENT = "params/reconfigure_client"; + private static final String QUERY_PARAMETERS_START_DEPENDENCIES = + "params/start_dependencies"; + private static Set<String> pkPropertyIds = new HashSet<String>(Arrays.asList(new String[]{ SERVICE_CLUSTER_NAME_PROPERTY_ID, @@ -317,11 +320,14 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider final boolean reconfigureClients = !"false".equals(getQueryParameterValue( QUERY_PARAMETERS_RECONFIGURE_CLIENT, predicate)); + final boolean startDependencies = "true".equals(getQueryParameterValue( + QUERY_PARAMETERS_START_DEPENDENCIES, predicate)); + requestStages = modifyResources(new Command<RequestStageContainer>() { @Override public RequestStageContainer invoke() throws AmbariException { return updateServices(stages, requests, request.getRequestInfoProperties(), - runSmokeTest, reconfigureClients); + runSmokeTest, reconfigureClients, startDependencies); } }); } @@ -547,7 +553,7 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider // Update services based on the given requests. protected synchronized RequestStageContainer updateServices(RequestStageContainer requestStages, Set<ServiceRequest> requests, Map<String, String> requestProperties, boolean runSmokeTest, - boolean reconfigureClients) throws AmbariException { + boolean reconfigureClients, boolean startDependencies) throws AmbariException { AmbariManagementController controller = getManagementController(); @@ -557,9 +563,9 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider } Map<State, List<Service>> changedServices - = new HashMap<State, List<Service>>(); + = new EnumMap<State, List<Service>>(State.class); Map<State, List<ServiceComponent>> changedComps = - new HashMap<State, List<ServiceComponent>>(); + new EnumMap<State, List<ServiceComponent>>(State.class); Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts = new HashMap<String, Map<State, List<ServiceComponentHost>>>(); Collection<ServiceComponentHost> ignoredScHosts = @@ -583,7 +589,6 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider } Clusters clusters = controller.getClusters(); - AmbariMetaInfo ambariMetaInfo = controller.getAmbariMetaInfo(); Set<String> maintenanceClusters = new HashSet<String>(); for (ServiceRequest request : requests) { @@ -684,128 +689,27 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider // TODO should we check whether all servicecomponents and // servicecomponenthosts are in the required desired state? - for (ServiceComponent sc : s.getServiceComponents().values()) { - State oldScState = sc.getDesiredState(); - if (newState != oldScState) { - if (sc.isClientComponent() && - !newState.isValidClientComponentState()) { - continue; - } - if (!State.isValidDesiredStateTransition(oldScState, newState)) { - throw new AmbariException("Invalid transition for" - + " servicecomponent" - + ", clusterName=" + cluster.getClusterName() - + ", clusterId=" + cluster.getClusterId() - + ", serviceName=" + sc.getServiceName() - + ", componentName=" + sc.getName() - + ", currentDesiredState=" + oldScState - + ", newDesiredState=" + newState); - } - if (!changedComps.containsKey(newState)) { - changedComps.put(newState, new ArrayList<ServiceComponent>()); - } - changedComps.get(newState).add(sc); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Handling update to ServiceComponent" - + ", clusterName=" + request.getClusterName() - + ", serviceName=" + s.getName() - + ", componentName=" + sc.getName() - + ", currentDesiredState=" + oldScState - + ", newDesiredState=" + newState); - } - - for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) { - State oldSchState = sch.getState(); - if (oldSchState == State.DISABLED || oldSchState == State.UNKNOWN) { - //Ignore host components updates in this state - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring ServiceComponentHost" - + ", clusterName=" + request.getClusterName() - + ", serviceName=" + s.getName() - + ", componentName=" + sc.getName() - + ", hostname=" + sch.getHostName() - + ", currentState=" + oldSchState - + ", newDesiredState=" + newState); - } - continue; - } - - if (newState == oldSchState) { - ignoredScHosts.add(sch); - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring ServiceComponentHost" - + ", clusterName=" + request.getClusterName() - + ", serviceName=" + s.getName() - + ", componentName=" + sc.getName() - + ", hostname=" + sch.getHostName() - + ", currentState=" + oldSchState - + ", newDesiredState=" + newState); - } - continue; - } - - if (! maintenanceStateHelper.isOperationAllowed(reqOpLvl, sch)) { - ignoredScHosts.add(sch); - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring ServiceComponentHost" - + ", clusterName=" + request.getClusterName() - + ", serviceName=" + s.getName() - + ", componentName=" + sc.getName() - + ", hostname=" + sch.getHostName()); - } - continue; - } + updateServiceComponents(requestStages, changedComps, changedScHosts, + ignoredScHosts, reqOpLvl, s, newState); + } - if (sc.isClientComponent() && - !newState.isValidClientComponentState()) { - continue; - } - /** - * This is hack for now wherein we don't fail if the - * sch is in INSTALL_FAILED - */ - if (! isValidStateTransition(requestStages, oldSchState, newState, sch)) { - String error = "Invalid transition for" - + " servicecomponenthost" - + ", clusterName=" + cluster.getClusterName() - + ", clusterId=" + cluster.getClusterId() - + ", serviceName=" + sch.getServiceName() - + ", componentName=" + sch.getServiceComponentName() - + ", hostname=" + sch.getHostName() - + ", currentState=" + oldSchState - + ", newDesiredState=" + newState; - StackId sid = cluster.getDesiredStackVersion(); - - if ( ambariMetaInfo.getComponentCategory( - sid.getStackName(), sid.getStackVersion(), sc.getServiceName(), - sch.getServiceComponentName()).isMaster()) { - throw new AmbariException(error); - } else { - LOG.warn("Ignoring: " + error); - continue; - } - } - if (!changedScHosts.containsKey(sc.getName())) { - changedScHosts.put(sc.getName(), - new HashMap<State, List<ServiceComponentHost>>()); + if (startDependencies && changedServices.containsKey(State.STARTED)) { + HashSet<Service> depServices = new HashSet<Service>(); + for (Service service : changedServices.get(State.STARTED)) { + RoleCommandOrder rco = controller.getRoleCommandOrder(service.getCluster()); + Set<Service> dependencies = rco.getTransitiveServices(service, RoleCommand.START); + for (Service dependency: dependencies) { + if (!changedServices.get(State.STARTED).contains(dependency)){ + depServices.add(dependency); } - if (!changedScHosts.get(sc.getName()).containsKey(newState)) { - changedScHosts.get(sc.getName()).put(newState, - new ArrayList<ServiceComponentHost>()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Handling update to ServiceComponentHost" - + ", clusterName=" + request.getClusterName() - + ", serviceName=" + s.getName() - + ", componentName=" + sc.getName() - + ", hostname=" + sch.getHostName() - + ", currentState=" + oldSchState - + ", newDesiredState=" + newState); - } - changedScHosts.get(sc.getName()).get(newState).add(sch); } } + for (Service service : depServices) { + updateServiceComponents(requestStages, changedComps, changedScHosts, + ignoredScHosts, reqOpLvl, service, State.STARTED); + changedServices.get(State.STARTED).add(service); + } + } if (seenNewStates.size() > 1) { @@ -829,6 +733,142 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider ignoredScHosts, runSmokeTest, reconfigureClients); } + private void updateServiceComponents(RequestStageContainer requestStages, + Map<State, List<ServiceComponent>> changedComps, + Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts, + Collection<ServiceComponentHost> ignoredScHosts, + Resource.Type reqOpLvl, + Service service, State newState) + throws AmbariException { + + Cluster cluster = service.getCluster(); + AmbariManagementController controller = getManagementController(); + AmbariMetaInfo ambariMetaInfo = controller.getAmbariMetaInfo(); + + for (ServiceComponent sc : service.getServiceComponents().values()) { + State oldScState = sc.getDesiredState(); + if (newState != oldScState) { + if (sc.isClientComponent() && + !newState.isValidClientComponentState()) { + continue; + } + if (!State.isValidDesiredStateTransition(oldScState, newState)) { + throw new AmbariException("Invalid transition for" + + " servicecomponent" + + ", clusterName=" + cluster.getClusterName() + + ", clusterId=" + cluster.getClusterId() + + ", serviceName=" + sc.getServiceName() + + ", componentName=" + sc.getName() + + ", currentDesiredState=" + oldScState + + ", newDesiredState=" + newState); + } + if (!changedComps.containsKey(newState)) { + changedComps.put(newState, new ArrayList<ServiceComponent>()); + } + changedComps.get(newState).add(sc); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Handling update to ServiceComponent" + + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + service.getName() + + ", componentName=" + sc.getName() + + ", currentDesiredState=" + oldScState + + ", newDesiredState=" + newState); + } + + for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) { + State oldSchState = sch.getState(); + if (oldSchState == State.DISABLED || oldSchState == State.UNKNOWN) { + //Ignore host components updates in this state + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring ServiceComponentHost" + + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + service.getName() + + ", componentName=" + sc.getName() + + ", hostname=" + sch.getHostName() + + ", currentState=" + oldSchState + + ", newDesiredState=" + newState); + } + continue; + } + // + if (newState == oldSchState) { + ignoredScHosts.add(sch); + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring ServiceComponentHost" + + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + service.getName() + + ", componentName=" + sc.getName() + + ", hostname=" + sch.getHostName() + + ", currentState=" + oldSchState + + ", newDesiredState=" + newState); + } + continue; + } + + if (! maintenanceStateHelper.isOperationAllowed(reqOpLvl, sch)) { + ignoredScHosts.add(sch); + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring ServiceComponentHost" + + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + service.getName() + + ", componentName=" + sc.getName() + + ", hostname=" + sch.getHostName()); + } + continue; + } + + if (sc.isClientComponent() && + !newState.isValidClientComponentState()) { + continue; + } + /** + * This is hack for now wherein we don't fail if the + * sch is in INSTALL_FAILED + */ + if (! isValidStateTransition(requestStages, oldSchState, newState, sch)) { + String error = "Invalid transition for" + + " servicecomponenthost" + + ", clusterName=" + cluster.getClusterName() + + ", clusterId=" + cluster.getClusterId() + + ", serviceName=" + sch.getServiceName() + + ", componentName=" + sch.getServiceComponentName() + + ", hostname=" + sch.getHostName() + + ", currentState=" + oldSchState + + ", newDesiredState=" + newState; + StackId sid = cluster.getDesiredStackVersion(); + + if ( ambariMetaInfo.getComponentCategory( + sid.getStackName(), sid.getStackVersion(), sc.getServiceName(), + sch.getServiceComponentName()).isMaster()) { + throw new AmbariException(error); + } else { + LOG.warn("Ignoring: " + error); + continue; + } + } + if (!changedScHosts.containsKey(sc.getName())) { + changedScHosts.put(sc.getName(), + new EnumMap<State, List<ServiceComponentHost>>(State.class)); + } + if (!changedScHosts.get(sc.getName()).containsKey(newState)) { + changedScHosts.get(sc.getName()).put(newState, + new ArrayList<ServiceComponentHost>()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Handling update to ServiceComponentHost" + + ", clusterName=" + cluster.getClusterName() + + ", serviceName=" + service.getName() + + ", componentName=" + sc.getName() + + ", hostname=" + sch.getHostName() + + ", currentState=" + oldSchState + + ", newDesiredState=" + newState); + } + changedScHosts.get(sc.getName()).get(newState).add(sch); + } + } + } + // Delete services based on the given set of requests protected RequestStatusResponse deleteServices(Set<ServiceRequest> request) throws AmbariException { http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java b/ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java index ea0d7ee..3427f9a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metadata/RoleCommandOrder.java @@ -27,6 +27,8 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.stageplanner.RoleGraphNode; import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.StackInfo; import org.codehaus.jackson.map.ObjectMapper; @@ -207,7 +209,7 @@ public class RoleCommandOrder { String stackName = currentStackVersion.getStackName(); String stackVersion = currentStackVersion.getStackVersion(); File rcoFile = getRCOFile(stackName, stackVersion); - Map<String,Object> userData = null; + Map<String,Object> userData; try { @@ -228,21 +230,26 @@ public class RoleCommandOrder { return; } - Map<String,Object> generalSection = (Map<String, Object>) userData.get(GENERAL_DEPS_KEY); + Map<String,Object> generalSection = + (Map<String, Object>) userData.get(GENERAL_DEPS_KEY); addDependencies(generalSection); if (hasGLUSTERFS) { - Map<String,Object> glusterfsSection = (Map<String, Object>) userData.get(GLUSTERFS_DEPS_KEY); + Map<String,Object> glusterfsSection = + (Map<String, Object>) userData.get(GLUSTERFS_DEPS_KEY); addDependencies(glusterfsSection); } else { - Map<String,Object> noGlusterFSSection = (Map<String, Object>) userData.get(NO_GLUSTERFS_DEPS_KEY); + Map<String,Object> noGlusterFSSection = + (Map<String, Object>) userData.get(NO_GLUSTERFS_DEPS_KEY); addDependencies(noGlusterFSSection); } if (isNameNodeHAEnabled) { - Map<String,Object> NAMENODEHASection = (Map<String, Object>) userData.get(NAMENODE_HA_DEPS_KEY); + Map<String,Object> NAMENODEHASection = + (Map<String, Object>) userData.get(NAMENODE_HA_DEPS_KEY); addDependencies(NAMENODEHASection); } if (isResourceManagerHAEnabled) { - Map<String,Object> ResourceManagerHASection = (Map<String, Object>) userData.get(RESOURCEMANAGER_HA_DEPS_KEY); + Map<String,Object> ResourceManagerHASection = + (Map<String, Object>) userData.get(RESOURCEMANAGER_HA_DEPS_KEY); addDependencies(ResourceManagerHASection); } extendTransitiveDependency(); @@ -273,19 +280,52 @@ public class RoleCommandOrder { } /** + * Returns transitive dependencies as a services list + * @param service to check if it depends on another services + * @return tramsitive services + */ + public Set<Service> getTransitiveServices(Service service, RoleCommand cmd) + throws AmbariException { + + Set<Service> transitiveServices = new HashSet<Service>(); + Cluster cluster = service.getCluster(); + + Set<RoleCommandPair> allDeps = new HashSet<RoleCommandPair>(); + for (ServiceComponent sc : service.getServiceComponents().values()) { + RoleCommandPair rcp = new RoleCommandPair(Role.valueOf(sc.getName()), cmd); + Set<RoleCommandPair> deps = this.dependencies.get(rcp); + if (deps != null) { + allDeps.addAll(deps); + } + } + + for (Service s : cluster.getServices().values()) { + for (RoleCommandPair rcp : allDeps) { + ServiceComponent sc = s.getServiceComponents().get(rcp.getRole().toString()); + if (sc != null) { + transitiveServices.add(s); + break; + } + } + } + + return transitiveServices; + } + + /** * Adds transitive dependencies to each node. * A => B and B => C implies A => B,C and B => C */ private void extendTransitiveDependency() { - for (RoleCommandPair rcp : this.dependencies.keySet()) { + for (Map.Entry<RoleCommandPair, Set<RoleCommandPair>> roleCommandPairSetEntry : this.dependencies.entrySet()) { HashSet<RoleCommandPair> visited = new HashSet<RoleCommandPair>(); HashSet<RoleCommandPair> transitiveDependencies = new HashSet<RoleCommandPair>(); - for (RoleCommandPair directlyBlockedOn : this.dependencies.get(rcp)) { + for (RoleCommandPair directlyBlockedOn : this.dependencies.get(roleCommandPairSetEntry.getKey())) { visited.add(directlyBlockedOn); identifyTransitiveDependencies(directlyBlockedOn, visited, transitiveDependencies); } if (transitiveDependencies.size() > 0) { - this.dependencies.get(rcp).addAll(transitiveDependencies); + this.dependencies.get(roleCommandPairSetEntry.getKey()).addAll(transitiveDependencies); } } } @@ -336,7 +376,8 @@ public class RoleCommandOrder { } public int compareDeps(RoleCommandOrder rco) { - Set<RoleCommandPair> v1 = null, v2 = null; + Set<RoleCommandPair> v1; + Set<RoleCommandPair> v2; if (this == rco) { return 0; } @@ -350,12 +391,13 @@ public class RoleCommandOrder { // So far so good. Since the keysets match, let's check the // actual entries against each other - for (RoleCommandPair key: this.dependencies.keySet()) { - v1 = this.dependencies.get(key); - v2 = rco.dependencies.get(key); + for (Map.Entry<RoleCommandPair, Set<RoleCommandPair>> roleCommandPairSetEntry : this.dependencies.entrySet()) { + v1 = this.dependencies.get(roleCommandPairSetEntry.getKey()); + v2 = rco.dependencies.get(roleCommandPairSetEntry.getKey()); if (!v1.equals(v2)) { - LOG.debug("different entry found for key (" + key.role.toString() + ", " - + key.cmd.toString() + ")" ); + LOG.debug("different entry found for key (" + + roleCommandPairSetEntry.getKey().role.toString() + ", " + + roleCommandPairSetEntry.getKey().cmd.toString() + ")" ); return 1; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/main/resources/properties.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/properties.json b/ambari-server/src/main/resources/properties.json index 825be7d..aeb03f2 100644 --- a/ambari-server/src/main/resources/properties.json +++ b/ambari-server/src/main/resources/properties.json @@ -20,6 +20,7 @@ "Services/attributes", "params/run_smoke_test", "params/reconfigure_client", + "params/start_dependencies", "_" ], "Host":[ http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java index e51cea3..39b43f7 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java @@ -416,9 +416,18 @@ public class AmbariManagementControllerTest { // manually change live state to stopped as no running action manager List<HostRoleCommand> commands = actionDB.getRequestTasks(resp.getRequestId()); for (HostRoleCommand cmd : commands) { - if (!cmd.getRole().toString().endsWith("CHECK")) { - clusters.getCluster(clusterName).getService(serviceName).getServiceComponent(cmd.getRole().name()) - .getServiceComponentHost(cmd.getHostName()).setState(State.STARTED); + String scName = cmd.getRole().toString(); + if (!scName.endsWith("CHECK")) { + Cluster cluster = clusters.getCluster(clusterName); + String hostname = cmd.getHostName(); + for (Service s : cluster.getServices().values()) { + if (s.getServiceComponents().containsKey(scName) && + !s.getServiceComponent(scName).isClientComponent()) { + s.getServiceComponent(scName).getServiceComponentHost(hostname). + setState(State.STARTED); + break; + } + } } } return resp.getRequestId(); @@ -5309,11 +5318,9 @@ public class AmbariManagementControllerTest { clusters.getHost(host2).setState(HostState.HEARTBEAT_LOST); - // Start - requestId2 = startService(clusterName, serviceName1, true, true); + // Start MAPREDUCE, HDFS is started as a dependency requestId3 = startService(clusterName, serviceName2, true, true); - stages = actionDB.getAllStages(requestId2); - stages.addAll(actionDB.getAllStages(requestId3)); + stages = actionDB.getAllStages(requestId3); HostRoleCommand clientWithHostDown = null; for (Stage stage : stages) { for (HostRoleCommand hrc : stage.getOrderedHostRoleCommands()) { @@ -5323,6 +5330,16 @@ public class AmbariManagementControllerTest { } } Assert.assertNull(clientWithHostDown); + + Assert.assertEquals(State.STARTED, clusters.getCluster(clusterName). + getService("MAPREDUCE").getServiceComponent("TASKTRACKER"). + getServiceComponentHost(host1).getState()); + Assert.assertEquals(State.STARTED, clusters.getCluster(clusterName). + getService("HDFS").getServiceComponent("NAMENODE"). + getServiceComponentHost(host1).getState()); + Assert.assertEquals(State.STARTED, clusters.getCluster(clusterName). + getService("HDFS").getServiceComponent("DATANODE"). + getServiceComponentHost(host1).getState()); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ServiceResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ServiceResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ServiceResourceProviderTest.java index b46736d..f37dea8 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ServiceResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ServiceResourceProviderTest.java @@ -23,6 +23,7 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.isNull; import static org.easymock.EasyMock.replay; @@ -40,6 +41,7 @@ import java.util.Map; import java.util.Set; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.RoleCommand; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.MaintenanceStateHelper; @@ -55,6 +57,7 @@ import org.apache.ambari.server.controller.spi.ResourceProvider; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.utilities.PredicateBuilder; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.metadata.RoleCommandOrder; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; @@ -262,6 +265,7 @@ public class ServiceResourceProviderTest { AmbariMetaInfo ambariMetaInfo = createNiceMock(AmbariMetaInfo.class); RequestStageContainer requestStages = createNiceMock(RequestStageContainer.class); RequestStatusResponse requestStatusResponse = createNiceMock(RequestStatusResponse.class); + RoleCommandOrder rco = createNiceMock(RoleCommandOrder.class); Map<String, String> mapRequestProps = new HashMap<String, String>(); mapRequestProps.put("context", "Called from a test"); @@ -294,8 +298,14 @@ public class ServiceResourceProviderTest { expect(requestStages.getRequestStatusResponse()).andReturn(requestStatusResponse); expect(maintenanceStateHelper.isOperationAllowed(anyObject(Resource.Type.class), anyObject(Service.class))).andReturn(true).anyTimes(); + expect(service0.getCluster()).andReturn(cluster).anyTimes(); + expect(managementController.getRoleCommandOrder(cluster)).andReturn(rco). + anyTimes(); + expect(rco.getTransitiveServices(eq(service0), eq(RoleCommand.START))). + andReturn(Collections.<Service>emptySet()).anyTimes(); + // replay - replay(managementController, clusters, cluster, maintenanceStateHelper, + replay(managementController, clusters, cluster, rco, maintenanceStateHelper, service0, serviceFactory, ambariMetaInfo, requestStages, requestStatusResponse); ServiceResourceProvider provider = getServiceProvider(managementController, maintenanceStateHelper); @@ -335,6 +345,7 @@ public class ServiceResourceProviderTest { RequestStatusResponse response1 = createNiceMock(RequestStatusResponse.class); RequestStatusResponse response2 = createNiceMock(RequestStatusResponse .class); + RoleCommandOrder rco = createNiceMock(RoleCommandOrder.class); Map<String, String> mapRequestProps = new HashMap<String, String>(); mapRequestProps.put("context", "Called from a test"); @@ -388,9 +399,17 @@ public class ServiceResourceProviderTest { expect(maintenanceStateHelper.isOperationAllowed(anyObject(Resource.Type.class), anyObject(Service.class))).andReturn(true).anyTimes(); + expect(service0.getCluster()).andReturn(cluster).anyTimes(); + expect(managementController1.getRoleCommandOrder(cluster)).andReturn(rco). + anyTimes(); + expect(managementController2.getRoleCommandOrder(cluster)).andReturn(rco). + anyTimes(); + expect(rco.getTransitiveServices(eq(service0), eq(RoleCommand.START))). + andReturn(Collections.<Service>emptySet()).anyTimes(); + // replay replay(managementController1, response1, managementController2, requestStages1, requestStages2, response2, - clusters, cluster, service0, serviceResponse0, ambariMetaInfo, maintenanceStateHelper); + clusters, cluster, service0, serviceResponse0, ambariMetaInfo, rco, maintenanceStateHelper); ServiceResourceProvider provider1 = getServiceProvider(managementController1, maintenanceStateHelper); @@ -1310,7 +1329,7 @@ public class ServiceResourceProviderTest { provider = getServiceProvider(controller); } - RequestStageContainer request = provider.updateServices(null, requests, requestProperties, runSmokeTest, reconfigureClients); + RequestStageContainer request = provider.updateServices(null, requests, requestProperties, runSmokeTest, reconfigureClients, true); request.persist(); return request.getRequestStatusResponse(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/32d0c90b/ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java index db3fb91..8095048 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/metadata/RoleCommandOrderTest.java @@ -29,11 +29,13 @@ import static org.easymock.EasyMock.verify; import java.io.IOException; import java.io.InputStream; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import junit.framework.Assert; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.RoleCommand; @@ -50,6 +52,7 @@ import org.codehaus.jackson.annotate.JsonAutoDetect; import org.codehaus.jackson.annotate.JsonMethod; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; +import org.easymock.IExpectationSetters; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -135,7 +138,7 @@ public class RoleCommandOrderTest { Service hdfsService = createMock(Service.class); expect(cluster.getService("HDFS")).andReturn(hdfsService).atLeastOnce(); - expect(cluster.getService("YARN")).andReturn(null); + expect(cluster.getService("YARN")).andReturn(null).atLeastOnce(); expect(hdfsService.getServiceComponent("JOURNALNODE")).andReturn(null); expect(cluster.getCurrentStackVersion()).andReturn(new StackId("HDP", "2.0.6")); @@ -316,7 +319,57 @@ public class RoleCommandOrderTest { verify(cluster); verify(hdfsService); } - + + @Test + public void testTransitiveServices() throws AmbariException { + RoleCommandOrder rco = injector.getInstance(RoleCommandOrder.class); + ClusterImpl cluster = createMock(ClusterImpl.class); + + Service hdfsService = createMock(Service.class); + + ServiceComponent namenode = createMock(ServiceComponent.class); + expect(namenode.getName()).andReturn("NAMENODE").anyTimes(); + + Map<String,ServiceComponent> hdfsComponents = Collections.singletonMap("NAMENODE", namenode); + expect(hdfsService.getServiceComponents()).andReturn(hdfsComponents).anyTimes(); + + Service nagiosService = createMock(Service.class); + expect(cluster.getService("NAGIOS")).andReturn(nagiosService).atLeastOnce(); + expect(nagiosService.getCluster()).andReturn(cluster).anyTimes(); + + ServiceComponent nagiosServer = createMock(ServiceComponent.class); + expect(nagiosServer.getName()).andReturn("NAGIOS_SERVER").anyTimes(); + + Map<String,ServiceComponent> nagiosComponents = Collections.singletonMap("NAGIOS_SERVER", nagiosServer); + expect(nagiosService.getServiceComponents()).andReturn(nagiosComponents).anyTimes(); + + Map<String, Service> installedServices = new HashMap<String, Service>(); + installedServices.put("HDFS", hdfsService); + installedServices.put("NAGIOS", nagiosService); + expect(cluster.getServices()).andReturn(installedServices).atLeastOnce(); + + + expect(cluster.getService("HDFS")).andReturn(hdfsService).atLeastOnce(); + expect(cluster.getService("GLUSTERFS")).andReturn(null); + expect(cluster.getService("YARN")).andReturn(null); + expect(hdfsService.getServiceComponent("JOURNALNODE")).andReturn(null); + expect(cluster.getCurrentStackVersion()).andReturn(new StackId("HDP", "2.0.5")); + + //replay + replay(cluster, hdfsService, nagiosService, nagiosServer, namenode); + + rco.initialize(cluster); + + Set<Service> transitiveServices = + rco.getTransitiveServices(cluster.getService("NAGIOS"), RoleCommand.START); + + //HDFS should be started before NAGIOS start + Assert.assertNotNull(transitiveServices); + Assert.assertFalse(transitiveServices.isEmpty()); + Assert.assertEquals(1, transitiveServices.size()); + Assert.assertTrue(transitiveServices.contains(hdfsService)); + } + private boolean dependenciesContainBlockedRole(Map<RoleCommandPair, Set<RoleCommandPair>> deps, Role blocked) { for (RoleCommandPair blockedPair : deps.keySet()) {