Francesco Romani has uploaded a new change for review.

Change subject: WIP: sampling: per-worker sampling
......................................................................

WIP: sampling: per-worker sampling

The new scalable sampling http://gerrit.ovirt.org/#/c/29980/
replies the old VDSM behaviour, which maximizes the VM isolation.
This, however, adds to the VDSM CPU usage, because more
work is needed to schedule events per vm and per sampling.

This patch builds on the same foundations of the scalable
sampling, but mixes thing differently in order to minimize
the scheduled tasks.

VM are partitioned and each partition is assigned to one
worker thread; each worker thread performs the same sampling
on all the VM assigned to it.

The Scheduler just schedules sampling to all the worker threads.
Considering just one sampling, for example sampleCpu,
the scalable sampling approach schedules one task per VM,
while this patch schedules just one task per worker thread.

Since the worker pool size is expected to be *much* lower
than the average VM pool size, this patch greatly reduces the
traffic.

The drawback of this approach is the isolation among VMs
is now weaker, and more logic is needed to address and properly
react to a stuck libvirt call.

This patch is marked Work In Progress and is not yet ready
because while minimal testing was succesfully done in the happy
path, some important bits are still missing.

Change-Id: If3d1edc8099517710a7b9c31758930f13131d0d1
Signed-off-by: Francesco Romani <[email protected]>
---
M tests/numaUtilsTests.py
M tests/samplingTests.py
M tests/vmApiTests.py
M vdsm/vdsm
M vdsm/virt/sampling.py
M vdsm/virt/vm.py
6 files changed, 619 insertions(+), 287 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/81/31181/1

diff --git a/tests/numaUtilsTests.py b/tests/numaUtilsTests.py
index b3676fd..959372f 100644
--- a/tests/numaUtilsTests.py
+++ b/tests/numaUtilsTests.py
@@ -54,7 +54,7 @@
         return {0: [0, 1], 1: [0, 1], 2: [0, 1], 3: [0, 1]}
 
 
-class FakeAdvancedStatsFunction:
+class FakeSampleWindow:
     def __init__(self):
         pass
 
@@ -68,7 +68,7 @@
 class FakeVmStatsThread:
     def __init__(self, vm):
         self._vm = vm
-        self.sampleVcpuPinning = FakeAdvancedStatsFunction()
+        self.sampleVcpuPinning = FakeSampleWindow()
 
 
 class TestNumaUtils(TestCaseBase):
diff --git a/tests/samplingTests.py b/tests/samplingTests.py
index 791054c..84c39b6 100644
--- a/tests/samplingTests.py
+++ b/tests/samplingTests.py
@@ -20,6 +20,7 @@
 
 import os
 import tempfile
+import threading
 import shutil
 
 from vdsm import ipwrapper
@@ -111,3 +112,245 @@
         s1 = sampling.InterfaceSample(lo)
         s1.operstate = 'x'
         self.assertEquals('operstate:x', s1.connlog_diff(s0))
+
+
+class Task(object):
+
+    def __init__(self, interval, name, error=None):
+        self.interval = interval
+        self.name = name
+        self.error = error
+        self.executed = 0
+
+    def __call__(self, vm, stats):
+        self.executed += 1
+        if self.error:
+            raise self.error
+
+
+class SamplingIterationTests(TestCaseBase):
+
+    task1 = Task(2, "task1")
+    task2 = Task(2, "task2")
+    task3 = Task(3, "task3")
+    samplings = [task1, task2, task3]
+
+    virtual_times = [
+        # cycle 1
+        (2, task1),
+        (2, task2),
+        (3, task3),
+        (4, task1),
+        (4, task2),
+        (6, task1),
+        (6, task2),
+        (6, task3),
+        # cycle 2
+        (8, task1),
+        (8, task2),
+        (9, task3),
+        (10, task1),
+        (10, task2),
+        (12, task1),
+        (12, task2),
+        (12, task3),
+    ]
+
+    delays = [
+        (2, task1),
+        (0, task2),
+        (1, task3),
+        (1, task1),
+        (0, task2),
+        (2, task1),
+        (0, task2),
+        (0, task3),
+    ]
+
+    def test_virtual_time(self):
+        iterator = sampling.cycle(self.samplings)
+        for pair in self.virtual_times:
+            self.assertEquals(next(iterator), pair)
+
+    def test_delays(self):
+        iterator = sampling.delays(self.virtual_times)
+        for i in range(2):
+            for pair in self.delays:
+                self.assertEquals(next(iterator), pair)
+
+    def test_chain(self):
+        iterator = sampling.delays(sampling.cycle(self.samplings))
+        for i in range(3):
+            for pair in self.delays:
+                self.assertEquals(next(iterator), pair)
+
+
+class SamplerTests(TestCaseBase):
+
+    def setUp(self):
+        self.scheduler = FakeScheduler()
+        self.executor = FakeExecutor()
+        self.handler = FakeHandler(True)
+        self.clientIF = FakeClientIF([FakeVM(0, self.handler)])
+        self.task1 = Task(2, "task1")
+        self.task2 = Task(3, "task2")
+        self.task3 = Task(4, "task3")
+        self.samplings = [self.task1, self.task2, self.task3]
+        self.sampler = sampling.Sampler(self.samplings, self.clientIF,
+                                        scheduler=self.scheduler,
+                                        executor=self.executor)
+
+    def test_start(self):
+        self.sampler.start()
+        self.assertEquals(self.scheduler.call.delay, 2)
+        self.assertEquals(self.scheduler.call.func, self.sampler._dispatch)
+        self.assertEquals(self.executor.tasks, [])
+
+    def test_stop(self):
+        self.sampler.start()
+        self.sampler.stop()
+        self.assertEquals(self.scheduler.call.func, INVALID)
+        self.assertEquals(self.executor.tasks, [])
+
+    def test_dispatch(self):
+        self.sampler.start()
+        self.scheduler.fire()
+        for (task, delay), sampling in zip(
+                self.executor.tasks, self.samplings):
+            self.assertEqual(delay, 10.0)
+            self.assertEqual(task._task, sampling)
+
+    def test_run_samplings(self):
+        self.sampler.start()
+
+        # time = 2
+        self.scheduler.fire()
+        self.executor.run()
+        self.assertEquals(self.scheduler.call.delay, 1)
+        self.assertEquals(self.task1.executed, 1)
+
+        # time = 3
+        self.scheduler.fire()
+        self.executor.run()
+        self.assertEquals(self.task2.executed, 1)
+        self.assertEquals(self.scheduler.call.delay, 1)
+
+        # time = 4
+        self.scheduler.fire()
+        self.executor.run()
+        self.assertEquals(self.task1.executed, 2)
+        self.assertEquals(self.scheduler.call.delay, 0)
+
+        # time = 4
+        self.scheduler.fire()
+        self.executor.run()
+        self.assertEquals(self.task3.executed, 1)
+        self.assertEquals(self.scheduler.call.delay, 2)
+
+        # time = 6
+        self.scheduler.fire()
+        self.executor.run()
+        self.assertEquals(self.task1.executed, 3)
+        self.assertEquals(self.scheduler.call.delay, 0)
+
+        # time = 6
+        self.scheduler.fire()
+        self.executor.run()
+        self.assertEquals(self.task2.executed, 2)
+        self.assertEquals(self.scheduler.call.delay, 2)
+
+
+class SamplerHandlerTests(TestCaseBase):
+
+    def test_does_handle_error(self):
+        self.check(FakeHandler(True))
+
+    def test_does_not_handle_error(self):
+        self.check(FakeHandler(False))
+
+    def check(self, handler):
+        scheduler = FakeScheduler()
+        executor = FakeExecutor()
+        error = Exception("Sampling task failed")
+        task = Task(2, "task", error=error)
+        sampler = sampling.Sampler([task], FakeClientIF([FakeVM(0, handler)]),
+                                   scheduler=scheduler, executor=executor)
+        sampler.start()
+        scheduler.fire()
+        executor.run()
+        self.assertEquals(handler.error, error)
+
+
+# Helpers
+
+
+class FakeHandler(object):
+
+    def __init__(self, result):
+        self.error = None
+        self.result = result
+        self.paused = False
+
+    def handleStatsException(self, e):
+        self.error = e
+        return self.result
+
+
+class FakeScheduler(object):
+
+    def __init__(self):
+        # Schedule only single call to simplify the tests
+        self.call = None
+
+    def schedule(self, delay, func):
+        assert self.call is None
+        self.call = Call(delay, func)
+        return self.call
+
+    def fire(self):
+        call = self.call
+        self.call = None
+        call.func()
+
+
+class Call(object):
+
+    def __init__(self, delay, func):
+        self.delay = delay
+        self.func = func
+
+    def cancel(self):
+        self.func = INVALID
+
+
+def INVALID():
+    pass
+
+
+class FakeExecutor(object):
+
+    def __init__(self):
+        self.tasks = []
+
+    def dispatch(self, task, timeout=None):
+        # Don't execute task on a worker thread to simplify the tests
+        self.tasks.append((task, timeout))
+
+    def run(self):
+        # Run next task manually to simplify the tests
+        task, timeout = self.tasks.pop()
+        task()
+
+
+class FakeVM(object):
+
+    def __init__(self, vm_id=0, handler=None):
+        self.id = vm_id
+        self._vmStats = handler
+
+
+class FakeClientIF(object):
+
+    def __init__(self, vms):
+        self.vmContainerLock = threading.Lock()
+        self.vmContainer = dict((vm.id, vm) for vm in vms)
diff --git a/tests/vmApiTests.py b/tests/vmApiTests.py
index 0342743..cdf5698 100644
--- a/tests/vmApiTests.py
+++ b/tests/vmApiTests.py
@@ -22,6 +22,8 @@
 import os
 import os.path
 
+from virt import sampling
+from virt.vm import VM_SAMPLINGS
 from virt import vmexitreason
 from vdsm import define
 from testlib import VdsmTestCase as TestCaseBase
@@ -29,6 +31,7 @@
 from rpc import vdsmapi
 
 from vmTests import FakeVM
+from samplingTests import FakeClientIF
 
 
 class TestSchemaCompliancyBase(TestCaseBase):
@@ -56,8 +59,9 @@
 
 @contextmanager
 def ensureVmStats(vm):
-    vm._initVmStats()
+    sampling.start(VM_SAMPLINGS, FakeClientIF([vm]))
     try:
+        vm._initVmStats()
         yield vm
     finally:
         vm._vmStats.stop()
diff --git a/vdsm/vdsm b/vdsm/vdsm
index cbc16ff..a498dbc 100755
--- a/vdsm/vdsm
+++ b/vdsm/vdsm
@@ -38,6 +38,9 @@
 from storage.dispatcher import Dispatcher
 from storage.hsm import HSM
 
+from virt import sampling
+from virt.vm import VM_SAMPLINGS
+
 import zombiereaper
 import dsaversion
 
@@ -81,6 +84,9 @@
     from clientIF import clientIF  # must import after config is read
     cif = clientIF.getInstance(irs, log)
     cif.start()
+
+    sampling.start(VM_SAMPLINGS, cif)
+
     try:
         while running[0]:
             signal.pause()
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 99d2585..04e4b20 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -26,16 +26,21 @@
 
     Contains a reverse dictionary pointing from error string to its error code.
 """
-import threading
-import os
-import time
-import logging
 import errno
 import ethtool
+import itertools
+import logging
+import operator
+import os
 import re
+import threading
+import time
 
+from vdsm.config import config
+from vdsm import executor
 from vdsm import utils
 from vdsm import netinfo
+from vdsm import schedule
 from vdsm.ipwrapper import getLinks
 from vdsm.constants import P_VDSM_RUN, P_VDSM_CLIENT_LOG
 
@@ -44,6 +49,38 @@
 _THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled'
 if not os.path.exists(_THP_STATE_PATH):
     _THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled'
+
+
+_SAMPLING_THREADS = config.getint('vars', 'vm_sampling_threads')
+
+
+_scheduler = schedule.Scheduler("sampling.scheduler")
+
+_executor = executor.Executor(
+    _SAMPLING_THREADS,
+    config.getint('vars', 'vm_sampling_tasks'),
+    _scheduler)
+
+_sampler = None
+
+
+def start(samplings, cif):
+    """ Called during application startup """
+    _scheduler.start()
+    _executor.start()
+    global _sampler
+    _sampler = Sampler(samplings, cif)
+    _sampler.start()
+
+
+def stop():
+    """ Called during application shutdown """
+    if _executor:
+        _executor.stop(wait=True)
+    if _scheduler:
+        _scheduler.stop()
+    if _sampler:
+        _sampler.stop()
 
 
 class InterfaceSample:
@@ -310,40 +347,25 @@
         return text
 
 
-class AdvancedStatsFunction(object):
-    """
-    A wrapper for functions and methods that will be executed at regular
-    intervals storing the return values for statistic purpose.
-    It is possible to provide a custom time function 'timefn' that provides
-    cached values to reduce system calls.
-    """
-    def __init__(self, function, interval=1, window=0, timefn=time.time):
-        self._function = function
+class SampleWindow(object):
+    def __init__(self, window, timefn=time.time):
         self._window = window
         self._timefn = timefn
         self._sample = []
-
-        if not isinstance(interval, int) or interval < 1:
-            raise ValueError("interval must be int and greater than 0")
-
-        self._interval = interval
+        self._last_time = 0  # No sample taken yet
 
     @property
-    def interval(self):
-        return self._interval
+    def lastTime(self):
+        return self._last_time
 
-    def __repr__(self):
-        return "<AdvancedStatsFunction %s at 0x%x>" % (
-            self._function.__name__, id(self._function.__name__))
-
-    def __call__(self, *args, **kwargs):
-        retValue = self._function(*args, **kwargs)
+    def addStat(self, retValue):
         retTime = self._timefn()
 
         if self._window > 0:
             self._sample.append((retTime, retValue))
             del self._sample[:-self._window]
 
+        self._last_time = retTime
         return retValue
 
     def getStats(self):
@@ -361,115 +383,164 @@
         return bgn_sample, end_sample, (end_time - bgn_time)
 
 
-class AdvancedStatsThread(threading.Thread):
+class Sampler(object):
     """
-    A thread that runs the registered AdvancedStatsFunction objects
-    for statistic and monitoring purpose.
+    Execute sampling tasks on the executor according to sampling schedule.
+
+    The sepcified samplings tasks are executed on executor one at a time, as
+    if there was a thread running them one after another in a loop.
+
+    If one of the sampling task get stuck, the rest of the sampling tasks are
+    delayed. This is important to prevent flooding of the thread pool with
+    possibly blocking sampling task, when the underlying libvirt thread is
+    stuck. When a stuck sampling tasks finish, the sampling tasks continue
+    normally.
+
+    When a sampling task is stuck, no other task can run on the blocked worker
+    thread. The executor is responsible for detecting and handling this.
+
+    Each vm should create one sampler, to ensure that there is only one
+    sampling task per vm excuting on the executor.
     """
-    DEFAULT_LOG = logging.getLogger("AdvancedStatsThread")
 
-    def __init__(self, log=DEFAULT_LOG, daemon=False):
-        """
-        Initialize an AdvancedStatsThread object
-        """
-        threading.Thread.__init__(self)
-        self.daemon = daemon
+    # If a sampling function blocks for more then this interval, the executor
+    # thread will be discarded and replaced with a new thread.
+    TIMEOUT = config.getint('vars', 'vm_sampling_timeout')
 
-        self._log = log
-        self._stopEvent = threading.Event()
-        self._contEvent = threading.Event()
+    _log = logging.getLogger("Sampler")
 
-        self._statsTime = None
-        self._statsFunctions = []
-
-    def addStatsFunction(self, *args):
-        """
-        Register the functions listed as arguments
-        """
-        if self.isAlive():
-            raise RuntimeError("AdvancedStatsThread is started")
-
-        for statsFunction in args:
-            self._statsFunctions.append(statsFunction)
+    def __init__(self, samplings, cif,
+                 scheduler=_scheduler,
+                 executor=_executor):
+        self._samplings = samplings
+        self._cif = cif
+        self._scheduler = scheduler
+        self._executor = executor
+        self._lock = threading.Lock()
+        self._running = False
+        self._iterator = None
+        self._task = _INVALID
+        self._call = None
 
     def start(self):
-        """
-        Start the execution of the thread and exit
-        """
-        self._log.debug("Start statistics collection")
-        threading.Thread.start(self)
+        with self._lock:
+            if self._running:
+                raise AssertionError("Sampler is running")
+            self._log.debug("Starting sampler")
+            self._running = True
+            self._iterator = delays(cycle(self._samplings))
+            self._schedule_next_task()
 
     def stop(self):
-        """
-        Stop the execution of the thread and exit
-        """
-        self._log.debug("Stop statistics collection")
-        self._stopEvent.set()
-        self._contEvent.set()
+        with self._lock:
+            if self._running:
+                self._log.debug("Stopping sampler")
+                self._running = False
+                self._call.cancel()
+                self._call = None
+                self._task = _INVALID
+                self._iterator = None
 
-    def pause(self):
-        """
-        Pause the execution of the registered functions
-        """
-        self._log.debug("Pause statistics collection")
-        self._contEvent.clear()
+    # Private
 
-    def cont(self):
+    def _dispatch(self):
         """
-        Resume the execution of the registered functions
+        Called from the scheduler thread when its time to run the current
+        sampling task.
         """
-        self._log.debug("Resume statistics collection")
-        self._contEvent.set()
+        with self._lock:
+            if self._running:
+                for part in _partitions(self._cif):
+                    self._executor.dispatch(
+                        _SamplingCall(self._task.task, part),
+                        timeout=self.TIMEOUT)
+                self._schedule_next_task()  # FIXME
 
-    def getLastSampleTime(self):
-        return self._statsTime
+    def _schedule_next_task(self):
+        delay, self._task = next(self._iterator)
+        self._call = self._scheduler.schedule(delay, self._dispatch)
 
-    def run(self):
-        self._log.debug("Stats thread started")
-        self._contEvent.set()
 
-        while not self._stopEvent.isSet():
+# Sentinel used to mark a task as invalid, allowing running sampling task
+# without holding a lock.
+def _INVALID():
+    pass
+
+
+def delays(virtual_times):
+    """
+    Accept stream of tuples (virtual_time, task) and return stream of tuples
+    (delay, task).
+    """
+    last_virtual_time = 0
+    for virtual_time, task in virtual_times:
+        delay = virtual_time - last_virtual_time
+        last_virtual_time = virtual_time
+        yield delay, task
+
+
+def cycle(samplings):
+    """
+    Returns endless stream of tuples (virtual_time, task)
+    """
+    samplings = group_by_interval(samplings)
+    virtual_time = samplings[0][0]
+    while True:
+        wait = samplings[0][0]
+        for interval, tasks in samplings:
+            reminder = virtual_time % interval
+            if reminder == 0:
+                for task in tasks:
+                    yield virtual_time, task
+            else:
+                wait = min(wait, interval - reminder)
+        virtual_time += wait
+
+
+def group_by_interval(samplings):
+    """
+    Groups samplings by interval.
+    """
+    keyfn = operator.attrgetter('interval')
+    samplings = sorted(samplings, key=keyfn)
+    return [(interval, tuple(tasks))
+            for interval, tasks in itertools.groupby(samplings, keyfn)]
+
+
+def _grouper(iterable, num=_SAMPLING_THREADS, fillvalue=None):
+    """se itertools recipes: grouper"""
+    args = [iter(iterable)] * num
+    return itertools.izip_longest(fillvalue=fillvalue, *args)
+
+
+def _partitions(cif):
+    with cif.vmContainerLock:
+        return tuple(_grouper(cif.vmContainer.itervalues()))
+
+
+class _SamplingCall(object):
+    def __init__(self, task, vms):
+        self._task = task
+        self._vms = vms
+
+    # Task interface
+
+    def __call__(self):
+        for vm in self._vms:
+            if vm is None:
+                break
+
+            stats = vm._vmStats
+            if stats.paused:
+                continue
+
             try:
-                self.collect()
-            except:
-                self._log.debug("Stats thread failed", exc_info=True)
-
-        self._log.debug("Stats thread finished")
-
-    def handleStatsException(self, ex):
-        """
-        Handle the registered function exceptions and eventually stop the
-        sampling if a fatal error occurred.
-        """
-        return False
-
-    def collect(self):
-        # TODO: improve this with lcm
-        _mInt = map(lambda x: x.interval, self._statsFunctions)
-        maxInterval = reduce(lambda x, y: x * y, set(_mInt), 1)
-
-        intervalAccum = 0
-        while not self._stopEvent.isSet():
-            self._contEvent.wait()
-
-            self._statsTime = time.time()
-            waitInterval = maxInterval
-
-            for statsFunction in self._statsFunctions:
-                thisInterval = statsFunction.interval - (
-                    intervalAccum % statsFunction.interval)
-                waitInterval = min(waitInterval, thisInterval)
-
-                if intervalAccum % statsFunction.interval == 0:
-                    try:
-                        statsFunction()
-                    except Exception as e:
-                        if not self.handleStatsException(e):
-                            self._log.error("Stats function failed: %s",
-                                            statsFunction, exc_info=True)
-
-            self._stopEvent.wait(waitInterval)
-            intervalAccum = (intervalAccum + waitInterval) % maxInterval
+                self._task(vm, stats)
+            except Exception as e:
+                handler = vm._vmStats
+                if not handler.handleStatsException(e):
+                    logging.exception("Stats function failed: %s for vm %s",
+                                      self._task, vm.id)
 
 
 class HostStatsThread(threading.Thread):
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 9e93425..ed9202e 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -20,6 +20,7 @@
 
 
 # stdlib imports
+from collections import namedtuple
 from contextlib import contextmanager
 from copy import deepcopy
 from xml.dom.minidom import parseString as _domParseStr
@@ -59,13 +60,13 @@
 # local package imports
 from . import guestagent
 from . import migration
+from . import sampling
 from . import vmexitreason
 from . import vmstatus
 from .vmtune import update_io_tune_dom, collect_inner_elements
 from .vmtune import io_tune_values_to_dom, io_tune_dom_to_values
 from . import vmxml
 
-from .sampling import AdvancedStatsFunction, AdvancedStatsThread
 from .utils import isVdsmImage
 from vmpowerdown import VmShutdown, VmReboot
 
@@ -167,175 +168,188 @@
     pass
 
 
-class VmStatsThread(AdvancedStatsThread):
+# VM samplings
+
+def highWrite(vm, stats):
+    # Avoid queries from storage during recovery process
+    if vm.isDisksStatsCollectionEnabled():
+        vm.extendDrivesIfNeeded()
+
+
+def updateVolumes(vm, stats):
+    # Avoid queries from storage during recovery process
+    if vm.isDisksStatsCollectionEnabled():
+        for vmDrive in vm.getDiskDevices():
+            vm.updateDriveVolume(vmDrive)
+
+
+def sampleCpu(vm, stats):
+    cpuStats = vm._dom.getCPUStats(True, 0)
+    stats.sampleCpu.addStat(cpuStats[0])
+
+
+def sampleDisk(vm, stats):
+    # Avoid queries from storage during recovery process
+    if vm.isDisksStatsCollectionEnabled():
+        diskSamples = {}
+        for vmDrive in vm.getDiskDevices():
+            diskSamples[vmDrive.name] = vm._dom.blockStats(vmDrive.name)
+
+        stats.sampleDisk.addStat(diskSamples)
+
+
+def sampleDiskLatency(vm, stats):
+    # Avoid queries from storage during recovery process
+    if vm.isDisksStatsCollectionEnabled():
+        # {'wr_total_times': 0L, 'rd_operations': 9638L,
+        # 'flush_total_times': 0L,'rd_total_times': 7622718001L,
+        # 'rd_bytes': 85172430L, 'flush_operations': 0L,
+        # 'wr_operations': 0L, 'wr_bytes': 0L}
+        diskLatency = {}
+        for vmDrive in vm.getDiskDevices():
+            diskLatency[vmDrive.name] = vm._dom.blockStatsFlags(
+                vmDrive.name, flags=libvirt.VIR_TYPED_PARAM_STRING_OKAY)
+        stats.sampleDiskLatency.addStat(diskLatency)
+
+
+def sampleNet(vm, stats):
+    netSamples = {}
+    for nic in vm.getNicDevices():
+        netSamples[nic.name] = vm._dom.interfaceStats(nic.name)
+    stats.sampleNet.addStat(netSamples)
+
+
+def sampleVcpuPinning(vm, stats):
+    vCpuInfos = vm._dom.vcpus()
+    stats.sampleVcpuPinning.addStat(vCpuInfos[0])
+
+
+def sampleBalloon(vm, stats):
+    infos = vm._dom.info()
+    stats.sampleBalloon.addStat(infos[2])
+
+
+def sampleVmJobs(vm, stats):
+    stats.sampleVmJobs.addStat(vm.queryBlockJobs())
+
+
+# This flag will prevent excessive log flooding when running
+# on libvirt with no support for metadata xml elements.
+#
+# The issue currently exists only on CentOS/RHEL 6.5 that
+# ships libvirt-0.10.x.
+#
+# TODO: Remove as soon as there is a hard dependency we can use
+_libvirt_metadata_supported = True
+
+
+def sampleCpuTune(vm, stats):
+    infos = vm._dom.schedulerParameters()
+    infos['vcpuCount'] = vm._dom.vcpusFlags(
+        libvirt.VIR_DOMAIN_VCPU_CURRENT)
+
+    metadataCpuLimit = None
+    global _libvirt_metadata_supported
+
+    try:
+        if _libvirt_metadata_supported:
+            metadataCpuLimit = vm._dom.metadata(
+                libvirt.VIR_DOMAIN_METADATA_ELEMENT,
+                METADATA_VM_TUNE_URI, 0)
+    except libvirt.libvirtError as e:
+        if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED:
+            _libvirt_metadata_supported = False  # FIXME
+            vm.log.error("libvirt does not support metadata")
+
+        elif (e.get_error_code()
+              not in (libvirt.VIR_ERR_NO_DOMAIN,
+                      libvirt.VIR_ERR_NO_DOMAIN_METADATA)):
+            # Non-existing VM and no metadata block are expected
+            # conditions and no reasons for concern here.
+            # All other errors should be reported.
+            vm.log.warn("Failed to retrieve QoS metadata because of %s", e)
+
+    if metadataCpuLimit:
+        metadataCpuLimitXML = _domParseStr(metadataCpuLimit)
+        nodeList = \
+            metadataCpuLimitXML.getElementsByTagName('vcpulimit')
+        infos['vcpuLimit'] = nodeList[0].childNodes[0].data
+
+    stats.sampleCpuTune.addStat(infos)
+
+
+SAMPLING = namedtuple('sampling', ('task', 'interval'))
+
+
+VM_SAMPLINGS = (
+    # not real samplings, but need to be done in the sampling context
+    SAMPLING(highWrite,
+             config.getint('vars', 'vm_watermark_interval')),
+    SAMPLING(updateVolumes,
+             config.getint('irs', 'vol_size_sample_interval')),
+    # real samplings (reported to Engine/outside world)
+    SAMPLING(sampleCpu,
+             config.getint('vars', 'vm_sample_cpu_interval')),
+    SAMPLING(sampleDisk,
+             config.getint('vars', 'vm_sample_disk_interval')),
+    SAMPLING(sampleDiskLatency,
+             config.getint('vars', 'vm_sample_disk_latency_interval')),
+    SAMPLING(sampleNet,
+             config.getint('vars', 'vm_sample_net_interval')),
+    SAMPLING(sampleBalloon,
+             config.getint('vars', 'vm_sample_balloon_interval')),
+    SAMPLING(sampleVmJobs,
+             config.getint('vars', 'vm_sample_jobs_interval')),
+    SAMPLING(sampleVcpuPinning,
+             config.getint('vars', 'vm_sample_vcpu_pin_interval')),
+    SAMPLING(sampleCpuTune,
+             config.getint('vars', 'vm_sample_cpu_tune_interval')))
+
+
+class VmStats(object):
     MBPS_TO_BPS = 10 ** 6 / 8
 
     # CPU tune sampling window
     # minimum supported value is 2
     CPU_TUNE_SAMPLING_WINDOW = 2
 
-    # This flag will prevent excessive log flooding when running
-    # on libvirt with no support for metadata xml elements.
-    #
-    # The issue currently exists only on CentOS/RHEL 6.5 that
-    # ships libvirt-0.10.x.
-    #
-    # TODO: Remove as soon as there is a hard dependency we can use
-    _libvirt_metadata_supported = True
+    _log = logging.getLogger("vm.VmStats")
 
     def __init__(self, vm):
-        AdvancedStatsThread.__init__(self, log=vm.log, daemon=True)
         self._vm = vm
+        self._paused = False
 
-        self.highWrite = (
-            AdvancedStatsFunction(
-                self._highWrite,
-                config.getint('vars', 'vm_watermark_interval')))
-        self.updateVolumes = (
-            AdvancedStatsFunction(
-                self._updateVolumes,
-                config.getint('irs', 'vol_size_sample_interval')))
+        self.sampleCpu = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_cpu_window'))
+        self.sampleDisk = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_disk_window'))
+        self.sampleDiskLatency = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_disk_latency_window'))
+        self.sampleNet = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_net_window'))
+        self.sampleBalloon = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_balloon_window'))
+        self.sampleVmJobs = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_jobs_window'))
+        self.sampleVcpuPinning = sampling.SampleWindow(
+            config.getint('vars', 'vm_sample_vcpu_pin_window'))
+        self.sampleCpuTune = sampling.SampleWindow(
+            self.CPU_TUNE_SAMPLING_WINDOW)
 
-        self.sampleCpu = (
-            AdvancedStatsFunction(
-                self._sampleCpu,
-                config.getint('vars', 'vm_sample_cpu_interval'),
-                config.getint('vars', 'vm_sample_cpu_window')))
-        self.sampleDisk = (
-            AdvancedStatsFunction(
-                self._sampleDisk,
-                config.getint('vars', 'vm_sample_disk_interval'),
-                config.getint('vars', 'vm_sample_disk_window')))
-        self.sampleDiskLatency = (
-            AdvancedStatsFunction(
-                self._sampleDiskLatency,
-                config.getint('vars', 'vm_sample_disk_latency_interval'),
-                config.getint('vars', 'vm_sample_disk_latency_window')))
-        self.sampleNet = (
-            AdvancedStatsFunction(
-                self._sampleNet,
-                config.getint('vars', 'vm_sample_net_interval'),
-                config.getint('vars', 'vm_sample_net_window')))
-        self.sampleBalloon = (
-            AdvancedStatsFunction(
-                self._sampleBalloon,
-                config.getint('vars', 'vm_sample_balloon_interval'),
-                config.getint('vars', 'vm_sample_balloon_window')))
-        self.sampleVmJobs = (
-            AdvancedStatsFunction(
-                self._sampleVmJobs,
-                config.getint('vars', 'vm_sample_jobs_interval'),
-                config.getint('vars', 'vm_sample_jobs_window')))
-        self.sampleVcpuPinning = (
-            AdvancedStatsFunction(
-                self._sampleVcpuPinning,
-                config.getint('vars', 'vm_sample_vcpu_pin_interval'),
-                config.getint('vars', 'vm_sample_vcpu_pin_window')))
-        self.sampleCpuTune = (
-            AdvancedStatsFunction(
-                self._sampleCpuTune,
-                config.getint('vars', 'vm_sample_cpu_tune_interval'),
-                self.CPU_TUNE_SAMPLING_WINDOW))
+        self._samplings = (
+            self.sampleCpu, self.sampleDisk,
+            self.sampleDiskLatency, self.sampleNet, self.sampleBalloon,
+            self.sampleVmJobs, self.sampleVcpuPinning, self.sampleCpuTune)
 
-        self.addStatsFunction(
-            self.highWrite, self.updateVolumes, self.sampleCpu,
-            self.sampleDisk, self.sampleDiskLatency, self.sampleNet,
-            self.sampleBalloon, self.sampleVmJobs, self.sampleVcpuPinning,
-            self.sampleCpuTune)
+    def pause(self):
+        self._paused = True
 
-    def _highWrite(self):
-        if not self._vm.isDisksStatsCollectionEnabled():
-            # Avoid queries from storage during recovery process
-            return
-        self._vm.extendDrivesIfNeeded()
+    def cont(self):
+        self._paused = False
 
-    def _updateVolumes(self):
-        if not self._vm.isDisksStatsCollectionEnabled():
-            # Avoid queries from storage during recovery process
-            return
-
-        for vmDrive in self._vm.getDiskDevices():
-            self._vm.updateDriveVolume(vmDrive)
-
-    def _sampleCpu(self):
-        cpuStats = self._vm._dom.getCPUStats(True, 0)
-        return cpuStats[0]
-
-    def _sampleDisk(self):
-        if not self._vm.isDisksStatsCollectionEnabled():
-            # Avoid queries from storage during recovery process
-            return
-
-        diskSamples = {}
-        for vmDrive in self._vm.getDiskDevices():
-            diskSamples[vmDrive.name] = self._vm._dom.blockStats(vmDrive.name)
-
-        return diskSamples
-
-    def _sampleDiskLatency(self):
-        if not self._vm.isDisksStatsCollectionEnabled():
-            # Avoid queries from storage during recovery process
-            return
-        # {'wr_total_times': 0L, 'rd_operations': 9638L,
-        # 'flush_total_times': 0L,'rd_total_times': 7622718001L,
-        # 'rd_bytes': 85172430L, 'flush_operations': 0L,
-        # 'wr_operations': 0L, 'wr_bytes': 0L}
-        diskLatency = {}
-        for vmDrive in self._vm.getDiskDevices():
-            diskLatency[vmDrive.name] = self._vm._dom.blockStatsFlags(
-                vmDrive.name, flags=libvirt.VIR_TYPED_PARAM_STRING_OKAY)
-        return diskLatency
-
-    def _sampleNet(self):
-        netSamples = {}
-        for nic in self._vm.getNicDevices():
-            netSamples[nic.name] = self._vm._dom.interfaceStats(nic.name)
-        return netSamples
-
-    def _sampleVcpuPinning(self):
-        vCpuInfos = self._vm._dom.vcpus()
-        return vCpuInfos[0]
-
-    def _sampleBalloon(self):
-        infos = self._vm._dom.info()
-        return infos[2]
-
-    def _sampleVmJobs(self):
-        return self._vm.queryBlockJobs()
-
-    def _sampleCpuTune(self):
-        infos = self._vm._dom.schedulerParameters()
-        infos['vcpuCount'] = self._vm._dom.vcpusFlags(
-            libvirt.VIR_DOMAIN_VCPU_CURRENT)
-
-        metadataCpuLimit = None
-
-        try:
-            if VmStatsThread._libvirt_metadata_supported:
-                metadataCpuLimit = self._vm._dom.metadata(
-                    libvirt.VIR_DOMAIN_METADATA_ELEMENT,
-                    METADATA_VM_TUNE_URI, 0)
-        except libvirt.libvirtError as e:
-            if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED:
-                VmStatsThread._libvirt_metadata_supported = False
-                self._log.error("libvirt does not support metadata")
-
-            elif (e.get_error_code()
-                  not in (libvirt.VIR_ERR_NO_DOMAIN,
-                          libvirt.VIR_ERR_NO_DOMAIN_METADATA)):
-                # Non-existing VM and no metadata block are expected
-                # conditions and no reasons for concern here.
-                # All other errors should be reported.
-                self._log.warn("Failed to retrieve QoS metadata because of %s",
-                               e)
-
-        if metadataCpuLimit:
-            metadataCpuLimitXML = _domParseStr(metadataCpuLimit)
-            nodeList = \
-                metadataCpuLimitXML.getElementsByTagName('vcpulimit')
-            infos['vcpuLimit'] = nodeList[0].childNodes[0].data
-
-        return infos
+    @property
+    def paused(self):
+        return self._paused
 
     def _getIoTuneStats(self, stats):
         """
@@ -579,7 +593,8 @@
         stats = {}
 
         try:
-            stats['statsAge'] = time.time() - self.getLastSampleTime()
+            stats['statsAge'] = time.time() - max(
+                sample.lastTime for sample in self._samplings)
         except TypeError:
             self._log.debug("Stats age not available")
             stats['statsAge'] = -1.0
@@ -2401,10 +2416,6 @@
             self.guestAgent.stop()
         except Exception:
             pass
-        try:
-            self._vmStats.stop()
-        except Exception:
-            pass
         self.saveState()
 
     def status(self):
@@ -2708,8 +2719,7 @@
         return domxml.toxml()
 
     def _initVmStats(self):
-        self._vmStats = VmStatsThread(self)
-        self._vmStats.start()
+        self._vmStats = VmStats(self)
         self._guestEventTime = self._startTime
 
     @staticmethod
@@ -2823,7 +2833,7 @@
                         supervdsm.getProxy().setPortMirroring(network,
                                                               nic.name)
 
-        # VmStatsThread may use block devices info from libvirt.
+        # VmStats may use block devices info from libvirt.
         # So, run it after you have this info
         self._initVmStats()
         self.guestAgent = guestagent.GuestAgent(
@@ -4525,8 +4535,6 @@
             self.lastStatus = vmstatus.POWERING_DOWN
             # Terminate the VM's creation thread.
             self._incomingMigrationFinished.set()
-            if self._vmStats:
-                self._vmStats.stop()
             if self.guestAgent:
                 self.guestAgent.stop()
             if self._dom:


-- 
To view, visit http://gerrit.ovirt.org/31181
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If3d1edc8099517710a7b9c31758930f13131d0d1
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to