AMBARI-17458 : Add support for distributed collector to Ambari REST API. 
(avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0855174b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0855174b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0855174b

Branch: refs/heads/branch-dev-patch-upgrade
Commit: 0855174b815987850d0022ff3c1619c3b2373921
Parents: a33e2b6
Author: Aravindan Vijayan <avija...@hortonworks.com>
Authored: Tue Sep 13 15:21:49 2016 -0700
Committer: Aravindan Vijayan <avija...@hortonworks.com>
Committed: Tue Sep 13 15:22:38 2016 -0700

----------------------------------------------------------------------
 .../controller/AmbariManagementController.java  |   8 +
 .../AmbariManagementControllerImpl.java         |   8 +
 .../internal/AbstractProviderModule.java        | 120 ++---------
 .../controller/internal/HostStatusHelper.java   |  88 ++++++++
 .../metrics/MetricsCollectorHAClusterState.java | 211 +++++++++++++++++++
 .../metrics/MetricsCollectorHAManager.java      | 106 ++++++++++
 .../metrics/timeline/AMSPropertyProvider.java   |  62 ++++--
 .../timeline/AMSReportPropertyProvider.java     |  15 +-
 .../metrics/timeline/MetricsRequestHelper.java  |   5 +-
 .../timeline/cache/TimelineMetricCache.java     |  10 +
 .../ambari/server/events/AmbariEvent.java       |   7 +-
 .../events/MetricsCollectorHostDownEvent.java   |  47 +++++
 .../0.1.0/package/scripts/metrics_collector.py  |   2 +-
 .../0.1.0/package/scripts/metrics_grafana.py    |   2 +-
 .../0.1.0/package/scripts/metrics_monitor.py    |   2 +-
 .../0.1.0/package/scripts/status.py             |   4 +-
 16 files changed, 561 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
index 9da6fd4..746bca4 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java
@@ -32,6 +32,7 @@ import 
org.apache.ambari.server.controller.internal.DeleteStatusMetaData;
 import org.apache.ambari.server.controller.internal.RequestStageContainer;
 import 
org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider;
 import 
org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
+import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager;
 import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.events.AmbariEvent;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
@@ -889,5 +890,12 @@ public interface AmbariManagementController {
    */
   AmbariEventPublisher getAmbariEventPublisher();
 
+  /**
+   * Gets an {@link MetricsCollectorHAManager} which can be used to get/add 
collector host for a cluster
+   *
+   * @return {@link MetricsCollectorHAManager}
+   */
+  MetricsCollectorHAManager getMetricsCollectorHAManager();
+
 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 1d82928..90d2162 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -98,6 +98,7 @@ import 
org.apache.ambari.server.controller.internal.WidgetLayoutResourceProvider
 import org.apache.ambari.server.controller.internal.WidgetResourceProvider;
 import 
org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider;
 import 
org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
+import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager;
 import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.customactions.ActionDefinition;
@@ -287,6 +288,8 @@ public class AmbariManagementControllerImpl implements 
AmbariManagementControlle
   private ClusterVersionDAO clusterVersionDAO;
   @Inject
   private AmbariEventPublisher ambariEventPublisher;
+  @Inject
+  private MetricsCollectorHAManager metricsCollectorHAManager;
 
   private MaintenanceStateHelper maintenanceStateHelper;
 
@@ -5066,6 +5069,11 @@ public class AmbariManagementControllerImpl implements 
AmbariManagementControlle
     return properties;
   }
 
+  @Override
+  public MetricsCollectorHAManager getMetricsCollectorHAManager() {
+    return injector.getInstance(MetricsCollectorHAManager.class);
+  }
+
   /**
    * Validates that the authenticated user can set a service's (run-as) user 
and group.
    * <p/>

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
index b6cbed5..5d462c5 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java
@@ -33,20 +33,15 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.inject.Injector;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
-import org.apache.ambari.server.controller.HostRequest;
-import org.apache.ambari.server.controller.HostResponse;
-import org.apache.ambari.server.controller.ServiceComponentHostRequest;
-import org.apache.ambari.server.controller.ServiceComponentHostResponse;
 import org.apache.ambari.server.controller.jmx.JMXHostProvider;
-import 
org.apache.ambari.server.controller.logging.LoggingSearchPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricHostProvider;
 import 
org.apache.ambari.server.controller.metrics.MetricPropertyProviderFactory;
+import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager;
 import org.apache.ambari.server.controller.metrics.MetricsPropertyProvider;
 import 
org.apache.ambari.server.controller.metrics.MetricsReportPropertyProvider;
 import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
@@ -70,9 +65,7 @@ import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ConfigHelper;
 import org.apache.ambari.server.state.DesiredConfig;
-import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.State;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,7 +107,6 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
   private volatile Map<String, Map<String, Map<String, String>>> 
jmxDesiredRpcSuffixes = new HashMap<String, Map<String, Map<String,String>>>();
   private volatile Map<String, String> clusterHdfsSiteConfigVersionMap = new 
HashMap<String, String>();
   private volatile Map<String, String> clusterJmxProtocolMap = new 
ConcurrentHashMap<>();
-  private volatile Set<String> metricServerHosts = new HashSet<String>();
   private volatile String clusterMetricServerPort = null;
   private volatile String clusterMetricServerVipPort = null;
   private volatile String clusterMetricserverVipHost = null;
@@ -233,6 +225,9 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
   @Inject
   TimelineMetricCacheProvider metricCacheProvider;
 
+  @Inject
+  MetricsCollectorHAManager metricsCollectorHAManager;
+
   /**
    * A factory used to retrieve Guice-injected instances of a metric
    * {@link PropertyProvider}.
@@ -260,11 +255,6 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
   private Map<String, String> clusterGangliaCollectorMap;
 
   /**
-   * The host name of Metrics collector.
-   */
-  private Map<String, String> clusterMetricCollectorMap;
-
-  /**
    * JMX ports read from the configs
    */
   private final Map<String, ConcurrentMap<String, ConcurrentMap<String, 
String>> >jmxPortMap =
@@ -298,6 +288,10 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
       eventPublisher = managementController.getAmbariEventPublisher();
       eventPublisher.register(this);
     }
+
+    if (null == metricsCollectorHAManager && null != managementController) {
+      metricsCollectorHAManager = 
managementController.getMetricsCollectorHAManager();
+    }
   }
 
 
@@ -348,7 +342,7 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
       LOG.error("Exception during checkInit.", e);
     }
 
-    if (!clusterMetricCollectorMap.isEmpty()) {
+    if (!metricsCollectorHAManager.isEmpty()) {
       return TIMELINE_METRICS;
     } else if (!clusterGangliaCollectorMap.isEmpty()) {
       return GANGLIA;
@@ -411,27 +405,13 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
     //If vip config not present
     //  If current collector host is null or if the host or the host component 
not live
     //    Update clusterMetricCollectorMap with a live metric collector host.
+    String currentCollectorHost = null;
     if (!vipHostConfigPresent) {
-      String currentCollectorHost = clusterMetricCollectorMap.get(clusterName);
-      if(! (isHostLive(clusterName, currentCollectorHost) &&
-        isHostComponentLive(clusterName, currentCollectorHost, 
"AMBARI_METRICS", Role.METRICS_COLLECTOR.name())) ) {
-        for (String hostname : metricServerHosts) {
-          if (isHostLive(clusterName, hostname)
-            && isHostComponentLive(clusterName, hostname, "AMBARI_METRICS", 
Role.METRICS_COLLECTOR.name())) {
-            clusterMetricCollectorMap.put(clusterName, hostname);
-            LOG.info("New Metrics Collector Host : " + hostname);
-            break;
-          } else {
-            LOG.info("Metrics Collector Host or host component not live : " + 
hostname);
-          }
-        }
+      currentCollectorHost = 
metricsCollectorHAManager.getCollectorHost(clusterName);
       }
-    }
-
     LOG.debug("Cluster Metrics Vip Host : " + clusterMetricserverVipHost);
-    LOG.debug("Cluster Metrics Collector Host : " + 
clusterMetricCollectorMap.get(clusterName));
 
-    return (clusterMetricserverVipHost != null) ? clusterMetricserverVipHost : 
clusterMetricCollectorMap.get(clusterName);
+    return (clusterMetricserverVipHost != null) ? clusterMetricserverVipHost : 
currentCollectorHost;
   }
 
   @Override
@@ -469,12 +449,7 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
 
   @Override
   public boolean isCollectorHostLive(String clusterName, MetricsService 
service) throws SystemException {
-    for (String hostname: metricServerHosts) {
-      if (isHostLive(clusterName, hostname)) {
-        return true;
-      }
-    }
-    return false;
+    return metricsCollectorHAManager.isCollectorHostLive(clusterName);
   }
 
   @Override
@@ -502,67 +477,14 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
     final String collectorHostName = getCollectorHostName(clusterName, 
service);
 
     if (service.equals(GANGLIA)) {
-      return isHostComponentLive(clusterName, collectorHostName, "GANGLIA",
+      return HostStatusHelper.isHostComponentLive(managementController, 
clusterName, collectorHostName, "GANGLIA",
         Role.GANGLIA_SERVER.name());
     } else if (service.equals(TIMELINE_METRICS)) {
-      for (String hostname: metricServerHosts) {
-        if (isHostComponentLive(clusterName, hostname, "AMBARI_METRICS",
-          Role.METRICS_COLLECTOR.name())) {
-          return true;
-        }
-      }
+      return metricsCollectorHAManager.isCollectorComponentLive(clusterName);
     }
     return false;
   }
 
-  private boolean isHostComponentLive(String clusterName, String hostName,
-                                      String serviceName, String 
componentName) {
-    if (clusterName == null) {
-      return false;
-    }
-
-    ServiceComponentHostResponse componentHostResponse;
-
-    try {
-      ServiceComponentHostRequest componentRequest =
-        new ServiceComponentHostRequest(clusterName, serviceName,
-          componentName, hostName, null);
-
-      Set<ServiceComponentHostResponse> hostComponents =
-        
managementController.getHostComponents(Collections.singleton(componentRequest));
-
-      componentHostResponse = hostComponents.size() == 1 ? 
hostComponents.iterator().next() : null;
-    } catch (AmbariException e) {
-      LOG.debug("Error checking " + componentName + " server host component 
state: ", e);
-      return false;
-    }
-
-    //Cluster without SCH
-    return componentHostResponse != null &&
-      componentHostResponse.getLiveState().equals(State.STARTED.name());
-  }
-
-  protected boolean isHostLive(String clusterName, String hostName) {
-    if (clusterName == null) {
-      return false;
-    }
-    HostResponse hostResponse;
-
-    try {
-      HostRequest hostRequest = new HostRequest(hostName, clusterName,
-        Collections.<String, String>emptyMap());
-      Set<HostResponse> hosts = 
HostResourceProvider.getHosts(managementController, hostRequest);
-
-      hostResponse = hosts.size() == 1 ? hosts.iterator().next() : null;
-    } catch (AmbariException e) {
-      LOG.debug("Error checking of Ganglia server host live status: ", e);
-      return false;
-    }
-    //Cluster without host
-    return hostResponse != null &&
-      !hostResponse.getHostState().equals(HostState.HEARTBEAT_LOST.name());
-  }
-
   // ----- JMXHostProvider ---------------------------------------------------
 
   @Override
@@ -924,7 +846,6 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
 
       clusterHostComponentMap = new HashMap<String, Map<String, String>>();
       clusterGangliaCollectorMap = new HashMap<String, String>();
-      clusterMetricCollectorMap = new HashMap<String, String>();
 
       for (Resource cluster : clusters) {
 
@@ -960,16 +881,7 @@ public abstract class AbstractProviderModule implements 
ProviderModule,
           if (componentName.equals(METRIC_SERVER)) {
             //  If current collector host is null or if the host or the host 
component not live
             //    Update clusterMetricCollectorMap.
-            String currentCollectorHost = 
clusterMetricCollectorMap.get(clusterName);
-            LOG.info("Current Metrics collector Host : " + 
currentCollectorHost);
-            if ((currentCollectorHost == null) ||
-              !(isHostLive(clusterName, currentCollectorHost) &&
-                isHostComponentLive(clusterName, currentCollectorHost, 
"AMBARI_METRICS", Role.METRICS_COLLECTOR.name()))
-              ) {
-              LOG.info("New Metrics collector Host : " + hostName);
-              clusterMetricCollectorMap.put(clusterName, hostName);
-            }
-            metricServerHosts.add(hostName);
+            metricsCollectorHAManager.addCollectorHost(clusterName, hostName);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java
new file mode 100644
index 0000000..69f1c44
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostStatusHelper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.controller.internal;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.HostRequest;
+import org.apache.ambari.server.controller.HostResponse;
+import org.apache.ambari.server.controller.ServiceComponentHostRequest;
+import org.apache.ambari.server.controller.ServiceComponentHostResponse;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class HostStatusHelper {
+
+  protected final static Logger LOG =
+    LoggerFactory.getLogger(HostStatusHelper.class);
+
+  public static boolean isHostComponentLive(AmbariManagementController 
managementController,
+                                             String clusterName, String 
hostName,
+                                      String serviceName, String 
componentName) {
+    if (clusterName == null) {
+      return false;
+    }
+
+    ServiceComponentHostResponse componentHostResponse;
+
+    try {
+      ServiceComponentHostRequest componentRequest =
+        new ServiceComponentHostRequest(clusterName, serviceName,
+          componentName, hostName, null);
+
+      Set<ServiceComponentHostResponse> hostComponents =
+        
managementController.getHostComponents(Collections.singleton(componentRequest));
+
+      componentHostResponse = hostComponents.size() == 1 ? 
hostComponents.iterator().next() : null;
+    } catch (AmbariException e) {
+      LOG.debug("Error checking " + componentName + " server host component 
state: ", e);
+      return false;
+    }
+
+    //Cluster without SCH
+    return componentHostResponse != null &&
+      componentHostResponse.getLiveState().equals(State.STARTED.name());
+  }
+
+  public static boolean isHostLive(AmbariManagementController 
managementController, String clusterName, String hostName) {
+    if (clusterName == null) {
+      return false;
+    }
+    HostResponse hostResponse;
+
+    try {
+      HostRequest hostRequest = new HostRequest(hostName, clusterName,
+        Collections.<String, String>emptyMap());
+      Set<HostResponse> hosts = 
HostResourceProvider.getHosts(managementController, hostRequest);
+
+      hostResponse = hosts.size() == 1 ? hosts.iterator().next() : null;
+    } catch (AmbariException e) {
+      LOG.debug("Error while checking host live status: ", e);
+      return false;
+    }
+    //Cluster without host
+    return hostResponse != null &&
+      !hostResponse.getHostState().equals(HostState.HEARTBEAT_LOST.name());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java
new file mode 100644
index 0000000..05b4e05
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAClusterState.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics;
+
+import com.google.inject.Inject;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.internal.HostStatusHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/*
+Class used to hold the status of metric collector hosts for a cluster.
+ */
+public class MetricsCollectorHAClusterState {
+
+  private String clusterName;
+  private Set<String> liveCollectorHosts;
+  private Set<String> deadCollectorHosts;
+  private AtomicInteger collectorDownRefreshCounter;
+  private static int collectorDownRefreshCounterLimit = 5;
+  private String currentCollectorHost = null;
+
+  @Inject
+  AmbariManagementController managementController;
+
+  protected final static Logger LOG =
+    LoggerFactory.getLogger(MetricsCollectorHAClusterState.class);
+
+  public MetricsCollectorHAClusterState(String clusterName) {
+
+    if (managementController == null) {
+      managementController = AmbariServer.getController();
+    }
+
+    this.clusterName = clusterName;
+    this.liveCollectorHosts = new CopyOnWriteArraySet<>();
+    this.deadCollectorHosts = new CopyOnWriteArraySet<>();
+    collectorDownRefreshCounter = new AtomicInteger(0);
+  }
+
+  public void addMetricsCollectorHost(String collectorHost) {
+    if (HostStatusHelper.isHostComponentLive(managementController, 
clusterName, collectorHost, "AMBARI_METRICS",
+      Role.METRICS_COLLECTOR.name())) {
+      liveCollectorHosts.add(collectorHost);
+      deadCollectorHosts.remove(collectorHost);
+    } else {
+      deadCollectorHosts.add(collectorHost);
+      liveCollectorHosts.remove(collectorHost);
+    }
+
+    //If there is no current collector host or the current host is down, this 
will be a proactive switch.
+    if (currentCollectorHost == null || 
!HostStatusHelper.isHostComponentLive(managementController, clusterName,
+      currentCollectorHost, "AMBARI_METRICS",
+      Role.METRICS_COLLECTOR.name())) {
+      refreshCollectorHost(currentCollectorHost);
+    }
+  }
+
+  private void refreshCollectorHost(String currentHost) {
+    LOG.info("Refreshing collector host, current collector host : " + 
currentHost);
+
+    testAndAddDeadCollectorsToLiveList(); //A good time to check if there are 
some dead collectors that have now become alive.
+
+    if (currentHost != null) {
+      if (liveCollectorHosts.contains(currentHost)) {
+        liveCollectorHosts.remove(currentHost);
+      }
+      if (!deadCollectorHosts.contains(currentHost)) {
+        deadCollectorHosts.add(currentHost);
+      }
+    }
+
+    if (!liveCollectorHosts.isEmpty()) {
+      currentCollectorHost = getRandom(liveCollectorHosts);
+    }
+
+    if (currentCollectorHost == null && !deadCollectorHosts.isEmpty()) {
+      currentCollectorHost = getRandom(deadCollectorHosts);
+    }
+
+    LOG.info("After refresh, new collector host : " + currentCollectorHost);
+  }
+
+  public String getCurrentCollectorHost() {
+    return currentCollectorHost;
+  }
+
+  public void onCollectorHostDown(String deadCollectorHost) {
+
+    if (deadCollectorHost == null) {
+      // Case 1: Collector is null. Ideally this can never happen
+      refreshCollectorHost(null);
+
+    } else if (deadCollectorHost.equals(currentCollectorHost) && 
numCollectors() > 1) {
+      // Case 2: Event informing us that the current collector is dead. We 
have not refreshed it yet.
+      if (testRefreshCounter()) {
+        refreshCollectorHost(deadCollectorHost);
+      }
+    }
+    //Case 3 : Got a dead collector event. Already changed the collector to a 
new one.
+    //No-Op
+  }
+
+  private void testAndAddDeadCollectorsToLiveList() {
+    Set<String> liveHosts = new HashSet<>();
+
+    for (String deadHost : deadCollectorHosts) {
+      if (isValidAliveCollectorHost(clusterName, deadHost)) {
+        liveHosts.add(deadHost);
+      }
+    }
+
+    for (String liveHost : liveHosts) {
+      LOG.info("Removing collector " + liveHost +  " from dead list to live 
list");
+      deadCollectorHosts.remove(liveHost);
+      liveCollectorHosts.add(liveHost);
+    }
+  }
+
+  private boolean isValidAliveCollectorHost(String clusterName, String 
collectorHost) {
+
+    return ((collectorHost != null) &&
+      HostStatusHelper.isHostLive(managementController, clusterName, 
collectorHost) &&
+      HostStatusHelper.isHostComponentLive(managementController, clusterName, 
collectorHost, "AMBARI_METRICS",
+        Role.METRICS_COLLECTOR.name()));
+  }
+
+  /*
+    A refresh counter to track number of collector down events received. If it 
exceeds the limit,
+    then we go ahead and refresh the collector.
+   */
+  private boolean testRefreshCounter() {
+    collectorDownRefreshCounter.incrementAndGet();
+    if (collectorDownRefreshCounter.get() == collectorDownRefreshCounterLimit) 
{
+      collectorDownRefreshCounter = new AtomicInteger(0);
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isCollectorHostLive() {
+    for (String host : liveCollectorHosts) {
+      if (HostStatusHelper.isHostLive(managementController, clusterName, 
host)) {
+        return true;
+      }
+    }
+    return false;
+    }
+
+  public boolean isCollectorComponentAlive() {
+
+    //Check in live hosts
+    for (String host : liveCollectorHosts) {
+      if (HostStatusHelper.isHostComponentLive(managementController, 
clusterName, host, "AMBARI_METRICS",
+        Role.METRICS_COLLECTOR.name())) {
+        return true;
+      }
+    }
+
+    //Check in dead hosts. Don't update live and dead lists. Can be done on 
refresh call.
+    for (String host : deadCollectorHosts) {
+      if (HostStatusHelper.isHostComponentLive(managementController, 
clusterName, host, "AMBARI_METRICS",
+        Role.METRICS_COLLECTOR.name())) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private int numCollectors() {
+    return this.liveCollectorHosts.size() + deadCollectorHosts.size();
+  }
+
+  private String getRandom(Set<String> collectorSet) {
+    int randIndex = new Random().nextInt(collectorSet.size());
+    int i = 0;
+    for(String host : collectorSet)
+    {
+      if (i == randIndex) {
+        return host;
+      }
+      i = i + 1;
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java
new file mode 100644
index 0000000..3eb03ad
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsCollectorHAManager.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.events.MetricsCollectorHostDownEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/*
+  Class used as a gateway to retrieving/updating metric collector hosts for 
all managed clusters.
+ */
+public class MetricsCollectorHAManager {
+
+  @Inject
+  protected AmbariEventPublisher eventPublisher;
+
+  private Map<String, MetricsCollectorHAClusterState> clusterCollectorHAState;
+  protected final static Logger LOG =
+    LoggerFactory.getLogger(MetricsCollectorHAManager.class);
+
+  public MetricsCollectorHAManager() {
+    clusterCollectorHAState = new HashMap<>();
+
+    if (null == eventPublisher && null != AmbariServer.getController()) {
+      eventPublisher = AmbariServer.getController().getAmbariEventPublisher();
+      if (eventPublisher != null) {
+        eventPublisher.register(this);
+      } else {
+        LOG.error("Unable to retrieve AmbariEventPublisher for Metric 
collector host event listening.");
+      }
+    }
+  }
+
+  public void addCollectorHost(String clusterName, String collectorHost) {
+
+    LOG.info("Adding collector host : " + collectorHost + " to cluster : " + 
clusterName);
+
+    if (! clusterCollectorHAState.containsKey(clusterName)) {
+      clusterCollectorHAState.put(clusterName, new 
MetricsCollectorHAClusterState(clusterName));
+    }
+    MetricsCollectorHAClusterState collectorHAClusterState = 
clusterCollectorHAState.get(clusterName);
+    collectorHAClusterState.addMetricsCollectorHost(collectorHost);
+  }
+
+  public String getCollectorHost(String clusterName) {
+
+    if (! clusterCollectorHAState.containsKey(clusterName)) {
+      clusterCollectorHAState.put(clusterName, new 
MetricsCollectorHAClusterState(clusterName));
+    }
+
+    MetricsCollectorHAClusterState collectorHAClusterState = 
clusterCollectorHAState.get(clusterName);
+    return collectorHAClusterState.getCurrentCollectorHost();
+  }
+
+  /**
+   * Handles {@link MetricsCollectorHostDownEvent}
+   *
+   * @param event
+   *          the change event.
+   */
+  @Subscribe
+  public void onMetricsCollectorHostDownEvent(MetricsCollectorHostDownEvent 
event) {
+
+    LOG.debug("MetricsCollectorHostDownEvent caught, Down collector : " + 
event.getCollectorHost());
+
+    String clusterName = event.getClusterName();
+    MetricsCollectorHAClusterState collectorHAClusterState = 
clusterCollectorHAState.get(clusterName);
+    collectorHAClusterState.onCollectorHostDown(event.getCollectorHost());
+  }
+
+  public boolean isEmpty() {
+    return this.clusterCollectorHAState.isEmpty();
+  }
+
+  public boolean isCollectorHostLive(String clusterName) {
+    MetricsCollectorHAClusterState metricsCollectorHAClusterState = 
this.clusterCollectorHAState.get(clusterName);
+    return metricsCollectorHAClusterState != null && 
metricsCollectorHAClusterState.isCollectorHostLive();
+  }
+
+  public boolean isCollectorComponentLive(String clusterName) {
+    MetricsCollectorHAClusterState metricsCollectorHAClusterState = 
this.clusterCollectorHAState.get(clusterName);
+    return metricsCollectorHAClusterState != null && 
metricsCollectorHAClusterState.isCollectorComponentAlive();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
index 9e81df4..455ee4d 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java
@@ -18,6 +18,7 @@
 package org.apache.ambari.server.controller.metrics.timeline;
 
 import com.google.common.collect.Sets;
+import com.google.inject.Inject;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
 import org.apache.ambari.server.controller.AmbariManagementController;
@@ -34,6 +35,8 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.events.MetricsCollectorHostDownEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.ambari.server.state.StackId;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -41,6 +44,7 @@ import 
org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -74,6 +78,8 @@ public abstract class AMSPropertyProvider extends 
MetricsPropertyProvider {
 
   private static final Map<String, String> JVM_PROCESS_NAMES = new 
HashMap<>(2);
 
+  private AmbariEventPublisher ambariEventPublisher;
+
   static {
     JVM_PROCESS_NAMES.put("HBASE_MASTER", "Master.");
     JVM_PROCESS_NAMES.put("HBASE_REGIONSERVER", "RegionServer.");
@@ -93,6 +99,10 @@ public abstract class AMSPropertyProvider extends 
MetricsPropertyProvider {
       componentNamePropertyId);
 
     this.metricCache = cacheProvider.getTimelineMetricsCache();
+
+    if (AmbariServer.getController() != null) {
+      this.ambariEventPublisher = 
AmbariServer.getController().getAmbariEventPublisher();
+    }
   }
 
   protected String getOverridenComponentName(Resource resource) {
@@ -225,27 +235,36 @@ public abstract class AMSPropertyProvider extends 
MetricsPropertyProvider {
         for (String hostNamesBatch : hostNamesBatches) {
           // Allow for multiple requests since host metrics for a
           // hostcomponent need the HOST appId
-          if (!hostComponentHostMetrics.isEmpty()) {
-            String hostComponentHostMetricParams = 
getSetString(processRegexps(hostComponentHostMetrics), -1);
-            setQueryParams(hostComponentHostMetricParams, hostNamesBatch, 
true, componentName);
-            TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
-                    getTimelineAppMetricCacheKey(hostComponentHostMetrics,
-                            componentName, hostNamesBatch, 
uriBuilder.toString()), componentName);
-
-            if (metricsResponse != null) {
-              
timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
+          try {
+            if (!hostComponentHostMetrics.isEmpty()) {
+              String hostComponentHostMetricParams = 
getSetString(processRegexps(hostComponentHostMetrics), -1);
+              setQueryParams(hostComponentHostMetricParams, hostNamesBatch, 
true, componentName);
+              TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
+                getTimelineAppMetricCacheKey(hostComponentHostMetrics,
+                  componentName, hostNamesBatch, uriBuilder.toString()), 
componentName);
+
+              if (metricsResponse != null) {
+                
timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
+              }
             }
-          }
 
-          if (!nonHostComponentMetrics.isEmpty()) {
-            String nonHostComponentHostMetricParams = 
getSetString(processRegexps(nonHostComponentMetrics), -1);
-            setQueryParams(nonHostComponentHostMetricParams, hostNamesBatch, 
false, componentName);
-            TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
-                    getTimelineAppMetricCacheKey(nonHostComponentMetrics,
-                            componentName, hostNamesBatch, 
uriBuilder.toString()), componentName);
+            if (!nonHostComponentMetrics.isEmpty()) {
+              String nonHostComponentHostMetricParams = 
getSetString(processRegexps(nonHostComponentMetrics), -1);
+              setQueryParams(nonHostComponentHostMetricParams, hostNamesBatch, 
false, componentName);
+              TimelineMetrics metricsResponse = getTimelineMetricsFromCache(
+                getTimelineAppMetricCacheKey(nonHostComponentMetrics,
+                  componentName, hostNamesBatch, uriBuilder.toString()), 
componentName);
 
-            if (metricsResponse != null) {
-              
timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
+              if (metricsResponse != null) {
+                
timelineMetrics.getMetrics().addAll(metricsResponse.getMetrics());
+              }
+            }
+          } catch (IOException io) {
+            if (io instanceof SocketTimeoutException || io instanceof 
ConnectException) {
+              if (ambariEventPublisher != null) {
+                ambariEventPublisher.publish(new 
MetricsCollectorHostDownEvent(clusterName, uriBuilder.getHost()));
+              }
+              throw io;
             }
           }
 
@@ -562,7 +581,6 @@ public abstract class AMSPropertyProvider extends 
MetricsPropertyProvider {
     Map<String, Map<TemporalInfo, MetricsRequest>> requestMap =
       new HashMap<String, Map<TemporalInfo, MetricsRequest>>();
 
-    String collectorHostName = null;
     String collectorPort = null;
     Map<String, Boolean> clusterCollectorComponentLiveMap = new HashMap<>();
     Map<String, Boolean> clusterCollectorHostLiveMap = new HashMap<>();
@@ -623,9 +641,7 @@ public abstract class AMSPropertyProvider extends 
MetricsPropertyProvider {
         requestMap.put(clusterName, requests);
       }
 
-      if (collectorHostName == null) {
-        collectorHostName = hostProvider.getCollectorHostName(clusterName, 
TIMELINE_METRICS);
-      }
+      String collectorHost = hostProvider.getCollectorHostName(clusterName, 
TIMELINE_METRICS);
 
       if (collectorPort == null) {
         collectorPort = hostProvider.getCollectorPort(clusterName, 
TIMELINE_METRICS);
@@ -656,7 +672,7 @@ public abstract class AMSPropertyProvider extends 
MetricsPropertyProvider {
             MetricsRequest metricsRequest = requests.get(temporalInfo);
             if (metricsRequest == null) {
               metricsRequest = new MetricsRequest(temporalInfo,
-                getAMSUriBuilder(collectorHostName,
+                getAMSUriBuilder(collectorHost,
                   collectorPort != null ? Integer.parseInt(collectorPort) : 
COLLECTOR_DEFAULT_PORT,
                   configuration.isHttpsEnabled()),
                   (String) resource.getPropertyValue(clusterNamePropertyId));

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
index 3688742..f9bfa59 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProvider.java
@@ -17,7 +17,10 @@
  */
 package org.apache.ambari.server.controller.metrics.timeline;
 
+import com.google.inject.Inject;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
 import org.apache.ambari.server.controller.internal.URLStreamProvider;
 import org.apache.ambari.server.controller.metrics.MetricHostProvider;
@@ -33,11 +36,14 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.events.MetricsCollectorHostDownEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.http.client.utils.URIBuilder;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,6 +60,7 @@ public class AMSReportPropertyProvider extends 
MetricsReportPropertyProvider {
   MetricsRequestHelper requestHelper;
   private static AtomicInteger printSkipPopulateMsgHostCounter = new 
AtomicInteger(0);
   private static AtomicInteger printSkipPopulateMsgHostCompCounter = new 
AtomicInteger(0);
+  private AmbariEventPublisher ambariEventPublisher;
 
   public AMSReportPropertyProvider(Map<String, Map<String, PropertyInfo>> 
componentPropertyInfoMap,
                                    URLStreamProvider streamProvider,
@@ -67,6 +74,9 @@ public class AMSReportPropertyProvider extends 
MetricsReportPropertyProvider {
 
     this.metricCache = cacheProvider.getTimelineMetricsCache();
     this.requestHelper = new MetricsRequestHelper(streamProvider);
+    if (AmbariServer.getController() != null) {
+      this.ambariEventPublisher = 
AmbariServer.getController().getAmbariEventPublisher();
+    }
   }
 
   /**
@@ -215,10 +225,13 @@ public class AMSReportPropertyProvider extends 
MetricsReportPropertyProvider {
         }
       } catch (IOException io) {
         timelineMetrics = null;
-        if (io instanceof SocketTimeoutException) {
+        if (io instanceof SocketTimeoutException || io instanceof 
ConnectException) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Skip populating metrics on socket timeout exception.");
           }
+          if (ambariEventPublisher != null) {
+            ambariEventPublisher.publish(new 
MetricsCollectorHostDownEvent(clusterName, host));
+          }
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
index 1df0f6a..e7d6c0b 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/MetricsRequestHelper.java
@@ -37,6 +37,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.SocketTimeoutException;
 import java.net.URISyntaxException;
@@ -116,8 +117,8 @@ public class MetricsRequestHelper {
         LOG.debug(errorMsg, io);
       }
 
-      if (io instanceof SocketTimeoutException) {
-        errorMsg += " Cannot connect to collector: SocketTimeoutException.";
+      if (io instanceof SocketTimeoutException || io instanceof 
ConnectException) {
+        errorMsg = "Cannot connect to collector: SocketTimeoutException for " 
+ uriBuilder.getHost();
         LOG.error(errorMsg);
         throw io;
       }

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
index b5fe05e..b852cc2 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -75,6 +76,9 @@ public class TimelineMetricCache extends 
UpdatingSelfPopulatingCache {
         if (t instanceof SocketTimeoutException) {
           throw new SocketTimeoutException(t.getMessage());
         }
+        if (t instanceof ConnectException) {
+          throw new ConnectException(t.getMessage());
+        }
       }
     }
 
@@ -132,6 +136,12 @@ public class TimelineMetricCache extends 
UpdatingSelfPopulatingCache {
 
       LOG.debug("New temporal info: " + newKey.getTemporalInfo() +
         " for : " + existingKey.getMetricNames());
+
+      if (existingKey.getSpec() == null || 
!existingKey.getSpec().equals(newKey.getSpec())) {
+        existingKey.setSpec(newKey.getSpec());
+        LOG.debug("New spec: " + newKey.getSpec() +
+          " for : " + existingKey.getMetricNames());
+      }
     }
 
     return super.get(key);

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
index 912c441..de9e7b6 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariEvent.java
@@ -120,7 +120,12 @@ public abstract class AmbariEvent {
     /**
      * Cluster configuration changed.
      */
-    CLUSTER_CONFIG_CHANGED;
+    CLUSTER_CONFIG_CHANGED,
+
+    /**
+     * Metrics Collector force refresh needed.
+     */
+    METRICS_COLLECTOR_HOST_DOWN;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java
new file mode 100644
index 0000000..0719477
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/MetricsCollectorHostDownEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+public class MetricsCollectorHostDownEvent extends AmbariEvent{
+
+
+  private final String clusterName;
+
+  private final String collectorHost;
+
+  /**
+   * Constructor.
+   *
+   * */
+
+  public MetricsCollectorHostDownEvent(String clusterName, String 
collectorHost) {
+    super(AmbariEventType.METRICS_COLLECTOR_HOST_DOWN);
+    this.clusterName = clusterName;
+    this.collectorHost = collectorHost;
+  }
+
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public String getCollectorHost() {
+    return collectorHost;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py
index 2c7119b..c26eafc 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_collector.py
@@ -58,7 +58,7 @@ class AmsCollector(Script):
   def status(self, env):
     import status_params
     env.set_params(status_params)
-    check_service_status(name='collector')
+    check_service_status(env, name='collector')
     
   def get_log_folder(self):
     import params

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py
index 747a687..542663f 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana.py
@@ -61,7 +61,7 @@ class AmsGrafana(Script):
   def status(self, env):
     import status_params
     env.set_params(status_params)
-    check_service_status(name='grafana')
+    check_service_status(env, name='grafana')
 
   def get_pid_files(self):
     import status_params

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
index d53c1fc..a377f6d 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_monitor.py
@@ -53,7 +53,7 @@ class AmsMonitor(Script):
   def status(self, env):
     import status_params
     env.set_params(status_params)
-    check_service_status(name='monitor')
+    check_service_status(env, name='monitor')
     
   def get_log_folder(self):
     import params

http://git-wip-us.apache.org/repos/asf/ambari/blob/0855174b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
index 14af3ad..0b24ac0 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/status.py
@@ -31,13 +31,13 @@ def get_collector_pid_files():
   return pid_files
 
 @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT)
-def check_service_status(name):
+def check_service_status(env, name):
   import status_params
   env.set_params(status_params)
 
   from resource_management.libraries.functions.check_process_status import 
check_process_status
   if name=='collector':
-    for pid_files in get_collector_pid_files():
+    for pid_file in get_collector_pid_files():
       check_process_status(pid_file)
   elif name == 'monitor':
     check_process_status(status_params.monitor_pid_file)

Reply via email to