Repository: ambari Updated Branches: refs/heads/branch-alerts-dev 31f9ff836 -> f58892817
AMBARI-6942. Alerts: save definitions and send alert data to the server (ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f5889281 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f5889281 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f5889281 Branch: refs/heads/branch-alerts-dev Commit: f588928175e6be6bd636755885db94e358735431 Parents: 31f9ff8 Author: Nate Cole <nc...@hortonworks.com> Authored: Wed Aug 20 11:45:16 2014 -0400 Committer: Nate Cole <nc...@hortonworks.com> Committed: Wed Aug 20 11:45:43 2014 -0400 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 94 ++++++++++++++------ .../src/main/python/ambari_agent/Controller.py | 16 +++- .../src/main/python/ambari_agent/Heartbeat.py | 8 +- .../python/ambari_agent/alerts/base_alert.py | 14 ++- .../python/ambari_agent/alerts/collector.py | 47 ++++++++++ .../python/ambari_agent/alerts/port_alert.py | 6 +- .../src/test/python/ambari_agent/TestAlerts.py | 14 ++- .../dummy_files/alert_definitions.def | 49 ++++++++++ .../dummy_files/alert_definitions.json | 46 ---------- 9 files changed, 205 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index cd0605f..10fdef7 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -21,19 +21,30 @@ limitations under the License. ''' http://apscheduler.readthedocs.org/en/v2.1.2 ''' -from apscheduler.scheduler import Scheduler -from alerts.port_alert import PortAlert +import glob import json import logging +import os import sys import time +from apscheduler.scheduler import Scheduler +from alerts.collector import AlertCollector +from alerts.port_alert import PortAlert + logger = logging.getLogger() class AlertSchedulerHandler(): + make_cachedir = True - def __init__(self, filename, in_minutes=True): - self.filename = filename + def __init__(self, cachedir, in_minutes=True): + self.cachedir = cachedir + + if not os.path.exists(cachedir) and AlertSchedulerHandler.make_cachedir: + try: + os.makedirs(cachedir) + except: + pass config = { 'threadpool.core_threads': 3, @@ -41,41 +52,65 @@ class AlertSchedulerHandler(): 'standalone': False } - self.scheduler = Scheduler(config) - - alert_callables = self.__load_alerts() - - for _callable in alert_callables: - if in_minutes: - self.scheduler.add_interval_job(self.__make_function(_callable), - minutes=_callable.interval()) - else: - self.scheduler.add_interval_job(self.__make_function(_callable), - seconds=_callable.interval()) + self.__scheduler = Scheduler(config) + self.__in_minutes = in_minutes + self.__loaded = False + self.__collector = AlertCollector() + + def update_definitions(self, alert_commands, refresh_jobs=False): + for command in alert_commands: + with open(os.path.join(self.cachedir, command['clusterName'] + '.def'), 'w') as f: + json.dump(command, f, indent=2) + + if refresh_jobs: + self.__scheduler.shutdown(wait=False) + self.__loaded = False + self.start() def __make_function(self, alert_def): return lambda: alert_def.collect() - + def start(self): - if not self.scheduler is None: - self.scheduler.start() + if not self.__loaded: + alert_callables = self.__load_definitions() + + for _callable in alert_callables: + if self.__in_minutes: + self.__scheduler.add_interval_job(self.__make_function(_callable), + minutes=_callable.interval()) + else: + self.__scheduler.add_interval_job(self.__make_function(_callable), + seconds=_callable.interval()) + self.__loaded = True + + if not self.__scheduler is None: + self.__scheduler.start() def stop(self): - if not self.scheduler is None: - self.scheduler.shutdown(wait=False) - self.scheduler = None + if not self.__scheduler is None: + self.__scheduler.shutdown(wait=False) + self.__scheduler = None - def __load_alerts(self): + def collector(self): + return self.__collector + + def __load_definitions(self): definitions = [] try: - # FIXME make location configurable - with open(self.filename) as fp: - cluster_defs = json.load(fp) - for deflist in cluster_defs.values(): - for definition in deflist: + for deffile in glob.glob(os.path.join(self.cachedir, '*.def')): + with open(deffile, 'r') as f: + command_json = json.load(f) + + for definition in command_json['alertDefinitions']: obj = self.__json_to_callable(definition) + if obj is not None: + obj.set_cluster( + '' if not 'clusterName' in command_json else command_json['clusterName'], + '' if not 'hostName' in command_json else command_json['hostName']) + definitions.append(obj) + except: import traceback traceback.print_exc() @@ -91,7 +126,7 @@ class AlertSchedulerHandler(): if source_type == 'METRIC': pass elif source_type == 'PORT': - alert = PortAlert(json_definition, source) + alert = PortAlert(self.__collector, json_definition, source) elif type == 'SCRIPT': pass @@ -119,6 +154,9 @@ def main(): i += 1 except KeyboardInterrupt: pass + + print str(ash.collector().alerts()) + ash.stop() if __name__ == "__main__": http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 3be54c2..23c28ae 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -79,8 +79,8 @@ class Controller(threading.Thread): if cache_dir is None: cache_dir = '/var/lib/ambari-agent/cache' - self.alert_scheduler_handler = AlertSchedulerHandler( - os.path.join(cache_dir, 'alerts', 'alert_definitions.json')) + alerts_cache_dir = os.path.join(cache_dir, 'alerts') + self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir) def __del__(self): @@ -135,6 +135,12 @@ class Controller(threading.Thread): pass else: self.hasMappedComponents = False + + if 'alertDefinitionCommands' in ret.keys(): + logger.info("Got alert definition update on registration " + pprint.pformat(ret['alertDefinitionCommands'])) + self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands']) + pass + pass except ssl.SSLError: self.repeatRegistration=False @@ -247,6 +253,10 @@ class Controller(threading.Thread): if 'statusCommands' in response.keys(): self.addToStatusQueue(response['statusCommands']) pass + + if 'alertDefinitionCommands' in response.keys(): + self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True) + pass if "true" == response['restartAgent']: logger.error("Received the restartAgent command") @@ -307,7 +317,7 @@ class Controller(threading.Thread): self.actionQueue = ActionQueue(self.config, controller=self) self.actionQueue.start() self.register = Register(self.config) - self.heartbeat = Heartbeat(self.actionQueue, self.config) + self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) opener = urllib2.build_opener() urllib2.install_opener(opener) http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/Heartbeat.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py index 37a97e8..fb41759 100644 --- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py +++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py @@ -35,10 +35,11 @@ logger = logging.getLogger() firstContact = True class Heartbeat: - def __init__(self, actionQueue, config=None): + def __init__(self, actionQueue, config=None, alert_collector=None): self.actionQueue = actionQueue self.config = config self.reports = [] + self.collector = alert_collector def build(self, id='-1', state_interval=-1, componentsMapped=False): global clusterId, clusterDefinitionRevision, firstContact @@ -51,7 +52,6 @@ class Heartbeat: nodeStatus["alerts"] = [] - heartbeat = { 'responseId' : int(id), 'timestamp' : timestamp, 'hostname' : hostname.hostname(self.config), @@ -95,6 +95,10 @@ class Heartbeat: logger.debug("mounts: %s", str(mounts)) nodeStatus["alerts"] = hostInfo.createAlerts(nodeStatus["alerts"]) + + if self.collector is not None: + nodeStatus['alerts'].extend(self.collector.alerts()) + return heartbeat def main(argv=None): http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index e102d56..6e99692 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -28,15 +28,22 @@ class BaseAlert(object): RESULT_CRITICAL = 'CRITICAL' RESULT_UNKNOWN = 'UNKNOWN' - def __init__(self, alert_meta, alert_source_meta): + def __init__(self, collector, alert_meta, alert_source_meta): + self.collector = collector self.alert_meta = alert_meta self.alert_source_meta = alert_source_meta + self.cluster = '' + self.hostname = '' def interval(self): if not self.alert_meta.has_key('interval'): return 1 else: return self.alert_meta['interval'] + + def set_cluster(self, cluster, host): + self.cluster = cluster + self.hostname = host def collect(self): res = (BaseAlert.RESULT_UNKNOWN, []) @@ -49,13 +56,14 @@ class BaseAlert(object): data = {} data['name'] = self._find_value('name') + data['label'] = self._find_value('label') data['state'] = res[0] data['text'] = res_base_text.format(*res[1]) - # data['cluster'] = self._find_value('cluster') # not sure how to get this yet + data['cluster'] = self.cluster data['service'] = self._find_value('service') data['component'] = self._find_value('component') - print str(data) + self.collector.put(self.cluster, data) def _find_value(self, meta_key): if self.alert_meta.has_key(meta_key): http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/alerts/collector.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py new file mode 100644 index 0000000..7249449 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging + +logger = logging.getLogger() + +class AlertCollector(): + ''' + cluster -> name -> alert dict + ''' + def __init__(self): + self.__buckets = {} + + def put(self, cluster, alert): + if not cluster in self.__buckets: + self.__buckets[cluster] = {} + + self.__buckets[cluster][alert['name']] = alert + + def alerts(self): + alerts = [] + for clustermap in self.__buckets.values()[:]: + alerts.extend(clustermap.values()) + + return alerts + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py index 165f890..2f051c8 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py @@ -30,8 +30,8 @@ logger = logging.getLogger() class PortAlert(BaseAlert): - def __init__(self, alert_meta, alert_source_meta): - super(PortAlert, self).__init__(alert_meta, alert_source_meta) + def __init__(self, collector, alert_meta, alert_source_meta): + super(PortAlert, self).__init__(collector, alert_meta, alert_source_meta) default_port = alert_source_meta['default_port'] uri = alert_source_meta['uri'] @@ -42,7 +42,7 @@ class PortAlert(BaseAlert): try: self.port = int(get_port_from_url(uri)) except: - traceback.print_exc() + # only when port parsing fails pass http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/test/python/ambari_agent/TestAlerts.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py index 51c3af9..0d0563a 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py @@ -23,6 +23,7 @@ import sys from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler from ambari_agent.apscheduler.scheduler import Scheduler from ambari_agent.alerts.port_alert import PortAlert +from ambari_agent.alerts.collector import AlertCollector from mock.mock import patch from unittest import TestCase @@ -35,12 +36,15 @@ class TestAlerts(TestCase): sys.stdout == sys.__stdout__ @patch.object(Scheduler, "add_interval_job") - def test_build(self, aps_add_interval_job_mock): - test_file_path = os.path.join('ambari_agent', 'dummy_files', 'alert_definitions.json') + @patch.object(Scheduler, "start") + def test_start(self, aps_add_interval_job_mock, aps_start_mock): + test_file_path = os.path.join('ambari_agent', 'dummy_files') ash = AlertSchedulerHandler(test_file_path) + ash.start() self.assertTrue(aps_add_interval_job_mock.called) + self.assertTrue(aps_start_mock.called) def test_port_alert(self): json = { "name": "namenode_process", @@ -51,7 +55,7 @@ class TestAlerts(TestCase): "scope": "host", "source": { "type": "PORT", - "uri": "http://c6401.ambari.apache.org:50070", + "uri": "http://c6409.ambari.apache.org:50070", "default_port": 50070, "reporting": { "ok": { @@ -64,7 +68,9 @@ class TestAlerts(TestCase): } } - pa = PortAlert(json, json['source']) + collector = AlertCollector() + + pa = PortAlert(collector, json, json['source']) self.assertEquals(6, pa.interval()) res = pa.collect() http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def new file mode 100644 index 0000000..45fb8d0 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def @@ -0,0 +1,49 @@ +{ + "clusterName": "c1", + "hostName": "c6401.ambari.apache.org", + "hash": "12341234134412341243124", + "alertDefinitions": [ + { + "name": "namenode_cpu", + "label": "NameNode host CPU Utilization", + "scope": "host", + "source": { + "type": "METRIC", + "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad", + "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}" + } + }, + { + "name": "namenode_process", + "service": "HDFS", + "component": "NAMENODE", + "label": "NameNode process", + "interval": 6, + "scope": "host", + "source": { + "type": "PORT", + "uri": "http://c6401.ambari.apache.org:50070", + "default_port": 50070, + "reporting": { + "ok": { + "text": "TCP OK - {0:.4f} response time on port {1}" + }, + "critical": { + "text": "Could not load process info: {0}" + } + } + } + }, + { + "name": "hdfs_last_checkpoint", + "label": "Last Checkpoint Time", + "interval": 1, + "scope": "service", + "enabled": false, + "source": { + "type": "SCRIPT", + "path": "scripts/alerts/last_checkpoint.py" + } + } + ] +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json deleted file mode 100644 index 6c55966..0000000 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "c1": [ - { - "name": "namenode_cpu", - "label": "NameNode host CPU Utilization", - "scope": "host", - "source": { - "type": "METRIC", - "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad", - "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}" - } - }, - { - "name": "namenode_process", - "service": "HDFS", - "component": "NAMENODE", - "label": "NameNode process", - "interval": 6, - "scope": "host", - "source": { - "type": "PORT", - "uri": "http://c6401.ambari.apache.org:50070", - "default_port": 50070, - "reporting": { - "ok": { - "text": "TCP OK - {0:.4f} response time on port {1}" - }, - "critical": { - "text": "Could not load process info: {0}" - } - } - } - }, - { - "name": "hdfs_last_checkpoint", - "label": "Last Checkpoint Time", - "interval": 1, - "scope": "service", - "enabled": false, - "source": { - "type": "SCRIPT", - "path": "scripts/alerts/last_checkpoint.py" - } - } - ] -}