Francesco Romani has uploaded a new change for review.

Change subject: WIP DRAFT virt: scalable sampling
......................................................................

WIP DRAFT virt: scalable sampling

*** WORK IN PROGRESS *** DO NOT MERGE ***

Draft of new scalable sampling leveraging
the Schedule library and a threadpool.

Change-Id: I9f54990b2114cbe931f64d6207192d1f6c6a3799
Signed-off-by: Francesco Romani <[email protected]>
---
M vdsm/virt/sampling.py
M vdsm/virt/vm.py
2 files changed, 126 insertions(+), 10 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/77/29977/1

diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 3479eee..de0cc6a 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -26,6 +26,7 @@
 
     Contains a reverse dictionary pointing from error string to its error code.
 """
+import contextlib
 import threading
 import os
 import time
@@ -34,6 +35,9 @@
 import ethtool
 import re
 
+from storage import threadPool as threadpool
+from vdsm import netinfo
+from vdsm import schedule
 from vdsm import utils
 from vdsm import netinfo
 from vdsm.ipwrapper import getLinks
@@ -44,6 +48,10 @@
 _THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled'
 if not os.path.exists(_THP_STATE_PATH):
     _THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled'
+
+_Scheduler = schedule.Scheduler(name='SamplingScheduler')
+
+_Executor = threadpool.ThreadPool(numThreads=5)  # to match libvirt pool size
 
 
 class InterfaceSample:
@@ -315,12 +323,16 @@
         self._interval = interval
 
     @property
+    def name(self):
+        return self._function.__name__
+
+    @property
     def interval(self):
         return self._interval
 
     def __repr__(self):
         return "<AdvancedStatsFunction %s at 0x%x>" % (
-            self._function.__name__, id(self._function.__name__))
+            self.name, id(self.name))
 
     def __call__(self, *args, **kwargs):
         retValue = self._function(*args, **kwargs)
@@ -347,6 +359,80 @@
         return bgn_sample, end_sample, (end_time - bgn_time)
 
 
+def _handleStatsException(ex):
+    """
+    Handle the registered function exceptions and eventually stop the
+    sampling if a fatal error occurred.
+    """
+    return False
+
+
+class Sampling(object):
+    def __init__(self, statfunc, vm, log, onException=_handleStatsException):
+        self.pause = False
+        self._statfunc = statfunc
+        self._onException = onException
+        self._vm = vm
+        self._log = log
+        self._done = threading.Event()
+        self._run = False
+        self._name = '%s-%s' % (self._vm.id, self._statfunc.name)
+        self._sampleTime = 0
+
+    @property
+    def sampleTime(self):
+        return self._sampleTime
+
+    def start(self):
+        self._run = True
+        _Scheduler(self._statfunc.interval, self._onSchedule)
+
+    def stop(self):
+        self._run = False
+
+    @property
+    def done(self):
+        return self._done.is_set()
+
+    @contextlib.contextmanager
+    def _in_progress(self):
+        self._done.clear()
+        yield
+        self._done.set()
+
+    def _onSchedule(self):
+        # to avoid to block the scheduler
+        _Executor.queueTask(self._name, self._collect)
+
+    def _collect(self):
+        if not self._run:
+            return
+        if not self.pause:
+            if not self.done:
+                # previous round not completed, do not clog the executor
+                self._vm._timeoutExperienced(True)
+            elif not self._vm.monitorResponsive():
+                # something else gone wrong, let's not add more chaos
+                # FIXME maybe log something?
+                pass
+            else:
+                self._do_sampling()
+                self._vm._timeoutExperienced(False)
+
+        self._schedule(self._statfunc.interval, self.onSchedule)
+
+    def _do_sampling(self):
+        with self._in_progress():
+            try:
+                self._statfunc()
+            except Exception as e:
+                if not self._onException(e):
+                    self._log.error("Stats function failed: %s",
+                                    self._statfunc, exc_info=True)
+            else:
+                self._sampleTime = time.time()
+
+
 class AdvancedStatsThread(threading.Thread):
     """
     A thread that runs the registered AdvancedStatsFunction objects
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 3fde1f7..fd60052 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -64,7 +64,7 @@
 from . import vmexitreason
 from . import vmstatus
 
-from .sampling import AdvancedStatsFunction, AdvancedStatsThread
+from .sampling import AdvancedStatsFunction, Sampling
 from .utils import isVdsmImage, XMLElement
 from vmpowerdown import VmShutdown, VmReboot
 
@@ -166,7 +166,7 @@
     pass
 
 
-class VmStatsThread(AdvancedStatsThread):
+class VmStatsCollector(object):
     MBPS_TO_BPS = 10 ** 6 / 8
 
     # CPU tune sampling window
@@ -183,8 +183,8 @@
     _libvirt_metadata_supported = True
 
     def __init__(self, vm):
-        AdvancedStatsThread.__init__(self, log=vm.log, daemon=True)
         self._vm = vm
+        self._samplings = []
 
         self.highWrite = (
             AdvancedStatsFunction(
@@ -236,11 +236,37 @@
                 config.getint('vars', 'vm_sample_cpu_tune_interval'),
                 self.CPU_TUNE_SAMPLING_WINDOW))
 
-        self.addStatsFunction(
+        self.addSamplings(
             self.highWrite, self.updateVolumes, self.sampleCpu,
             self.sampleDisk, self.sampleDiskLatency, self.sampleNet,
             self.sampleBalloon, self.sampleVmJobs, self.sampleVcpuPinning,
             self.sampleCpuTune)
+
+    def addSamplings(self, *statsFuncs):
+        for statFunc in statsFuncs:
+            self._samplings.append(Sampling(statFunc,
+                                            self._vm, self._vm.log,
+                                            self._handleStatsException))
+
+    def start(self):
+        for sampling in self._samplings:
+            sampling.start()
+
+    def stop(self):
+        for sampling in self._samplings:
+            sampling.start()
+
+    def pause(self):
+        for sampling in self._samplings:
+            sampling.pause = True
+
+    def cont(self):
+        for sampling in self._samplings:
+            sampling.pause = False
+
+    def getLastSampleTime(self):
+        return max([sampling.sampleTime
+                    for sampling in self._samplings])
 
     def _highWrite(self):
         if not self._vm.isDisksStatsCollectionEnabled():
@@ -310,13 +336,13 @@
         metadataCpuLimit = None
 
         try:
-            if VmStatsThread._libvirt_metadata_supported:
+            if VmStatsCollector._libvirt_metadata_supported:
                 metadataCpuLimit = self._vm._dom.metadata(
                     libvirt.VIR_DOMAIN_METADATA_ELEMENT,
                     METADATA_VM_TUNE_URI, 0)
         except libvirt.libvirtError as e:
             if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED:
-                VmStatsThread._libvirt_metadata_supported = False
+                VmStatsCollector._libvirt_metadata_supported = False
                 self._log.error("libvirt does not support metadata")
 
             elif (e.get_error_code()
@@ -2777,7 +2803,7 @@
 
         WARNING: this method should only gather statistics by copying data.
         Especially avoid costly and dangerous ditrect calls to the _dom
-        attribute. Use the VmStatsThread instead!
+        attribute. Use the VmStatsCollector instead!
         """
 
         if self.lastStatus == vmstatus.DOWN:
@@ -3058,7 +3084,7 @@
         return domxml.toxml()
 
     def _initVmStats(self):
-        self._vmStats = VmStatsThread(self)
+        self._vmStats = VmStatsCollector(self)
         self._vmStats.start()
         self._guestEventTime = self._startTime
 
@@ -3173,7 +3199,7 @@
             if drive['device'] == 'disk' and isVdsmImage(drive):
                 self._syncVolumeChain(drive)
 
-        # VmStatsThread may use block devices info from libvirt.
+        # VmStatsCollector may use block devices info from libvirt.
         # So, run it after you have this info
         self._initVmStats()
         self.guestAgent = guestagent.GuestAgent(
@@ -3921,6 +3947,10 @@
         self.log.warning('_readPauseCode unsupported by libvirt vm')
         return 'NOERR'
 
+    @property
+    def monitorResponsive(self):
+        return self._monitorResponse == 0
+
     def _timeoutExperienced(self, timeout):
         if timeout:
             self._monitorResponse = -1


-- 
To view, visit http://gerrit.ovirt.org/29977
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9f54990b2114cbe931f64d6207192d1f6c6a3799
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to