AMBARI-7410. Alerts: add aggregate type and processing (ncole)

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/179290e0
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/179290e0
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/179290e0

Branch: refs/heads/trunk
Commit: 179290e0fc5370b9d46a18d5ba9d187d6c1112c8
Parents: ba325d0
Author: Nate Cole <nc...@hortonworks.com>
Authored: Fri Sep 19 10:55:54 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Fri Sep 19 10:56:10 2014 -0400

----------------------------------------------------------------------
 .../server/api/services/AmbariMetaInfo.java     |  22 ++-
 .../AlertDefinitionResourceProvider.java        |  11 +-
 .../listeners/AlertAggregateListener.java       | 144 +++++++++++++++
 .../events/listeners/AlertReceivedListener.java |  14 +-
 .../apache/ambari/server/orm/dao/AlertsDAO.java |  52 ++++++
 .../server/orm/entities/AlertCurrentEntity.java |   1 +
 .../orm/entities/AlertDefinitionEntity.java     |  14 +-
 .../state/alert/AlertDefinitionFactory.java     |   2 +-
 .../ambari/server/state/alert/MetricSource.java |  46 ++++-
 .../stacks/HDP/2.0.6/services/HBASE/alerts.json |   4 +-
 .../stacks/HDP/2.0.6/services/HDFS/alerts.json  |  30 +++-
 .../server/api/services/AmbariMetaInfoTest.java |   4 +-
 .../AlertDefinitionResourceProviderTest.java    |   4 +-
 .../AlertGroupResourceProviderTest.java         |   2 +-
 .../apache/ambari/server/orm/OrmTestHelper.java |   3 +-
 .../server/orm/dao/AlertDefinitionDAOTest.java  |   9 +-
 .../server/orm/dao/AlertDispatchDAOTest.java    |   3 +-
 .../ambari/server/orm/dao/AlertsDAOTest.java    |  83 ++++++++-
 .../state/cluster/AlertDataManagerTest.java     | 174 ++++++++++++++++++-
 .../stacks/HDP/2.0.5/services/HDFS/alerts.json  |  19 +-
 20 files changed, 592 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index e3e5779..ec53afc 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -31,7 +31,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Scanner;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -48,6 +47,7 @@ import org.apache.ambari.server.api.util.StackExtensionHelper;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.customactions.ActionDefinition;
 import org.apache.ambari.server.customactions.ActionDefinitionManager;
+import org.apache.ambari.server.events.listeners.AlertAggregateListener;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.MetainfoDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
@@ -66,6 +66,7 @@ import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinitionFactory;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.apache.ambari.server.state.stack.LatestRepoCallable;
 import org.apache.ambari.server.state.stack.MetricDefinition;
 import org.apache.ambari.server.state.stack.RepositoryXml;
@@ -1205,9 +1206,7 @@ public class AmbariMetaInfo {
     }
 
     // for every cluster
-    Set<Entry<String, Cluster>> clusterEntries = clusterMap.entrySet();
-    for (Entry<String, Cluster> clusterEntry : clusterEntries) {
-      Cluster cluster = clusterEntry.getValue();
+    for (Cluster cluster : clusterMap.values()) {
       long clusterId = cluster.getClusterId();
       StackId stackId = cluster.getDesiredStackVersion();
       StackInfo stackInfo = getStackInfo(stackId.getStackName(),
@@ -1246,7 +1245,7 @@ public class AmbariMetaInfo {
       List<AlertDefinitionEntity> persist = new 
ArrayList<AlertDefinitionEntity>();
       List<AlertDefinitionEntity> entities = 
alertDefinitionDao.findAll(clusterId);
 
-      // create a map of the enntities for fast extraction
+      // create a map of the entities for fast extraction
       Map<String, AlertDefinitionEntity> mappedEntities = new HashMap<String, 
AlertDefinitionEntity>(100);
       for (AlertDefinitionEntity entity : entities) {
         mappedEntities.put(entity.getDefinitionName(), entity);
@@ -1279,9 +1278,20 @@ public class AmbariMetaInfo {
           LOG.info("Merging Alert Definition {} into the database",
               entity.getDefinitionName());
         }
-
         alertDefinitionDao.createOrUpdate(entity);
       }
+      
+      // all definitions have been resolved.  pull and initialize the 
aggregates
+      for (AlertDefinitionEntity def : 
alertDefinitionDao.findAll(cluster.getClusterId())) {
+        if (def.getSourceType().equals(SourceType.AGGREGATE)) {
+          AlertDefinition realDef = alertDefinitionFactory.coerce(def);
+          
+          AlertAggregateListener listener = 
injector.getInstance(AlertAggregateListener.class);
+          
+          listener.addAggregateType(cluster.getClusterId(), realDef); 
+        }
+      }
+      
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
index c63f063..7cb9886 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
@@ -310,7 +310,7 @@ public class AlertDefinitionResourceProvider extends 
AbstractControllerResourceP
     String definitionName = (String) requestMap.get(ALERT_DEF_NAME);
     String serviceName = (String) requestMap.get(ALERT_DEF_SERVICE_NAME);
     String componentName = (String) requestMap.get(ALERT_DEF_COMPONENT_NAME);
-    String sourceType = (String) requestMap.get(ALERT_DEF_SOURCE_TYPE);
+    String type = (String) requestMap.get(ALERT_DEF_SOURCE_TYPE);
     String label = (String) requestMap.get(ALERT_DEF_LABEL);
     String desiredScope = (String) requestMap.get(ALERT_DEF_SCOPE);
 
@@ -330,6 +330,11 @@ public class AlertDefinitionResourceProvider extends 
AbstractControllerResourceP
     if (null != desiredScope && desiredScope.length() > 0) {
       scope = Scope.valueOf(desiredScope);
     }
+    
+    SourceType sourceType = null;
+    if (null != type && type.length() > 0) {
+      sourceType = SourceType.valueOf(type);
+    }
 
     // if not specified when creating an alert definition, the scope is
     // assumed to be ANY
@@ -354,7 +359,7 @@ public class AlertDefinitionResourceProvider extends 
AbstractControllerResourceP
       throw new IllegalArgumentException("Service name must be specified");
     }
 
-    if (bCreate && StringUtils.isEmpty(sourceType)) {
+    if (bCreate && null == sourceType) {
       throw new IllegalArgumentException(String.format(
           "Source type must be specified and one of %s",
           EnumSet.allOf(SourceType.class)));
@@ -439,7 +444,7 @@ public class AlertDefinitionResourceProvider extends 
AbstractControllerResourceP
     }
 
     if (null != enabled) {
-      entity.setEnabled(enabled);
+      entity.setEnabled(enabled.booleanValue());
     }
 
     if (null != interval) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java
new file mode 100644
index 0000000..8340c26
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertAggregateListener.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners;
+
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.Reporting;
+import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+/**
+ * Used to aggregate alerts. 
+ */
+public class AlertAggregateListener {
+
+  @Inject
+  private AlertsDAO m_alertsDao = null;
+
+  private AlertEventPublisher m_publisher = null;
+  private Map<Long, Map<String, AlertDefinition>> m_aggregateMap = 
+      new HashMap<Long, Map<String, AlertDefinition>>();
+  
+  @Inject
+  public AlertAggregateListener(AlertEventPublisher publisher) {
+    m_publisher = publisher;
+
+    publisher.register(this);
+  }
+  
+  /**
+   * Consume an alert that was received.
+   */
+  @Subscribe
+  public void onAlertEvent(AlertReceivedEvent event) {
+    AlertDefinition def = getAggregateDefinition(event.getClusterId(), 
event.getAlert().getName());
+    
+    if (null == def || null == m_alertsDao) {
+      return;
+    }
+    
+    AggregateSource as = (AggregateSource) def.getSource();
+    
+    AlertSummaryDTO summary = m_alertsDao.findAggregateCounts(
+        event.getClusterId(), as.getAlertName());
+    
+    Alert alert = new Alert(def.getName(), null, def.getServiceName(),
+        null, null, AlertState.UNKNOWN);
+    alert.setLabel(def.getLabel());
+    alert.setTimestamp(System.currentTimeMillis());
+    
+    if (0 == summary.getOkCount()) {
+      alert.setText("Cannot determine, there are no records");
+    } else if (summary.getUnknownCount() > 0) {
+      alert.setText("There are alerts with status UNKNOWN.");
+    } else {
+      Reporting reporting = as.getReporting();
+      
+      int numerator = summary.getCriticalCount() + summary.getWarningCount();
+      int denominator = summary.getOkCount();
+      double value = (double)(numerator) / denominator;
+      
+      if (value > reporting.getCritical().getValue().doubleValue()) {
+        alert.setState(AlertState.CRITICAL);
+        alert.setText(MessageFormat.format(reporting.getCritical().getText(),
+            Integer.valueOf(denominator), Integer.valueOf(numerator)));
+        
+      } else if (value > reporting.getWarning().getValue().doubleValue()) {
+        alert.setState(AlertState.WARNING);
+        alert.setText(MessageFormat.format(reporting.getWarning().getText(),
+            Integer.valueOf(denominator), Integer.valueOf(numerator)));
+        
+      } else {
+        alert.setState(AlertState.OK);
+        alert.setText(MessageFormat.format(reporting.getOk().getText(),
+            Integer.valueOf(denominator), Integer.valueOf(numerator)));
+      }
+      
+    }
+    
+    // make a new event and allow others to consume it
+    AlertReceivedEvent aggEvent = new AlertReceivedEvent(event.getClusterId(), 
alert);
+    
+    m_publisher.publish(aggEvent);
+  }
+  
+  private AlertDefinition getAggregateDefinition(long clusterId, String name) {
+    Long id = Long.valueOf(clusterId);
+    if (!m_aggregateMap.containsKey(id))
+      return null;
+    
+    if (!m_aggregateMap.get(id).containsKey(name))
+      return null;
+    
+    return m_aggregateMap.get(id).get(name);
+  }
+
+  /**
+   * @param source the aggregate source
+   */
+  public void addAggregateType(long clusterId, AlertDefinition definition) {
+    Long id = Long.valueOf(clusterId);
+    
+    if (!m_aggregateMap.containsKey(id)) {
+      m_aggregateMap.put(id, new HashMap<String, AlertDefinition>());
+    }
+    
+    Map<String, AlertDefinition> map = m_aggregateMap.get(id);
+    
+    AggregateSource as = (AggregateSource) definition.getSource();
+    
+    map.put(as.getAlertName(), definition);
+  }
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
index 0f1b49a..fb7a608 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/AlertReceivedListener.java
@@ -85,9 +85,15 @@ public class AlertReceivedListener {
     long clusterId = event.getClusterId();
     Alert alert = event.getAlert();
 
-    AlertCurrentEntity current = 
m_alertsDao.findCurrentByHostAndName(clusterId,
-        alert.getHost(), alert.getName());
-
+    AlertCurrentEntity current = null;
+    
+    if (null == alert.getHost()) {
+      current = m_alertsDao.findCurrentByNameNoHost(clusterId, 
alert.getName());
+    } else {
+      current = m_alertsDao.findCurrentByHostAndName(clusterId, 
alert.getHost(),
+          alert.getName());
+    }
+    
     if (null == current) {
       AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
           alert.getName());
@@ -122,7 +128,7 @@ public class AlertReceivedListener {
       current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
 
       m_alertsDao.merge(current);
-
+      
       // broadcast the alert changed event for other subscribers
       AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent(
           event.getClusterId(), event.getAlert(), current.getAlertHistory(),

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
index a274089..aba41a5 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
@@ -457,5 +457,57 @@ public class AlertsDAO {
   public void remove(AlertCurrentEntity alert) {
     entityManagerProvider.get().remove(merge(alert));
   }
+  
+  /**
+   * Finds the aggregate counts for an alert name, across all hosts. 
+   * @param clusterId the cluster id
+   * @param alertName the name of the alert to find the aggregate
+   * @return the summary data
+   */
+  @RequiresSession
+  public AlertSummaryDTO findAggregateCounts(long clusterId, String alertName) 
{
+    StringBuilder sb = new StringBuilder();
+    sb.append("SELECT NEW %s (");
+    sb.append("COUNT(history), ");
+    sb.append("SUM(CASE WHEN history.alertState = %s.%s THEN 1 ELSE 0 END), ");
+    sb.append("SUM(CASE WHEN history.alertState = %s.%s THEN 1 ELSE 0 END), ");
+    sb.append("SUM(CASE WHEN history.alertState = %s.%s THEN 1 ELSE 0 END)) ");
+    sb.append("FROM AlertCurrentEntity alert JOIN alert.alertHistory history 
WHERE history.clusterId = :clusterId");
+    sb.append(" AND history.alertDefinition.definitionName = :definitionName");
+    
+    String str = String.format(sb.toString(),
+        AlertSummaryDTO.class.getName(),
+        AlertState.class.getName(), AlertState.WARNING.name(),
+        AlertState.class.getName(), AlertState.CRITICAL.name(),
+        AlertState.class.getName(), AlertState.UNKNOWN.name());
+    
+    TypedQuery<AlertSummaryDTO> query = 
entityManagerProvider.get().createQuery(
+        str, AlertSummaryDTO.class);
+    
+    query.setParameter("clusterId", Long.valueOf(clusterId));
+    query.setParameter("definitionName", alertName);
+    
+    return daoUtils.selectSingle(query);    
+  }
+  
+  /**
+   * Locate the current alert for the provided service and alert name, but when
+   * host is not set ({@code IS NULL}).
+   * @param clusterId the cluster id
+   * @param serviceName the service name
+   * @param alertName the name of the alert
+   * @return the current record, or {@code null} if not found
+   */
+  @RequiresSession
+  public AlertCurrentEntity findCurrentByNameNoHost(long clusterId, String 
alertName) {
+    
+    TypedQuery<AlertCurrentEntity> query = 
entityManagerProvider.get().createNamedQuery(
+        "AlertCurrentEntity.findByNameAndNoHost", AlertCurrentEntity.class);
+
+    query.setParameter("clusterId", Long.valueOf(clusterId));
+    query.setParameter("definitionName", alertName);
+
+    return daoUtils.selectOne(query);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
index c24ac17..5b54d57 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
@@ -51,6 +51,7 @@ import org.apache.ambari.server.state.MaintenanceState;
     @NamedQuery(name = "AlertCurrentEntity.findByService", query = "SELECT 
alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE 
history.clusterId = :clusterId AND history.serviceName = :serviceName AND 
history.alertDefinition.scope IN :inlist"),
     @NamedQuery(name = "AlertCurrentEntity.findByHost", query = "SELECT alert 
FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE 
history.clusterId = :clusterId AND history.hostName = :hostName AND 
history.alertDefinition.scope IN :inlist"),
     @NamedQuery(name = "AlertCurrentEntity.findByHostAndName", query = "SELECT 
alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history WHERE 
history.clusterId = :clusterId AND history.alertDefinition.definitionName = 
:definitionName AND history.hostName = :hostName"),
+    @NamedQuery(name = "AlertCurrentEntity.findByNameAndNoHost", query = 
"SELECT alert FROM AlertCurrentEntity alert JOIN alert.alertHistory history 
WHERE history.clusterId = :clusterId AND history.alertDefinition.definitionName 
= :definitionName AND history.hostName IS NULL"),
     @NamedQuery(name = "AlertCurrentEntity.removeByHistoryId", query = "DELETE 
FROM AlertCurrentEntity alert WHERE alert.alertHistory.alertId = :historyId"),
     @NamedQuery(name = "AlertCurrentEntity.removeByDefinitionId", query = 
"DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.definitionId 
= :definitionId") })
 public class AlertCurrentEntity {

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
index b4b8a44..8548fda 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertDefinitionEntity.java
@@ -43,6 +43,7 @@ import javax.persistence.TableGenerator;
 import javax.persistence.UniqueConstraint;
 
 import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.SourceType;
 
 /**
  * The {@link AlertDefinitionEntity} class is used to model an alert that needs
@@ -106,7 +107,8 @@ public class AlertDefinitionEntity {
   private String serviceName;
 
   @Column(name = "source_type", nullable = false, length = 255)
-  private String sourceType;
+  @Enumerated(value = EnumType.STRING)
+  private SourceType sourceType;
 
   /**
    * Bi-directional many-to-many association to {@link AlertGroupEntity}
@@ -266,7 +268,7 @@ public class AlertDefinitionEntity {
    *         otherwise.
    */
   public boolean getEnabled() {
-    return enabled == 0 ? false : true;
+    return enabled == Integer.valueOf(0) ? false : true;
   }
 
   /**
@@ -277,7 +279,7 @@ public class AlertDefinitionEntity {
    *          otherwise.
    */
   public void setEnabled(boolean enabled) {
-    this.enabled = enabled ? 1 : 0;
+    this.enabled = enabled ? Integer.valueOf(1) : Integer.valueOf(0);
   }
 
   /**
@@ -344,16 +346,14 @@ public class AlertDefinitionEntity {
   /**
    * @return
    */
-  // !!! FIXME: Create enumeration for this
-  public String getSourceType() {
+  public SourceType getSourceType() {
     return sourceType;
   }
 
   /**
    * @param sourceType
    */
-  // !!! FIXME: Create enumeration for this
-  public void setSourceType(String sourceType) {
+  public void setSourceType(SourceType sourceType) {
     this.sourceType = sourceType;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
index f16f4a6..f8e44bd 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
@@ -203,7 +203,7 @@ public class AlertDefinitionFactory {
     entity.setScope(scope);
 
     Source source = definition.getSource();
-    entity.setSourceType(source.getType().name());
+    entity.setSourceType(source.getType());
 
     try {
       String sourceJson = m_gson.toJson(source);

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java
index 530c1bb..15351d9 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/MetricSource.java
@@ -17,6 +17,10 @@
  */
 package org.apache.ambari.server.state.alert;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import com.google.gson.annotations.SerializedName;
 
 /**
@@ -31,7 +35,7 @@ public class MetricSource extends Source {
   private String m_uri = null;
   
   @SerializedName("jmx")
-  private Object jmxInfo = null;
+  private JmxInfo jmxInfo = null;
 
   @SerializedName("ganglia")
   private String gangliaInfo = null;
@@ -39,7 +43,7 @@ public class MetricSource extends Source {
   /**
    * @return the jmx info, if this metric is jmx-based
    */
-  public Object getJmxInfo() {
+  public JmxInfo getJmxInfo() {
     return jmxInfo;
   }
 
@@ -116,4 +120,42 @@ public class MetricSource extends Source {
 
     return true;
   }
+  
+  /**
+   * Represents the {@code jmx} element in a Metric alert.
+   */
+  public static class JmxInfo {
+    @SerializedName("property_list")
+    private List<String> propertyList;
+    
+    private String value;
+    
+    public List<String> getPropertyList() {
+      return propertyList;
+    }
+    
+    public String getValue() {
+      return value;
+    }
+    
+    @Override
+    public boolean equals(Object object) {
+      if (!JmxInfo.class.isInstance(object)) {
+        return false;
+      }
+      
+      JmxInfo other = (JmxInfo)object;
+      
+      List<String> list1 = new ArrayList<String>(propertyList);
+      List<String> list2 = new ArrayList<String>(other.propertyList);
+      
+      if ((null == list1 && null != list2) || (null != list1 && null == 
list2)) {
+        return false;
+      }
+      
+      // !!! even if out of order, this is enough to fail
+      return list1.equals(list2);
+      
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/alerts.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/alerts.json 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/alerts.json
index 3958106..2e21aad 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/alerts.json
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HBASE/alerts.json
@@ -5,7 +5,7 @@
       "name": "hbase_master_process",
       "label": "HBase Master Process",
       "interval": 1,
-      "scope": "any",
+      "scope": "ANY",
       "source": {
         "type": "PORT",
         "uri": "{{hbase-site/hbase.master.port}}",
@@ -21,4 +21,4 @@
       }
     }
   ]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json
index 00286b1..88503af 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/HDFS/alerts.json
@@ -1,11 +1,35 @@
 {
-  "service": [],
+  "service": [
+    {
+      "name": "percent_datanode",
+      "label": "Percent DataNodes live",
+      "interval": 1,
+      "scope": "SERVICE",
+      "source": {
+        "type": "AGGREGATE",
+        "alert_name": "datanode_process",
+        "reporting": {
+          "ok": {
+            "text": "OK: total: <{0}>, affected: <{1}>"
+          },
+          "warning": {
+            "text": "OK: total: <{0}>, affected: <{1}>",
+            "value": 0.1
+          },
+          "critical": {
+            "text": "CRITICAL: total: <{0}>, affected <{1}>",
+            "value": 0.3
+          }
+        }
+      }
+    }
+  ],
   "NAMENODE": [
     {
       "name": "namenode_process",
       "label": "NameNode Process",
       "interval": 1,
-      "scope": "any",
+      "scope": "ANY",
       "source": {
         "type": "PORT",
         "uri": "{{hdfs-site/dfs.namenode.http-address}}",
@@ -24,7 +48,7 @@
       "name": "check_cpu",
       "label": "NameNode host CPU utilization",
       "interval": 2,
-      "scope": "any",
+      "scope": "ANY",
       "source": {
         "type": "METRIC",
         "uri": "{{hdfs-site/dfs.namenode.http-address}}",

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
index 66c0b3d..23d0efa 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
@@ -1530,7 +1530,7 @@ public class AmbariMetaInfoTest {
     expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).anyTimes();
     expect(cluster.getDesiredStackVersion()).andReturn(stackId).anyTimes();
     expect(cluster.getServices()).andReturn(clusterServiceMap).anyTimes();
-    expect(dao.findAll(EasyMock.anyInt())).andReturn(entities);
+    expect(dao.findAll(EasyMock.anyInt())).andReturn(entities).atLeastOnce();
     dao.createOrUpdate(EasyMock.anyObject(AlertDefinitionEntity.class));
     EasyMock.expectLastCall().times(4);
 
@@ -1549,7 +1549,7 @@ public class AmbariMetaInfoTest {
     expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).anyTimes();
     expect(cluster.getDesiredStackVersion()).andReturn(stackId).anyTimes();
     expect(cluster.getServices()).andReturn(clusterServiceMap).anyTimes();
-    expect(dao.findAll(EasyMock.anyInt())).andReturn(entities);
+    expect(dao.findAll(EasyMock.anyInt())).andReturn(entities).atLeastOnce();
 
     EasyMock.replay(clusters, cluster, dao);
     metaInfo.reconcileAlertDefinitions(clusters);

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
index 7ea2ffe..78655dd 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
@@ -264,7 +264,7 @@ public class AlertDefinitionResourceProviderTest {
     Assert.assertEquals(Integer.valueOf(1), entity.getScheduleInterval());
     Assert.assertEquals(Scope.ANY, entity.getScope());
     Assert.assertEquals("HDFS", entity.getServiceName());
-    Assert.assertEquals("METRIC", entity.getSourceType());
+    Assert.assertEquals(SourceType.METRIC, entity.getSourceType());
     Assert.assertEquals("Mock Label (Create)", entity.getLabel());
 
     // verify Source
@@ -474,7 +474,7 @@ public class AlertDefinitionResourceProviderTest {
     entity.setHash(DEFINITION_UUID);
     entity.setScheduleInterval(Integer.valueOf(2));
     entity.setServiceName(null);
-    entity.setSourceType(SourceType.METRIC.name());
+    entity.setSourceType(SourceType.METRIC);
     entity.setSource(sourceJson);
     return Arrays.asList(entity);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
index ddfc75a..d2ce6fb 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertGroupResourceProviderTest.java
@@ -478,7 +478,7 @@ public class AlertGroupResourceProviderTest {
     entity.setHash(DEFINITION_UUID);
     entity.setScheduleInterval(Integer.valueOf(2));
     entity.setServiceName(null);
-    entity.setSourceType(SourceType.METRIC.name());
+    entity.setSourceType(SourceType.METRIC);
     entity.setSource(null);
 
     Set<AlertDefinitionEntity> definitions = new 
HashSet<AlertDefinitionEntity>();

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
index a9d126c..08cd7b8 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/OrmTestHelper.java
@@ -61,6 +61,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.orm.entities.UserEntity;
 import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.springframework.security.crypto.password.PasswordEncoder;
 import org.springframework.util.Assert;
 
@@ -330,7 +331,7 @@ public class OrmTestHelper {
     definition.setScheduleInterval(60);
     definition.setScope(Scope.SERVICE);
     definition.setSource("Source " + System.currentTimeMillis());
-    definition.setSourceType("SCRIPT");
+    definition.setSourceType(SourceType.SCRIPT);
 
     alertDefinitionDAO.create(definition);
     return alertDefinitionDAO.findById(definition.getDefinitionId());

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
index cdca3dd..3e20cd6 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDefinitionDAOTest.java
@@ -45,6 +45,7 @@ import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.NotificationState;
 import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -97,7 +98,7 @@ public class AlertDefinitionDAOTest {
       definition.setScheduleInterval(60);
       definition.setScope(Scope.SERVICE);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }
 
@@ -118,7 +119,7 @@ public class AlertDefinitionDAOTest {
       definition.setScheduleInterval(60);
       definition.setScope(Scope.SERVICE);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }
 
@@ -133,7 +134,7 @@ public class AlertDefinitionDAOTest {
       definition.setScheduleInterval(60);
       definition.setScope(Scope.HOST);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }
 
@@ -148,7 +149,7 @@ public class AlertDefinitionDAOTest {
       definition.setScheduleInterval(60);
       definition.setScope(Scope.HOST);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       dao.create(definition);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
index 2c3a876..7f4deda 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
@@ -41,6 +41,7 @@ import 
org.apache.ambari.server.orm.entities.AlertTargetEntity;
 import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.NotificationState;
 import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -431,7 +432,7 @@ public class AlertDispatchDAOTest {
       definition.setScheduleInterval(60);
       definition.setScope(Scope.SERVICE);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       definitionDao.create(definition);
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
index a397da4..9e638e8 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
@@ -40,6 +40,7 @@ import 
org.apache.ambari.server.orm.entities.AlertHistoryEntity;
 import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -84,7 +85,7 @@ public class AlertsDAOTest {
       definition.setScheduleInterval(Integer.valueOf(60));
       definition.setScope(Scope.SERVICE);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       definitionDao.create(definition);
     }
     
@@ -208,7 +209,7 @@ public class AlertsDAOTest {
     hostDef.setScheduleInterval(Integer.valueOf(60));
     hostDef.setScope(Scope.HOST);
     hostDef.setSource("HostService");
-    hostDef.setSourceType("SCRIPT");
+    hostDef.setSourceType(SourceType.SCRIPT);
     definitionDao.create(hostDef);
     
     // history for the definition
@@ -395,5 +396,83 @@ public class AlertsDAOTest {
     assertEquals(0, summary.getCriticalCount());
     assertEquals(0, summary.getCriticalCount());
     
+  }
+  
+  @Test
+  public void testFindAggregates() throws Exception {
+    // definition
+    AlertDefinitionEntity definition = new AlertDefinitionEntity();
+    definition.setDefinitionName("many_per_cluster");
+    definition.setServiceName("ServiceName");
+    definition.setComponentName(null);
+    definition.setClusterId(clusterId);
+    definition.setHash(UUID.randomUUID().toString());
+    definition.setScheduleInterval(Integer.valueOf(60));
+    definition.setScope(Scope.SERVICE);
+    definition.setSource("SourceScript");
+    definition.setSourceType(SourceType.SCRIPT);
+    definitionDao.create(definition);
+    
+    // history record #1 and current
+    AlertHistoryEntity history = new AlertHistoryEntity();
+    history.setAlertDefinition(definition);
+    history.setAlertInstance(null);
+    history.setAlertLabel("");
+    history.setAlertState(AlertState.OK);
+    history.setAlertText("");
+    history.setAlertTimestamp(Long.valueOf(1L));
+    history.setClusterId(clusterId);
+    history.setComponentName("");
+    history.setHostName("h1");
+    history.setServiceName("ServiceName");
+    
+    AlertCurrentEntity current = new AlertCurrentEntity();
+    current.setAlertHistory(history);
+    current.setLatestTimestamp(Long.valueOf(1L));
+    current.setOriginalTimestamp(Long.valueOf(1L));
+    dao.merge(current);
+    
+    // history record #2 and current
+    history = new AlertHistoryEntity();
+    history.setAlertDefinition(definition);
+    history.setAlertInstance(null);
+    history.setAlertLabel("");
+    history.setAlertState(AlertState.OK);
+    history.setAlertText("");
+    history.setAlertTimestamp(Long.valueOf(1L));
+    history.setClusterId(clusterId);
+    history.setComponentName("");
+    history.setHostName("h2");
+    history.setServiceName("ServiceName");
+    
+    current = new AlertCurrentEntity();
+    current.setAlertHistory(history);
+    current.setLatestTimestamp(Long.valueOf(1L));
+    current.setOriginalTimestamp(Long.valueOf(1L));
+    dao.merge(current);
+    
+    AlertSummaryDTO summary = dao.findAggregateCounts(clusterId.longValue(), 
"many_per_cluster");
+    assertEquals(2, summary.getOkCount());
+    assertEquals(0, summary.getWarningCount());
+    assertEquals(0, summary.getCriticalCount());
+    assertEquals(0, summary.getUnknownCount());
+    
+    AlertCurrentEntity c = dao.findCurrentByHostAndName(clusterId.longValue(),
+        "h2", "many_per_cluster");
+    AlertHistoryEntity h = c.getAlertHistory();
+    h.setAlertState(AlertState.CRITICAL);
+    dao.merge(h);
+    
+    summary = dao.findAggregateCounts(clusterId.longValue(), 
"many_per_cluster");
+    assertEquals(2, summary.getOkCount());
+    assertEquals(0, summary.getWarningCount());
+    assertEquals(1, summary.getCriticalCount());
+    assertEquals(0, summary.getUnknownCount());
+    
+    summary = dao.findAggregateCounts(clusterId.longValue(), "foo");
+    assertEquals(0, summary.getOkCount());
+    assertEquals(0, summary.getWarningCount());
+    assertEquals(0, summary.getCriticalCount());
+    assertEquals(0, summary.getUnknownCount());
   }  
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
index bbf7774..00ba942 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
@@ -20,17 +20,22 @@ package org.apache.ambari.server.state.cluster;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
+import java.lang.reflect.Field;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.ambari.server.events.AlertEvent;
 import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.listeners.AlertAggregateListener;
 import org.apache.ambari.server.events.listeners.AlertReceivedListener;
 import org.apache.ambari.server.events.listeners.AlertStateChangedListener;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.OrmTestHelper;
@@ -45,15 +50,25 @@ import 
org.apache.ambari.server.orm.entities.AlertNoticeEntity;
 import org.apache.ambari.server.orm.entities.AlertTargetEntity;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinitionFactory;
+import org.apache.ambari.server.state.alert.Reporting;
+import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
 import org.apache.ambari.server.state.alert.Scope;
+import org.apache.ambari.server.state.alert.Source;
+import org.apache.ambari.server.state.alert.SourceType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.google.gson.Gson;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.persist.PersistService;
 
+
 /**
  * Tests the management of {@link AlertEvent}s in the system.
  */
@@ -94,7 +109,7 @@ public class AlertDataManagerTest {
       definition.setScheduleInterval(Integer.valueOf(60));
       definition.setScope(Scope.SERVICE);
       definition.setSource("Source " + i);
-      definition.setSourceType("SCRIPT");
+      definition.setSourceType(SourceType.SCRIPT);
       definitionDao.create(definition);
     }
   }
@@ -253,4 +268,161 @@ public class AlertDataManagerTest {
     notices = dispatchDao.findAllNotices();
     assertEquals(1, notices.size());
   }
+  
+  @Test
+  public void testAggregateAlerts() throws Exception {
+    // create definition
+    AlertDefinitionEntity definition = new AlertDefinitionEntity();
+    definition.setDefinitionName("to_aggregate");
+    definition.setLabel("My Label");
+    definition.setServiceName("SERVICE");
+    definition.setComponentName(null);
+    definition.setClusterId(clusterId);
+    definition.setHash(UUID.randomUUID().toString());
+    definition.setScheduleInterval(Integer.valueOf(60));
+    definition.setScope(Scope.HOST);
+    definition.setSource("Source");
+    definition.setSourceType(SourceType.SCRIPT);
+    definitionDao.create(definition);
+    
+    // create aggregate of definition
+    AlertDefinitionEntity aggDef = new AlertDefinitionEntity();
+    aggDef.setDefinitionName("aggregate_test");
+    aggDef.setServiceName("SERVICE");
+    aggDef.setComponentName(null);
+    aggDef.setClusterId(clusterId);
+    aggDef.setHash(UUID.randomUUID().toString());
+    aggDef.setScheduleInterval(Integer.valueOf(60));
+    aggDef.setScope(Scope.SERVICE);
+    
+    AggregateSource source = new AggregateSource();
+
+    source.setAlertName("to_aggregate");
+    // !!! type is protected
+    Field field = Source.class.getDeclaredField("type");
+    field.setAccessible(true);
+    field.set(source, SourceType.AGGREGATE);
+
+    Reporting reporting = new Reporting();
+    
+    ReportTemplate template = new ReportTemplate();
+    template.setText("You are good {1}/{0}");
+    reporting.setOk(template);
+
+    template = new ReportTemplate();
+    template.setText("Going bad {1}/{0}");
+    template.setValue(Double.valueOf(0.33d));
+    reporting.setWarning(template);
+    
+    template = new ReportTemplate();
+    template.setText("On fire! {1}/{0}");
+    template.setValue(Double.valueOf(0.66d));
+    reporting.setCritical(template);
+    
+    source.setReporting(reporting);
+
+    Gson gson = new Gson();
+    
+    aggDef.setSource(gson.toJson(source));
+    aggDef.setSourceType(SourceType.AGGREGATE);
+    definitionDao.create(aggDef);
+    
+    // add current and history across four hosts
+    for (int i = 0; i < 4; i++) {
+      AlertHistoryEntity history = new AlertHistoryEntity();
+      history.setAlertDefinition(definition);
+      history.setAlertInstance(null);
+      history.setAlertLabel(definition.getLabel());
+      history.setAlertState(AlertState.OK);
+      history.setAlertText("OK");
+      history.setAlertTimestamp(Long.valueOf(1));
+      history.setClusterId(clusterId);
+      history.setComponentName(definition.getComponentName());
+      history.setHostName("h" + (i+1));
+      history.setServiceName(definition.getServiceName());
+      dao.create(history);
+     
+      AlertCurrentEntity current = new AlertCurrentEntity();
+      current.setAlertHistory(history);
+      current.setLatestText(history.getAlertText());
+      current.setLatestTimestamp(Long.valueOf(1L));
+      current.setOriginalTimestamp(Long.valueOf(1L));
+      dao.merge(current);
+    }
+    
+    AlertEventPublisher publisher = 
injector.getInstance(AlertEventPublisher.class);
+
+    // !!! need a synchronous op for testing
+    field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
+    field.setAccessible(true);
+    field.set(publisher, new EventBus());
+
+    final AtomicReference<Alert> ref = new AtomicReference<Alert>();    
+    publisher.register(new TestListener() {
+      @Subscribe
+      public void catchIt(AlertReceivedEvent event) {
+        ref.set(event.getAlert());
+      }
+    });
+    
+    AlertAggregateListener listener = 
injector.getInstance(AlertAggregateListener.class);
+    AlertDefinitionFactory factory = new AlertDefinitionFactory();
+    listener.addAggregateType(clusterId.longValue(), factory.coerce(aggDef));
+
+    // any alert and event will do that is for the definition since an 
aggregate
+    // checks them all regardless of state
+    Alert alert = new Alert(
+        definition.getDefinitionName(),
+        null,
+        definition.getServiceName(),
+        definition.getComponentName(),
+        "h1",
+        AlertState.OK);
+    AlertReceivedEvent event = new AlertReceivedEvent(clusterId.longValue(), 
alert);
+    
+    listener.onAlertEvent(event);
+    assertNotNull(ref.get());
+    assertEquals(AlertState.OK, ref.get().getState());
+    assertTrue(ref.get().getText().indexOf("0/4") > -1);
+    
+    // check if one is critical, still ok
+    AlertCurrentEntity current = dao.findCurrentByHostAndName(
+        clusterId.longValue(), "h1", definition.getDefinitionName());
+    current.getAlertHistory().setAlertState(AlertState.CRITICAL);
+    dao.merge(current.getAlertHistory());
+
+    listener.onAlertEvent(event);
+    assertEquals("aggregate_test", ref.get().getName());    
+    assertEquals(AlertState.OK, ref.get().getState());
+    assertTrue(ref.get().getText().indexOf("1/4") > -1);
+
+    // two are either warning or critical, warning
+    current = dao.findCurrentByHostAndName(
+        clusterId.longValue(), "h2", definition.getDefinitionName());
+    current.getAlertHistory().setAlertState(AlertState.WARNING);
+    dao.merge(current.getAlertHistory());
+    
+    listener.onAlertEvent(event);
+    assertEquals("aggregate_test", ref.get().getName());
+    assertEquals(AlertState.WARNING, ref.get().getState());
+    assertTrue(ref.get().getText().indexOf("2/4") > -1);
+
+    // three make it critical
+    current = dao.findCurrentByHostAndName(
+        clusterId.longValue(), "h3", definition.getDefinitionName());
+    current.getAlertHistory().setAlertState(AlertState.CRITICAL);
+    dao.merge(current.getAlertHistory());
+    
+    listener.onAlertEvent(event);
+    assertEquals("aggregate_test", ref.get().getName());    
+    assertEquals(AlertState.CRITICAL, ref.get().getState());
+    assertTrue(ref.get().getText().indexOf("3/4") > -1);
+  }
+  
+  /**
+   * Test interface collects aggregate alert invocations 
+   */
+  private static interface TestListener {
+    public void catchIt(AlertReceivedEvent event);
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/179290e0/ambari-server/src/test/resources/stacks/HDP/2.0.5/services/HDFS/alerts.json
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/resources/stacks/HDP/2.0.5/services/HDFS/alerts.json 
b/ambari-server/src/test/resources/stacks/HDP/2.0.5/services/HDFS/alerts.json
index f4a3001..0077bad 100644
--- 
a/ambari-server/src/test/resources/stacks/HDP/2.0.5/services/HDFS/alerts.json
+++ 
b/ambari-server/src/test/resources/stacks/HDP/2.0.5/services/HDFS/alerts.json
@@ -1,13 +1,18 @@
 {
-  "service": [],
+  "service": [
+  ],
   "NAMENODE": [
     {
       "name": "namenode_cpu",
       "label": "NameNode host CPU Utilization",
-      "scope": "host",
+      "scope": "ANY",
       "source": {
         "type": "METRIC",
-        "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
+        "jmx": {
+          "property_list": [
+            "java.lang:type=OperatingSystem/SystemCpuLoad"
+          ]
+        },
         "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}",
         "reporting": {
           "ok": {
@@ -28,7 +33,7 @@
       "name": "namenode_process",
       "label": "NameNode process",
       "interval": 1,
-      "scope": "any",
+      "scope": "ANY",
       "source": {
         "type": "PORT",
         "uri": "{{hdfs-site/dfs.namenode.http-address}}",
@@ -47,7 +52,7 @@
       "name": "hdfs_last_checkpoint",
       "label": "Last Checkpoint Time",
       "interval": 1,
-      "scope": "service",
+      "SCOPE": "service",
       "enabled": false,
       "source": {
         "type": "SCRIPT",
@@ -60,7 +65,7 @@
       "name": "secondary_namenode_process",
       "label": "Secondary NameNode process",
       "interval": 1,
-      "scope": "any",
+      "scope": "ANY",
       "source": {
         "type": "PORT",        
         "uri": "{{hdfs-site/dfs.namenode.secondary.http-address}}",
@@ -69,4 +74,4 @@
     }
   ],  
   "DATANODE": []
-}
\ No newline at end of file
+}

Reply via email to