Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 8c44cd93c -> 2f6d7fbc9


AMBARI-17053. Add explicit ambari-server log line indicating cluster creation 
complete. (Daniel Gergely via stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2f6d7fbc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2f6d7fbc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2f6d7fbc

Branch: refs/heads/branch-2.4
Commit: 2f6d7fbc9637e9714ed30ae63be3f0e5b92583b7
Parents: 8c44cd9
Author: Daniel Gergely <dgerg...@hortonworks.com>
Authored: Fri Jun 17 06:58:33 2016 +0200
Committer: Toader, Sebastian <stoa...@hortonworks.com>
Committed: Fri Jun 17 06:59:12 2016 +0200

----------------------------------------------------------------------
 .../actionmanager/ActionDBAccessorImpl.java     |   5 +
 .../ambari/server/events/AmbariEvent.java       |   5 +
 .../server/events/RequestFinishedEvent.java     |  44 ++++++
 .../ambari/server/topology/PersistedState.java  |   7 +
 .../server/topology/PersistedStateImpl.java     |  22 +++
 .../ambari/server/topology/TopologyManager.java | 120 ++++++++++++++-
 .../server/topology/TopologyManagerTest.java    | 150 ++++++++++++++++++-
 7 files changed, 344 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 8e6fb3f..0fa5aa9 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -46,6 +46,7 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.events.HostRemovedEvent;
 import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
+import org.apache.ambari.server.events.RequestFinishedEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
@@ -128,6 +129,9 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
   Configuration configuration;
 
   @Inject
+  AmbariEventPublisher ambariEventPublisher;
+
+  @Inject
   AuditLogger auditLogger;
 
   /**
@@ -434,6 +438,7 @@ public class ActionDBAccessorImpl implements 
ActionDBAccessor {
     if (requestEntity != null && requestEntity.getEndTime() == -1L) {
       requestEntity.setEndTime(System.currentTimeMillis());
       requestDAO.merge(requestEntity);
+      ambariEventPublisher.publish(new 
RequestFinishedEvent(requestEntity.getClusterId(), requestId));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
index 1079806..912c441 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
@@ -103,6 +103,11 @@ public abstract class AmbariEvent {
     ACTION_EXECUTION_FINISHED,
 
     /**
+     * Sent when request finishes
+     */
+    REQUEST_FINISHED,
+
+    /**
      * The cluster was renamed.
      */
     CLUSTER_RENAME,

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java
new file mode 100644
index 0000000..603711a
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestFinishedEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events;
+
+/**
+ * The {@link RequestFinishedEvent} is fired when a host is added to a cluster.
+ */
+public class RequestFinishedEvent extends ClusterEvent {
+
+  /**
+   * ID of the request
+   */
+  private long requestId;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterId
+   *          the ID of the cluster.
+   */
+  public RequestFinishedEvent(long clusterId, long requestId) {
+    super(AmbariEventType.REQUEST_FINISHED, clusterId);
+    this.requestId = requestId;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
index 77419d8..41fa032 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java
@@ -70,4 +70,11 @@ public interface PersistedState {
   Map<ClusterTopology, List<LogicalRequest>> getAllRequests();
 
   void registerInTopologyHostInfo(Host host);
+
+  /**
+   * Returns provision request for a cluster
+   * @param clusterId
+   * @return
+   */
+  LogicalRequest getProvisionRequest(long clusterId);
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
index 324a397..ca81418 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java
@@ -143,6 +143,28 @@ public class PersistedStateImpl implements PersistedState {
   }
 
   @Override
+  public LogicalRequest getProvisionRequest(long clusterId) {
+    Collection<TopologyRequestEntity> entities = 
topologyRequestDAO.findByClusterId(clusterId);
+    for (TopologyRequestEntity entity : entities) {
+      if(TopologyRequest.Type.PROVISION == 
TopologyRequest.Type.valueOf(entity.getAction())) {
+        TopologyLogicalRequestEntity logicalRequestEntity = 
entity.getTopologyLogicalRequestEntity();
+        TopologyRequest replayedRequest = new ReplayedTopologyRequest(entity, 
blueprintFactory);
+        try {
+          ClusterTopology clusterTopology = new 
ClusterTopologyImpl(ambariContext, replayedRequest);
+          Long logicalId = logicalRequestEntity.getId();
+          return logicalRequestFactory.createRequest(logicalId, 
replayedRequest, clusterTopology, logicalRequestEntity);
+        } catch (InvalidTopologyException e) {
+          throw new RuntimeException("Failed to construct cluster topology 
while replaying request: " + e, e);
+        } catch (AmbariException e) {
+          throw new RuntimeException("Failed to construct logical request 
during replay: " + e, e);
+        }
+      }
+    }
+
+    return null;
+  }
+
+  @Override
   public Map<ClusterTopology, List<LogicalRequest>> getAllRequests() {
     //todo: we only currently support a single request per ambari instance so 
there should only
     //todo: be a single cluster topology

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 4a1e29e..0190478 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -34,11 +34,14 @@ import java.util.concurrent.Executors;
 
 import javax.inject.Inject;
 
+import com.google.common.eventbus.Subscribe;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import 
org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.controller.internal.ArtifactResourceProvider;
 import org.apache.ambari.server.controller.internal.CalculatedStatus;
 import org.apache.ambari.server.controller.internal.CredentialResourceProvider;
@@ -53,6 +56,9 @@ import 
org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.events.AmbariEvent;
+import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
 import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.state.Host;
@@ -111,7 +117,22 @@ public class TopologyManager {
 
   private final static Logger LOG = 
LoggerFactory.getLogger(TopologyManager.class);
 
-  public TopologyManager() {
+  /**
+   * Stores request that belongs to blueprint creation
+   */
+  private Map<Long, LogicalRequest> 
clusterProvisionWithBlueprintCreateRequests = new HashMap<>();
+  /**
+   * Flag to show whether blueprint is already finished or not. It is used for 
shortcuts.
+   */
+  private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = 
new HashMap<>();
+
+  public TopologyManager(){
+
+  }
+
+  @Inject
+  public void setEventPublisher(AmbariEventPublisher ambariEventPublisher) {
+    ambariEventPublisher.register(this);
   }
 
   @Inject
@@ -133,6 +154,62 @@ public class TopologyManager {
     }
   }
 
+  /**
+   * Called when heartbeat processing finishes
+   * @param event
+   */
+  @Subscribe
+  public void onRequestFinished(RequestFinishedEvent event) {
+    if(event.getType() != AmbariEvent.AmbariEventType.REQUEST_FINISHED
+            || clusterProvisionWithBlueprintCreateRequests.isEmpty()
+            || 
Boolean.TRUE.equals(clusterProvisionWithBlueprintCreationFinished.get(event.getClusterId())))
 {
+      return;
+    }
+
+    if(isClusterProvisionWithBlueprintFinished(event.getClusterId())) {
+      clusterProvisionWithBlueprintCreationFinished.put(event.getClusterId(), 
Boolean.TRUE);
+      LogicalRequest provisionRequest = 
clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId());
+      if(isLogicalRequestSuccessful(provisionRequest)) {
+        LOG.info("Cluster creation request id={} using Blueprint {} 
successfully completed for cluster id={}",
+                
clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()).getRequestId(),
+                
clusterTopologyMap.get(event.getClusterId()).getBlueprint().getName(),
+                event.getClusterId());
+      } else {
+        LOG.info("Cluster creation request id={} using Blueprint {} failed for 
cluster id={}",
+                
clusterProvisionWithBlueprintCreateRequests.get(event.getClusterId()).getRequestId(),
+                
clusterTopologyMap.get(event.getClusterId()).getBlueprint().getName(),
+                event.getClusterId());
+      }
+    }
+  }
+
+  /**
+   * Returns if provision request for a cluster is tracked
+   * @param clusterId
+   * @return
+   */
+  public boolean isClusterProvisionWithBlueprintTracked(long clusterId) {
+    return clusterProvisionWithBlueprintCreateRequests.containsKey(clusterId);
+  }
+
+  /**
+   * Returns if the provision request for a cluster is finished.
+   * Note that this method returns false if the request is not tracked.
+   * See {@link TopologyManager#isClusterProvisionWithBlueprintTracked(long)}
+   * @param clusterId
+   * @return
+   */
+  public boolean isClusterProvisionWithBlueprintFinished(long clusterId) {
+    if(!isClusterProvisionWithBlueprintTracked(clusterId)) {
+      return false; // no blueprint request is running
+    }
+    // shortcut
+    if(clusterProvisionWithBlueprintCreationFinished.containsKey(clusterId) && 
clusterProvisionWithBlueprintCreationFinished.get(clusterId)) {
+      return true;
+    }
+    return 
isLogicalRequestFinished(clusterProvisionWithBlueprintCreateRequests.get(clusterId));
+  }
+
   public RequestStatusResponse provisionCluster(final ProvisionClusterRequest 
request) throws InvalidTopologyException, AmbariException {
     ensureInitialized();
     ClusterTopology topology = new ClusterTopologyImpl(ambariContext, request);
@@ -194,6 +271,7 @@ public class TopologyManager {
     //todo: be tied to cluster state
 
     ambariContext.persistInstallStateForUI(clusterName, stack.getName(), 
stack.getVersion());
+    clusterProvisionWithBlueprintCreateRequests.put(clusterId, logicalRequest);
     return getRequestStatus(logicalRequest.getRequestId());
   }
 
@@ -680,6 +758,13 @@ public class TopologyManager {
     for (Map.Entry<ClusterTopology, List<LogicalRequest>> requestEntry : 
persistedRequests.entrySet()) {
       ClusterTopology topology = requestEntry.getKey();
       clusterTopologyMap.put(topology.getClusterId(), topology);
+      // update provision request cache
+      LogicalRequest provisionRequest = 
persistedState.getProvisionRequest(topology.getClusterId());
+      if(provisionRequest != null) {
+        
clusterProvisionWithBlueprintCreateRequests.put(topology.getClusterId(), 
provisionRequest);
+        
clusterProvisionWithBlueprintCreationFinished.put(topology.getClusterId(),
+                
isLogicalRequestFinished(clusterProvisionWithBlueprintCreateRequests.get(topology.getClusterId())));
+      }
 
       for (LogicalRequest logicalRequest : requestEntry.getValue()) {
         allRequests.put(logicalRequest.getRequestId(), logicalRequest);
@@ -716,6 +801,39 @@ public class TopologyManager {
     }
   }
 
+  /**
+   * @param logicalRequest
+   * @return true if all the tasks in the logical request are in completed 
state, false otherwise
+   */
+  private boolean isLogicalRequestFinished(LogicalRequest logicalRequest) {
+    if(logicalRequest != null) {
+      boolean completed = true;
+      for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) {
+        if(!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) {
+          completed = false;
+        }
+      }
+      return completed;
+    }
+    return false;
+  }
+
+  /**
+   * Returns if all the tasks in the logical request have completed state.
+   * @param logicalRequest
+   * @return
+   */
+  private boolean isLogicalRequestSuccessful(LogicalRequest logicalRequest) {
+    if(logicalRequest != null) {
+      for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) {
+        if(HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) 
{
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
   //todo: this should invoke a callback on each 'service' in the topology
   private void finalizeTopology(TopologyRequest request, ClusterTopology 
topology) {
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/2f6d7fbc/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
index 017eab5..b45c585 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java
@@ -19,10 +19,12 @@
 package org.apache.ambari.server.topology;
 
 import junit.framework.Assert;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.ClusterRequest;
 import org.apache.ambari.server.controller.ConfigurationRequest;
 import org.apache.ambari.server.controller.RequestStatusResponse;
+import org.apache.ambari.server.controller.ShortTaskStatus;
 import org.apache.ambari.server.controller.internal.HostResourceProvider;
 import org.apache.ambari.server.controller.internal.ProvisionClusterRequest;
 import org.apache.ambari.server.controller.internal.ScaleClusterRequest;
@@ -30,6 +32,7 @@ import org.apache.ambari.server.controller.internal.Stack;
 import org.apache.ambari.server.controller.spi.ClusterController;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
+import org.apache.ambari.server.events.RequestFinishedEvent;
 import org.apache.ambari.server.security.encryption.CredentialStoreService;
 import org.apache.ambari.server.stack.NoSuchStackException;
 import org.apache.ambari.server.state.SecurityType;
@@ -87,6 +90,9 @@ public class TopologyManagerTest {
   @TestSubject
   private TopologyManager topologyManager = new TopologyManager();
 
+  @TestSubject
+  private TopologyManager topologyManagerReplay = new TopologyManager();
+
   @Mock(type = MockType.NICE)
   private Blueprint blueprint;
 
@@ -112,7 +118,7 @@ public class TopologyManagerTest {
   private RequestStatusResponse requestStatusResponse;
   @Mock(type = MockType.STRICT)
   private ExecutorService executor;
-  @Mock(type = MockType.STRICT)
+  @Mock(type = MockType.NICE)
   private PersistedState persistedState;
   @Mock(type = MockType.NICE)
   private HostGroup group1;
@@ -283,6 +289,7 @@ public class TopologyManagerTest {
     expect(logicalRequestFactory.createRequest(eq(1L), (TopologyRequest) 
anyObject(), capture(clusterTopologyCapture))).
         andReturn(logicalRequest).anyTimes();
     expect(logicalRequest.getRequestId()).andReturn(1L).anyTimes();
+    expect(logicalRequest.getClusterId()).andReturn(CLUSTER_ID).anyTimes();
     
expect(logicalRequest.getReservedHosts()).andReturn(Collections.singleton("host1")).anyTimes();
     
expect(logicalRequest.getRequestStatus()).andReturn(requestStatusResponse).anyTimes();
 
@@ -314,16 +321,10 @@ public class TopologyManagerTest {
 
     expectLastCall().anyTimes();
 
-    
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
-        List<LogicalRequest>>emptyMap()).anyTimes();
     
expect(persistedState.persistTopologyRequest(request)).andReturn(persistedTopologyRequest).anyTimes();
     persistedState.persistLogicalRequest(logicalRequest, 1);
     expectLastCall().anyTimes();
 
-    replay(blueprint, stack, request, group1, group2, ambariContext, 
logicalRequestFactory, logicalRequest,
-        configurationRequest, configurationRequest2, configurationRequest3, 
requestStatusResponse, executor,
-        persistedState, securityConfigurationFactory, credentialStoreService, 
clusterController, resourceProvider,
-        mockFuture);
 
     Class clazz = TopologyManager.class;
 
@@ -333,6 +334,12 @@ public class TopologyManagerTest {
 
     EasyMockSupport.injectMocks(topologyManager);
 
+    Field f2 = clazz.getDeclaredField("executor");
+    f2.setAccessible(true);
+    f2.set(topologyManagerReplay, executor);
+
+    EasyMockSupport.injectMocks(topologyManagerReplay);
+
   }
 
   @After
@@ -348,10 +355,134 @@ public class TopologyManagerTest {
 
   @Test
   public void testProvisionCluster() throws Exception {
+    
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
+            List<LogicalRequest>>emptyMap()).anyTimes();
+    replayAll();
+
     topologyManager.provisionCluster(request);
     //todo: assertions
   }
 
+  @Test
+  public void testBlueprintRequestCompletion() throws Exception {
+    List<ShortTaskStatus> tasks = new ArrayList<>();
+    ShortTaskStatus t1 = new ShortTaskStatus();
+    t1.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t1);
+    ShortTaskStatus t2 = new ShortTaskStatus();
+    t2.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t2);
+    ShortTaskStatus t3 = new ShortTaskStatus();
+    t3.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t3);
+
+    expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
+    
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
+            List<LogicalRequest>>emptyMap()).anyTimes();
+    
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+    replayAll();
+    topologyManager.provisionCluster(request);
+    requestFinished();
+    
Assert.assertTrue(topologyManager.isClusterProvisionWithBlueprintFinished(CLUSTER_ID));
+  }
+
+  @Test
+  public void testBlueprintRequestCompletion__Failure() throws Exception {
+    List<ShortTaskStatus> tasks = new ArrayList<>();
+    ShortTaskStatus t1 = new ShortTaskStatus();
+    t1.setStatus(HostRoleStatus.FAILED.toString());
+    tasks.add(t1);
+    ShortTaskStatus t2 = new ShortTaskStatus();
+    t2.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t2);
+    ShortTaskStatus t3 = new ShortTaskStatus();
+    t3.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t3);
+
+    expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
+    
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
+            List<LogicalRequest>>emptyMap()).anyTimes();
+    
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+    replayAll();
+    topologyManager.provisionCluster(request);
+    requestFinished();
+    
Assert.assertTrue(topologyManager.isClusterProvisionWithBlueprintFinished(CLUSTER_ID));
+  }
+
+  @Test
+  public void testBlueprintRequestCompletion__InProgress() throws Exception {
+    List<ShortTaskStatus> tasks = new ArrayList<>();
+    ShortTaskStatus t1 = new ShortTaskStatus();
+    t1.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    tasks.add(t1);
+    ShortTaskStatus t2 = new ShortTaskStatus();
+    t2.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t2);
+    ShortTaskStatus t3 = new ShortTaskStatus();
+    t3.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t3);
+
+    expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
+    
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
+            List<LogicalRequest>>emptyMap()).anyTimes();
+    
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+    replayAll();
+    topologyManager.provisionCluster(request);
+    requestFinished();
+    
Assert.assertFalse(topologyManager.isClusterProvisionWithBlueprintFinished(CLUSTER_ID));
+  }
+
+  @Test
+  public void testBlueprintRequestCompletion__NoRequest() throws Exception {
+    TopologyManager tm = new TopologyManager();
+    tm.onRequestFinished(new RequestFinishedEvent(CLUSTER_ID, 1));
+    Assert.assertFalse(tm.isClusterProvisionWithBlueprintTracked(CLUSTER_ID));
+    replayAll();
+  }
+
+  @Test
+  public void testBlueprintRequestCompletion__Replay() throws Exception {
+    List<ShortTaskStatus> tasks = new ArrayList<>();
+    ShortTaskStatus t1 = new ShortTaskStatus();
+    t1.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t1);
+    ShortTaskStatus t2 = new ShortTaskStatus();
+    t2.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t2);
+    ShortTaskStatus t3 = new ShortTaskStatus();
+    t3.setStatus(HostRoleStatus.COMPLETED.toString());
+    tasks.add(t3);
+
+    Map<ClusterTopology,List<LogicalRequest>> allRequests = new HashMap<>();
+    List<LogicalRequest> logicalRequests = new ArrayList<>();
+    logicalRequests.add(logicalRequest);
+    ClusterTopology clusterTopologyMock = 
EasyMock.createNiceMock(ClusterTopology.class);
+    
expect(clusterTopologyMock.getClusterId()).andReturn(CLUSTER_ID).anyTimes();
+
+    
expect(ambariContext.isTopologyResolved(EasyMock.anyLong())).andReturn(true).anyTimes();
+
+    allRequests.put(clusterTopologyMock, logicalRequests);
+    expect(persistedState.getAllRequests()).andReturn(allRequests).anyTimes();
+    
expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes();
+    expect(logicalRequest.hasCompleted()).andReturn(true).anyTimes();
+    expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes();
+    replayAll();
+    EasyMock.replay(clusterTopologyMock);
+    topologyManagerReplay.getRequest(1L); // calling ensureInitialized 
indirectly
+    
Assert.assertTrue(topologyManagerReplay.isClusterProvisionWithBlueprintFinished(CLUSTER_ID));
+  }
+
+  private void requestFinished() {
+    topologyManager.onRequestFinished(new RequestFinishedEvent(CLUSTER_ID, 1));
+  }
+
+  private void replayAll() {
+    replay(blueprint, stack, request, group1, group2, ambariContext, 
logicalRequestFactory,
+            configurationRequest, configurationRequest2, 
configurationRequest3, executor,
+            persistedState, securityConfigurationFactory, 
credentialStoreService, clusterController, resourceProvider,
+            mockFuture, requestStatusResponse, logicalRequest);
+  }
+
   @Test(expected = InvalidTopologyException.class)
   public void testScaleHosts__alreadyExistingHost() throws 
InvalidTopologyTemplateException, InvalidTopologyException, AmbariException, 
NoSuchStackException {
     HashSet<Map<String, Object>> propertySet = new HashSet<>();
@@ -364,7 +495,10 @@ public class TopologyManagerTest {
     BlueprintFactory bpfMock = EasyMock.createNiceMock(BlueprintFactory.class);
     
EasyMock.expect(bpfMock.getBlueprint(BLUEPRINT_NAME)).andReturn(blueprint).anyTimes();
     ScaleClusterRequest.init(bpfMock);
-    EasyMock.replay(bpfMock);
+    replay(bpfMock);
+    
expect(persistedState.getAllRequests()).andReturn(Collections.<ClusterTopology,
+      List<LogicalRequest>>emptyMap()).anyTimes();
+    replayAll();
     topologyManager.provisionCluster(request);
     topologyManager.scaleHosts(new ScaleClusterRequest(propertySet));
     Assert.fail("InvalidTopologyException should have been thrown");

Reply via email to