Repository: ambari
Updated Branches:
  refs/heads/trunk 322552cdf -> 167618c15


AMBARI-6685. Flume: alerts hang forever after deleting all agents (ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/167618c1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/167618c1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/167618c1

Branch: refs/heads/trunk
Commit: 167618c15dc3e33e1cd5a7ef46f0e198136411b6
Parents: 322552c
Author: Nate Cole <nc...@hortonworks.com>
Authored: Wed Jul 30 15:33:26 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Thu Jul 31 13:32:38 2014 -0400

----------------------------------------------------------------------
 .../org/apache/ambari/server/state/Alert.java   | 38 +++++++++---
 .../server/state/cluster/ClusterImpl.java       | 19 +++++-
 .../FLUME/package/scripts/flume_handler.py      | 27 ++++++---
 .../server/state/cluster/ClusterTest.java       | 62 +++++++++++++++++++-
 .../python/stacks/2.0.6/FLUME/test_flume.py     |  4 +-
 5 files changed, 125 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java 
b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
index 14c09b8..e39b900 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
@@ -164,17 +164,12 @@ public class Alert {
   }
 
   
-  
   @Override
   public int hashCode() {
-    int result = 0;
-    
-    result += (null != name) ? name.hashCode() : 0;
+    int result = alertHashCode();
+
     result += 31 * result + (null != instance ? instance.hashCode() : 0);
-    result += 31 * result + (null != service ? service.hashCode() : 0);
-    result += 31 * result + (null != component ? component.hashCode() : 0);
-    result += 31 * result + (null != host ? host.hashCode() : 0);
-    
+
     return result;
   }
 
@@ -186,9 +181,34 @@ public class Alert {
   public boolean equals(Object o) {
     if (null == o || !Alert.class.isInstance(o))
       return false;
-    
+
     return hashCode() == o.hashCode();
   }
+
+  /**
+   * @return the hashcode of the alert without instance info
+   */
+  private int alertHashCode() {
+    int result = (null != name) ? name.hashCode() : 0;
+    result += 31 * result + (null != service ? service.hashCode() : 0);
+    result += 31 * result + (null != component ? component.hashCode() : 0);
+    result += 31 * result + (null != host ? host.hashCode() : 0);
+
+    return result;
+  }
+
+  /**
+   * Checks equality with another alert, not taking into account instance info
+   * 
+   * @param that
+   *          the other alert to compare against
+   * @return <code>true</code> when the alert is equal in every way except the
+   *         instance info
+   */
+  public boolean almostEquals(Alert that) {
+    return alertHashCode() == that.alertHashCode();
+  }
+  
   
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
index 99141a3..3d2365c 100644
--- 
a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java
@@ -72,6 +72,8 @@ import 
org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.apache.ambari.server.controller.ServiceConfigVersionResponse;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Predicate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1838,13 +1840,24 @@ public class ClusterImpl implements Cluster {
   public void addAlerts(Collection<Alert> alerts) {
     try {
       writeLock.lock();
-      if (LOG.isDebugEnabled()) {
-        for (Alert alert : alerts) {
+      
+      for (final Alert alert : alerts) {
+        if (clusterAlerts.size() > 0) {
+          CollectionUtils.filter(clusterAlerts, new Predicate() {
+            @Override
+            public boolean evaluate(Object obj) {
+              Alert collectedAlert = (Alert) obj;
+              return !collectedAlert.almostEquals(alert);
+            }
+          });
+        }
+        
+        if (LOG.isDebugEnabled()) {
           LOG.debug("Adding alert for name={} service={}, on host={}",
               alert.getName(), alert.getService(), alert.getHost());
         }
       }
-      clusterAlerts.removeAll(alerts);
+
       clusterAlerts.addAll(alerts);
 
     } finally {

http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
index 1e965e2..c29dd93 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py
@@ -20,6 +20,7 @@ limitations under the License.
 from resource_management import *
 from flume import flume
 from flume import flume_status
+from flume import find_expected_agent_names
 
 class FlumeHandler(Script):
   def install(self, env):
@@ -61,20 +62,28 @@ class FlumeHandler(Script):
     json['processes'] = processes
     json['alerts'] = []
 
-    for proc in processes:
+    if len(processes) == 0 and len(find_expected_agent_names()) == 0:
       alert = {}
       alert['name'] = 'flume_agent'
-      alert['instance'] = proc['name']
       alert['label'] = 'Flume Agent process'
+      alert['state'] = 'WARNING'
+      alert['text'] = 'No agents defined'
+      json['alerts'].append(alert)
+    else:
+      for proc in processes:
+        alert = {}
+        alert['name'] = 'flume_agent'
+        alert['instance'] = proc['name']
+        alert['label'] = 'Flume Agent process'
 
-      if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING':
-        alert['state'] = 'CRITICAL'
-        alert['text'] = 'Flume agent {0} not running'.format(proc['name'])
-      else:
-        alert['state'] = 'OK'
-        alert['text'] = 'Flume agent {0} is running'.format(proc['name'])
+        if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING':
+          alert['state'] = 'CRITICAL'
+          alert['text'] = 'Flume agent {0} not running'.format(proc['name'])
+        else:
+          alert['state'] = 'OK'
+          alert['text'] = 'Flume agent {0} is running'.format(proc['name'])
 
-      json['alerts'].append(alert)
+        json['alerts'].append(alert)
 
     self.put_structured_out(json)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
index 85a6993..453c26b 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java
@@ -26,12 +26,15 @@ import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.persistence.EntityManager;
 
@@ -46,8 +49,30 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.controller.ClusterResponse;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.orm.entities.*;
-import org.apache.ambari.server.state.*;
+import org.apache.ambari.server.orm.entities.ClusterEntity;
+import org.apache.ambari.server.orm.entities.ClusterServiceEntity;
+import org.apache.ambari.server.orm.entities.HostEntity;
+import org.apache.ambari.server.orm.entities.HostStateEntity;
+import org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity;
+import org.apache.ambari.server.state.AgentVersion;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.ConfigFactory;
+import org.apache.ambari.server.state.DesiredConfig;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.HostHealthStatus;
+import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
+import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent;
 import org.apache.ambari.server.state.host.HostRegistrationRequestEvent;
@@ -574,5 +599,38 @@ public class ClusterTest {
     Assert.assertEquals(State.INSTALLED,
         c1.getProvisioningState());    
   }
+  
+  @Test
+  public void testAlertUpdates() {
+    
+    Set<Alert> alerts = new HashSet<Alert>();
+    alerts.add(new Alert("alert_1", "instance1", "service", "component", 
"host1", AlertState.OK));
+    alerts.add(new Alert("alert_1", "instance2", "service", "component", 
"host1", AlertState.OK));
+    
+    c1.addAlerts(alerts);
+    
+    Collection<Alert> result = c1.getAlerts();
+    assertEquals(2, result.size());
+    for (Alert a : result) {
+      assertEquals("alert_1", a.getName());
+    }
+    
+    // add an alert that removes the previous 2 with the same name
+    alerts = new HashSet<Alert>();
+    alerts.add(new Alert("alert_1", null, "service", "component", "host1", 
AlertState.WARNING));
+    c1.addAlerts(alerts);
+    
+    result = c1.getAlerts();
+    assertEquals(1, result.size());
+    
+    // add alerts that remove the old type, regardless of instance name
+    alerts = new HashSet<Alert>();    
+    alerts.add(new Alert("alert_1", "instance1", "service", "component", 
"host1", AlertState.OK));
+    alerts.add(new Alert("alert_1", "instance2", "service", "component", 
"host1", AlertState.OK));
+    c1.addAlerts(alerts);
+    
+    result = c1.getAlerts();
+    assertEquals(2, result.size());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py 
b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
index 49462e8..f421bf5 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py
@@ -97,8 +97,8 @@ class TestFlumeHandler(RMFTestCase):
     
     # test that the method was called with empty processes
     self.assertTrue(structured_out_mock.called)
-    structured_out_mock.assert_called_with({'processes': [], 'alerts': []})
-
+    structured_out_mock.assert_called_with({'processes': [],
+      'alerts': [{'text': 'No agents defined', 'state': 'WARNING', 'name': 
'flume_agent', 'label': 'Flume Agent process'}]})
     self.assertNoMoreResources()
 
   @patch("resource_management.libraries.script.Script.put_structured_out")

Reply via email to