This is an automated email from the ASF dual-hosted git repository. mpapirkovskyy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 8b3af69 AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (#1850) 8b3af69 is described below commit 8b3af69c1710fb27114fa31acff10c8fc96c0b52 Author: Myroslav Papirkovskyi <mpapirkovs...@apache.org> AuthorDate: Fri Oct 12 13:14:00 2018 +0300 AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (#1850) * AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (mpapirkovskyy) * AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (mpapirkovskyy) --- .../ambari/server/AmbariRuntimeException.java | 4 ++ .../apache/ambari/server/actionmanager/Stage.java | 5 ++- .../controller/AmbariManagementControllerImpl.java | 1 - .../HostComponentsUpdateListener.java | 1 - .../listeners/requests/STOMPUpdateListener.java | 3 +- .../listeners/services/ServiceUpdateListener.java | 2 +- .../listeners/upgrade/UpgradeUpdateListener.java | 2 +- .../events/publishers/STOMPUpdatePublisher.java | 52 +++++++++++++++++----- .../metrics/system/impl/MetricsServiceImpl.java | 3 +- .../svccomphost/ServiceComponentHostImpl.java | 20 ++++----- 10 files changed, 65 insertions(+), 28 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java b/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java index c6a20eb..b26f106 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java @@ -25,4 +25,8 @@ public class AmbariRuntimeException extends RuntimeException { public AmbariRuntimeException(String message, Throwable cause) { super(message, cause); } + + public AmbariRuntimeException(String message) { + super(message); + } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java index 3740674..eab50d4f 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java @@ -224,11 +224,12 @@ public class Stage { void loadExecutionCommandWrappers() { for (Map.Entry<String, Map<String, HostRoleCommand>> hostRoleCommandEntry : hostRoleCommands.entrySet()) { String hostname = hostRoleCommandEntry.getKey(); - commandsToSend.put(hostname, new ArrayList<>()); + List<ExecutionCommandWrapper> wrappers = new ArrayList<>(); Map<String, HostRoleCommand> roleCommandMap = hostRoleCommandEntry.getValue(); for (Map.Entry<String, HostRoleCommand> roleCommandEntry : roleCommandMap.entrySet()) { - commandsToSend.get(hostname).add(roleCommandEntry.getValue().getExecutionCommandWrapper()); + wrappers.add(roleCommandEntry.getValue().getExecutionCommandWrapper()); } + commandsToSend.put(hostname, wrappers); } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index 681da12..cffed9e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -2590,7 +2590,6 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle } Map<String, String> hostParams = new TreeMap<>(); - hostParams.putAll(getRcaParameters()); if (roleCommand.equals(RoleCommand.INSTALL)) { List<ServiceOsSpecific.Package> packages = diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java index feda69d..4ecb00c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java @@ -49,7 +49,6 @@ public class HostComponentsUpdateListener { public HostComponentsUpdateListener(AmbariEventPublisher ambariEventPublisher, STOMPUpdatePublisher STOMPUpdatePublisher) { ambariEventPublisher.register(this); - STOMPUpdatePublisher.register(this); this.STOMPUpdatePublisher = STOMPUpdatePublisher; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java index fde7854..8492156 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java @@ -41,7 +41,8 @@ public class STOMPUpdateListener { public STOMPUpdateListener(Injector injector, Set<STOMPEvent.Type> typesToProcess) { STOMPUpdatePublisher STOMPUpdatePublisher = injector.getInstance(STOMPUpdatePublisher.class); - STOMPUpdatePublisher.register(this); + STOMPUpdatePublisher.registerAgent(this); + STOMPUpdatePublisher.registerAPI(this); this.typesToProcess = typesToProcess == null ? Collections.emptySet() : typesToProcess; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java index 0cdb34f..c35cc08 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java @@ -57,7 +57,7 @@ public class ServiceUpdateListener { @Inject public ServiceUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher, AmbariEventPublisher ambariEventPublisher) { - STOMPUpdatePublisher.register(this); + STOMPUpdatePublisher.registerAPI(this); ambariEventPublisher.register(this); this.STOMPUpdatePublisher = STOMPUpdatePublisher; diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java index a39ed8a..d6dd89b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java @@ -48,7 +48,7 @@ public class UpgradeUpdateListener { @Inject public UpgradeUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher, AmbariEventPublisher ambariEventPublisher) { - STOMPUpdatePublisher.register(this); + STOMPUpdatePublisher.registerAPI(this); this.STOMPUpdatePublisher = STOMPUpdatePublisher; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java index 99a03d6..073531b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java @@ -17,8 +17,11 @@ */ package org.apache.ambari.server.events.publishers; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.ambari.server.AmbariRuntimeException; +import org.apache.ambari.server.events.DefaultMessageEmitter; import org.apache.ambari.server.events.HostComponentsUpdateEvent; import org.apache.ambari.server.events.RequestUpdateEvent; import org.apache.ambari.server.events.STOMPEvent; @@ -26,13 +29,15 @@ import org.apache.ambari.server.events.ServiceUpdateEvent; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.google.inject.Singleton; @Singleton public class STOMPUpdatePublisher { - private final EventBus m_eventBus; + private final EventBus agentEventBus; + private final EventBus apiEventBus; @Inject private RequestUpdateEventPublisher requestUpdateEventPublisher; @@ -43,24 +48,51 @@ public class STOMPUpdatePublisher { @Inject private ServiceUpdateEventPublisher serviceUpdateEventPublisher; - public STOMPUpdatePublisher() { - m_eventBus = new AsyncEventBus("ambari-update-bus", - Executors.newSingleThreadExecutor()); + private final ExecutorService threadPoolExecutorAgent = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build()); + private final ExecutorService threadPoolExecutorAPI = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("stomp-api-bus-%d").build()); + + public STOMPUpdatePublisher() throws NoSuchFieldException, IllegalAccessException { + agentEventBus = new AsyncEventBus("agent-update-bus", + threadPoolExecutorAgent); + + apiEventBus = new AsyncEventBus("api-update-bus", + threadPoolExecutorAPI); } public void publish(STOMPEvent event) { + if (DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) { + publishAgent(event); + } else if (DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES.contains(event.getType())) { + publishAPI(event); + } else { + // TODO need better solution + throw new AmbariRuntimeException("Event with type {" + event.getType() + "} can not be published."); + } + } + + private void publishAPI(STOMPEvent event) { if (event.getType().equals(STOMPEvent.Type.REQUEST)) { - requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus); + requestUpdateEventPublisher.publish((RequestUpdateEvent) event, apiEventBus); } else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) { - hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus); + hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, apiEventBus); } else if (event.getType().equals(STOMPEvent.Type.SERVICE)) { - serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, m_eventBus); + serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, apiEventBus); } else { - m_eventBus.post(event); + apiEventBus.post(event); } } - public void register(Object object) { - m_eventBus.register(object); + private void publishAgent(STOMPEvent event) { + agentEventBus.post(event); + } + + public void registerAgent(Object object) { + agentEventBus.register(object); + } + + public void registerAPI(Object object) { + apiEventBus.register(object); } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java index 37a7082..03f67ad 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java @@ -116,7 +116,8 @@ public class MetricsServiceImpl implements MetricsService { src.init(MetricsConfiguration.getSubsetConfiguration(configuration, "source." + sourceName + "."), sink); sources.put(sourceName, src); if (src instanceof StompEventsMetricsSource) { - STOMPUpdatePublisher.register(src); + STOMPUpdatePublisher.registerAPI(src); + STOMPUpdatePublisher.registerAgent(src); } src.start(); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java index 01e214d..441d557 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java @@ -1252,6 +1252,16 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { LOG.error("Could not determine stale config", e); } + try { + Cluster cluster = clusters.getCluster(clusterName); + ServiceComponent serviceComponent = cluster.getService(serviceName).getServiceComponent(serviceComponentName); + ServiceComponentHost sch = serviceComponent.getServiceComponentHost(hostName); + String refreshConfigsCommand = helper.getRefreshConfigsCommand(cluster,sch); + r.setReloadConfig(refreshConfigsCommand != null); + } catch (Exception e) { + LOG.error("Could not determine reload config flag", e); + } + return r; } @@ -1279,16 +1289,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost { r.setStaleConfig(false); } - try { - Cluster cluster = clusters.getCluster(clusterName); - ServiceComponent serviceComponent = cluster.getService(serviceName).getServiceComponent(serviceComponentName); - ServiceComponentHost sch = serviceComponent.getServiceComponentHost(hostName); - String refreshConfigsCommand = helper.getRefreshConfigsCommand(cluster,sch); - r.setReloadConfig(refreshConfigsCommand != null); - } catch (Exception e) { - LOG.error("Could not determine reload config flag", e); - } - return r; }