Repository: ambari Updated Branches: refs/heads/trunk f9cefd231 -> 9c570b859
AMBARI-11134. Set service states when provisioning cluster via blueprint Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9c570b85 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9c570b85 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9c570b85 Branch: refs/heads/trunk Commit: 9c570b859470e68427c399a870af426a3062ada9 Parents: f9cefd2 Author: John Speidel <jspei...@hortonworks.com> Authored: Thu May 14 10:22:10 2015 -0400 Committer: John Speidel <jspei...@hortonworks.com> Committed: Thu May 14 11:00:12 2015 -0400 ---------------------------------------------------------------------- .../ambari/server/topology/AmbariContext.java | 26 ++- .../ambari/server/topology/TopologyManager.java | 21 +- .../server/topology/AmbariContextTest.java | 229 +++++++++++++++++++ .../server/topology/TopologyManagerTest.java | 5 - 4 files changed, 264 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/9c570b85/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java index e6c43ef..0de4b00 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AmbariContext.java @@ -51,6 +51,8 @@ import org.apache.ambari.server.controller.internal.HostResourceProvider; import org.apache.ambari.server.controller.internal.RequestImpl; import org.apache.ambari.server.controller.internal.ServiceResourceProvider; import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.controller.predicate.EqualsPredicate; +import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.utilities.ClusterControllerHelper; import org.apache.ambari.server.state.Cluster; @@ -81,7 +83,7 @@ public class AmbariContext { private static HostResourceProvider hostResourceProvider; private static ServiceResourceProvider serviceResourceProvider; private static ComponentResourceProvider componentResourceProvider; - private HostComponentResourceProvider hostComponentResourceProvider; + private static HostComponentResourceProvider hostComponentResourceProvider; private final static Logger LOG = LoggerFactory.getLogger(TopologyManager.class); @@ -158,7 +160,6 @@ public class AmbariContext { Cluster cluster = getController().getClusters().getCluster(clusterName); services.removeAll(cluster.getServices().keySet()); } catch (AmbariException e) { - e.printStackTrace(); throw new RuntimeException("Failed to persist service and component resources: " + e, e); } Set<ServiceRequest> serviceRequests = new HashSet<ServiceRequest>(); @@ -173,9 +174,28 @@ public class AmbariContext { getServiceResourceProvider().createServices(serviceRequests); getComponentResourceProvider().createComponents(componentRequests); } catch (AmbariException e) { - e.printStackTrace(); throw new RuntimeException("Failed to persist service and component resources: " + e, e); } + // set all services state to INSTALLED->STARTED + // this is required so the user can start failed services at the service level + Map<String, Object> installProps = new HashMap<String, Object>(); + installProps.put(ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, "INSTALLED"); + installProps.put(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); + Map<String, Object> startProps = new HashMap<String, Object>(); + startProps.put(ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, "STARTED"); + startProps.put(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); + Predicate predicate = new EqualsPredicate<String>( + ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, clusterName); + try { + getServiceResourceProvider().updateResources( + new RequestImpl(null, Collections.singleton(installProps), null, null), predicate); + + getServiceResourceProvider().updateResources( + new RequestImpl(null, Collections.singleton(startProps), null, null), predicate); + } catch (Exception e) { + // just log as this won't prevent cluster from being provisioned correctly + LOG.error("Unable to update state of services during cluster provision: " + e, e); + } } public void createAmbariHostResources(String clusterName, String hostName, Map<String, Collection<String>> components) { http://git-wip-us.apache.org/repos/asf/ambari/blob/9c570b85/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index e6c2f1e..a75cccb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -62,7 +62,6 @@ public class TopologyManager { private final Collection<LogicalRequest> outstandingRequests = new ArrayList<LogicalRequest>(); //todo: currently only support a single cluster private Map<String, ClusterTopology> clusterTopologyMap = new HashMap<String, ClusterTopology>(); - //private final Map<TopologyTask.Type, Set<TopologyTask>> pendingTasks = new HashMap<TopologyTask.Type, Set<TopologyTask>>(); //todo: inject private static LogicalRequestFactory logicalRequestFactory = new LogicalRequestFactory(); @@ -93,13 +92,16 @@ public class TopologyManager { ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request); // persist request after it has successfully validated PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request); + + // get the id prior to creating ambari resources which increments the counter + Long provisionId = ambariContext.getNextRequestId(); ambariContext.createAmbariResources(topology); String clusterName = topology.getClusterName(); clusterTopologyMap.put(clusterName, topology); addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true)); - LogicalRequest logicalRequest = processRequest(persistedRequest, topology); + LogicalRequest logicalRequest = processRequest(persistedRequest, topology, provisionId); //todo: this should be invoked as part of a generic lifecycle event which could possibly //todo: be tied to cluster state @@ -121,7 +123,8 @@ public class TopologyManager { PersistedTopologyRequest persistedRequest = persistedState.persistTopologyRequest(request); // this registers/updates all request host groups topology.update(request); - return getRequestStatus(processRequest(persistedRequest, topology).getRequestId()); + return getRequestStatus(processRequest(persistedRequest, topology, + ambariContext.getNextRequestId()).getRequestId()); } public void onHostRegistered(HostImpl host, boolean associatedWithCluster) { @@ -272,11 +275,11 @@ public class TopologyManager { return hostComponentMap; } - private LogicalRequest processRequest(PersistedTopologyRequest persistedRequest, ClusterTopology topology) + private LogicalRequest processRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId) throws AmbariException { - finalizeTopology(persistedRequest.getRequest(), topology); - LogicalRequest logicalRequest = createLogicalRequest(persistedRequest, topology); + finalizeTopology(request.getRequest(), topology); + LogicalRequest logicalRequest = createLogicalRequest(request, topology, requestId); boolean requestHostComplete = false; //todo: overall synchronization. Currently we have nested synchronization here @@ -323,13 +326,13 @@ public class TopologyManager { return logicalRequest; } - private LogicalRequest createLogicalRequest(PersistedTopologyRequest persistedRequest, ClusterTopology topology) + private LogicalRequest createLogicalRequest(PersistedTopologyRequest request, ClusterTopology topology, Long requestId) throws AmbariException { LogicalRequest logicalRequest = logicalRequestFactory.createRequest( - ambariContext.getNextRequestId(), persistedRequest.getRequest(), topology); + requestId, request.getRequest(), topology); - persistedState.persistLogicalRequest(logicalRequest, persistedRequest.getId()); + persistedState.persistLogicalRequest(logicalRequest, request.getId()); allRequests.put(logicalRequest.getRequestId(), logicalRequest); synchronized (reservedHosts) { http://git-wip-us.apache.org/repos/asf/ambari/blob/9c570b85/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java new file mode 100644 index 0000000..7dcb7be --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AmbariContextTest.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.topology; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.ClusterRequest; +import org.apache.ambari.server.controller.ServiceComponentRequest; +import org.apache.ambari.server.controller.ServiceRequest; +import org.apache.ambari.server.controller.internal.ComponentResourceProvider; +import org.apache.ambari.server.controller.internal.HostComponentResourceProvider; +import org.apache.ambari.server.controller.internal.HostResourceProvider; +import org.apache.ambari.server.controller.internal.ServiceResourceProvider; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.controller.predicate.EqualsPredicate; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.Request; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Service; +import org.easymock.Capture; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.createStrictMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * AmbariContext unit tests + */ +//todo: switch over to EasyMockSupport +public class AmbariContextTest { + + private static final String CLUSTER_NAME = "testCluster"; + private static final String STACK_NAME = "testStack"; + private static final String STACK_VERSION = "testVersion"; + + private static final AmbariContext context = new AmbariContext(); + private static final AmbariManagementController controller = createStrictMock(AmbariManagementController.class); + private static final HostResourceProvider hostResourceProvider = createStrictMock(HostResourceProvider.class); + private static final ServiceResourceProvider serviceResourceProvider = createStrictMock(ServiceResourceProvider.class); + private static final ComponentResourceProvider componentResourceProvider = createStrictMock(ComponentResourceProvider.class); + private static final HostComponentResourceProvider hostComponentResourceProvider = createStrictMock(HostComponentResourceProvider.class); + private static final ClusterTopology topology = createNiceMock(ClusterTopology.class); + private static final Blueprint blueprint = createNiceMock(Blueprint.class); + private static final Stack stack = createNiceMock(Stack.class); + private static final Clusters clusters = createStrictMock(Clusters.class); + private static final Cluster cluster = createStrictMock(Cluster.class); + + private static final Collection<String> blueprintServices = new HashSet<String>(); + private static final Map<String, Service> clusterServices = new HashMap<String, Service>(); + + @Before + public void setUp() throws Exception { + // "inject" context state + Class clazz = AmbariContext.class; + Field f = clazz.getDeclaredField("controller"); + f.setAccessible(true); + f.set(null, controller); + + f = clazz.getDeclaredField("hostResourceProvider"); + f.setAccessible(true); + f.set(null, hostResourceProvider); + + f = clazz.getDeclaredField("serviceResourceProvider"); + f.setAccessible(true); + f.set(null, serviceResourceProvider); + + f = clazz.getDeclaredField("componentResourceProvider"); + f.setAccessible(true); + f.set(null, componentResourceProvider); + + f = clazz.getDeclaredField("hostComponentResourceProvider"); + f.setAccessible(true); + f.set(null, hostComponentResourceProvider); + + blueprintServices.add("service1"); + blueprintServices.add("service2"); + + expect(topology.getClusterName()).andReturn(CLUSTER_NAME).anyTimes(); + expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); + + expect(blueprint.getStack()).andReturn(stack).anyTimes(); + expect(blueprint.getServices()).andReturn(blueprintServices).anyTimes(); + expect(blueprint.getComponents("service1")).andReturn(Arrays.asList("s1Component1", "s1Component2")); + expect(blueprint.getComponents("service2")).andReturn(Collections.singleton("s2Component1")); + + expect(stack.getName()).andReturn(STACK_NAME).anyTimes(); + expect(stack.getVersion()).andReturn(STACK_VERSION).anyTimes(); + + } + + @After + public void tearDown() throws Exception { + verify(controller, hostResourceProvider, serviceResourceProvider, componentResourceProvider, + hostComponentResourceProvider, topology, blueprint, stack, clusters, cluster); + + reset(controller, hostResourceProvider, serviceResourceProvider, componentResourceProvider, + hostComponentResourceProvider, topology, blueprint, stack, clusters, cluster); + } + + private void replayAll() { + replay(controller, hostResourceProvider, serviceResourceProvider, componentResourceProvider, + hostComponentResourceProvider, topology, blueprint, stack, clusters, cluster); + } + + @Test + public void testCreateAmbariResources() throws Exception { + // expectations + Capture<ClusterRequest> clusterRequestCapture = new Capture<ClusterRequest>(); + controller.createCluster(capture(clusterRequestCapture)); + expectLastCall().once(); + expect(controller.getClusters()).andReturn(clusters).anyTimes(); + expect(clusters.getCluster(CLUSTER_NAME)).andReturn(cluster).anyTimes(); + expect(cluster.getServices()).andReturn(clusterServices).anyTimes(); + + Capture<Set<ServiceRequest>> serviceRequestCapture = new Capture<Set<ServiceRequest>>(); + Capture<Set<ServiceComponentRequest>> serviceComponentRequestCapture = new Capture<Set<ServiceComponentRequest>>(); + + serviceResourceProvider.createServices(capture(serviceRequestCapture)); + expectLastCall().once(); + componentResourceProvider.createComponents(capture(serviceComponentRequestCapture)); + expectLastCall().once(); + + Capture<Request> serviceInstallRequestCapture = new Capture<Request>(); + Capture<Request> serviceStartRequestCapture = new Capture<Request>(); + Capture<Predicate> installPredicateCapture = new Capture<Predicate>(); + Capture<Predicate> startPredicateCapture = new Capture<Predicate>(); + + expect(serviceResourceProvider.updateResources(capture(serviceInstallRequestCapture), + capture(installPredicateCapture))).andReturn(null).once(); + expect(serviceResourceProvider.updateResources(capture(serviceStartRequestCapture), + capture(startPredicateCapture))).andReturn(null).once(); + + replayAll(); + + // test + context.createAmbariResources(topology); + + // assertions + ClusterRequest clusterRequest = clusterRequestCapture.getValue(); + assertEquals(CLUSTER_NAME, clusterRequest.getClusterName()); + assertEquals(String.format("%s-%s", STACK_NAME, STACK_VERSION), clusterRequest.getStackVersion()); + + Collection<ServiceRequest> serviceRequests = serviceRequestCapture.getValue(); + assertEquals(2, serviceRequests.size()); + Collection<String> servicesFound = new HashSet<String>(); + for (ServiceRequest serviceRequest : serviceRequests) { + servicesFound.add(serviceRequest.getServiceName()); + assertEquals(CLUSTER_NAME, serviceRequest.getClusterName()); + } + assertTrue(servicesFound.size() == 2 && + servicesFound.containsAll(Arrays.asList("service1", "service2"))); + + Collection<ServiceComponentRequest> serviceComponentRequests = serviceComponentRequestCapture.getValue(); + assertEquals(3, serviceComponentRequests.size()); + Map<String, Collection<String>> foundServiceComponents = new HashMap<String, Collection<String>>(); + for (ServiceComponentRequest componentRequest : serviceComponentRequests) { + assertEquals(CLUSTER_NAME, componentRequest.getClusterName()); + String serviceName = componentRequest.getServiceName(); + Collection<String> serviceComponents = foundServiceComponents.get(serviceName); + if (serviceComponents == null) { + serviceComponents = new HashSet<String>(); + foundServiceComponents.put(serviceName, serviceComponents); + } + serviceComponents.add(componentRequest.getComponentName()); + } + assertEquals(2, foundServiceComponents.size()); + + Collection<String> service1Components = foundServiceComponents.get("service1"); + assertEquals(2, service1Components.size()); + assertTrue(service1Components.containsAll(Arrays.asList("s1Component1", "s1Component2"))); + + Collection<String> service2Components = foundServiceComponents.get("service2"); + assertEquals(1, service2Components.size()); + assertTrue(service2Components.contains("s2Component1")); + + Request installRequest = serviceInstallRequestCapture.getValue(); + Set<Map<String, Object>> installPropertiesSet = installRequest.getProperties(); + assertEquals(1, installPropertiesSet.size()); + Map<String, Object> installProperties = installPropertiesSet.iterator().next(); + assertEquals(CLUSTER_NAME, installProperties.get(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID)); + assertEquals("INSTALLED", installProperties.get(ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID)); + assertEquals(new EqualsPredicate<String>(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, CLUSTER_NAME), + installPredicateCapture.getValue()); + + Request startRequest = serviceStartRequestCapture.getValue(); + Set<Map<String, Object>> startPropertiesSet = startRequest.getProperties(); + assertEquals(1, startPropertiesSet.size()); + Map<String, Object> startProperties = startPropertiesSet.iterator().next(); + assertEquals(CLUSTER_NAME, startProperties.get(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID)); + assertEquals("STARTED", startProperties.get(ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID)); + assertEquals(new EqualsPredicate<String>(ServiceResourceProvider.SERVICE_CLUSTER_NAME_PROPERTY_ID, CLUSTER_NAME), + installPredicateCapture.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/9c570b85/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index df87aec..8d72d79 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -234,12 +234,7 @@ public class TopologyManagerTest { ambariContext.createAmbariResources(isA(ClusterTopology.class)); expectLastCall().once(); expect(ambariContext.getNextRequestId()).andReturn(1L).once(); - //todo: these are from cluster topology context expect(ambariContext.isClusterKerberosEnabled(CLUSTER_NAME)).andReturn(false).anyTimes(); - //ambariContext.createAmbariHostResources(CLUSTER_NAME, "host1", group1ServiceComponents); - //expectLastCall().once(); - //ambariContext.registerHostWithConfigGroup(eq("host1"), isA(ClusterTopologyImpl.class), eq("group1")); - //expectLastCall().once(); // cluster configuration task run() isn't executed by mock executor // so only INITIAL config