AMBARI-17458 : Add support for distributed collector to Ambari REST API. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0855174b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0855174b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0855174b Branch: refs/heads/branch-dev-patch-upgrade Commit: 0855174b815987850d0022ff3c1619c3b2373921 Parents: a33e2b6 Author: Aravindan Vijayan <avija...@hortonworks.com> Authored: Tue Sep 13 15:21:49 2016 -0700 Committer: Aravindan Vijayan <avija...@hortonworks.com> Committed: Tue Sep 13 15:22:38 2016 -0700 ---------------------------------------------------------------------- .../controller/AmbariManagementController.java | 8 + .../AmbariManagementControllerImpl.java | 8 + .../internal/AbstractProviderModule.java | 120 ++--------- .../controller/internal/HostStatusHelper.java | 88 ++++++++ .../metrics/MetricsCollectorHAClusterState.java | 211 +++++++++++++++++++ .../metrics/MetricsCollectorHAManager.java | 106 ++++++++++ .../metrics/timeline/AMSPropertyProvider.java | 62 ++++-- .../timeline/AMSReportPropertyProvider.java | 15 +- .../metrics/timeline/MetricsRequestHelper.java | 5 +- .../timeline/cache/TimelineMetricCache.java | 10 + .../ambari/server/events/AmbariEvent.java | 7 +- .../events/MetricsCollectorHostDownEvent.java | 47 +++++ .../0.1.0/package/scripts/metrics_collector.py | 2 +- .../0.1.0/package/scripts/metrics_grafana.py | 2 +- .../0.1.0/package/scripts/metrics_monitor.py | 2 +- .../0.1.0/package/scripts/status.py | 4 +- 16 files changed, 561 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java index 9da6fd4..746bca4 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java @@ -32,6 +32,7 @@ import org.apache.ambari.server.controller.internal.DeleteStatusMetaData; import org.apache.ambari.server.controller.internal.RequestStageContainer; import org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; +import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.events.AmbariEvent; import org.apache.ambari.server.events.publishers.AmbariEventPublisher; @@ -889,5 +890,12 @@ public interface AmbariManagementController { */ AmbariEventPublisher getAmbariEventPublisher(); + /** + * Gets an {@link MetricsCollectorHAManager} which can be used to get/add collector host for a cluster + * + * @return {@link MetricsCollectorHAManager} + */ + MetricsCollectorHAManager getMetricsCollectorHAManager(); + } http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java index 1d82928..90d2162 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java @@ -98,6 +98,7 @@ import org.apache.ambari.server.controller.internal.WidgetLayoutResourceProvider import org.apache.ambari.server.controller.internal.WidgetResourceProvider; import org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; +import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager; import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.customactions.ActionDefinition; @@ -287,6 +288,8 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle private ClusterVersionDAO clusterVersionDAO; @Inject private AmbariEventPublisher ambariEventPublisher; + @Inject + private MetricsCollectorHAManager metricsCollectorHAManager; private MaintenanceStateHelper maintenanceStateHelper; @@ -5066,6 +5069,11 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle return properties; } + @Override + public MetricsCollectorHAManager getMetricsCollectorHAManager() { + return injector.getInstance(MetricsCollectorHAManager.class); + } + /** * Validates that the authenticated user can set a service's (run-as) user and group. * <p/> http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java index b6cbed5..5d462c5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java @@ -33,20 +33,15 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import com.google.inject.Injector; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.Role; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.AmbariServer; -import org.apache.ambari.server.controller.HostRequest; -import org.apache.ambari.server.controller.HostResponse; -import org.apache.ambari.server.controller.ServiceComponentHostRequest; -import org.apache.ambari.server.controller.ServiceComponentHostResponse; import org.apache.ambari.server.controller.jmx.JMXHostProvider; -import org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricHostProvider; import org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory; +import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager; import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider; import org.apache.ambari.server.controller.metrics.MetricsServiceProvider; @@ -70,9 +65,7 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.DesiredConfig; -import org.apache.ambari.server.state.HostState; import org.apache.ambari.server.state.Service; -import org.apache.ambari.server.state.State; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +107,6 @@ public abstract class AbstractProviderModule implements ProviderModule, private volatile Map<String, Map<String, Map<String, String>>> jmxDesiredRpcSuffixes = new HashMap<String, Map<String, Map<String,String>>>(); private volatile Map<String, String> clusterHdfsSiteConfigVersionMap = new HashMap<String, String>(); private volatile Map<String, String> clusterJmxProtocolMap = new ConcurrentHashMap<>(); - private volatile Set<String> metricServerHosts = new HashSet<String>(); private volatile String clusterMetricServerPort = null; private volatile String clusterMetricServerVipPort = null; private volatile String clusterMetricserverVipHost = null; @@ -233,6 +225,9 @@ public abstract class AbstractProviderModule implements ProviderModule, @Inject TimelineMetricCacheProvider metricCacheProvider; + @Inject + MetricsCollectorHAManager metricsCollectorHAManager; + /** * A factory used to retrieve Guice-injected instances of a metric * {@link PropertyProvider}. @@ -260,11 +255,6 @@ public abstract class AbstractProviderModule implements ProviderModule, private Map<String, String> clusterGangliaCollectorMap; /** - * The host name of Metrics collector. - */ - private Map<String, String> clusterMetricCollectorMap; - - /** * JMX ports read from the configs */ private final Map<String, ConcurrentMap<String, ConcurrentMap<String, String>> >jmxPortMap = @@ -298,6 +288,10 @@ public abstract class AbstractProviderModule implements ProviderModule, eventPublisher = managementController.getAmbariEventPublisher(); eventPublisher.register(this); } + + if (null == metricsCollectorHAManager && null != managementController) { + metricsCollectorHAManager = managementController.getMetricsCollectorHAManager(); + } } @@ -348,7 +342,7 @@ public abstract class AbstractProviderModule implements ProviderModule, LOG.error("Exception during checkInit.", e); } - if (!clusterMetricCollectorMap.isEmpty()) { + if (!metricsCollectorHAManager.isEmpty()) { return TIMELINE_METRICS; } else if (!clusterGangliaCollectorMap.isEmpty()) { return GANGLIA; @@ -411,27 +405,13 @@ public abstract class AbstractProviderModule implements ProviderModule, //If vip config not present // If current collector host is null or if the host or the host component not live // Update clusterMetricCollectorMap with a live metric collector host. + String currentCollectorHost = null; if (!vipHostConfigPresent) { - String currentCollectorHost = clusterMetricCollectorMap.get(clusterName); - if(! (isHostLive(clusterName, currentCollectorHost) && - isHostComponentLive(clusterName, currentCollectorHost, "AMBARI_METRICS", Role.METRICS_COLLECTOR.name())) ) { - for (String hostname : metricServerHosts) { - if (isHostLive(clusterName, hostname) - && isHostComponentLive(clusterName, hostname, "AMBARI_METRICS", Role.METRICS_COLLECTOR.name())) { - clusterMetricCollectorMap.put(clusterName, hostname); - LOG.info("New Metrics Collector Host : " + hostname); - break; - } else { - LOG.info("Metrics Collector Host or host component not live : " + hostname); - } - } + currentCollectorHost = metricsCollectorHAManager.getCollectorHost(clusterName); } - } - LOG.debug("Cluster Metrics Vip Host : " + clusterMetricserverVipHost); - LOG.debug("Cluster Metrics Collector Host : " + clusterMetricCollectorMap.get(clusterName)); - return (clusterMetricserverVipHost != null) ? clusterMetricserverVipHost : clusterMetricCollectorMap.get(clusterName); + return (clusterMetricserverVipHost != null) ? clusterMetricserverVipHost : currentCollectorHost; } @Override @@ -469,12 +449,7 @@ public abstract class AbstractProviderModule implements ProviderModule, @Override public boolean isCollectorHostLive(String clusterName, MetricsService service) throws SystemException { - for (String hostname: metricServerHosts) { - if (isHostLive(clusterName, hostname)) { - return true; - } - } - return false; + return metricsCollectorHAManager.isCollectorHostLive(clusterName); } @Override @@ -502,67 +477,14 @@ public abstract class AbstractProviderModule implements ProviderModule, final String collectorHostName = getCollectorHostName(clusterName, service); if (service.equals(GANGLIA)) { - return isHostComponentLive(clusterName, collectorHostName, "GANGLIA", + return HostStatusHelper.isHostComponentLive(managementController, clusterName, collectorHostName, "GANGLIA", Role.GANGLIA_SERVER.name()); } else if (service.equals(TIMELINE_METRICS)) { - for (String hostname: metricServerHosts) { - if (isHostComponentLive(clusterName, hostname, "AMBARI_METRICS", - Role.METRICS_COLLECTOR.name())) { - return true; - } - } + return metricsCollectorHAManager.isCollectorComponentLive(clusterName); } return false; } - private boolean isHostComponentLive(String clusterName, String hostName, - String serviceName, String componentName) { - if (clusterName == null) { - return false; - } - - ServiceComponentHostResponse componentHostResponse; - - try { - ServiceComponentHostRequest componentRequest = - new ServiceComponentHostRequest(clusterName, serviceName, - componentName, hostName, null); - - Set<ServiceComponentHostResponse> hostComponents = - managementController.getHostComponents(Collections.singleton(componentRequest)); - - componentHostResponse = hostComponents.size() == 1 ? hostComponents.iterator().next() : null; - } catch (AmbariException e) { - LOG.debug("Error checking " + componentName + " server host component state: ", e); - return false; - } - - //Cluster without SCH - return componentHostResponse != null && - componentHostResponse.getLiveState().equals(State.STARTED.name()); - } - - protected boolean isHostLive(String clusterName, String hostName) { - if (clusterName == null) { - return false; - } - HostResponse hostResponse; - - try { - HostRequest hostRequest = new HostRequest(hostName, clusterName, - Collections.<String, String>emptyMap()); - Set<HostResponse> hosts = HostResourceProvider.getHosts(managementController, hostRequest); - - hostResponse = hosts.size() == 1 ? hosts.iterator().next() : null; - } catch (AmbariException e) { - LOG.debug("Error checking of Ganglia server host live status: ", e); - return false; - } - //Cluster without host - return hostResponse != null && - !hostResponse.getHostState().equals(HostState.HEARTBEAT_LOST.name()); - } - // ----- JMXHostProvider --------------------------------------------------- @Override @@ -924,7 +846,6 @@ public abstract class AbstractProviderModule implements ProviderModule, clusterHostComponentMap = new HashMap<String, Map<String, String>>(); clusterGangliaCollectorMap = new HashMap<String, String>(); - clusterMetricCollectorMap = new HashMap<String, String>(); for (Resource cluster : clusters) { @@ -960,16 +881,7 @@ public abstract class AbstractProviderModule implements ProviderModule, if (componentName.equals(METRIC_SERVER)) { // If current collector host is null or if the host or the host component not live // Update clusterMetricCollectorMap. - String currentCollectorHost = clusterMetricCollectorMap.get(clusterName); - LOG.info("Current Metrics collector Host : " + currentCollectorHost); - if ((currentCollectorHost == null) || - !(isHostLive(clusterName, currentCollectorHost) && - isHostComponentLive(clusterName, currentCollectorHost, "AMBARI_METRICS", Role.METRICS_COLLECTOR.name())) - ) { - LOG.info("New Metrics collector Host : " + hostName); - clusterMetricCollectorMap.put(clusterName, hostName); - } - metricServerHosts.add(hostName); + metricsCollectorHAManager.addCollectorHost(clusterName, hostName); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java new file mode 100644 index 0000000..69f1c44 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.controller.internal; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.HostRequest; +import org.apache.ambari.server.controller.HostResponse; +import org.apache.ambari.server.controller.ServiceComponentHostRequest; +import org.apache.ambari.server.controller.ServiceComponentHostResponse; +import org.apache.ambari.server.state.HostState; +import org.apache.ambari.server.state.State; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; + +public class HostStatusHelper { + + protected final static Logger LOG = + LoggerFactory.getLogger(HostStatusHelper.class); + + public static boolean isHostComponentLive(AmbariManagementController managementController, + String clusterName, String hostName, + String serviceName, String componentName) { + if (clusterName == null) { + return false; + } + + ServiceComponentHostResponse componentHostResponse; + + try { + ServiceComponentHostRequest componentRequest = + new ServiceComponentHostRequest(clusterName, serviceName, + componentName, hostName, null); + + Set<ServiceComponentHostResponse> hostComponents = + managementController.getHostComponents(Collections.singleton(componentRequest)); + + componentHostResponse = hostComponents.size() == 1 ? hostComponents.iterator().next() : null; + } catch (AmbariException e) { + LOG.debug("Error checking " + componentName + " server host component state: ", e); + return false; + } + + //Cluster without SCH + return componentHostResponse != null && + componentHostResponse.getLiveState().equals(State.STARTED.name()); + } + + public static boolean isHostLive(AmbariManagementController managementController, String clusterName, String hostName) { + if (clusterName == null) { + return false; + } + HostResponse hostResponse; + + try { + HostRequest hostRequest = new HostRequest(hostName, clusterName, + Collections.<String, String>emptyMap()); + Set<HostResponse> hosts = HostResourceProvider.getHosts(managementController, hostRequest); + + hostResponse = hosts.size() == 1 ? hosts.iterator().next() : null; + } catch (AmbariException e) { + LOG.debug("Error while checking host live status: ", e); + return false; + } + //Cluster without host + return hostResponse != null && + !hostResponse.getHostState().equals(HostState.HEARTBEAT_LOST.name()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java new file mode 100644 index 0000000..05b4e05 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics; + +import com.google.inject.Inject; +import org.apache.ambari.server.Role; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.AmbariServer; +import org.apache.ambari.server.controller.internal.HostStatusHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicInteger; + +/* +Class used to hold the status of metric collector hosts for a cluster. + */ +public class MetricsCollectorHAClusterState { + + private String clusterName; + private Set<String> liveCollectorHosts; + private Set<String> deadCollectorHosts; + private AtomicInteger collectorDownRefreshCounter; + private static int collectorDownRefreshCounterLimit = 5; + private String currentCollectorHost = null; + + @Inject + AmbariManagementController managementController; + + protected final static Logger LOG = + LoggerFactory.getLogger(MetricsCollectorHAClusterState.class); + + public MetricsCollectorHAClusterState(String clusterName) { + + if (managementController == null) { + managementController = AmbariServer.getController(); + } + + this.clusterName = clusterName; + this.liveCollectorHosts = new CopyOnWriteArraySet<>(); + this.deadCollectorHosts = new CopyOnWriteArraySet<>(); + collectorDownRefreshCounter = new AtomicInteger(0); + } + + public void addMetricsCollectorHost(String collectorHost) { + if (HostStatusHelper.isHostComponentLive(managementController, clusterName, collectorHost, "AMBARI_METRICS", + Role.METRICS_COLLECTOR.name())) { + liveCollectorHosts.add(collectorHost); + deadCollectorHosts.remove(collectorHost); + } else { + deadCollectorHosts.add(collectorHost); + liveCollectorHosts.remove(collectorHost); + } + + //If there is no current collector host or the current host is down, this will be a proactive switch. + if (currentCollectorHost == null || !HostStatusHelper.isHostComponentLive(managementController, clusterName, + currentCollectorHost, "AMBARI_METRICS", + Role.METRICS_COLLECTOR.name())) { + refreshCollectorHost(currentCollectorHost); + } + } + + private void refreshCollectorHost(String currentHost) { + LOG.info("Refreshing collector host, current collector host : " + currentHost); + + testAndAddDeadCollectorsToLiveList(); //A good time to check if there are some dead collectors that have now become alive. + + if (currentHost != null) { + if (liveCollectorHosts.contains(currentHost)) { + liveCollectorHosts.remove(currentHost); + } + if (!deadCollectorHosts.contains(currentHost)) { + deadCollectorHosts.add(currentHost); + } + } + + if (!liveCollectorHosts.isEmpty()) { + currentCollectorHost = getRandom(liveCollectorHosts); + } + + if (currentCollectorHost == null && !deadCollectorHosts.isEmpty()) { + currentCollectorHost = getRandom(deadCollectorHosts); + } + + LOG.info("After refresh, new collector host : " + currentCollectorHost); + } + + public String getCurrentCollectorHost() { + return currentCollectorHost; + } + + public void onCollectorHostDown(String deadCollectorHost) { + + if (deadCollectorHost == null) { + // Case 1: Collector is null. Ideally this can never happen + refreshCollectorHost(null); + + } else if (deadCollectorHost.equals(currentCollectorHost) && numCollectors() > 1) { + // Case 2: Event informing us that the current collector is dead. We have not refreshed it yet. + if (testRefreshCounter()) { + refreshCollectorHost(deadCollectorHost); + } + } + //Case 3 : Got a dead collector event. Already changed the collector to a new one. + //No-Op + } + + private void testAndAddDeadCollectorsToLiveList() { + Set<String> liveHosts = new HashSet<>(); + + for (String deadHost : deadCollectorHosts) { + if (isValidAliveCollectorHost(clusterName, deadHost)) { + liveHosts.add(deadHost); + } + } + + for (String liveHost : liveHosts) { + LOG.info("Removing collector " + liveHost + " from dead list to live list"); + deadCollectorHosts.remove(liveHost); + liveCollectorHosts.add(liveHost); + } + } + + private boolean isValidAliveCollectorHost(String clusterName, String collectorHost) { + + return ((collectorHost != null) && + HostStatusHelper.isHostLive(managementController, clusterName, collectorHost) && + HostStatusHelper.isHostComponentLive(managementController, clusterName, collectorHost, "AMBARI_METRICS", + Role.METRICS_COLLECTOR.name())); + } + + /* + A refresh counter to track number of collector down events received. If it exceeds the limit, + then we go ahead and refresh the collector. + */ + private boolean testRefreshCounter() { + collectorDownRefreshCounter.incrementAndGet(); + if (collectorDownRefreshCounter.get() == collectorDownRefreshCounterLimit) { + collectorDownRefreshCounter = new AtomicInteger(0); + return true; + } + return false; + } + + public boolean isCollectorHostLive() { + for (String host : liveCollectorHosts) { + if (HostStatusHelper.isHostLive(managementController, clusterName, host)) { + return true; + } + } + return false; + } + + public boolean isCollectorComponentAlive() { + + //Check in live hosts + for (String host : liveCollectorHosts) { + if (HostStatusHelper.isHostComponentLive(managementController, clusterName, host, "AMBARI_METRICS", + Role.METRICS_COLLECTOR.name())) { + return true; + } + } + + //Check in dead hosts. Don't update live and dead lists. Can be done on refresh call. + for (String host : deadCollectorHosts) { + if (HostStatusHelper.isHostComponentLive(managementController, clusterName, host, "AMBARI_METRICS", + Role.METRICS_COLLECTOR.name())) { + return true; + } + } + + return false; + } + + private int numCollectors() { + return this.liveCollectorHosts.size() + deadCollectorHosts.size(); + } + + private String getRandom(Set<String> collectorSet) { + int randIndex = new Random().nextInt(collectorSet.size()); + int i = 0; + for(String host : collectorSet) + { + if (i == randIndex) { + return host; + } + i = i + 1; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java new file mode 100644 index 0000000..3eb03ad --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics; + +import com.google.common.eventbus.Subscribe; +import com.google.inject.Inject; +import org.apache.ambari.server.controller.AmbariServer; +import org.apache.ambari.server.events.MetricsCollectorHostDownEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/* + Class used as a gateway to retrieving/updating metric collector hosts for all managed clusters. + */ +public class MetricsCollectorHAManager { + + @Inject + protected AmbariEventPublisher eventPublisher; + + private Map<String, MetricsCollectorHAClusterState> clusterCollectorHAState; + protected final static Logger LOG = + LoggerFactory.getLogger(MetricsCollectorHAManager.class); + + public MetricsCollectorHAManager() { + clusterCollectorHAState = new HashMap<>(); + + if (null == eventPublisher && null != AmbariServer.getController()) { + eventPublisher = AmbariServer.getController().getAmbariEventPublisher(); + if (eventPublisher != null) { + eventPublisher.register(this); + } else { + LOG.error("Unable to retrieve AmbariEventPublisher for Metric collector host event listening."); + } + } + } + + public void addCollectorHost(String clusterName, String collectorHost) { + + LOG.info("Adding collector host : " + collectorHost + " to cluster : " + clusterName); + + if (! clusterCollectorHAState.containsKey(clusterName)) { + clusterCollectorHAState.put(clusterName, new MetricsCollectorHAClusterState(clusterName)); + } + MetricsCollectorHAClusterState collectorHAClusterState = clusterCollectorHAState.get(clusterName); + collectorHAClusterState.addMetricsCollectorHost(collectorHost); + } + + public String getCollectorHost(String clusterName) { + + if (! clusterCollectorHAState.containsKey(clusterName)) { + clusterCollectorHAState.put(clusterName, new MetricsCollectorHAClusterState(clusterName)); + } + + MetricsCollectorHAClusterState collectorHAClusterState = clusterCollectorHAState.get(clusterName); + return collectorHAClusterState.getCurrentCollectorHost(); + } + + /** + * Handles {@link MetricsCollectorHostDownEvent} + * + * @param event + * the change event. + */ + @Subscribe + public void onMetricsCollectorHostDownEvent(MetricsCollectorHostDownEvent event) { + + LOG.debug("MetricsCollectorHostDownEvent caught, Down collector : " + event.getCollectorHost()); + + String clusterName = event.getClusterName(); + MetricsCollectorHAClusterState collectorHAClusterState = clusterCollectorHAState.get(clusterName); + collectorHAClusterState.onCollectorHostDown(event.getCollectorHost()); + } + + public boolean isEmpty() { + return this.clusterCollectorHAState.isEmpty(); + } + + public boolean isCollectorHostLive(String clusterName) { + MetricsCollectorHAClusterState metricsCollectorHAClusterState = this.clusterCollectorHAState.get(clusterName); + return metricsCollectorHAClusterState != null && metricsCollectorHAClusterState.isCollectorHostLive(); + } + + public boolean isCollectorComponentLive(String clusterName) { + MetricsCollectorHAClusterState metricsCollectorHAClusterState = this.clusterCollectorHAState.get(clusterName); + return metricsCollectorHAClusterState != null && metricsCollectorHAClusterState.isCollectorComponentAlive(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java index 9e81df4..455ee4d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.controller.metrics.timeline; import com.google.common.collect.Sets; +import com.google.inject.Inject; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -34,6 +35,8 @@ import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.TemporalInfo; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.events.MetricsCollectorHostDownEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.ambari.server.state.StackId; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -41,6 +44,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.http.client.utils.URIBuilder; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; @@ -74,6 +78,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { private static final Map<String, String> JVM_PROCESS_NAMES = new HashMap<>(2); + private AmbariEventPublisher ambariEventPublisher; + static { JVM_PROCESS_NAMES.put("HBASE_MASTER", "Master."); JVM_PROCESS_NAMES.put("HBASE_REGIONSERVER", "RegionServer."); @@ -93,6 +99,10 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { componentNamePropertyId); this.metricCache = cacheProvider.getTimelineMetricsCache(); + + if (AmbariServer.getController() != null) { + this.ambariEventPublisher = AmbariServer.getController().getAmbariEventPublisher(); + } } protected String getOverridenComponentName(Resource resource) { @@ -225,27 +235,36 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { for (String hostNamesBatch : hostNamesBatches) { // Allow for multiple requests since host metrics for a // hostcomponent need the HOST appId - if (!hostComponentHostMetrics.isEmpty()) { - String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1); - setQueryParams(hostComponentHostMetricParams, hostNamesBatch, true, componentName); - TimelineMetrics metricsResponse = getTimelineMetricsFromCache( - getTimelineAppMetricCacheKey(hostComponentHostMetrics, - componentName, hostNamesBatch, uriBuilder.toString()), componentName); - - if (metricsResponse != null) { - timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); + try { + if (!hostComponentHostMetrics.isEmpty()) { + String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1); + setQueryParams(hostComponentHostMetricParams, hostNamesBatch, true, componentName); + TimelineMetrics metricsResponse = getTimelineMetricsFromCache( + getTimelineAppMetricCacheKey(hostComponentHostMetrics, + componentName, hostNamesBatch, uriBuilder.toString()), componentName); + + if (metricsResponse != null) { + timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); + } } - } - if (!nonHostComponentMetrics.isEmpty()) { - String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1); - setQueryParams(nonHostComponentHostMetricParams, hostNamesBatch, false, componentName); - TimelineMetrics metricsResponse = getTimelineMetricsFromCache( - getTimelineAppMetricCacheKey(nonHostComponentMetrics, - componentName, hostNamesBatch, uriBuilder.toString()), componentName); + if (!nonHostComponentMetrics.isEmpty()) { + String nonHostComponentHostMetricParams = getSetString(processRegexps(nonHostComponentMetrics), -1); + setQueryParams(nonHostComponentHostMetricParams, hostNamesBatch, false, componentName); + TimelineMetrics metricsResponse = getTimelineMetricsFromCache( + getTimelineAppMetricCacheKey(nonHostComponentMetrics, + componentName, hostNamesBatch, uriBuilder.toString()), componentName); - if (metricsResponse != null) { - timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); + if (metricsResponse != null) { + timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics()); + } + } + } catch (IOException io) { + if (io instanceof SocketTimeoutException || io instanceof ConnectException) { + if (ambariEventPublisher != null) { + ambariEventPublisher.publish(new MetricsCollectorHostDownEvent(clusterName, uriBuilder.getHost())); + } + throw io; } } @@ -562,7 +581,6 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { Map<String, Map<TemporalInfo, MetricsRequest>> requestMap = new HashMap<String, Map<TemporalInfo, MetricsRequest>>(); - String collectorHostName = null; String collectorPort = null; Map<String, Boolean> clusterCollectorComponentLiveMap = new HashMap<>(); Map<String, Boolean> clusterCollectorHostLiveMap = new HashMap<>(); @@ -623,9 +641,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { requestMap.put(clusterName, requests); } - if (collectorHostName == null) { - collectorHostName = hostProvider.getCollectorHostName(clusterName, TIMELINE_METRICS); - } + String collectorHost = hostProvider.getCollectorHostName(clusterName, TIMELINE_METRICS); if (collectorPort == null) { collectorPort = hostProvider.getCollectorPort(clusterName, TIMELINE_METRICS); @@ -656,7 +672,7 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { MetricsRequest metricsRequest = requests.get(temporalInfo); if (metricsRequest == null) { metricsRequest = new MetricsRequest(temporalInfo, - getAMSUriBuilder(collectorHostName, + getAMSUriBuilder(collectorHost, collectorPort != null ? Integer.parseInt(collectorPort) : COLLECTOR_DEFAULT_PORT, configuration.isHttpsEnabled()), (String) resource.getPropertyValue(clusterNamePropertyId)); http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java index 3688742..f9bfa59 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java @@ -17,7 +17,10 @@ */ package org.apache.ambari.server.controller.metrics.timeline; +import com.google.inject.Inject; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.internal.PropertyInfo; import org.apache.ambari.server.controller.internal.URLStreamProvider; import org.apache.ambari.server.controller.metrics.MetricHostProvider; @@ -33,11 +36,14 @@ import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.TemporalInfo; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.events.MetricsCollectorHostDownEvent; +import org.apache.ambari.server.events.publishers.AmbariEventPublisher; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.http.client.utils.URIBuilder; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.HashSet; @@ -54,6 +60,7 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider { MetricsRequestHelper requestHelper; private static AtomicInteger printSkipPopulateMsgHostCounter = new AtomicInteger(0); private static AtomicInteger printSkipPopulateMsgHostCompCounter = new AtomicInteger(0); + private AmbariEventPublisher ambariEventPublisher; public AMSReportPropertyProvider(Map<String, Map<String, PropertyInfo>> componentPropertyInfoMap, URLStreamProvider streamProvider, @@ -67,6 +74,9 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider { this.metricCache = cacheProvider.getTimelineMetricsCache(); this.requestHelper = new MetricsRequestHelper(streamProvider); + if (AmbariServer.getController() != null) { + this.ambariEventPublisher = AmbariServer.getController().getAmbariEventPublisher(); + } } /** @@ -215,10 +225,13 @@ public class AMSReportPropertyProvider extends MetricsReportPropertyProvider { } } catch (IOException io) { timelineMetrics = null; - if (io instanceof SocketTimeoutException) { + if (io instanceof SocketTimeoutException || io instanceof ConnectException) { if (LOG.isDebugEnabled()) { LOG.debug("Skip populating metrics on socket timeout exception."); } + if (ambariEventPublisher != null) { + ambariEventPublisher.publish(new MetricsCollectorHostDownEvent(clusterName, host)); + } break; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java index 1df0f6a..e7d6c0b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java @@ -37,6 +37,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.SocketTimeoutException; import java.net.URISyntaxException; @@ -116,8 +117,8 @@ public class MetricsRequestHelper { LOG.debug(errorMsg, io); } - if (io instanceof SocketTimeoutException) { - errorMsg += " Cannot connect to collector: SocketTimeoutException."; + if (io instanceof SocketTimeoutException || io instanceof ConnectException) { + errorMsg = "Cannot connect to collector: SocketTimeoutException for " + uriBuilder.getHost(); LOG.error(errorMsg); throw io; } http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java index b5fe05e..b852cc2 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -75,6 +76,9 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache { if (t instanceof SocketTimeoutException) { throw new SocketTimeoutException(t.getMessage()); } + if (t instanceof ConnectException) { + throw new ConnectException(t.getMessage()); + } } } @@ -132,6 +136,12 @@ public class TimelineMetricCache extends UpdatingSelfPopulatingCache { LOG.debug("New temporal info: " + newKey.getTemporalInfo() + " for : " + existingKey.getMetricNames()); + + if (existingKey.getSpec() == null || !existingKey.getSpec().equals(newKey.getSpec())) { + existingKey.setSpec(newKey.getSpec()); + LOG.debug("New spec: " + newKey.getSpec() + + " for : " + existingKey.getMetricNames()); + } } return super.get(key); http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java index 912c441..de9e7b6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java @@ -120,7 +120,12 @@ public abstract class AmbariEvent { /** * Cluster configuration changed. */ - CLUSTER_CONFIG_CHANGED; + CLUSTER_CONFIG_CHANGED, + + /** + * Metrics Collector force refresh needed. + */ + METRICS_COLLECTOR_HOST_DOWN; } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java new file mode 100644 index 0000000..0719477 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.events; + +public class MetricsCollectorHostDownEvent extends AmbariEvent{ + + + private final String clusterName; + + private final String collectorHost; + + /** + * Constructor. + * + * */ + + public MetricsCollectorHostDownEvent(String clusterName, String collectorHost) { + super(AmbariEventType.METRICS_COLLECTOR_HOST_DOWN); + this.clusterName = clusterName; + this.collectorHost = collectorHost; + } + + + public String getClusterName() { + return clusterName; + } + + public String getCollectorHost() { + return collectorHost; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py index 2c7119b..c26eafc 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py @@ -58,7 +58,7 @@ class AmsCollector(Script): def status(self, env): import status_params env.set_params(status_params) - check_service_status(name='collector') + check_service_status(env, name='collector') def get_log_folder(self): import params http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py index 747a687..542663f 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py @@ -61,7 +61,7 @@ class AmsGrafana(Script): def status(self, env): import status_params env.set_params(status_params) - check_service_status(name='grafana') + check_service_status(env, name='grafana') def get_pid_files(self): import status_params http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py index d53c1fc..a377f6d 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py @@ -53,7 +53,7 @@ class AmsMonitor(Script): def status(self, env): import status_params env.set_params(status_params) - check_service_status(name='monitor') + check_service_status(env, name='monitor') def get_log_folder(self): import params http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py index 14af3ad..0b24ac0 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py @@ -31,13 +31,13 @@ def get_collector_pid_files(): return pid_files @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) -def check_service_status(name): +def check_service_status(env, name): import status_params env.set_params(status_params) from resource_management.libraries.functions.check_process_status import check_process_status if name=='collector': - for pid_files in get_collector_pid_files(): + for pid_file in get_collector_pid_files(): check_process_status(pid_file) elif name == 'monitor': check_process_status(status_params.monitor_pid_file)