Repository: ambari Updated Branches: refs/heads/trunk 322552cdf -> 167618c15
AMBARI-6685. Flume: alerts hang forever after deleting all agents (ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/167618c1 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/167618c1 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/167618c1 Branch: refs/heads/trunk Commit: 167618c15dc3e33e1cd5a7ef46f0e198136411b6 Parents: 322552c Author: Nate Cole <nc...@hortonworks.com> Authored: Wed Jul 30 15:33:26 2014 -0400 Committer: Nate Cole <nc...@hortonworks.com> Committed: Thu Jul 31 13:32:38 2014 -0400 ---------------------------------------------------------------------- .../org/apache/ambari/server/state/Alert.java | 38 +++++++++--- .../server/state/cluster/ClusterImpl.java | 19 +++++- .../FLUME/package/scripts/flume_handler.py | 27 ++++++--- .../server/state/cluster/ClusterTest.java | 62 +++++++++++++++++++- .../python/stacks/2.0.6/FLUME/test_flume.py | 4 +- 5 files changed, 125 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java index 14c09b8..e39b900 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java @@ -164,17 +164,12 @@ public class Alert { } - @Override public int hashCode() { - int result = 0; - - result += (null != name) ? name.hashCode() : 0; + int result = alertHashCode(); + result += 31 * result + (null != instance ? instance.hashCode() : 0); - result += 31 * result + (null != service ? service.hashCode() : 0); - result += 31 * result + (null != component ? component.hashCode() : 0); - result += 31 * result + (null != host ? host.hashCode() : 0); - + return result; } @@ -186,9 +181,34 @@ public class Alert { public boolean equals(Object o) { if (null == o || !Alert.class.isInstance(o)) return false; - + return hashCode() == o.hashCode(); } + + /** + * @return the hashcode of the alert without instance info + */ + private int alertHashCode() { + int result = (null != name) ? name.hashCode() : 0; + result += 31 * result + (null != service ? service.hashCode() : 0); + result += 31 * result + (null != component ? component.hashCode() : 0); + result += 31 * result + (null != host ? host.hashCode() : 0); + + return result; + } + + /** + * Checks equality with another alert, not taking into account instance info + * + * @param that + * the other alert to compare against + * @return <code>true</code> when the alert is equal in every way except the + * instance info + */ + public boolean almostEquals(Alert that) { + return alertHashCode() == that.alertHashCode(); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java index 99141a3..3d2365c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java @@ -72,6 +72,8 @@ import org.apache.ambari.server.state.scheduler.RequestExecution; import org.apache.ambari.server.state.scheduler.RequestExecutionFactory; import org.apache.ambari.server.controller.ServiceConfigVersionResponse; import org.apache.commons.lang.StringUtils; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1838,13 +1840,24 @@ public class ClusterImpl implements Cluster { public void addAlerts(Collection<Alert> alerts) { try { writeLock.lock(); - if (LOG.isDebugEnabled()) { - for (Alert alert : alerts) { + + for (final Alert alert : alerts) { + if (clusterAlerts.size() > 0) { + CollectionUtils.filter(clusterAlerts, new Predicate() { + @Override + public boolean evaluate(Object obj) { + Alert collectedAlert = (Alert) obj; + return !collectedAlert.almostEquals(alert); + } + }); + } + + if (LOG.isDebugEnabled()) { LOG.debug("Adding alert for name={} service={}, on host={}", alert.getName(), alert.getService(), alert.getHost()); } } - clusterAlerts.removeAll(alerts); + clusterAlerts.addAll(alerts); } finally { http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py index 1e965e2..c29dd93 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py @@ -20,6 +20,7 @@ limitations under the License. from resource_management import * from flume import flume from flume import flume_status +from flume import find_expected_agent_names class FlumeHandler(Script): def install(self, env): @@ -61,20 +62,28 @@ class FlumeHandler(Script): json['processes'] = processes json['alerts'] = [] - for proc in processes: + if len(processes) == 0 and len(find_expected_agent_names()) == 0: alert = {} alert['name'] = 'flume_agent' - alert['instance'] = proc['name'] alert['label'] = 'Flume Agent process' + alert['state'] = 'WARNING' + alert['text'] = 'No agents defined' + json['alerts'].append(alert) + else: + for proc in processes: + alert = {} + alert['name'] = 'flume_agent' + alert['instance'] = proc['name'] + alert['label'] = 'Flume Agent process' - if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING': - alert['state'] = 'CRITICAL' - alert['text'] = 'Flume agent {0} not running'.format(proc['name']) - else: - alert['state'] = 'OK' - alert['text'] = 'Flume agent {0} is running'.format(proc['name']) + if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING': + alert['state'] = 'CRITICAL' + alert['text'] = 'Flume agent {0} not running'.format(proc['name']) + else: + alert['state'] = 'OK' + alert['text'] = 'Flume agent {0} is running'.format(proc['name']) - json['alerts'].append(alert) + json['alerts'].append(alert) self.put_structured_out(json) http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java index 85a6993..453c26b 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java @@ -26,12 +26,15 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import javax.persistence.EntityManager; @@ -46,8 +49,30 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.controller.ClusterResponse; import org.apache.ambari.server.orm.GuiceJpaInitializer; import org.apache.ambari.server.orm.InMemoryDefaultTestModule; -import org.apache.ambari.server.orm.entities.*; -import org.apache.ambari.server.state.*; +import org.apache.ambari.server.orm.entities.ClusterEntity; +import org.apache.ambari.server.orm.entities.ClusterServiceEntity; +import org.apache.ambari.server.orm.entities.HostEntity; +import org.apache.ambari.server.orm.entities.HostStateEntity; +import org.apache.ambari.server.orm.entities.ServiceDesiredStateEntity; +import org.apache.ambari.server.state.AgentVersion; +import org.apache.ambari.server.state.Alert; +import org.apache.ambari.server.state.AlertState; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Config; +import org.apache.ambari.server.state.ConfigFactory; +import org.apache.ambari.server.state.DesiredConfig; +import org.apache.ambari.server.state.Host; +import org.apache.ambari.server.state.HostHealthStatus; +import org.apache.ambari.server.state.HostState; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentFactory; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.ambari.server.state.ServiceComponentHostFactory; +import org.apache.ambari.server.state.ServiceFactory; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.State; import org.apache.ambari.server.state.fsm.InvalidStateTransitionException; import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent; import org.apache.ambari.server.state.host.HostRegistrationRequestEvent; @@ -574,5 +599,38 @@ public class ClusterTest { Assert.assertEquals(State.INSTALLED, c1.getProvisioningState()); } + + @Test + public void testAlertUpdates() { + + Set<Alert> alerts = new HashSet<Alert>(); + alerts.add(new Alert("alert_1", "instance1", "service", "component", "host1", AlertState.OK)); + alerts.add(new Alert("alert_1", "instance2", "service", "component", "host1", AlertState.OK)); + + c1.addAlerts(alerts); + + Collection<Alert> result = c1.getAlerts(); + assertEquals(2, result.size()); + for (Alert a : result) { + assertEquals("alert_1", a.getName()); + } + + // add an alert that removes the previous 2 with the same name + alerts = new HashSet<Alert>(); + alerts.add(new Alert("alert_1", null, "service", "component", "host1", AlertState.WARNING)); + c1.addAlerts(alerts); + + result = c1.getAlerts(); + assertEquals(1, result.size()); + + // add alerts that remove the old type, regardless of instance name + alerts = new HashSet<Alert>(); + alerts.add(new Alert("alert_1", "instance1", "service", "component", "host1", AlertState.OK)); + alerts.add(new Alert("alert_1", "instance2", "service", "component", "host1", AlertState.OK)); + c1.addAlerts(alerts); + + result = c1.getAlerts(); + assertEquals(2, result.size()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/167618c1/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py index 49462e8..f421bf5 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py +++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py @@ -97,8 +97,8 @@ class TestFlumeHandler(RMFTestCase): # test that the method was called with empty processes self.assertTrue(structured_out_mock.called) - structured_out_mock.assert_called_with({'processes': [], 'alerts': []}) - + structured_out_mock.assert_called_with({'processes': [], + 'alerts': [{'text': 'No agents defined', 'state': 'WARNING', 'name': 'flume_agent', 'label': 'Flume Agent process'}]}) self.assertNoMoreResources() @patch("resource_management.libraries.script.Script.put_structured_out")