This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 9719e54  AMBARI-24291. Start All Services on 100-nodes cluster timed 
out after 1 hour. (#2449)
9719e54 is described below

commit 9719e54fa944f48f04fa401c9c8a9669bf94b620
Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org>
AuthorDate: Fri Oct 19 15:04:10 2018 +0300

    AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 
hour. (#2449)
    
    * AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 
hour. (mpapirkovskyy)
    
    * AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 
hour. (mpapirkovskyy)
---
 .../ambari/server/AmbariRuntimeException.java      |  4 ++
 .../apache/ambari/server/actionmanager/Stage.java  |  5 ++-
 .../controller/AmbariManagementControllerImpl.java |  1 -
 .../HostComponentsUpdateListener.java              |  1 -
 .../listeners/requests/STOMPUpdateListener.java    |  3 +-
 .../listeners/services/ServiceUpdateListener.java  |  2 +-
 .../listeners/upgrade/UpgradeUpdateListener.java   |  2 +-
 .../events/publishers/STOMPUpdatePublisher.java    | 52 +++++++++++++++++-----
 .../metrics/system/impl/MetricsServiceImpl.java    |  3 +-
 .../svccomphost/ServiceComponentHostImpl.java      | 20 ++++-----
 10 files changed, 65 insertions(+), 28 deletions(-)

diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
index c6a20eb..b26f106 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java
@@ -25,4 +25,8 @@ public class AmbariRuntimeException extends RuntimeException {
   public AmbariRuntimeException(String message, Throwable cause) {
     super(message, cause);
   }
+
+  public AmbariRuntimeException(String message) {
+    super(message);
+  }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
index b88275a..420c04b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
@@ -224,11 +224,12 @@ public class Stage {
   void loadExecutionCommandWrappers() {
     for (Map.Entry<String, Map<String, HostRoleCommand>> hostRoleCommandEntry 
: hostRoleCommands.entrySet()) {
       String hostname = hostRoleCommandEntry.getKey();
-      commandsToSend.put(hostname, new ArrayList<>());
+      List<ExecutionCommandWrapper> wrappers = new ArrayList<>();
       Map<String, HostRoleCommand> roleCommandMap = 
hostRoleCommandEntry.getValue();
       for (Map.Entry<String, HostRoleCommand> roleCommandEntry : 
roleCommandMap.entrySet()) {
-        
commandsToSend.get(hostname).add(roleCommandEntry.getValue().getExecutionCommandWrapper());
+        wrappers.add(roleCommandEntry.getValue().getExecutionCommandWrapper());
       }
+      commandsToSend.put(hostname, wrappers);
     }
   }
 
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 4927c34..a98891f 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -2554,7 +2554,6 @@ public class AmbariManagementControllerImpl implements 
AmbariManagementControlle
     }
 
     Map<String, String> hostParams = new TreeMap<>();
-    hostParams.putAll(getRcaParameters());
 
     if (roleCommand.equals(RoleCommand.INSTALL)) {
       List<ServiceOsSpecific.Package> packages =
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
index feda69d..4ecb00c 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java
@@ -49,7 +49,6 @@ public class HostComponentsUpdateListener {
   public HostComponentsUpdateListener(AmbariEventPublisher 
ambariEventPublisher,
                                       STOMPUpdatePublisher 
STOMPUpdatePublisher) {
     ambariEventPublisher.register(this);
-    STOMPUpdatePublisher.register(this);
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;
   }
 
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
index fde7854..8492156 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java
@@ -41,7 +41,8 @@ public class STOMPUpdateListener {
   public STOMPUpdateListener(Injector injector, Set<STOMPEvent.Type> 
typesToProcess) {
     STOMPUpdatePublisher STOMPUpdatePublisher =
       injector.getInstance(STOMPUpdatePublisher.class);
-    STOMPUpdatePublisher.register(this);
+    STOMPUpdatePublisher.registerAgent(this);
+    STOMPUpdatePublisher.registerAPI(this);
     this.typesToProcess = typesToProcess == null ? Collections.emptySet() : 
typesToProcess;
   }
 
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
index 0cdb34f..c35cc08 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java
@@ -57,7 +57,7 @@ public class ServiceUpdateListener {
 
   @Inject
   public ServiceUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher, 
AmbariEventPublisher ambariEventPublisher) {
-    STOMPUpdatePublisher.register(this);
+    STOMPUpdatePublisher.registerAPI(this);
     ambariEventPublisher.register(this);
 
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
index a39ed8a..d6dd89b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
@@ -48,7 +48,7 @@ public class UpgradeUpdateListener {
 
   @Inject
   public UpgradeUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher, 
AmbariEventPublisher ambariEventPublisher) {
-    STOMPUpdatePublisher.register(this);
+    STOMPUpdatePublisher.registerAPI(this);
 
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;
   }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
index 99a03d6..073531b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java
@@ -17,8 +17,11 @@
  */
 package org.apache.ambari.server.events.publishers;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.ambari.server.AmbariRuntimeException;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
 import org.apache.ambari.server.events.RequestUpdateEvent;
 import org.apache.ambari.server.events.STOMPEvent;
@@ -26,13 +29,15 @@ import org.apache.ambari.server.events.ServiceUpdateEvent;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 @Singleton
 public class STOMPUpdatePublisher {
 
-  private final EventBus m_eventBus;
+  private final EventBus agentEventBus;
+  private final EventBus apiEventBus;
 
   @Inject
   private RequestUpdateEventPublisher requestUpdateEventPublisher;
@@ -43,24 +48,51 @@ public class STOMPUpdatePublisher {
   @Inject
   private ServiceUpdateEventPublisher serviceUpdateEventPublisher;
 
-  public STOMPUpdatePublisher() {
-    m_eventBus = new AsyncEventBus("ambari-update-bus",
-      Executors.newSingleThreadExecutor());
+  private final ExecutorService threadPoolExecutorAgent = 
Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build());
+  private final ExecutorService threadPoolExecutorAPI = 
Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("stomp-api-bus-%d").build());
+
+  public STOMPUpdatePublisher() throws NoSuchFieldException, 
IllegalAccessException {
+    agentEventBus = new AsyncEventBus("agent-update-bus",
+        threadPoolExecutorAgent);
+
+    apiEventBus = new AsyncEventBus("api-update-bus",
+        threadPoolExecutorAPI);
   }
 
   public void publish(STOMPEvent event) {
+    if 
(DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
+      publishAgent(event);
+    } else if 
(DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES.contains(event.getType())) {
+      publishAPI(event);
+    } else {
+      // TODO need better solution
+      throw new AmbariRuntimeException("Event with type {" + event.getType() + 
"} can not be published.");
+    }
+  }
+
+  private void publishAPI(STOMPEvent event) {
     if (event.getType().equals(STOMPEvent.Type.REQUEST)) {
-      requestUpdateEventPublisher.publish((RequestUpdateEvent) event, 
m_eventBus);
+      requestUpdateEventPublisher.publish((RequestUpdateEvent) event, 
apiEventBus);
     } else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) {
-      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) 
event, m_eventBus);
+      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) 
event, apiEventBus);
     } else if (event.getType().equals(STOMPEvent.Type.SERVICE)) {
-      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, 
m_eventBus);
+      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, 
apiEventBus);
     } else {
-      m_eventBus.post(event);
+      apiEventBus.post(event);
     }
   }
 
-  public void register(Object object) {
-    m_eventBus.register(object);
+  private void publishAgent(STOMPEvent event) {
+    agentEventBus.post(event);
+  }
+
+  public void registerAgent(Object object) {
+    agentEventBus.register(object);
+  }
+
+  public void registerAPI(Object object) {
+    apiEventBus.register(object);
   }
 }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
index 37a7082..03f67ad 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java
@@ -116,7 +116,8 @@ public class MetricsServiceImpl implements MetricsService {
         src.init(MetricsConfiguration.getSubsetConfiguration(configuration, 
"source." + sourceName + "."), sink);
         sources.put(sourceName, src);
         if (src instanceof StompEventsMetricsSource) {
-          STOMPUpdatePublisher.register(src);
+          STOMPUpdatePublisher.registerAPI(src);
+          STOMPUpdatePublisher.registerAgent(src);
         }
         src.start();
       }
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index 8006e53..7801f0d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -1280,6 +1280,16 @@ public class ServiceComponentHostImpl implements 
ServiceComponentHost {
       LOG.error("Could not determine stale config", e);
     }
 
+    try {
+      Cluster cluster = clusters.getCluster(clusterName);
+      ServiceComponent serviceComponent = 
cluster.getService(serviceName).getServiceComponent(serviceComponentName);
+      ServiceComponentHost sch = 
serviceComponent.getServiceComponentHost(hostName);
+      String refreshConfigsCommand = 
helper.getRefreshConfigsCommand(cluster,sch);
+      r.setReloadConfig(refreshConfigsCommand != null);
+    } catch (Exception e) {
+      LOG.error("Could not determine reload config flag", e);
+    }
+
     return r;
   }
 
@@ -1307,16 +1317,6 @@ public class ServiceComponentHostImpl implements 
ServiceComponentHost {
       r.setStaleConfig(false);
     }
 
-    try {
-      Cluster cluster = clusters.getCluster(clusterName);
-      ServiceComponent serviceComponent = 
cluster.getService(serviceName).getServiceComponent(serviceComponentName);
-      ServiceComponentHost sch = 
serviceComponent.getServiceComponentHost(hostName);
-      String refreshConfigsCommand = 
helper.getRefreshConfigsCommand(cluster,sch);
-      r.setReloadConfig(refreshConfigsCommand != null);
-    } catch (Exception e) {
-      LOG.error("Could not determine reload config flag", e);
-    }
-
     return r;
   }
 

Reply via email to