Repository: ambari
Updated Branches:
  refs/heads/branch-alerts-dev 31f9ff836 -> f58892817


AMBARI-6942. Alerts: save definitions and send alert data to the server (ncole)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f5889281
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f5889281
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f5889281

Branch: refs/heads/branch-alerts-dev
Commit: f588928175e6be6bd636755885db94e358735431
Parents: 31f9ff8
Author: Nate Cole <nc...@hortonworks.com>
Authored: Wed Aug 20 11:45:16 2014 -0400
Committer: Nate Cole <nc...@hortonworks.com>
Committed: Wed Aug 20 11:45:43 2014 -0400

----------------------------------------------------------------------
 .../ambari_agent/AlertSchedulerHandler.py       | 94 ++++++++++++++------
 .../src/main/python/ambari_agent/Controller.py  | 16 +++-
 .../src/main/python/ambari_agent/Heartbeat.py   |  8 +-
 .../python/ambari_agent/alerts/base_alert.py    | 14 ++-
 .../python/ambari_agent/alerts/collector.py     | 47 ++++++++++
 .../python/ambari_agent/alerts/port_alert.py    |  6 +-
 .../src/test/python/ambari_agent/TestAlerts.py  | 14 ++-
 .../dummy_files/alert_definitions.def           | 49 ++++++++++
 .../dummy_files/alert_definitions.json          | 46 ----------
 9 files changed, 205 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py 
b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index cd0605f..10fdef7 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -21,19 +21,30 @@ limitations under the License.
 '''
 http://apscheduler.readthedocs.org/en/v2.1.2
 '''
-from apscheduler.scheduler import Scheduler
-from alerts.port_alert import PortAlert
+import glob
 import json
 import logging
+import os
 import sys
 import time
+from apscheduler.scheduler import Scheduler
+from alerts.collector import AlertCollector
+from alerts.port_alert import PortAlert
+
 
 logger = logging.getLogger()
 
 class AlertSchedulerHandler():
+  make_cachedir = True
 
-  def __init__(self, filename, in_minutes=True):
-    self.filename = filename
+  def __init__(self, cachedir, in_minutes=True):
+    self.cachedir = cachedir
+    
+    if not os.path.exists(cachedir) and AlertSchedulerHandler.make_cachedir:
+      try:
+        os.makedirs(cachedir)
+      except:
+        pass
     
     config = {
       'threadpool.core_threads': 3,
@@ -41,41 +52,65 @@ class AlertSchedulerHandler():
       'standalone': False
     }
 
-    self.scheduler = Scheduler(config)
-
-    alert_callables = self.__load_alerts()
-
-    for _callable in alert_callables:
-      if in_minutes:
-        self.scheduler.add_interval_job(self.__make_function(_callable),
-          minutes=_callable.interval())
-      else:
-        self.scheduler.add_interval_job(self.__make_function(_callable),
-          seconds=_callable.interval())
+    self.__scheduler = Scheduler(config)
+    self.__in_minutes = in_minutes
+    self.__loaded = False
+    self.__collector = AlertCollector()
+          
+  def update_definitions(self, alert_commands, refresh_jobs=False):
+    for command in alert_commands:
+      with open(os.path.join(self.cachedir, command['clusterName'] + '.def'), 
'w') as f:
+        json.dump(command, f, indent=2)
+    
+    if refresh_jobs:
+      self.__scheduler.shutdown(wait=False)
+      self.__loaded = False
+      self.start()
       
   def __make_function(self, alert_def):
     return lambda: alert_def.collect()
-
+    
   def start(self):
-    if not self.scheduler is None:
-      self.scheduler.start()
+    if not self.__loaded:
+      alert_callables = self.__load_definitions()
+      
+      for _callable in alert_callables:
+        if self.__in_minutes:
+          self.__scheduler.add_interval_job(self.__make_function(_callable),
+            minutes=_callable.interval())
+        else:
+          self.__scheduler.add_interval_job(self.__make_function(_callable),
+            seconds=_callable.interval())
+      self.__loaded = True
+      
+    if not self.__scheduler is None:
+      self.__scheduler.start()
     
   def stop(self):
-    if not self.scheduler is None:
-      self.scheduler.shutdown(wait=False)
-      self.scheduler = None
+    if not self.__scheduler is None:
+      self.__scheduler.shutdown(wait=False)
+      self.__scheduler = None
       
-  def __load_alerts(self):
+  def collector(self):
+    return self.__collector
+      
+  def __load_definitions(self):
     definitions = []
     try:
-      # FIXME make location configurable
-      with open(self.filename) as fp: 
-        cluster_defs = json.load(fp)
-        for deflist in cluster_defs.values():
-          for definition in deflist:
+      for deffile in glob.glob(os.path.join(self.cachedir, '*.def')):
+        with open(deffile, 'r') as f:
+          command_json = json.load(f)
+
+          for definition in command_json['alertDefinitions']:
             obj = self.__json_to_callable(definition)
+              
             if obj is not None:
+              obj.set_cluster(
+                '' if not 'clusterName' in command_json else 
command_json['clusterName'],
+                '' if not 'hostName' in command_json else 
command_json['hostName'])
+
               definitions.append(obj)
+      
     except:
       import traceback
       traceback.print_exc()
@@ -91,7 +126,7 @@ class AlertSchedulerHandler():
     if source_type == 'METRIC':
       pass
     elif source_type == 'PORT':
-      alert = PortAlert(json_definition, source)
+      alert = PortAlert(self.__collector, json_definition, source)
     elif type == 'SCRIPT':
       pass
 
@@ -119,6 +154,9 @@ def main():
       i += 1
   except KeyboardInterrupt:
     pass
+    
+  print str(ash.collector().alerts())
+      
   ash.stop()
     
 if __name__ == "__main__":

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 3be54c2..23c28ae 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -79,8 +79,8 @@ class Controller(threading.Thread):
     if cache_dir is None:
       cache_dir = '/var/lib/ambari-agent/cache'
       
-    self.alert_scheduler_handler = AlertSchedulerHandler(
-     os.path.join(cache_dir, 'alerts', 'alert_definitions.json'))
+    alerts_cache_dir = os.path.join(cache_dir, 'alerts')
+    self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir)
 
 
   def __del__(self):
@@ -135,6 +135,12 @@ class Controller(threading.Thread):
           pass
         else:
           self.hasMappedComponents = False
+
+        if 'alertDefinitionCommands' in ret.keys():
+          logger.info("Got alert definition update on registration " + 
pprint.pformat(ret['alertDefinitionCommands']))
+          
self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands'])
+          pass
+
         pass
       except ssl.SSLError:
         self.repeatRegistration=False
@@ -247,6 +253,10 @@ class Controller(threading.Thread):
         if 'statusCommands' in response.keys():
           self.addToStatusQueue(response['statusCommands'])
           pass
+          
+        if 'alertDefinitionCommands' in response.keys():
+          
self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'],
 True)
+          pass
 
         if "true" == response['restartAgent']:
           logger.error("Received the restartAgent command")
@@ -307,7 +317,7 @@ class Controller(threading.Thread):
     self.actionQueue = ActionQueue(self.config, controller=self)
     self.actionQueue.start()
     self.register = Register(self.config)
-    self.heartbeat = Heartbeat(self.actionQueue, self.config)
+    self.heartbeat = Heartbeat(self.actionQueue, self.config, 
self.alert_scheduler_handler.collector())
 
     opener = urllib2.build_opener()
     urllib2.install_opener(opener)

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py 
b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index 37a97e8..fb41759 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -35,10 +35,11 @@ logger = logging.getLogger()
 firstContact = True
 class Heartbeat:
 
-  def __init__(self, actionQueue, config=None):
+  def __init__(self, actionQueue, config=None, alert_collector=None):
     self.actionQueue = actionQueue
     self.config = config
     self.reports = []
+    self.collector = alert_collector
 
   def build(self, id='-1', state_interval=-1, componentsMapped=False):
     global clusterId, clusterDefinitionRevision, firstContact
@@ -51,7 +52,6 @@ class Heartbeat:
     nodeStatus["alerts"] = []
 
 
-
     heartbeat = { 'responseId'        : int(id),
                   'timestamp'         : timestamp,
                   'hostname'          : hostname.hostname(self.config),
@@ -95,6 +95,10 @@ class Heartbeat:
         logger.debug("mounts: %s", str(mounts))
 
     nodeStatus["alerts"] = hostInfo.createAlerts(nodeStatus["alerts"])
+    
+    if self.collector is not None:
+      nodeStatus['alerts'].extend(self.collector.alerts())
+    
     return heartbeat
 
 def main(argv=None):

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
index e102d56..6e99692 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py
@@ -28,15 +28,22 @@ class BaseAlert(object):
   RESULT_CRITICAL = 'CRITICAL'
   RESULT_UNKNOWN = 'UNKNOWN'
   
-  def __init__(self, alert_meta, alert_source_meta):
+  def __init__(self, collector, alert_meta, alert_source_meta):
+    self.collector = collector
     self.alert_meta = alert_meta
     self.alert_source_meta = alert_source_meta
+    self.cluster = ''
+    self.hostname = ''
     
   def interval(self):
     if not self.alert_meta.has_key('interval'):
       return 1
     else:
       return self.alert_meta['interval']
+      
+  def set_cluster(self, cluster, host):
+    self.cluster = cluster
+    self.hostname = host
   
   def collect(self):
     res = (BaseAlert.RESULT_UNKNOWN, [])
@@ -49,13 +56,14 @@ class BaseAlert(object):
       
     data = {}
     data['name'] = self._find_value('name')
+    data['label'] = self._find_value('label')
     data['state'] = res[0]
     data['text'] = res_base_text.format(*res[1])
-    # data['cluster'] = self._find_value('cluster') # not sure how to get this 
yet
+    data['cluster'] = self.cluster
     data['service'] = self._find_value('service')
     data['component'] = self._find_value('component')
     
-    print str(data)
+    self.collector.put(self.cluster, data)
   
   def _find_value(self, meta_key):
     if self.alert_meta.has_key(meta_key):

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
new file mode 100644
index 0000000..7249449
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+
+logger = logging.getLogger()
+
+class AlertCollector():
+  '''
+  cluster -> name -> alert dict
+  '''  
+  def __init__(self):
+    self.__buckets = {}
+    
+  def put(self, cluster, alert):
+    if not cluster in self.__buckets:
+      self.__buckets[cluster] = {}
+      
+    self.__buckets[cluster][alert['name']] = alert 
+    
+  def alerts(self):
+    alerts = []
+    for clustermap in self.__buckets.values()[:]:
+      alerts.extend(clustermap.values())
+      
+    return alerts
+      
+    
+
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py 
b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
index 165f890..2f051c8 100644
--- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
+++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py
@@ -30,8 +30,8 @@ logger = logging.getLogger()
 
 class PortAlert(BaseAlert):
 
-  def __init__(self, alert_meta, alert_source_meta):
-    super(PortAlert, self).__init__(alert_meta, alert_source_meta)
+  def __init__(self, collector, alert_meta, alert_source_meta):
+    super(PortAlert, self).__init__(collector, alert_meta, alert_source_meta)
     
     default_port = alert_source_meta['default_port']
     uri = alert_source_meta['uri']
@@ -42,7 +42,7 @@ class PortAlert(BaseAlert):
     try:
       self.port = int(get_port_from_url(uri))
     except:
-      traceback.print_exc()
+      # only when port parsing fails
       pass
 
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py 
b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
index 51c3af9..0d0563a 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -23,6 +23,7 @@ import sys
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.apscheduler.scheduler import Scheduler
 from ambari_agent.alerts.port_alert import PortAlert
+from ambari_agent.alerts.collector import AlertCollector
 from mock.mock import patch
 from unittest import TestCase
 
@@ -35,12 +36,15 @@ class TestAlerts(TestCase):
     sys.stdout == sys.__stdout__
 
   @patch.object(Scheduler, "add_interval_job")
-  def test_build(self, aps_add_interval_job_mock):
-    test_file_path = os.path.join('ambari_agent', 'dummy_files', 
'alert_definitions.json')
+  @patch.object(Scheduler, "start")
+  def test_start(self, aps_add_interval_job_mock, aps_start_mock):
+    test_file_path = os.path.join('ambari_agent', 'dummy_files')
 
     ash = AlertSchedulerHandler(test_file_path)
+    ash.start()
 
     self.assertTrue(aps_add_interval_job_mock.called)
+    self.assertTrue(aps_start_mock.called)
 
   def test_port_alert(self):
     json = { "name": "namenode_process",
@@ -51,7 +55,7 @@ class TestAlerts(TestCase):
       "scope": "host",
       "source": {
         "type": "PORT",
-        "uri": "http://c6401.ambari.apache.org:50070";,
+        "uri": "http://c6409.ambari.apache.org:50070";,
         "default_port": 50070,
         "reporting": {
           "ok": {
@@ -64,7 +68,9 @@ class TestAlerts(TestCase):
       }
     }
 
-    pa = PortAlert(json, json['source'])
+    collector = AlertCollector()
+
+    pa = PortAlert(collector, json, json['source'])
     self.assertEquals(6, pa.interval())
 
     res = pa.collect()

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def
new file mode 100644
index 0000000..45fb8d0
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.def
@@ -0,0 +1,49 @@
+{
+  "clusterName": "c1",
+  "hostName": "c6401.ambari.apache.org",
+  "hash": "12341234134412341243124",
+  "alertDefinitions": [
+    {
+      "name": "namenode_cpu",
+      "label": "NameNode host CPU Utilization",
+      "scope": "host",
+      "source": {
+        "type": "METRIC",
+        "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
+        "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
+      }
+    },
+    {
+      "name": "namenode_process",
+      "service": "HDFS",
+      "component": "NAMENODE",
+      "label": "NameNode process",
+      "interval": 6,
+      "scope": "host",
+      "source": {
+        "type": "PORT",
+        "uri": "http://c6401.ambari.apache.org:50070";,
+        "default_port": 50070,
+        "reporting": {
+          "ok": {
+            "text": "TCP OK - {0:.4f} response time on port {1}"
+          },
+          "critical": {
+            "text": "Could not load process info: {0}"
+          }
+        }
+      }
+    },
+    {
+      "name": "hdfs_last_checkpoint",
+      "label": "Last Checkpoint Time",
+      "interval": 1,
+      "scope": "service",
+      "enabled": false,
+      "source": {
+        "type": "SCRIPT",
+        "path": "scripts/alerts/last_checkpoint.py"
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f5889281/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
deleted file mode 100644
index 6c55966..0000000
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/alert_definitions.json
+++ /dev/null
@@ -1,46 +0,0 @@
-{
-  "c1": [
-    {
-      "name": "namenode_cpu",
-      "label": "NameNode host CPU Utilization",
-      "scope": "host",
-      "source": {
-        "type": "METRIC",
-        "jmx": "java.lang:type=OperatingSystem/SystemCpuLoad",
-        "host": "{{hdfs-site/dfs.namenode.secondary.http-address}}"
-      }
-    },
-    {
-      "name": "namenode_process",
-      "service": "HDFS",
-      "component": "NAMENODE",
-      "label": "NameNode process",
-      "interval": 6,
-      "scope": "host",
-      "source": {
-        "type": "PORT",
-        "uri": "http://c6401.ambari.apache.org:50070";,
-        "default_port": 50070,
-        "reporting": {
-          "ok": {
-            "text": "TCP OK - {0:.4f} response time on port {1}"
-          },
-          "critical": {
-            "text": "Could not load process info: {0}"
-          }
-        }
-      }
-    },
-    {
-      "name": "hdfs_last_checkpoint",
-      "label": "Last Checkpoint Time",
-      "interval": 1,
-      "scope": "service",
-      "enabled": false,
-      "source": {
-        "type": "SCRIPT",
-        "path": "scripts/alerts/last_checkpoint.py"
-      }
-    }
-  ]
-}

Reply via email to