Repository: ambari
Updated Branches:
  refs/heads/trunk 2ddd7a338 -> 5dea886ef


AMBARI-18569. Execute topology tasks in parallel by hosts. (Attila Doroszlai 
via stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5dea886e
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5dea886e
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5dea886e

Branch: refs/heads/trunk
Commit: 5dea886efde58fb93033f705b5dd43b5ccd72ad6
Parents: 2ddd7a3
Author: Attila Doroszlai <adorosz...@hortonworks.com>
Authored: Fri Nov 11 12:22:04 2016 +0100
Committer: Toader, Sebastian <stoa...@hortonworks.com>
Committed: Fri Nov 11 12:22:04 2016 +0100

----------------------------------------------------------------------
 .../server/configuration/Configuration.java     | 37 ++++++++++++++-
 .../AmbariManagementControllerImpl.java         | 20 ++++----
 .../internal/ComponentResourceProvider.java     |  4 +-
 .../internal/HostComponentResourceProvider.java |  2 +-
 .../internal/ServiceResourceProvider.java       |  4 +-
 .../ambari/server/orm/entities/StackEntity.java | 10 +++-
 .../server/topology/HostOfferResponse.java      | 46 ++++++++++++------
 .../ambari/server/topology/HostRequest.java     | 18 +++----
 .../ambari/server/topology/LogicalRequest.java  | 13 +++---
 .../ambari/server/topology/TopologyManager.java | 49 ++++++++++++++------
 .../server/configuration/ConfigurationTest.java |  6 +--
 11 files changed, 143 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index 9d2243b..15f186b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -100,7 +100,6 @@ import com.google.gson.JsonPrimitive;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
-
 /**
  * The {@link Configuration} class is used to read from the
  * {{ambari.properties}} file and manage/expose the configuration properties.
@@ -2481,6 +2480,22 @@ public class Configuration {
   public static final ConfigurationProperty<Long> LOG4JMONITOR_DELAY = new 
ConfigurationProperty<>(
           "log4j.monitor.delay", TimeUnit.MINUTES.toMillis(5));
 
+  /**
+   * Indicates whether parallel topology task creation is enabled for 
blueprint cluster provisioning.
+   * Defaults to <code>false</code>.
+   * @see #TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT
+   */
+  @Markdown(description = "Indicates whether parallel topology task creation 
is enabled")
+  public static final ConfigurationProperty<Boolean> 
TOPOLOGY_TASK_PARALLEL_CREATION_ENABLED = new 
ConfigurationProperty<>("topology.task.creation.parallel", Boolean.FALSE);
+
+  /**
+   * The number of threads to use for parallel topology task creation in 
blueprint cluster provisioning if enabled.
+   * Defaults to 10.
+   * @see #TOPOLOGY_TASK_PARALLEL_CREATION_ENABLED
+   */
+  @Markdown(description = "The number of threads to use for parallel topology 
task creation if enabled")
+  public static final ConfigurationProperty<Integer> 
TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT = new 
ConfigurationProperty<>("topology.task.creation.parallel.threads", 10);
+
   private static final Logger LOG = LoggerFactory.getLogger(
     Configuration.class);
 
@@ -5168,6 +5183,24 @@ public class Configuration {
   }
 
   /**
+   * @return the number of threads to use for parallel topology task creation 
if enabled
+   */
+  public int getParallelTopologyTaskCreationThreadPoolSize() {
+    try {
+      return 
Integer.parseInt(getProperty(TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT));
+    } catch (NumberFormatException e) {
+      return TOPOLOGY_TASK_PARALLEL_CREATION_THREAD_COUNT.getDefaultValue();
+    }
+  }
+
+  /**
+   * @return true if parallel execution of task creation is enabled explicitly
+   */
+  public boolean isParallelTopologyTaskCreationEnabled() {
+    return 
Boolean.parseBoolean(getProperty(TOPOLOGY_TASK_PARALLEL_CREATION_ENABLED));
+  }
+
+  /**
    * Generates a markdown table which includes:
    * <ul>
    * <li>Property key name</li>

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 09e49ef..b04fdd7 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -2164,16 +2164,16 @@ public class AmbariManagementControllerImpl implements 
AmbariManagementControlle
       commandParams.putAll(commandParamsInp);
     }
 
-    //Propogate HCFS service type info
-    Iterator<Service> it = cluster.getServices().values().iterator();
-    while(it.hasNext()) {
-       ServiceInfo serviceInfoInstance = 
ambariMetaInfo.getService(stackId.getStackName(),stackId.getStackVersion(), 
it.next().getName());
-       LOG.info("Iterating service type Instance in createHostAction:: " + 
serviceInfoInstance.getName());
-       if(serviceInfoInstance.getServiceType() != null) {
-           LOG.info("Adding service type info in createHostAction:: " + 
serviceInfoInstance.getServiceType());
-            commandParams.put("dfs_type",serviceInfoInstance.getServiceType());
-           break;
-       }
+    // Propagate HCFS service type info
+    for (Service service : cluster.getServices().values()) {
+      ServiceInfo serviceInfoInstance = 
ambariMetaInfo.getService(stackId.getStackName(),stackId.getStackVersion(), 
service.getName());
+      LOG.debug("Iterating service type Instance in createHostAction: {}", 
serviceInfoInstance.getName());
+      String serviceType = serviceInfoInstance.getServiceType();
+      if (serviceType != null) {
+        LOG.info("Adding service type info in createHostAction: {}", 
serviceType);
+        commandParams.put("dfs_type", serviceType);
+        break;
+      }
     }
 
     boolean isInstallCommand = roleCommand.equals(RoleCommand.INSTALL);

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
index 241a48f..453c688 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
@@ -266,7 +266,7 @@ public class ComponentResourceProvider extends 
AbstractControllerResourceProvide
   }
 
   // Create the components for the given requests.
-  public synchronized void createComponents(Set<ServiceComponentRequest> 
requests)
+  public void createComponents(Set<ServiceComponentRequest> requests)
       throws AmbariException, AuthorizationException {
 
     if (requests.isEmpty()) {
@@ -473,7 +473,7 @@ public class ComponentResourceProvider extends 
AbstractControllerResourceProvide
   }
 
   // Update the components for the given requests.
-  protected synchronized RequestStatusResponse 
updateComponents(Set<ServiceComponentRequest> requests,
+  protected RequestStatusResponse 
updateComponents(Set<ServiceComponentRequest> requests,
                                                                 Map<String, 
String> requestProperties,
                                                                 boolean 
runSmokeTest)
       throws AmbariException, AuthorizationException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
index 87eb266..c8ec08b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java
@@ -474,7 +474,7 @@ public class HostComponentResourceProvider extends 
AbstractControllerResourcePro
   //todo: This was moved from AmbariManagementController and needs a lot of 
refactoring.
   //todo: Look into using the predicate instead of 
Set<ServiceComponentHostRequest>
   //todo: change to private access when all AMC tests have been moved.
-  protected synchronized RequestStageContainer 
updateHostComponents(RequestStageContainer stages,
+  protected RequestStageContainer updateHostComponents(RequestStageContainer 
stages,
                                                                     
Set<ServiceComponentHostRequest> requests,
                                                                     
Map<String, String> requestProperties,
                                                                     boolean 
runSmokeTest) throws AmbariException, AuthorizationException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
index a08d153..0d5c174 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
@@ -343,7 +343,7 @@ public class ServiceResourceProvider extends 
AbstractControllerResourceProvider
   }
 
   // Create services from the given request.
-  public synchronized void createServices(Set<ServiceRequest> requests)
+  public void createServices(Set<ServiceRequest> requests)
       throws AmbariException, AuthorizationException {
 
     if (requests.isEmpty()) {
@@ -461,7 +461,7 @@ public class ServiceResourceProvider extends 
AbstractControllerResourceProvider
   }
 
   // Update services based on the given requests.
-  protected synchronized RequestStageContainer 
updateServices(RequestStageContainer requestStages, Set<ServiceRequest> 
requests,
+  protected RequestStageContainer updateServices(RequestStageContainer 
requestStages, Set<ServiceRequest> requests,
                                                       Map<String, String> 
requestProperties, boolean runSmokeTest,
                                                       boolean 
reconfigureClients, boolean startDependencies) throws AmbariException, 
AuthorizationException {
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java
index c425969..2cfe07c 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StackEntity.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,6 +24,7 @@ import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
+import javax.persistence.QueryHint;
 import javax.persistence.Table;
 import javax.persistence.TableGenerator;
 import javax.persistence.UniqueConstraint;
@@ -39,7 +40,12 @@ import javax.persistence.UniqueConstraint;
 @TableGenerator(name = "stack_id_generator", table = "ambari_sequences", 
pkColumnName = "sequence_name", valueColumnName = "sequence_value", 
pkColumnValue = "stack_id_seq", initialValue = 0)
 @NamedQueries({
     @NamedQuery(name = "StackEntity.findAll", query = "SELECT stack FROM 
StackEntity stack"),
-    @NamedQuery(name = "StackEntity.findByNameAndVersion", query = "SELECT 
stack FROM StackEntity stack WHERE stack.stackName = :stackName AND 
stack.stackVersion = :stackVersion") })
+    @NamedQuery(name = "StackEntity.findByNameAndVersion", query = "SELECT 
stack FROM StackEntity stack WHERE stack.stackName = :stackName AND 
stack.stackVersion = :stackVersion",
+                hints = {
+                  @QueryHint(name = "eclipselink.query-results-cache", value = 
"true"),
+                  @QueryHint(name = "eclipselink.query-results-cache.size", 
value = "100")
+                })
+})
 public class StackEntity {
 
   @Id

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
index 2932581..495aea6 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,30 +19,37 @@
 
 package org.apache.ambari.server.topology;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
+import java.util.concurrent.Executor;
 
 /**
  * Response to a host offer.
  */
-public class HostOfferResponse {
+final class HostOfferResponse {
+
   public enum Answer {ACCEPTED, DECLINED_PREDICATE, DECLINED_DONE}
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HostOfferResponse.class);
+  static final HostOfferResponse DECLINED_DUE_TO_PREDICATE = new 
HostOfferResponse(Answer.DECLINED_PREDICATE);
+  static final HostOfferResponse DECLINED_DUE_TO_DONE = new 
HostOfferResponse(Answer.DECLINED_DONE);
+
   private final Answer answer;
   private final String hostGroupName;
   private final long hostRequestId;
   private final List<TopologyTask> tasks;
 
-  public HostOfferResponse(Answer answer) {
-    if (answer == Answer.ACCEPTED) {
-      throw new IllegalArgumentException("For accepted response, hostgroup 
name and tasks must be set");
-    }
-    this.answer = answer;
-    this.hostRequestId = -1;
-    this.hostGroupName = null;
-    this.tasks = null;
+  static HostOfferResponse createAcceptedResponse(long hostRequestId, String 
hostGroupName, List<TopologyTask> tasks) {
+    return new HostOfferResponse(Answer.ACCEPTED, hostRequestId, 
hostGroupName, tasks);
+  }
+
+  private HostOfferResponse(Answer answer) {
+    this(answer, -1, null, null);
   }
 
-  public HostOfferResponse(Answer answer, long hostRequestId, String 
hostGroupName, List<TopologyTask> tasks) {
+  private HostOfferResponse(Answer answer, long hostRequestId, String 
hostGroupName, List<TopologyTask> tasks) {
     this.answer = answer;
     this.hostRequestId = hostRequestId;
     this.hostGroupName = hostGroupName;
@@ -63,7 +70,20 @@ public class HostOfferResponse {
     return hostGroupName;
   }
 
-  public List<TopologyTask> getTasks() {
-    return tasks;
+  void executeTasks(Executor executor, final String hostName, final 
ClusterTopology topology, final AmbariContext ambariContext) {
+    if (answer != Answer.ACCEPTED) {
+      LOG.warn("Attempted to execute tasks for declined host offer", answer);
+    } else {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          for (TopologyTask task : tasks) {
+            LOG.info("Running task for accepted host offer for hostname = {}, 
task = {}", hostName, task.getType());
+            task.init(topology, ambariContext);
+            task.run();
+          }
+        }
+      });
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
index 6a65b48..a18999b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -134,15 +134,15 @@ public class HostRequest implements 
Comparable<HostRequest> {
   //todo: synchronization
   public synchronized HostOfferResponse offer(HostImpl host) {
     if (!isOutstanding) {
-      return new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
+      return HostOfferResponse.DECLINED_DUE_TO_DONE;
     }
     if (matchesHost(host)) {
       isOutstanding = false;
       hostname = host.getHostName();
       setHostOnTasks(host);
-      return new HostOfferResponse(HostOfferResponse.Answer.ACCEPTED, id, 
hostGroup.getName(), topologyTasks);
+      return HostOfferResponse.createAcceptedResponse(id, hostGroup.getName(), 
topologyTasks);
     } else {
-      return new 
HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE);
+      return HostOfferResponse.DECLINED_DUE_TO_PREDICATE;
     }
   }
 
@@ -466,6 +466,7 @@ public class HostRequest implements Comparable<HostRequest> 
{
 
     @Override
     public void run() {
+      LOG.info("HostRequest: Executing RESOURCE_CREATION task for host: {}", 
hostname);
       HostGroup group = 
topology.getBlueprint().getHostGroup(getHostgroupName());
       Map<String, Collection<String>> serviceComponents = new HashMap<String, 
Collection<String>>();
       for (String service : group.getServices()) {
@@ -492,6 +493,7 @@ public class HostRequest implements Comparable<HostRequest> 
{
 
     @Override
     public void run() {
+      LOG.info("HostRequest: Executing CONFIGURE task for host: {}", hostname);
       ambariContext.registerHostWithConfigGroup(getHostName(), 
clusterTopology, getHostgroupName());
     }
   }
@@ -517,7 +519,7 @@ public class HostRequest implements Comparable<HostRequest> 
{
 
     @Override
     public void run() {
-      LOG.info("HostRequest.InstallHostTask: Executing INSTALL task for host: 
" + hostname);
+      LOG.info("HostRequest: Executing INSTALL task for host: {}", hostname);
       boolean skipInstallTaskCreate = 
topology.getProvisionAction().equals(ProvisionAction.START_ONLY);
       RequestStatusResponse response = clusterTopology.installHost(hostname, 
skipInstallTaskCreate, skipFailure);
       // map logical install tasks to physical install tasks
@@ -543,7 +545,7 @@ public class HostRequest implements Comparable<HostRequest> 
{
         }
       }
 
-      LOG.info("HostRequest.InstallHostTask: Exiting INSTALL task for host: " 
+ hostname);
+      LOG.info("HostRequest: Exiting INSTALL task for host: {}", hostname);
     }
   }
 
@@ -568,7 +570,7 @@ public class HostRequest implements Comparable<HostRequest> 
{
 
     @Override
     public void run() {
-      LOG.info("HostRequest.StartHostTask: Executing START task for host: " + 
hostname);
+      LOG.info("HostRequest: Executing START task for host: {}", hostname);
       RequestStatusResponse response = clusterTopology.startHost(hostname, 
skipFailure);
       // map logical install tasks to physical install tasks
       List<ShortTaskStatus> underlyingTasks = response.getTasks();
@@ -591,7 +593,7 @@ public class HostRequest implements Comparable<HostRequest> 
{
         }
       }
 
-      LOG.info("HostRequest.StartHostTask: Exiting START task for host: " + 
hostname);
+      LOG.info("HostRequest: Exiting START task for host: {}", hostname);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 3aaf589..0039e35 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -121,7 +121,7 @@ public class LogicalRequest extends Request {
       //todo: prioritization of master host requests
       Iterator<HostRequest> hostRequestIterator = 
outstandingHostRequests.iterator();
       while (hostRequestIterator.hasNext()) {
-        LOG.info("LogicalRequest.offer: attempting to match a request to a 
request for a reserved host to hostname = {}", host.getHostName());
+        LOG.info("LogicalRequest.offer: attempting to match a request to a 
request for a non-reserved host to hostname = {}", host.getHostName());
         HostOfferResponse response = hostRequestIterator.next().offer(host);
         switch (response.getAnswer()) {
           case ACCEPTED:
@@ -132,23 +132,22 @@ public class LogicalRequest extends Request {
             //todo: should have been done on ACCEPT
             hostRequestIterator.remove();
             LOG.info("LogicalRequest.offer: host request returned 
DECLINED_DONE for hostname = {}, host request has been removed from list", 
host.getHostName());
+            break;
           case DECLINED_PREDICATE:
             LOG.info("LogicalRequest.offer: host request returned 
DECLINED_PREDICATE for hostname = {}", host.getHostName());
             predicateRejected = true;
+            break;
         }
       }
 
       LOG.info("LogicalRequest.offer: outstandingHost request list size = " + 
outstandingHostRequests.size());
     }
 
-
-
-
     // if at least one outstanding host request rejected for predicate or we 
have an outstanding request
     // with a reserved host decline due to predicate, otherwise decline due to 
all hosts being resolved
     return predicateRejected || ! requestsWithReservedHosts.isEmpty() ?
-        new HostOfferResponse(HostOfferResponse.Answer.DECLINED_PREDICATE) :
-        new HostOfferResponse(HostOfferResponse.Answer.DECLINED_DONE);
+            HostOfferResponse.DECLINED_DUE_TO_PREDICATE :
+            HostOfferResponse.DECLINED_DUE_TO_DONE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index bba0325..341633e 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -38,6 +39,7 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import 
org.apache.ambari.server.api.services.stackadvisor.StackAdvisorBlueprintProcessor;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.RequestStatusResponse;
 import org.apache.ambari.server.controller.ShortTaskStatus;
@@ -86,7 +88,9 @@ public class TopologyManager {
   private static final String 
CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = 
"cluster_configure_task_timeout";
 
   private PersistedState persistedState;
-  private ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final Executor taskExecutor; // executes TopologyTasks
+  private final boolean parallelTaskCreationEnabled;
   private Collection<String> hostsToIgnore = new HashSet<String>();
   private final List<HostImpl> availableHosts = new LinkedList<HostImpl>();
   private final Map<String, LogicalRequest> reservedHosts = new 
HashMap<String, LogicalRequest>();
@@ -127,8 +131,18 @@ public class TopologyManager {
    */
   private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = 
new HashMap<>();
 
-  public TopologyManager(){
+  public TopologyManager() {
+    parallelTaskCreationEnabled = false;
+    taskExecutor = executor;
+  }
 
+  @Inject
+  public TopologyManager(Configuration configuration) {
+    int threadPoolSize = 
configuration.getParallelTopologyTaskCreationThreadPoolSize();
+    parallelTaskCreationEnabled = 
configuration.isParallelTopologyTaskCreationEnabled() && threadPoolSize > 1;
+    taskExecutor = parallelTaskCreationEnabled
+      ? Executors.newFixedThreadPool(threadPoolSize)
+      : executor;
   }
 
   @Inject
@@ -691,7 +705,7 @@ public class TopologyManager {
     return logicalRequest;
   }
 
-  private void processAcceptedHostOffer(ClusterTopology topology, final 
HostOfferResponse response, final HostImpl host) {
+  private void processAcceptedHostOffer(final ClusterTopology topology, final 
HostOfferResponse response, final HostImpl host) {
     final String hostName = host.getHostName();
     try {
       topology.addHostToTopology(response.getHostGroupName(), hostName);
@@ -722,19 +736,24 @@ public class TopologyManager {
       throw new RuntimeException(e);
     }
 
-
-    LOG.info("TopologyManager.processAcceptedHostOffer: about to execute tasks 
for host = {}",
-        hostName);
-
-    for (TopologyTask task : response.getTasks()) {
-      LOG.info("Processing accepted host offer for {} which responded {} and 
task {}",
-          hostName, response.getAnswer(), task.getType());
-
-      task.init(topology, ambariContext);
-      executor.execute(task);
+    LOG.info("TopologyManager.processAcceptedHostOffer: queue tasks for host = 
{} which responded {}", hostName, response.getAnswer());
+    if (parallelTaskCreationEnabled) {
+      executor.execute(new Runnable() { // do not start until cluster config 
done
+        @Override
+        public void run() {
+          queueHostTasks(topology, response, hostName);
+        }
+      });
+    } else {
+      queueHostTasks(topology, response, hostName);
     }
   }
 
+  private void queueHostTasks(ClusterTopology topology, HostOfferResponse 
response, String hostName) {
+    LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for 
host = {}", hostName);
+    response.executeTasks(taskExecutor, hostName, topology, ambariContext);
+  }
+
   private void updateHostWithRackInfo(ClusterTopology topology, 
HostOfferResponse response, HostImpl host) {
     // the rack info from the cluster creation template
     String rackInfoFromTemplate = 
topology.getHostGroupInfo().get(response.getHostGroupName()).getHostRackInfo().get
@@ -878,7 +897,7 @@ public class TopologyManager {
     }
 
     ConfigureClusterTask configureClusterTask = new 
ConfigureClusterTask(topology, configurationRequest);
-    AsyncCallableService<Boolean> asyncCallableService = new 
AsyncCallableService(configureClusterTask, timeout, delay,
+    AsyncCallableService<Boolean> asyncCallableService = new 
AsyncCallableService<>(configureClusterTask, timeout, delay,
         Executors.newScheduledThreadPool(1));
 
     executor.submit(asyncCallableService);

http://git-wip-us.apache.org/repos/asf/ambari/blob/5dea886e/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index f90cf76..b0bcc58 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -1039,8 +1039,6 @@ public class ConfigurationTest {
   /**
    * Tests that every {@link ConfigurationProperty} field in
    * {@link Configuration} has a property {@link Markdown} annotation.
-   *
-   * @throws Exception
    */
   @Test
   public void testAllPropertiesHaveMarkdownDescriptions() throws Exception {
@@ -1052,9 +1050,9 @@ public class ConfigurationTest {
 
       ConfigurationProperty<?> configurationProperty = 
(ConfigurationProperty<?>) field.get(null);
       Markdown markdown = field.getAnnotation(Markdown.class);
-      if( null == markdown ){
+      if (null == markdown) {
         ConfigurationMarkdown configMarkdown = 
field.getAnnotation(ConfigurationMarkdown.class);
-        markdown = configMarkdown.markdown();
+        markdown = configMarkdown != null ? configMarkdown.markdown() : null;
       }
 
       Assert.assertNotNull("The configuration property " + 
configurationProperty.getKey()

Reply via email to