Repository: ambari Updated Branches: refs/heads/branch-2.4 8c44cd93c -> 2f6d7fbc9
AMBARI-17053. Add explicit ambari-server log line indicating cluster creation complete. (Daniel Gergely via stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2f6d7fbc Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2f6d7fbc Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2f6d7fbc Branch: refs/heads/branch-2.4 Commit: 2f6d7fbc9637e9714ed30ae63be3f0e5b92583b7 Parents: 8c44cd9 Author: Daniel Gergely <dgerg...@hortonworks.com> Authored: Fri Jun 17 06:58:33 2016 +0200 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Fri Jun 17 06:59:12 2016 +0200 ---------------------------------------------------------------------- .../actionmanager/ActionDBAccessorImpl.java | 5 + .../ambari/server/events/AmbariEvent.java | 5 + .../server/events/RequestFinishedEvent.java | 44 ++++++ .../ambari/server/topology/PersistedState.java | 7 + .../server/topology/PersistedStateImpl.java | 22 +++ .../ambari/server/topology/TopologyManager.java | 120 ++++++++++++++- .../server/topology/TopologyManagerTest.java | 150 ++++++++++++++++++- 7 files changed, 344 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java index 8e6fb3f..0fa5aa9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java @@ -46,6 +46,7 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.events.HostRemovedEvent; import org.apache.ambari.server.events.ServiceComponentUninstalledEvent; +import org.apache.ambari.server.events.RequestFinishedEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.dao.ClusterDAO; import org.apache.ambari.server.orm.dao.ExecutionCommandDAO; @@ -128,6 +129,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { Configuration configuration; @Inject + AmbariEventPublisher ambariEventPublisher; + + @Inject AuditLogger auditLogger; /** @@ -434,6 +438,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor { if (requestEntity != null && requestEntity.getEndTime() == -1L) { requestEntity.setEndTime(System.currentTimeMillis()); requestDAO.merge(requestEntity); + ambariEventPublisher.publish(new RequestFinishedEvent(requestEntity.getClusterId(), requestId)); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java index 1079806..912c441 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java @@ -103,6 +103,11 @@ public abstract class AmbariEvent { ACTION_EXECUTION_FINISHED, /** + * Sent when request finishes + */ + REQUEST_FINISHED, + + /** * The cluster was renamed. */ CLUSTER_RENAME, http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java new file mode 100644 index 0000000..603711a --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.events; + +/** + * The {@link RequestFinishedEvent} is fired when a host is added to a cluster. + */ +public class RequestFinishedEvent extends ClusterEvent { + + /** + * ID of the request + */ + private long requestId; + + /** + * Constructor. + * + * @param clusterId + * the ID of the cluster. + */ + public RequestFinishedEvent(long clusterId, long requestId) { + super(AmbariEventType.REQUEST_FINISHED, clusterId); + this.requestId = requestId; + } + + public long getRequestId() { + return requestId; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java index 77419d8..41fa032 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java @@ -70,4 +70,11 @@ public interface PersistedState { Map<ClusterTopology, List<LogicalRequest>> getAllRequests(); void registerInTopologyHostInfo(Host host); + + /** + * Returns provision request for a cluster + * @param clusterId + * @return + */ + LogicalRequest getProvisionRequest(long clusterId); } http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java index 324a397..ca81418 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java @@ -143,6 +143,28 @@ public class PersistedStateImpl implements PersistedState { } @Override + public LogicalRequest getProvisionRequest(long clusterId) { + Collection<TopologyRequestEntity> entities = topologyRequestDAO.findByClusterId(clusterId); + for (TopologyRequestEntity entity : entities) { + if(TopologyRequest.Type.PROVISION == TopologyRequest.Type.valueOf(entity.getAction())) { + TopologyLogicalRequestEntity logicalRequestEntity = entity.getTopologyLogicalRequestEntity(); + TopologyRequest replayedRequest = new ReplayedTopologyRequest(entity, blueprintFactory); + try { + ClusterTopology clusterTopology = new ClusterTopologyImpl(ambariContext, replayedRequest); + Long logicalId = logicalRequestEntity.getId(); + return logicalRequestFactory.createRequest(logicalId, replayedRequest, clusterTopology, logicalRequestEntity); + } catch (InvalidTopologyException e) { + throw new RuntimeException("Failed to construct cluster topology while replaying request: " + e, e); + } catch (AmbariException e) { + throw new RuntimeException("Failed to construct logical request during replay: " + e, e); + } + } + } + + return null; + } + + @Override public Map<ClusterTopology, List<LogicalRequest>> getAllRequests() { //todo: we only currently support a single request per ambari instance so there should only //todo: be a single cluster topology http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 4a1e29e..0190478 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -34,11 +34,14 @@ import java.util.concurrent.Executors; import javax.inject.Inject; +import com.google.common.eventbus.Subscribe; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.controller.internal.ArtifactResourceProvider; import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.controller.internal.CredentialResourceProvider; @@ -53,6 +56,9 @@ import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException; import org.apache.ambari.server.controller.spi.ResourceProvider; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; +import org.apache.ambari.server.events.AmbariEvent; +import org.apache.ambari.server.events.RequestFinishedEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.state.Host; @@ -111,7 +117,22 @@ public class TopologyManager { private final static Logger LOG = LoggerFactory.getLogger(TopologyManager.class); - public TopologyManager() { + /** + * Stores request that belongs to blueprint creation + */ + private Map<Long, LogicalRequest> clusterProvisionWithBlueprintCreateRequests = new HashMap<>(); + /** + * Flag to show whether blueprint is already finished or not. It is used for shortcuts. + */ + private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = new HashMap<>(); + + public TopologyManager(){ + + } + + @Inject + public void setEventPublisher(AmbariEventPublisher ambariEventPublisher) { + ambariEventPublisher.register(this); } @Inject @@ -133,6 +154,62 @@ public class TopologyManager { } } + /** + * Called when heartbeat processing finishes + * @param event + */ + @Subscribe + public void onRequestFinished(RequestFinishedEvent event) { + if(event.getType() != AmbariEvent.AmbariEventType.REQUEST_FINISHED + || clusterProvisionWithBlueprintCreateRequests.isEmpty() + || Boolean.TRUE.equals(clusterProvisionWithBlueprintCreationFinished.get(event.getClusterId()))) { + return; + } + + if(isClusterProvisionWithBlueprintFinished(event.getClusterId())) { + clusterProvisionWithBlueprintCreationFinished.put(event.getClusterId(), Boolean.TRUE); + LogicalRequest provisionRequest = clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()); + if(isLogicalRequestSuccessful(provisionRequest)) { + LOG.info("Cluster creation request id={} using Blueprint {} successfully completed for cluster id={}", + clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()).getRequestId(), + clusterTopologyMap.get(event.getClusterId()).getBlueprint().getName(), + event.getClusterId()); + } else { + LOG.info("Cluster creation request id={} using Blueprint {} failed for cluster id={}", + clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()).getRequestId(), + clusterTopologyMap.get(event.getClusterId()).getBlueprint().getName(), + event.getClusterId()); + } + } + } + + /** + * Returns if provision request for a cluster is tracked + * @param clusterId + * @return + */ + public boolean isClusterProvisionWithBlueprintTracked(long clusterId) { + return clusterProvisionWithBlueprintCreateRequests.containsKey(clusterId); + } + + /** + * Returns if the provision request for a cluster is finished. + * Note that this method returns false if the request is not tracked. + * See {@link TopologyManager#isClusterProvisionWithBlueprintTracked(long)} + * @param clusterId + * @return + */ + public boolean isClusterProvisionWithBlueprintFinished(long clusterId) { + if(!isClusterProvisionWithBlueprintTracked(clusterId)) { + return false; // no blueprint request is running + } + // shortcut + if(clusterProvisionWithBlueprintCreationFinished.containsKey(clusterId) && clusterProvisionWithBlueprintCreationFinished.get(clusterId)) { + return true; + } + return isLogicalRequestFinished(clusterProvisionWithBlueprintCreateRequests.get(clusterId)); + } + public RequestStatusResponse provisionCluster(final ProvisionClusterRequest request) throws InvalidTopologyException, AmbariException { ensureInitialized(); ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request); @@ -194,6 +271,7 @@ public class TopologyManager { //todo: be tied to cluster state ambariContext.persistInstallStateForUI(clusterName, stack.getName(), stack.getVersion()); + clusterProvisionWithBlueprintCreateRequests.put(clusterId, logicalRequest); return getRequestStatus(logicalRequest.getRequestId()); } @@ -680,6 +758,13 @@ public class TopologyManager { for (Map.Entry<ClusterTopology, List<LogicalRequest>> requestEntry : persistedRequests.entrySet()) { ClusterTopology topology = requestEntry.getKey(); clusterTopologyMap.put(topology.getClusterId(), topology); + // update provision request cache + LogicalRequest provisionRequest = persistedState.getProvisionRequest(topology.getClusterId()); + if(provisionRequest != null) { + clusterProvisionWithBlueprintCreateRequests.put(topology.getClusterId(), provisionRequest); + clusterProvisionWithBlueprintCreationFinished.put(topology.getClusterId(), + isLogicalRequestFinished(clusterProvisionWithBlueprintCreateRequests.get(topology.getClusterId()))); + } for (LogicalRequest logicalRequest : requestEntry.getValue()) { allRequests.put(logicalRequest.getRequestId(), logicalRequest); @@ -716,6 +801,39 @@ public class TopologyManager { } } + /** + * @param logicalRequest + * @return true if all the tasks in the logical request are in completed state, false otherwise + */ + private boolean isLogicalRequestFinished(LogicalRequest logicalRequest) { + if(logicalRequest != null) { + boolean completed = true; + for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) { + if(!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) { + completed = false; + } + } + return completed; + } + return false; + } + + /** + * Returns if all the tasks in the logical request have completed state. + * @param logicalRequest + * @return + */ + private boolean isLogicalRequestSuccessful(LogicalRequest logicalRequest) { + if(logicalRequest != null) { + for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) { + if(HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) { + return false; + } + } + } + return true; + } + //todo: this should invoke a callback on each 'service' in the topology private void finalizeTopology(TopologyRequest request, ClusterTopology topology) { } http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 017eab5..b45c585 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -19,10 +19,12 @@ package org.apache.ambari.server.topology; import junit.framework.Assert; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.controller.ClusterRequest; import org.apache.ambari.server.controller.ConfigurationRequest; import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.controller.internal.HostResourceProvider; import org.apache.ambari.server.controller.internal.ProvisionClusterRequest; import org.apache.ambari.server.controller.internal.ScaleClusterRequest; @@ -30,6 +32,7 @@ import org.apache.ambari.server.controller.internal.Stack; import org.apache.ambari.server.controller.spi.ClusterController; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.ResourceProvider; +import org.apache.ambari.server.events.RequestFinishedEvent; import org.apache.ambari.server.security.encryption.CredentialStoreService; import org.apache.ambari.server.stack.NoSuchStackException; import org.apache.ambari.server.state.SecurityType; @@ -87,6 +90,9 @@ public class TopologyManagerTest { @TestSubject private TopologyManager topologyManager = new TopologyManager(); + @TestSubject + private TopologyManager topologyManagerReplay = new TopologyManager(); + @Mock(type = MockType.NICE) private Blueprint blueprint; @@ -112,7 +118,7 @@ public class TopologyManagerTest { private RequestStatusResponse requestStatusResponse; @Mock(type = MockType.STRICT) private ExecutorService executor; - @Mock(type = MockType.STRICT) + @Mock(type = MockType.NICE) private PersistedState persistedState; @Mock(type = MockType.NICE) private HostGroup group1; @@ -283,6 +289,7 @@ public class TopologyManagerTest { expect(logicalRequestFactory.createRequest(eq(1L), (TopologyRequest) anyObject(), capture(clusterTopologyCapture))). andReturn(logicalRequest).anyTimes(); expect(logicalRequest.getRequestId()).andReturn(1L).anyTimes(); + expect(logicalRequest.getClusterId()).andReturn(CLUSTER_ID).anyTimes(); expect(logicalRequest.getReservedHosts()).andReturn(Collections.singleton("host1")).anyTimes(); expect(logicalRequest.getRequestStatus()).andReturn(requestStatusResponse).anyTimes(); @@ -314,16 +321,10 @@ public class TopologyManagerTest { expectLastCall().anyTimes(); - expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, - List<LogicalRequest>>emptyMap()).anyTimes(); expect(persistedState.persistTopologyRequest(request)).andReturn(persistedTopologyRequest).anyTimes(); persistedState.persistLogicalRequest(logicalRequest, 1); expectLastCall().anyTimes(); - replay(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory, logicalRequest, - configurationRequest, configurationRequest2, configurationRequest3, requestStatusResponse, executor, - persistedState, securityConfigurationFactory, credentialStoreService, clusterController, resourceProvider, - mockFuture); Class clazz = TopologyManager.class; @@ -333,6 +334,12 @@ public class TopologyManagerTest { EasyMockSupport.injectMocks(topologyManager); + Field f2 = clazz.getDeclaredField("executor"); + f2.setAccessible(true); + f2.set(topologyManagerReplay, executor); + + EasyMockSupport.injectMocks(topologyManagerReplay); + } @After @@ -348,10 +355,134 @@ public class TopologyManagerTest { @Test public void testProvisionCluster() throws Exception { + expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, + List<LogicalRequest>>emptyMap()).anyTimes(); + replayAll(); + topologyManager.provisionCluster(request); //todo: assertions } + @Test + public void testBlueprintRequestCompletion() throws Exception { + List<ShortTaskStatus> tasks = new ArrayList<>(); + ShortTaskStatus t1 = new ShortTaskStatus(); + t1.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t1); + ShortTaskStatus t2 = new ShortTaskStatus(); + t2.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t2); + ShortTaskStatus t3 = new ShortTaskStatus(); + t3.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t3); + + expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); + expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, + List<LogicalRequest>>emptyMap()).anyTimes(); + expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + replayAll(); + topologyManager.provisionCluster(request); + requestFinished(); + Assert.assertTrue(topologyManager.isClusterProvisionWithBlueprintFinished(CLUSTER_ID)); + } + + @Test + public void testBlueprintRequestCompletion__Failure() throws Exception { + List<ShortTaskStatus> tasks = new ArrayList<>(); + ShortTaskStatus t1 = new ShortTaskStatus(); + t1.setStatus(HostRoleStatus.FAILED.toString()); + tasks.add(t1); + ShortTaskStatus t2 = new ShortTaskStatus(); + t2.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t2); + ShortTaskStatus t3 = new ShortTaskStatus(); + t3.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t3); + + expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); + expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, + List<LogicalRequest>>emptyMap()).anyTimes(); + expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + replayAll(); + topologyManager.provisionCluster(request); + requestFinished(); + Assert.assertTrue(topologyManager.isClusterProvisionWithBlueprintFinished(CLUSTER_ID)); + } + + @Test + public void testBlueprintRequestCompletion__InProgress() throws Exception { + List<ShortTaskStatus> tasks = new ArrayList<>(); + ShortTaskStatus t1 = new ShortTaskStatus(); + t1.setStatus(HostRoleStatus.IN_PROGRESS.toString()); + tasks.add(t1); + ShortTaskStatus t2 = new ShortTaskStatus(); + t2.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t2); + ShortTaskStatus t3 = new ShortTaskStatus(); + t3.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t3); + + expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); + expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, + List<LogicalRequest>>emptyMap()).anyTimes(); + expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + replayAll(); + topologyManager.provisionCluster(request); + requestFinished(); + Assert.assertFalse(topologyManager.isClusterProvisionWithBlueprintFinished(CLUSTER_ID)); + } + + @Test + public void testBlueprintRequestCompletion__NoRequest() throws Exception { + TopologyManager tm = new TopologyManager(); + tm.onRequestFinished(new RequestFinishedEvent(CLUSTER_ID, 1)); + Assert.assertFalse(tm.isClusterProvisionWithBlueprintTracked(CLUSTER_ID)); + replayAll(); + } + + @Test + public void testBlueprintRequestCompletion__Replay() throws Exception { + List<ShortTaskStatus> tasks = new ArrayList<>(); + ShortTaskStatus t1 = new ShortTaskStatus(); + t1.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t1); + ShortTaskStatus t2 = new ShortTaskStatus(); + t2.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t2); + ShortTaskStatus t3 = new ShortTaskStatus(); + t3.setStatus(HostRoleStatus.COMPLETED.toString()); + tasks.add(t3); + + Map<ClusterTopology,List<LogicalRequest>> allRequests = new HashMap<>(); + List<LogicalRequest> logicalRequests = new ArrayList<>(); + logicalRequests.add(logicalRequest); + ClusterTopology clusterTopologyMock = EasyMock.createNiceMock(ClusterTopology.class); + expect(clusterTopologyMock.getClusterId()).andReturn(CLUSTER_ID).anyTimes(); + + expect(ambariContext.isTopologyResolved(EasyMock.anyLong())).andReturn(true).anyTimes(); + + allRequests.put(clusterTopologyMock, logicalRequests); + expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes(); + expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes(); + expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); + replayAll(); + EasyMock.replay(clusterTopologyMock); + topologyManagerReplay.getRequest(1L); // calling ensureInitialized indirectly + Assert.assertTrue(topologyManagerReplay.isClusterProvisionWithBlueprintFinished(CLUSTER_ID)); + } + + private void requestFinished() { + topologyManager.onRequestFinished(new RequestFinishedEvent(CLUSTER_ID, 1)); + } + + private void replayAll() { + replay(blueprint, stack, request, group1, group2, ambariContext, logicalRequestFactory, + configurationRequest, configurationRequest2, configurationRequest3, executor, + persistedState, securityConfigurationFactory, credentialStoreService, clusterController, resourceProvider, + mockFuture, requestStatusResponse, logicalRequest); + } + @Test(expected = InvalidTopologyException.class) public void testScaleHosts__alreadyExistingHost() throws InvalidTopologyTemplateException, InvalidTopologyException, AmbariException, NoSuchStackException { HashSet<Map<String, Object>> propertySet = new HashSet<>(); @@ -364,7 +495,10 @@ public class TopologyManagerTest { BlueprintFactory bpfMock = EasyMock.createNiceMock(BlueprintFactory.class); EasyMock.expect(bpfMock.getBlueprint(BLUEPRINT_NAME)).andReturn(blueprint).anyTimes(); ScaleClusterRequest.init(bpfMock); - EasyMock.replay(bpfMock); + replay(bpfMock); + expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology, + List<LogicalRequest>>emptyMap()).anyTimes(); + replayAll(); topologyManager.provisionCluster(request); topologyManager.scaleHosts(new ScaleClusterRequest(propertySet)); Assert.fail("InvalidTopologyException should have been thrown");