Mei Liu has uploaded a new change for review.

Change subject: mom: add storage backend info collection and entity 
generation(Draft)
......................................................................

mom: add storage backend info collection and entity generation(Draft)

This patch adds storage backend info collection and entity generation.
It collects both memory and storage info and generate related entities
for policy evaluation seperately.

Change-Id: I6add3893502270a65a2b1f8dfc2906e92a24761a
Signed-off-by: MeiLiu <[email protected]>
---
M doc/mom-balloon+ksm.conf
M doc/mom-balloon.conf
M mom/Controllers/Balloon.py
M mom/Controllers/KSM.py
M mom/GuestManager.py
M mom/GuestMonitor.py
M mom/HostMonitor.py
M mom/MOMFuncs.py
M mom/Monitor.py
M mom/Policy/Policy.py
M mom/PolicyEngine.py
M mom/__init__.py
12 files changed, 186 insertions(+), 95 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/mom refs/changes/62/17762/1

diff --git a/doc/mom-balloon+ksm.conf b/doc/mom-balloon+ksm.conf
index 2ca2488..ed79cec 100644
--- a/doc/mom-balloon+ksm.conf
+++ b/doc/mom-balloon+ksm.conf
@@ -76,7 +76,7 @@
 
 [host]
 # A comma-separated list of Collector plugins to use for Host data collection.
-collectors: HostMemory, HostKSM
+mem-collectors: HostMemory, HostKSM
 
 # Priority count for storage backend. VM disk resides in that storage backend
 # has a priority in range [0, priority-count). 0 is the highest priority.
@@ -84,7 +84,7 @@
 
 [guest]
 # A comma-separated list of Collector plugins to use for Guest data collection.
-collectors: GuestQemuProc, GuestMemory, GuestBalloon
+mem-collectors: GuestQemuProc, GuestMemory, GuestBalloon
 
 # Default priority value of each VM disk.
 priority: 2
diff --git a/doc/mom-balloon.conf b/doc/mom-balloon.conf
index 5229fb2..de6c0d2 100644
--- a/doc/mom-balloon.conf
+++ b/doc/mom-balloon.conf
@@ -76,7 +76,7 @@
 
 [host]
 # A comma-separated list of Collector plugins to use for Host data collection.
-collectors: HostMemory
+mem-collectors: HostMemory
 
 # Priority count for storage backend. VM disk resides in that storage backend
 # has a priority in range [0, priority-count). 0 is the highest priority.
@@ -84,7 +84,7 @@
 
 [guest]
 # A comma-separated list of Collector plugins to use for Guest data collection.
-collectors: GuestQemuProc, GuestMemory, GuestBalloon
+mem-collectors: GuestQemuProc, GuestMemory, GuestBalloon
 
 # Default priority value of each VM disk.
 priority: 2
diff --git a/mom/Controllers/Balloon.py b/mom/Controllers/Balloon.py
index 6cc7ba3..b3263d9 100644
--- a/mom/Controllers/Balloon.py
+++ b/mom/Controllers/Balloon.py
@@ -37,6 +37,6 @@
                     name, prev_target, target)
             self.hypervisor_iface.setVmBalloonTarget(uuid, target)
 
-    def process(self, host, guests):
-        for guest in guests:
+    def process(self, host_mem, guests_mem, storage):
+        for guest in guests_mem:
             self.process_guest(guest)
diff --git a/mom/Controllers/KSM.py b/mom/Controllers/KSM.py
index eec525e..6868ce1 100644
--- a/mom/Controllers/KSM.py
+++ b/mom/Controllers/KSM.py
@@ -36,10 +36,10 @@
         except IOError, (errno, strerror):
             self.logger.warn("KSM: Failed to write %s: %s", fname, strerror)
 
-    def process(self, host, guests):
+    def process(self, host_mem, guests_mem, storage):
         outputs = {}
         for key in self.cur.keys():
-            rule_var =  host.GetControl('ksm_' + key)
+            rule_var =  host_mem.GetControl('ksm_' + key)
             if rule_var is not None and rule_var != self.cur[key]:
                 outputs[key] = rule_var
                 self.cur[key] = rule_var
diff --git a/mom/GuestManager.py b/mom/GuestManager.py
index 0918f50..bdd2405 100644
--- a/mom/GuestManager.py
+++ b/mom/GuestManager.py
@@ -97,12 +97,12 @@
         Interrogate all active GuestMonitors
         Return: A dictionary of Entities, indexed by guest id
         """
-        ret = {}
+        ret = {'mem': {}}
         self.guests_sem.acquire()
         for (id, monitor) in self.guests.items():
             entity = monitor.interrogate()
-            if entity is not None:
-                ret[id] = entity
+            if entity.get('mem'):
+                ret['mem'][id] = entity['mem']
         self.guests_sem.release()
         return ret
 
@@ -122,7 +122,8 @@
         ret = []
         self.guests_sem.acquire()
         for (id, monitor) in self.guests.items():
-            if monitor.isReady():
+            ready = monitor.isReady()
+            if bool(ready.get('mem')) and bool(ready.get('storage')):
                 name = monitor.getGuestName()
                 if name is not None:
                     ret.append(name)
diff --git a/mom/GuestMonitor.py b/mom/GuestMonitor.py
index 87a7c17..ae40029 100644
--- a/mom/GuestMonitor.py
+++ b/mom/GuestMonitor.py
@@ -40,12 +40,13 @@
         self.properties['hypervisor_iface'] = hypervisor_iface
         self.data_sem.release()
 
-        collector_list = self.config.get('guest', 'collectors')
-        self.collectors = Collector.get_collectors(collector_list,
-                            self.properties, self.config)
-        if self.collectors is None:
-            self.logger.error("Guest Monitor initialization failed")
-            return
+        mem_collector_list = self.config.get('guest', 'mem-collectors')
+        self.collectors['mem'] = Collector.get_collectors(mem_collector_list,
+            self.properties, self.config)
+        for collectorType in self.collectors:
+            if self.collectors[collectorType] is None:
+                self.logger.error("Guest Monitor initialization failed")
+                return
         self.start()
 
     def run(self):
@@ -65,3 +66,7 @@
             return self.properties['name']
         except KeyError:
             return None
+
+    def getExtraFileds(self):
+        return {'mem': set(), 'storage': set()}
+
diff --git a/mom/HostMonitor.py b/mom/HostMonitor.py
index e5f54f1..9ab3aa0 100644
--- a/mom/HostMonitor.py
+++ b/mom/HostMonitor.py
@@ -26,7 +26,7 @@
     """
     The Host Monitor thread collects and reports statistics about the host.
     """
-    def __init__(self, config):
+    def __init__(self, config, hypervisor_iface):
         threading.Thread.__init__(self, name="HostMonitor")
         Monitor.__init__(self, config, self.getName())
         self.setDaemon(True)
@@ -36,17 +36,34 @@
         # Append monitor interval to properties because HostKSM needs it
         # to calculate ksmd cpu usage.
         self.properties['interval'] = self.interval
-        collector_list = self.config.get('host', 'collectors')
-        self.collectors = Collector.get_collectors(collector_list,
-                            self.properties, self.config)
-        if self.collectors is None:
+        self.hypervisor_iface = hypervisor_iface
+        self.backendPriorityCount = self.config.get('host', 'priority-count')
+        mem_collector_list = self.config.get('host', 'mem-collectors')
+        self.collectors['mem'] = Collector.get_collectors(mem_collector_list,
+            self.properties, self.config)
+        if self.collectors['mem'] is None:
             self.logger.error("Host Monitor initialization failed")
             return
         self.start()
 
+    def getStorageBackendSetting(self):
+#        backend_list = self.hypervisor_iface.getStorageBackendList()
+        backend_list = ['default1', 'default2', 'default3']
+        remove_list = set(self.storage_spec) - set(backend_list)
+        new_list = set(backend_list) - set(self.storage_spec)
+        for uuid in remove_list:
+            del self.storage_spec[uuid]
+        for uuid in new_list:
+            self.storage_spec[uuid] = {'uuid': uuid,
+                'priorityCount': self.backendPriorityCount}
+
+    def getExtraFileds(self):
+        return {'mem': set(), 'storage': set(['uuid', 'priorityCount'])}
+
     def run(self):
         self.logger.info("Host Monitor starting")
         while self._should_run():
+            self.getStorageBackendSetting()
             data = self.collect()
             time.sleep(self.interval)
         self.logger.info("Host Monitor ending")
diff --git a/mom/MOMFuncs.py b/mom/MOMFuncs.py
index c062e33..9e98ecc 100644
--- a/mom/MOMFuncs.py
+++ b/mom/MOMFuncs.py
@@ -71,12 +71,23 @@
 
     @exported
     def getStatistics(self):
-        host_stats = self.threads['host_monitor'].interrogate().statistics[-1]
-        guest_stats = {}
-        guest_entities = self.threads['guest_manager'].interrogate().values()
-        for entity in guest_entities:
-            guest_stats[entity.properties['name']] = entity.statistics[-1]
-        ret = {'host': host_stats, 'guests': guest_stats}
+        host_entities = self.threads['host_monitor'].interrogate()
+        host_mem_entity = host_entities.get('mem')
+        host_storage_entities = host_entities.get('storage')
+        host_storage_stats = {}
+        if host_mem_entity:
+            host_mem_stats = host_mem_entity.statistics[-1]
+        if host_storage_entities:
+            for index, entity in host_storage_entities.iteritems():
+                host_storage_stats[index] = entity.statistics[-1]
+        guest_mem_stats = {}
+        guest_mem_entities = self.threads['guest_manager'].interrogate().\
+            get('mem').values()
+        for entity in guest_mem_entities:
+            guest_mem_stats[entity.properties['name']] = entity.statistics[-1]
+
+        ret = {'host': host_mem_stats, 'guests': guest_mem_stats,
+               'storage': host_storage_stats}
         return ret
 
     @exported
diff --git a/mom/Monitor.py b/mom/Monitor.py
index 8daccec..d2f75df 100644
--- a/mom/Monitor.py
+++ b/mom/Monitor.py
@@ -33,28 +33,32 @@
         # Guard the data with a semaphore to ensure consistency.
         self.data_sem = threading.Semaphore()
         self.properties = {}
-        self.statistics = deque()
+        self.statistics = {'mem': deque(), 'storage': {}}
         self.variables = {}
         self.name = name
-        self.fields = None
-        self.collectors = []
+        self.fields = {}
+        self.collectors = {'mem': [], 'storage': []}
+        self.storage_spec = {}
+        self.plotter = {}
         self.logger = logging.getLogger('mom.Monitor')
 
         plot_dir = config.get('__int__', 'plot-subdir')
         if plot_dir != '':
-            self.plotter = Plotter(plot_dir, name)
+            self.plotter['mem']= Plotter(plot_dir, name + "_Memory")
+            self.plotter['storage'] = Plotter(plot_dir, name + "_Storage")
         else:
-            self.plotter = None
+            self.plotter['mem'] = None
+            self.plotter['storage'] = None
 
-        self.ready = None
+        self.ready = {}
         self._terminate = False
 
     def collect(self):
         """
-        Collect a set of statistics by invoking all defined collectors and
-        merging the data into one dictionary and pushing it onto the deque of
-        historical statistics.  Maintain a history length as specified in the
-        config file.
+        Collect a set of statistics by invoking all defined collectors.
+        Merge the data into multiple dictionaries for memory and each storage
+        item(e.g. storage backend) and push them onto the deques of historical
+        statistics. Maintain a history length as specified in the config file.
 
         Note: Priority is given to collectors based on the order that they are
         listed in the config file (ie. if two collectors produce the same
@@ -63,60 +67,107 @@
         """
 
         # The first time we are called, populate the list of expected fields
-        if self.fields is None:
-            self.fields = set()
-            for c in self.collectors:
-                self.fields |= c.getFields()
-            self.logger.debug("Using fields: %s", repr(self.fields))
-            if self.plotter is not None:
-                self.plotter.setFields(self.fields)
+        for key in self.statistics:
+            if self.fields.get(key) is None:
+                self.fields[key] = set()
+                for c in self.collectors[key]:
+                    self.fields[key] |= c.getFields()
+                extraFields = self.getExtraFileds().get(key)
+                if extraFields:
+                    self.fields[key] |= extraFields
+                self.logger.debug("Using %s fields: %s", key,
+                                  repr(self.fields[key]))
+                if self.plotter.get(key) is not None:
+                    self.plotter[key].setFields(self.fields[key])
 
-        data = {}
-        for c in self.collectors:
+        data = {'mem': {}}
+        for c in self.collectors['mem']:
             try:
                 for (key, val) in c.collect().items():
                     if key not in data:
-                        data[key] = val
+                        data['mem'][key] = val
             except Collector.CollectionError, e:
-                self._disp_collection_error("Collection error: %s" % e.msg)
+                self._disp_collection_error("mem", "Collection error: %s" %
+                                            e.msg)
             except Collector.FatalError, e:
-                self._set_not_ready("Fatal Collector error: %s" % e.msg)
+                self._set_not_ready("mem", "Fatal Collector error: %s" % e.msg)
                 self.terminate()
                 return None
-        if set(data) != self.fields:
-            self._set_not_ready("Incomplete data: missing %s" % \
-                                (self.fields - set(data)))
-            return None
+        if set(data['mem']) != self.fields['mem']:
+            self._set_not_ready("mem", "Incomplete data: missing %s" % \
+                                (self.fields['mem'] - set(data['mem'])))
 
-        self.data_sem.acquire()
-        self.statistics.append(data)
-        if len(self.statistics) > self.config.getint('main', 
'sample-history-length'):
-            self.statistics.popleft()
-        self.data_sem.release()
-        self._set_ready()
+        ret =  {}
+        for key in self.statistics:
+            if self.ready.get(key) is not False:
+                if key == 'mem':
+                    stat = data['mem']
+                elif key == 'storage':
+                    indexedStat = self.storage_spec
+                    stat = self.storage_spec.values()
+                historyLength = self.config.getint('main',
+                                                   'sample-history-length')
 
-        if self.plotter is not None:
-            self.plotter.plot(data)
+                self.data_sem.acquire()
+                if isinstance(stat, list):
+                    #storage statistics
+                    for index, stats in indexedStat.iteritems():
+                        if self.statistics[key].get(index) is None:
+                            self.statistics[key][index] = deque()
+                        self.statistics[key][index].append(stats)
+                        if len(self.statistics[key][index]) > historyLength:
+                            self.statistics[key][index].popleft()
+                else:
+                    #memory statistics
+                    self.statistics[key].append(stat)
+                    if len(self.statistics[key]) > historyLength:
+                        self.statistics[key].popleft()
+                self.data_sem.release()
 
-        return data
+                self._set_ready(key)
+                ret[key] = stat
 
-    def interrogate(self):
-        """
-        Take a snapshot of this Monitor object and return an Entity object 
which
-        is useful for rules processing.
-        Return: A new Entity object
-        """
-        if self.ready is not True:
-            return None
+                if self.plotter.get(key) is not None:
+                    if isinstance(stat, list):
+                        #storage plot statistics
+                        for item in stat:
+                            self.plotter[key].plot(item)
+                    else:
+                        #memory plot statistics
+                        self.plotter[key].plot(stat)
+        return ret
+
+    def produceEntity(self, stats):
         ret = Entity(monitor=self)
         self.data_sem.acquire()
         for prop in self.properties.keys():
             ret._set_property(prop, self.properties[prop])
         for var in self.variables.keys():
             ret._set_variable(var, self.variables[var])
-        ret._set_statistics(self.statistics)
+        ret._set_statistics(stats)
         self.data_sem.release()
         ret._finalize()
+        return ret
+
+    def interrogate(self):
+        """
+        Take snapshots of this Monitor's objects and return Entity objects 
which
+        are useful for rules processing.
+        Return: New Entity objects.
+        """
+        ret = {}
+        for key in self.ready:
+            if self.ready[key] is True:
+                if isinstance(self.statistics[key], deque):
+                    #memory statistics
+                    entity = self.produceEntity(self.statistics[key])
+                    ret[key] = entity
+                elif isinstance(self.statistics[key], dict):
+                    #storage statistics
+                    ret[key] = {}
+                    for index, stats in self.statistics[key].iteritems():
+                        entity = self.produceEntity(stats)
+                        ret[key][index]= entity
         return ret
 
     def update_variables(self, variables):
@@ -138,23 +189,23 @@
         """
         Check if all configured Collectors are working properly.
         """
-        return bool(self.ready)
+        return self.ready
 
-    def _set_ready(self):
-        if self.ready is not True:
-            self.logger.info('%s is ready', self.name)
-        self.ready = True
+    def _set_ready(self, key):
+        if self.ready.get(key) is not True:
+            self.logger.info('%s is ready in %s', key, self.name)
+        self.ready[key]= True
 
-    def _disp_collection_error(self, message=None):
+    def _disp_collection_error(self, key, message=None):
         if message is not None:
-            if self.ready is False:
-                self.logger.debug('%s: %s', self.name, message)
+            if self.ready.get(key) is False:
+                self.logger.debug('%s %s: %s', self.name, key, message)
             else: # True or None
-                self.logger.warn('%s: %s', self.name, message)
+                self.logger.warn('%s %s: %s', self.name, key, message)
 
-    def _set_not_ready(self, message=None):
-        self.ready = False
-        self._disp_collection_error(message)
+    def _set_not_ready(self, key, message=None):
+        self.ready[key] = False
+        self._disp_collection_error(key, message)
 
     def _should_run(self):
         """
diff --git a/mom/Policy/Policy.py b/mom/Policy/Policy.py
index 3bcdea1..e6e9893 100644
--- a/mom/Policy/Policy.py
+++ b/mom/Policy/Policy.py
@@ -79,10 +79,11 @@
             self.policy_strings = {}
             self.code = []
 
-    def evaluate(self, host, guest_list):
+    def evaluate(self, host_mem, guest_mem_list, storage):
         results = []
-        self.evaluator.stack.set('Host', host, alloc=True)
-        self.evaluator.stack.set('Guests', guest_list, alloc=True)
+        self.evaluator.stack.set('Host', host_mem, alloc=True)
+        self.evaluator.stack.set('Guests', guest_mem_list, alloc=True)
+        self.evaluator.stack.set('Storages', storage, alloc=True)
 
         with self.policy_sem:
             try:
diff --git a/mom/PolicyEngine.py b/mom/PolicyEngine.py
index 659ac3c..53c30e0 100644
--- a/mom/PolicyEngine.py
+++ b/mom/PolicyEngine.py
@@ -111,15 +111,20 @@
         into each configured Controller.
         """
         host = self.properties['host_monitor'].interrogate()
-        if host is None:
+        # Maybe here it is better to have two policy engine(One for storage 
and one for memory)
+        # This can make sure that incomplete collect of one will not affect 
the other.
+        if not host.get('mem') or not host.get('storage'):
             return
-        guest_list = self.properties['guest_manager'].interrogate().values()
 
-        ret = self.policy.evaluate(host, guest_list)
+        storage = host['storage'].values()
+        guests = self.properties['guest_manager'].interrogate()
+        guest_mem_list = guests.get('mem').values()
+
+        ret = self.policy.evaluate(host['mem'], guest_mem_list, storage)
         if ret is False:
             return
         for c in self.controllers:
-            c.process(host, guest_list)
+            c.process(host['mem'], guest_mem_list, storage)
 
     def run(self):
         self.logger.info("Policy Engine starting")
diff --git a/mom/__init__.py b/mom/__init__.py
index 928a881..300cbe4 100644
--- a/mom/__init__.py
+++ b/mom/__init__.py
@@ -24,10 +24,10 @@
         # Start threads
         self.logger.info("MOM starting")
         self.config.set('__int__', 'running', '1')
-        host_monitor = HostMonitor(self.config)
         hypervisor_iface = self.get_hypervisor_interface()
         if not hypervisor_iface:
             self.shutdown()
+        host_monitor = HostMonitor(self.config, hypervisor_iface)
         guest_manager = GuestManager(self.config, hypervisor_iface)
         policy_engine = PolicyEngine(self.config, hypervisor_iface, 
host_monitor, \
                                      guest_manager)
@@ -94,10 +94,10 @@
         self.config.set('logging', 'max-bytes', '2097152')
         self.config.set('logging', 'backup-count', '5')
         self.config.add_section('host')
-        self.config.set('host', 'collectors', 'HostMemory')
+        self.config.set('host', 'mem-collectors', 'HostMemory')
         self.config.set('host', 'priority-count', '3')
         self.config.add_section('guest')
-        self.config.set('guest', 'collectors', 'GuestQemuProc, GuestMemory')
+        self.config.set('guest', 'mem-collectors', 'GuestQemuProc, 
GuestMemory')
         self.config.set('guest', 'priority', '2')
 
         # Override defaults from the config file


-- 
To view, visit http://gerrit.ovirt.org/17762
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I6add3893502270a65a2b1f8dfc2906e92a24761a
Gerrit-PatchSet: 1
Gerrit-Project: mom
Gerrit-Branch: master
Gerrit-Owner: Mei Liu <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to