Francesco Romani has uploaded a new change for review. Change subject: WIP: sampling: per-worker sampling ......................................................................
WIP: sampling: per-worker sampling The new scalable sampling http://gerrit.ovirt.org/#/c/29980/ replies the old VDSM behaviour, which maximizes the VM isolation. This, however, adds to the VDSM CPU usage, because more work is needed to schedule events per vm and per sampling. This patch builds on the same foundations of the scalable sampling, but mixes thing differently in order to minimize the scheduled tasks. VM are partitioned and each partition is assigned to one worker thread; each worker thread performs the same sampling on all the VM assigned to it. The Scheduler just schedules sampling to all the worker threads. Considering just one sampling, for example sampleCpu, the scalable sampling approach schedules one task per VM, while this patch schedules just one task per worker thread. Since the worker pool size is expected to be *much* lower than the average VM pool size, this patch greatly reduces the traffic. The drawback of this approach is the isolation among VMs is now weaker, and more logic is needed to address and properly react to a stuck libvirt call. This patch is marked Work In Progress and is not yet ready because while minimal testing was succesfully done in the happy path, some important bits are still missing. Change-Id: If3d1edc8099517710a7b9c31758930f13131d0d1 Signed-off-by: Francesco Romani <[email protected]> --- M tests/numaUtilsTests.py M tests/samplingTests.py M tests/vmApiTests.py M vdsm/vdsm M vdsm/virt/sampling.py M vdsm/virt/vm.py 6 files changed, 619 insertions(+), 287 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/81/31181/1 diff --git a/tests/numaUtilsTests.py b/tests/numaUtilsTests.py index b3676fd..959372f 100644 --- a/tests/numaUtilsTests.py +++ b/tests/numaUtilsTests.py @@ -54,7 +54,7 @@ return {0: [0, 1], 1: [0, 1], 2: [0, 1], 3: [0, 1]} -class FakeAdvancedStatsFunction: +class FakeSampleWindow: def __init__(self): pass @@ -68,7 +68,7 @@ class FakeVmStatsThread: def __init__(self, vm): self._vm = vm - self.sampleVcpuPinning = FakeAdvancedStatsFunction() + self.sampleVcpuPinning = FakeSampleWindow() class TestNumaUtils(TestCaseBase): diff --git a/tests/samplingTests.py b/tests/samplingTests.py index 791054c..84c39b6 100644 --- a/tests/samplingTests.py +++ b/tests/samplingTests.py @@ -20,6 +20,7 @@ import os import tempfile +import threading import shutil from vdsm import ipwrapper @@ -111,3 +112,245 @@ s1 = sampling.InterfaceSample(lo) s1.operstate = 'x' self.assertEquals('operstate:x', s1.connlog_diff(s0)) + + +class Task(object): + + def __init__(self, interval, name, error=None): + self.interval = interval + self.name = name + self.error = error + self.executed = 0 + + def __call__(self, vm, stats): + self.executed += 1 + if self.error: + raise self.error + + +class SamplingIterationTests(TestCaseBase): + + task1 = Task(2, "task1") + task2 = Task(2, "task2") + task3 = Task(3, "task3") + samplings = [task1, task2, task3] + + virtual_times = [ + # cycle 1 + (2, task1), + (2, task2), + (3, task3), + (4, task1), + (4, task2), + (6, task1), + (6, task2), + (6, task3), + # cycle 2 + (8, task1), + (8, task2), + (9, task3), + (10, task1), + (10, task2), + (12, task1), + (12, task2), + (12, task3), + ] + + delays = [ + (2, task1), + (0, task2), + (1, task3), + (1, task1), + (0, task2), + (2, task1), + (0, task2), + (0, task3), + ] + + def test_virtual_time(self): + iterator = sampling.cycle(self.samplings) + for pair in self.virtual_times: + self.assertEquals(next(iterator), pair) + + def test_delays(self): + iterator = sampling.delays(self.virtual_times) + for i in range(2): + for pair in self.delays: + self.assertEquals(next(iterator), pair) + + def test_chain(self): + iterator = sampling.delays(sampling.cycle(self.samplings)) + for i in range(3): + for pair in self.delays: + self.assertEquals(next(iterator), pair) + + +class SamplerTests(TestCaseBase): + + def setUp(self): + self.scheduler = FakeScheduler() + self.executor = FakeExecutor() + self.handler = FakeHandler(True) + self.clientIF = FakeClientIF([FakeVM(0, self.handler)]) + self.task1 = Task(2, "task1") + self.task2 = Task(3, "task2") + self.task3 = Task(4, "task3") + self.samplings = [self.task1, self.task2, self.task3] + self.sampler = sampling.Sampler(self.samplings, self.clientIF, + scheduler=self.scheduler, + executor=self.executor) + + def test_start(self): + self.sampler.start() + self.assertEquals(self.scheduler.call.delay, 2) + self.assertEquals(self.scheduler.call.func, self.sampler._dispatch) + self.assertEquals(self.executor.tasks, []) + + def test_stop(self): + self.sampler.start() + self.sampler.stop() + self.assertEquals(self.scheduler.call.func, INVALID) + self.assertEquals(self.executor.tasks, []) + + def test_dispatch(self): + self.sampler.start() + self.scheduler.fire() + for (task, delay), sampling in zip( + self.executor.tasks, self.samplings): + self.assertEqual(delay, 10.0) + self.assertEqual(task._task, sampling) + + def test_run_samplings(self): + self.sampler.start() + + # time = 2 + self.scheduler.fire() + self.executor.run() + self.assertEquals(self.scheduler.call.delay, 1) + self.assertEquals(self.task1.executed, 1) + + # time = 3 + self.scheduler.fire() + self.executor.run() + self.assertEquals(self.task2.executed, 1) + self.assertEquals(self.scheduler.call.delay, 1) + + # time = 4 + self.scheduler.fire() + self.executor.run() + self.assertEquals(self.task1.executed, 2) + self.assertEquals(self.scheduler.call.delay, 0) + + # time = 4 + self.scheduler.fire() + self.executor.run() + self.assertEquals(self.task3.executed, 1) + self.assertEquals(self.scheduler.call.delay, 2) + + # time = 6 + self.scheduler.fire() + self.executor.run() + self.assertEquals(self.task1.executed, 3) + self.assertEquals(self.scheduler.call.delay, 0) + + # time = 6 + self.scheduler.fire() + self.executor.run() + self.assertEquals(self.task2.executed, 2) + self.assertEquals(self.scheduler.call.delay, 2) + + +class SamplerHandlerTests(TestCaseBase): + + def test_does_handle_error(self): + self.check(FakeHandler(True)) + + def test_does_not_handle_error(self): + self.check(FakeHandler(False)) + + def check(self, handler): + scheduler = FakeScheduler() + executor = FakeExecutor() + error = Exception("Sampling task failed") + task = Task(2, "task", error=error) + sampler = sampling.Sampler([task], FakeClientIF([FakeVM(0, handler)]), + scheduler=scheduler, executor=executor) + sampler.start() + scheduler.fire() + executor.run() + self.assertEquals(handler.error, error) + + +# Helpers + + +class FakeHandler(object): + + def __init__(self, result): + self.error = None + self.result = result + self.paused = False + + def handleStatsException(self, e): + self.error = e + return self.result + + +class FakeScheduler(object): + + def __init__(self): + # Schedule only single call to simplify the tests + self.call = None + + def schedule(self, delay, func): + assert self.call is None + self.call = Call(delay, func) + return self.call + + def fire(self): + call = self.call + self.call = None + call.func() + + +class Call(object): + + def __init__(self, delay, func): + self.delay = delay + self.func = func + + def cancel(self): + self.func = INVALID + + +def INVALID(): + pass + + +class FakeExecutor(object): + + def __init__(self): + self.tasks = [] + + def dispatch(self, task, timeout=None): + # Don't execute task on a worker thread to simplify the tests + self.tasks.append((task, timeout)) + + def run(self): + # Run next task manually to simplify the tests + task, timeout = self.tasks.pop() + task() + + +class FakeVM(object): + + def __init__(self, vm_id=0, handler=None): + self.id = vm_id + self._vmStats = handler + + +class FakeClientIF(object): + + def __init__(self, vms): + self.vmContainerLock = threading.Lock() + self.vmContainer = dict((vm.id, vm) for vm in vms) diff --git a/tests/vmApiTests.py b/tests/vmApiTests.py index 0342743..cdf5698 100644 --- a/tests/vmApiTests.py +++ b/tests/vmApiTests.py @@ -22,6 +22,8 @@ import os import os.path +from virt import sampling +from virt.vm import VM_SAMPLINGS from virt import vmexitreason from vdsm import define from testlib import VdsmTestCase as TestCaseBase @@ -29,6 +31,7 @@ from rpc import vdsmapi from vmTests import FakeVM +from samplingTests import FakeClientIF class TestSchemaCompliancyBase(TestCaseBase): @@ -56,8 +59,9 @@ @contextmanager def ensureVmStats(vm): - vm._initVmStats() + sampling.start(VM_SAMPLINGS, FakeClientIF([vm])) try: + vm._initVmStats() yield vm finally: vm._vmStats.stop() diff --git a/vdsm/vdsm b/vdsm/vdsm index cbc16ff..a498dbc 100755 --- a/vdsm/vdsm +++ b/vdsm/vdsm @@ -38,6 +38,9 @@ from storage.dispatcher import Dispatcher from storage.hsm import HSM +from virt import sampling +from virt.vm import VM_SAMPLINGS + import zombiereaper import dsaversion @@ -81,6 +84,9 @@ from clientIF import clientIF # must import after config is read cif = clientIF.getInstance(irs, log) cif.start() + + sampling.start(VM_SAMPLINGS, cif) + try: while running[0]: signal.pause() diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py index 99d2585..04e4b20 100644 --- a/vdsm/virt/sampling.py +++ b/vdsm/virt/sampling.py @@ -26,16 +26,21 @@ Contains a reverse dictionary pointing from error string to its error code. """ -import threading -import os -import time -import logging import errno import ethtool +import itertools +import logging +import operator +import os import re +import threading +import time +from vdsm.config import config +from vdsm import executor from vdsm import utils from vdsm import netinfo +from vdsm import schedule from vdsm.ipwrapper import getLinks from vdsm.constants import P_VDSM_RUN, P_VDSM_CLIENT_LOG @@ -44,6 +49,38 @@ _THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled' if not os.path.exists(_THP_STATE_PATH): _THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled' + + +_SAMPLING_THREADS = config.getint('vars', 'vm_sampling_threads') + + +_scheduler = schedule.Scheduler("sampling.scheduler") + +_executor = executor.Executor( + _SAMPLING_THREADS, + config.getint('vars', 'vm_sampling_tasks'), + _scheduler) + +_sampler = None + + +def start(samplings, cif): + """ Called during application startup """ + _scheduler.start() + _executor.start() + global _sampler + _sampler = Sampler(samplings, cif) + _sampler.start() + + +def stop(): + """ Called during application shutdown """ + if _executor: + _executor.stop(wait=True) + if _scheduler: + _scheduler.stop() + if _sampler: + _sampler.stop() class InterfaceSample: @@ -310,40 +347,25 @@ return text -class AdvancedStatsFunction(object): - """ - A wrapper for functions and methods that will be executed at regular - intervals storing the return values for statistic purpose. - It is possible to provide a custom time function 'timefn' that provides - cached values to reduce system calls. - """ - def __init__(self, function, interval=1, window=0, timefn=time.time): - self._function = function +class SampleWindow(object): + def __init__(self, window, timefn=time.time): self._window = window self._timefn = timefn self._sample = [] - - if not isinstance(interval, int) or interval < 1: - raise ValueError("interval must be int and greater than 0") - - self._interval = interval + self._last_time = 0 # No sample taken yet @property - def interval(self): - return self._interval + def lastTime(self): + return self._last_time - def __repr__(self): - return "<AdvancedStatsFunction %s at 0x%x>" % ( - self._function.__name__, id(self._function.__name__)) - - def __call__(self, *args, **kwargs): - retValue = self._function(*args, **kwargs) + def addStat(self, retValue): retTime = self._timefn() if self._window > 0: self._sample.append((retTime, retValue)) del self._sample[:-self._window] + self._last_time = retTime return retValue def getStats(self): @@ -361,115 +383,164 @@ return bgn_sample, end_sample, (end_time - bgn_time) -class AdvancedStatsThread(threading.Thread): +class Sampler(object): """ - A thread that runs the registered AdvancedStatsFunction objects - for statistic and monitoring purpose. + Execute sampling tasks on the executor according to sampling schedule. + + The sepcified samplings tasks are executed on executor one at a time, as + if there was a thread running them one after another in a loop. + + If one of the sampling task get stuck, the rest of the sampling tasks are + delayed. This is important to prevent flooding of the thread pool with + possibly blocking sampling task, when the underlying libvirt thread is + stuck. When a stuck sampling tasks finish, the sampling tasks continue + normally. + + When a sampling task is stuck, no other task can run on the blocked worker + thread. The executor is responsible for detecting and handling this. + + Each vm should create one sampler, to ensure that there is only one + sampling task per vm excuting on the executor. """ - DEFAULT_LOG = logging.getLogger("AdvancedStatsThread") - def __init__(self, log=DEFAULT_LOG, daemon=False): - """ - Initialize an AdvancedStatsThread object - """ - threading.Thread.__init__(self) - self.daemon = daemon + # If a sampling function blocks for more then this interval, the executor + # thread will be discarded and replaced with a new thread. + TIMEOUT = config.getint('vars', 'vm_sampling_timeout') - self._log = log - self._stopEvent = threading.Event() - self._contEvent = threading.Event() + _log = logging.getLogger("Sampler") - self._statsTime = None - self._statsFunctions = [] - - def addStatsFunction(self, *args): - """ - Register the functions listed as arguments - """ - if self.isAlive(): - raise RuntimeError("AdvancedStatsThread is started") - - for statsFunction in args: - self._statsFunctions.append(statsFunction) + def __init__(self, samplings, cif, + scheduler=_scheduler, + executor=_executor): + self._samplings = samplings + self._cif = cif + self._scheduler = scheduler + self._executor = executor + self._lock = threading.Lock() + self._running = False + self._iterator = None + self._task = _INVALID + self._call = None def start(self): - """ - Start the execution of the thread and exit - """ - self._log.debug("Start statistics collection") - threading.Thread.start(self) + with self._lock: + if self._running: + raise AssertionError("Sampler is running") + self._log.debug("Starting sampler") + self._running = True + self._iterator = delays(cycle(self._samplings)) + self._schedule_next_task() def stop(self): - """ - Stop the execution of the thread and exit - """ - self._log.debug("Stop statistics collection") - self._stopEvent.set() - self._contEvent.set() + with self._lock: + if self._running: + self._log.debug("Stopping sampler") + self._running = False + self._call.cancel() + self._call = None + self._task = _INVALID + self._iterator = None - def pause(self): - """ - Pause the execution of the registered functions - """ - self._log.debug("Pause statistics collection") - self._contEvent.clear() + # Private - def cont(self): + def _dispatch(self): """ - Resume the execution of the registered functions + Called from the scheduler thread when its time to run the current + sampling task. """ - self._log.debug("Resume statistics collection") - self._contEvent.set() + with self._lock: + if self._running: + for part in _partitions(self._cif): + self._executor.dispatch( + _SamplingCall(self._task.task, part), + timeout=self.TIMEOUT) + self._schedule_next_task() # FIXME - def getLastSampleTime(self): - return self._statsTime + def _schedule_next_task(self): + delay, self._task = next(self._iterator) + self._call = self._scheduler.schedule(delay, self._dispatch) - def run(self): - self._log.debug("Stats thread started") - self._contEvent.set() - while not self._stopEvent.isSet(): +# Sentinel used to mark a task as invalid, allowing running sampling task +# without holding a lock. +def _INVALID(): + pass + + +def delays(virtual_times): + """ + Accept stream of tuples (virtual_time, task) and return stream of tuples + (delay, task). + """ + last_virtual_time = 0 + for virtual_time, task in virtual_times: + delay = virtual_time - last_virtual_time + last_virtual_time = virtual_time + yield delay, task + + +def cycle(samplings): + """ + Returns endless stream of tuples (virtual_time, task) + """ + samplings = group_by_interval(samplings) + virtual_time = samplings[0][0] + while True: + wait = samplings[0][0] + for interval, tasks in samplings: + reminder = virtual_time % interval + if reminder == 0: + for task in tasks: + yield virtual_time, task + else: + wait = min(wait, interval - reminder) + virtual_time += wait + + +def group_by_interval(samplings): + """ + Groups samplings by interval. + """ + keyfn = operator.attrgetter('interval') + samplings = sorted(samplings, key=keyfn) + return [(interval, tuple(tasks)) + for interval, tasks in itertools.groupby(samplings, keyfn)] + + +def _grouper(iterable, num=_SAMPLING_THREADS, fillvalue=None): + """se itertools recipes: grouper""" + args = [iter(iterable)] * num + return itertools.izip_longest(fillvalue=fillvalue, *args) + + +def _partitions(cif): + with cif.vmContainerLock: + return tuple(_grouper(cif.vmContainer.itervalues())) + + +class _SamplingCall(object): + def __init__(self, task, vms): + self._task = task + self._vms = vms + + # Task interface + + def __call__(self): + for vm in self._vms: + if vm is None: + break + + stats = vm._vmStats + if stats.paused: + continue + try: - self.collect() - except: - self._log.debug("Stats thread failed", exc_info=True) - - self._log.debug("Stats thread finished") - - def handleStatsException(self, ex): - """ - Handle the registered function exceptions and eventually stop the - sampling if a fatal error occurred. - """ - return False - - def collect(self): - # TODO: improve this with lcm - _mInt = map(lambda x: x.interval, self._statsFunctions) - maxInterval = reduce(lambda x, y: x * y, set(_mInt), 1) - - intervalAccum = 0 - while not self._stopEvent.isSet(): - self._contEvent.wait() - - self._statsTime = time.time() - waitInterval = maxInterval - - for statsFunction in self._statsFunctions: - thisInterval = statsFunction.interval - ( - intervalAccum % statsFunction.interval) - waitInterval = min(waitInterval, thisInterval) - - if intervalAccum % statsFunction.interval == 0: - try: - statsFunction() - except Exception as e: - if not self.handleStatsException(e): - self._log.error("Stats function failed: %s", - statsFunction, exc_info=True) - - self._stopEvent.wait(waitInterval) - intervalAccum = (intervalAccum + waitInterval) % maxInterval + self._task(vm, stats) + except Exception as e: + handler = vm._vmStats + if not handler.handleStatsException(e): + logging.exception("Stats function failed: %s for vm %s", + self._task, vm.id) class HostStatsThread(threading.Thread): diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py index 9e93425..ed9202e 100644 --- a/vdsm/virt/vm.py +++ b/vdsm/virt/vm.py @@ -20,6 +20,7 @@ # stdlib imports +from collections import namedtuple from contextlib import contextmanager from copy import deepcopy from xml.dom.minidom import parseString as _domParseStr @@ -59,13 +60,13 @@ # local package imports from . import guestagent from . import migration +from . import sampling from . import vmexitreason from . import vmstatus from .vmtune import update_io_tune_dom, collect_inner_elements from .vmtune import io_tune_values_to_dom, io_tune_dom_to_values from . import vmxml -from .sampling import AdvancedStatsFunction, AdvancedStatsThread from .utils import isVdsmImage from vmpowerdown import VmShutdown, VmReboot @@ -167,175 +168,188 @@ pass -class VmStatsThread(AdvancedStatsThread): +# VM samplings + +def highWrite(vm, stats): + # Avoid queries from storage during recovery process + if vm.isDisksStatsCollectionEnabled(): + vm.extendDrivesIfNeeded() + + +def updateVolumes(vm, stats): + # Avoid queries from storage during recovery process + if vm.isDisksStatsCollectionEnabled(): + for vmDrive in vm.getDiskDevices(): + vm.updateDriveVolume(vmDrive) + + +def sampleCpu(vm, stats): + cpuStats = vm._dom.getCPUStats(True, 0) + stats.sampleCpu.addStat(cpuStats[0]) + + +def sampleDisk(vm, stats): + # Avoid queries from storage during recovery process + if vm.isDisksStatsCollectionEnabled(): + diskSamples = {} + for vmDrive in vm.getDiskDevices(): + diskSamples[vmDrive.name] = vm._dom.blockStats(vmDrive.name) + + stats.sampleDisk.addStat(diskSamples) + + +def sampleDiskLatency(vm, stats): + # Avoid queries from storage during recovery process + if vm.isDisksStatsCollectionEnabled(): + # {'wr_total_times': 0L, 'rd_operations': 9638L, + # 'flush_total_times': 0L,'rd_total_times': 7622718001L, + # 'rd_bytes': 85172430L, 'flush_operations': 0L, + # 'wr_operations': 0L, 'wr_bytes': 0L} + diskLatency = {} + for vmDrive in vm.getDiskDevices(): + diskLatency[vmDrive.name] = vm._dom.blockStatsFlags( + vmDrive.name, flags=libvirt.VIR_TYPED_PARAM_STRING_OKAY) + stats.sampleDiskLatency.addStat(diskLatency) + + +def sampleNet(vm, stats): + netSamples = {} + for nic in vm.getNicDevices(): + netSamples[nic.name] = vm._dom.interfaceStats(nic.name) + stats.sampleNet.addStat(netSamples) + + +def sampleVcpuPinning(vm, stats): + vCpuInfos = vm._dom.vcpus() + stats.sampleVcpuPinning.addStat(vCpuInfos[0]) + + +def sampleBalloon(vm, stats): + infos = vm._dom.info() + stats.sampleBalloon.addStat(infos[2]) + + +def sampleVmJobs(vm, stats): + stats.sampleVmJobs.addStat(vm.queryBlockJobs()) + + +# This flag will prevent excessive log flooding when running +# on libvirt with no support for metadata xml elements. +# +# The issue currently exists only on CentOS/RHEL 6.5 that +# ships libvirt-0.10.x. +# +# TODO: Remove as soon as there is a hard dependency we can use +_libvirt_metadata_supported = True + + +def sampleCpuTune(vm, stats): + infos = vm._dom.schedulerParameters() + infos['vcpuCount'] = vm._dom.vcpusFlags( + libvirt.VIR_DOMAIN_VCPU_CURRENT) + + metadataCpuLimit = None + global _libvirt_metadata_supported + + try: + if _libvirt_metadata_supported: + metadataCpuLimit = vm._dom.metadata( + libvirt.VIR_DOMAIN_METADATA_ELEMENT, + METADATA_VM_TUNE_URI, 0) + except libvirt.libvirtError as e: + if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED: + _libvirt_metadata_supported = False # FIXME + vm.log.error("libvirt does not support metadata") + + elif (e.get_error_code() + not in (libvirt.VIR_ERR_NO_DOMAIN, + libvirt.VIR_ERR_NO_DOMAIN_METADATA)): + # Non-existing VM and no metadata block are expected + # conditions and no reasons for concern here. + # All other errors should be reported. + vm.log.warn("Failed to retrieve QoS metadata because of %s", e) + + if metadataCpuLimit: + metadataCpuLimitXML = _domParseStr(metadataCpuLimit) + nodeList = \ + metadataCpuLimitXML.getElementsByTagName('vcpulimit') + infos['vcpuLimit'] = nodeList[0].childNodes[0].data + + stats.sampleCpuTune.addStat(infos) + + +SAMPLING = namedtuple('sampling', ('task', 'interval')) + + +VM_SAMPLINGS = ( + # not real samplings, but need to be done in the sampling context + SAMPLING(highWrite, + config.getint('vars', 'vm_watermark_interval')), + SAMPLING(updateVolumes, + config.getint('irs', 'vol_size_sample_interval')), + # real samplings (reported to Engine/outside world) + SAMPLING(sampleCpu, + config.getint('vars', 'vm_sample_cpu_interval')), + SAMPLING(sampleDisk, + config.getint('vars', 'vm_sample_disk_interval')), + SAMPLING(sampleDiskLatency, + config.getint('vars', 'vm_sample_disk_latency_interval')), + SAMPLING(sampleNet, + config.getint('vars', 'vm_sample_net_interval')), + SAMPLING(sampleBalloon, + config.getint('vars', 'vm_sample_balloon_interval')), + SAMPLING(sampleVmJobs, + config.getint('vars', 'vm_sample_jobs_interval')), + SAMPLING(sampleVcpuPinning, + config.getint('vars', 'vm_sample_vcpu_pin_interval')), + SAMPLING(sampleCpuTune, + config.getint('vars', 'vm_sample_cpu_tune_interval'))) + + +class VmStats(object): MBPS_TO_BPS = 10 ** 6 / 8 # CPU tune sampling window # minimum supported value is 2 CPU_TUNE_SAMPLING_WINDOW = 2 - # This flag will prevent excessive log flooding when running - # on libvirt with no support for metadata xml elements. - # - # The issue currently exists only on CentOS/RHEL 6.5 that - # ships libvirt-0.10.x. - # - # TODO: Remove as soon as there is a hard dependency we can use - _libvirt_metadata_supported = True + _log = logging.getLogger("vm.VmStats") def __init__(self, vm): - AdvancedStatsThread.__init__(self, log=vm.log, daemon=True) self._vm = vm + self._paused = False - self.highWrite = ( - AdvancedStatsFunction( - self._highWrite, - config.getint('vars', 'vm_watermark_interval'))) - self.updateVolumes = ( - AdvancedStatsFunction( - self._updateVolumes, - config.getint('irs', 'vol_size_sample_interval'))) + self.sampleCpu = sampling.SampleWindow( + config.getint('vars', 'vm_sample_cpu_window')) + self.sampleDisk = sampling.SampleWindow( + config.getint('vars', 'vm_sample_disk_window')) + self.sampleDiskLatency = sampling.SampleWindow( + config.getint('vars', 'vm_sample_disk_latency_window')) + self.sampleNet = sampling.SampleWindow( + config.getint('vars', 'vm_sample_net_window')) + self.sampleBalloon = sampling.SampleWindow( + config.getint('vars', 'vm_sample_balloon_window')) + self.sampleVmJobs = sampling.SampleWindow( + config.getint('vars', 'vm_sample_jobs_window')) + self.sampleVcpuPinning = sampling.SampleWindow( + config.getint('vars', 'vm_sample_vcpu_pin_window')) + self.sampleCpuTune = sampling.SampleWindow( + self.CPU_TUNE_SAMPLING_WINDOW) - self.sampleCpu = ( - AdvancedStatsFunction( - self._sampleCpu, - config.getint('vars', 'vm_sample_cpu_interval'), - config.getint('vars', 'vm_sample_cpu_window'))) - self.sampleDisk = ( - AdvancedStatsFunction( - self._sampleDisk, - config.getint('vars', 'vm_sample_disk_interval'), - config.getint('vars', 'vm_sample_disk_window'))) - self.sampleDiskLatency = ( - AdvancedStatsFunction( - self._sampleDiskLatency, - config.getint('vars', 'vm_sample_disk_latency_interval'), - config.getint('vars', 'vm_sample_disk_latency_window'))) - self.sampleNet = ( - AdvancedStatsFunction( - self._sampleNet, - config.getint('vars', 'vm_sample_net_interval'), - config.getint('vars', 'vm_sample_net_window'))) - self.sampleBalloon = ( - AdvancedStatsFunction( - self._sampleBalloon, - config.getint('vars', 'vm_sample_balloon_interval'), - config.getint('vars', 'vm_sample_balloon_window'))) - self.sampleVmJobs = ( - AdvancedStatsFunction( - self._sampleVmJobs, - config.getint('vars', 'vm_sample_jobs_interval'), - config.getint('vars', 'vm_sample_jobs_window'))) - self.sampleVcpuPinning = ( - AdvancedStatsFunction( - self._sampleVcpuPinning, - config.getint('vars', 'vm_sample_vcpu_pin_interval'), - config.getint('vars', 'vm_sample_vcpu_pin_window'))) - self.sampleCpuTune = ( - AdvancedStatsFunction( - self._sampleCpuTune, - config.getint('vars', 'vm_sample_cpu_tune_interval'), - self.CPU_TUNE_SAMPLING_WINDOW)) + self._samplings = ( + self.sampleCpu, self.sampleDisk, + self.sampleDiskLatency, self.sampleNet, self.sampleBalloon, + self.sampleVmJobs, self.sampleVcpuPinning, self.sampleCpuTune) - self.addStatsFunction( - self.highWrite, self.updateVolumes, self.sampleCpu, - self.sampleDisk, self.sampleDiskLatency, self.sampleNet, - self.sampleBalloon, self.sampleVmJobs, self.sampleVcpuPinning, - self.sampleCpuTune) + def pause(self): + self._paused = True - def _highWrite(self): - if not self._vm.isDisksStatsCollectionEnabled(): - # Avoid queries from storage during recovery process - return - self._vm.extendDrivesIfNeeded() + def cont(self): + self._paused = False - def _updateVolumes(self): - if not self._vm.isDisksStatsCollectionEnabled(): - # Avoid queries from storage during recovery process - return - - for vmDrive in self._vm.getDiskDevices(): - self._vm.updateDriveVolume(vmDrive) - - def _sampleCpu(self): - cpuStats = self._vm._dom.getCPUStats(True, 0) - return cpuStats[0] - - def _sampleDisk(self): - if not self._vm.isDisksStatsCollectionEnabled(): - # Avoid queries from storage during recovery process - return - - diskSamples = {} - for vmDrive in self._vm.getDiskDevices(): - diskSamples[vmDrive.name] = self._vm._dom.blockStats(vmDrive.name) - - return diskSamples - - def _sampleDiskLatency(self): - if not self._vm.isDisksStatsCollectionEnabled(): - # Avoid queries from storage during recovery process - return - # {'wr_total_times': 0L, 'rd_operations': 9638L, - # 'flush_total_times': 0L,'rd_total_times': 7622718001L, - # 'rd_bytes': 85172430L, 'flush_operations': 0L, - # 'wr_operations': 0L, 'wr_bytes': 0L} - diskLatency = {} - for vmDrive in self._vm.getDiskDevices(): - diskLatency[vmDrive.name] = self._vm._dom.blockStatsFlags( - vmDrive.name, flags=libvirt.VIR_TYPED_PARAM_STRING_OKAY) - return diskLatency - - def _sampleNet(self): - netSamples = {} - for nic in self._vm.getNicDevices(): - netSamples[nic.name] = self._vm._dom.interfaceStats(nic.name) - return netSamples - - def _sampleVcpuPinning(self): - vCpuInfos = self._vm._dom.vcpus() - return vCpuInfos[0] - - def _sampleBalloon(self): - infos = self._vm._dom.info() - return infos[2] - - def _sampleVmJobs(self): - return self._vm.queryBlockJobs() - - def _sampleCpuTune(self): - infos = self._vm._dom.schedulerParameters() - infos['vcpuCount'] = self._vm._dom.vcpusFlags( - libvirt.VIR_DOMAIN_VCPU_CURRENT) - - metadataCpuLimit = None - - try: - if VmStatsThread._libvirt_metadata_supported: - metadataCpuLimit = self._vm._dom.metadata( - libvirt.VIR_DOMAIN_METADATA_ELEMENT, - METADATA_VM_TUNE_URI, 0) - except libvirt.libvirtError as e: - if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED: - VmStatsThread._libvirt_metadata_supported = False - self._log.error("libvirt does not support metadata") - - elif (e.get_error_code() - not in (libvirt.VIR_ERR_NO_DOMAIN, - libvirt.VIR_ERR_NO_DOMAIN_METADATA)): - # Non-existing VM and no metadata block are expected - # conditions and no reasons for concern here. - # All other errors should be reported. - self._log.warn("Failed to retrieve QoS metadata because of %s", - e) - - if metadataCpuLimit: - metadataCpuLimitXML = _domParseStr(metadataCpuLimit) - nodeList = \ - metadataCpuLimitXML.getElementsByTagName('vcpulimit') - infos['vcpuLimit'] = nodeList[0].childNodes[0].data - - return infos + @property + def paused(self): + return self._paused def _getIoTuneStats(self, stats): """ @@ -579,7 +593,8 @@ stats = {} try: - stats['statsAge'] = time.time() - self.getLastSampleTime() + stats['statsAge'] = time.time() - max( + sample.lastTime for sample in self._samplings) except TypeError: self._log.debug("Stats age not available") stats['statsAge'] = -1.0 @@ -2401,10 +2416,6 @@ self.guestAgent.stop() except Exception: pass - try: - self._vmStats.stop() - except Exception: - pass self.saveState() def status(self): @@ -2708,8 +2719,7 @@ return domxml.toxml() def _initVmStats(self): - self._vmStats = VmStatsThread(self) - self._vmStats.start() + self._vmStats = VmStats(self) self._guestEventTime = self._startTime @staticmethod @@ -2823,7 +2833,7 @@ supervdsm.getProxy().setPortMirroring(network, nic.name) - # VmStatsThread may use block devices info from libvirt. + # VmStats may use block devices info from libvirt. # So, run it after you have this info self._initVmStats() self.guestAgent = guestagent.GuestAgent( @@ -4525,8 +4535,6 @@ self.lastStatus = vmstatus.POWERING_DOWN # Terminate the VM's creation thread. self._incomingMigrationFinished.set() - if self._vmStats: - self._vmStats.stop() if self.guestAgent: self.guestAgent.stop() if self._dom: -- To view, visit http://gerrit.ovirt.org/31181 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: If3d1edc8099517710a7b9c31758930f13131d0d1 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
