Francesco Romani has uploaded a new change for review. Change subject: WIP DRAFT virt: scalable sampling ......................................................................
WIP DRAFT virt: scalable sampling *** WORK IN PROGRESS *** DO NOT MERGE *** Draft of new scalable sampling leveraging the Schedule library and a threadpool. Change-Id: I9f54990b2114cbe931f64d6207192d1f6c6a3799 Signed-off-by: Francesco Romani <[email protected]> --- M vdsm/virt/sampling.py M vdsm/virt/vm.py 2 files changed, 126 insertions(+), 10 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/77/29977/1 diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py index 3479eee..de0cc6a 100644 --- a/vdsm/virt/sampling.py +++ b/vdsm/virt/sampling.py @@ -26,6 +26,7 @@ Contains a reverse dictionary pointing from error string to its error code. """ +import contextlib import threading import os import time @@ -34,6 +35,9 @@ import ethtool import re +from storage import threadPool as threadpool +from vdsm import netinfo +from vdsm import schedule from vdsm import utils from vdsm import netinfo from vdsm.ipwrapper import getLinks @@ -44,6 +48,10 @@ _THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled' if not os.path.exists(_THP_STATE_PATH): _THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled' + +_Scheduler = schedule.Scheduler(name='SamplingScheduler') + +_Executor = threadpool.ThreadPool(numThreads=5) # to match libvirt pool size class InterfaceSample: @@ -315,12 +323,16 @@ self._interval = interval @property + def name(self): + return self._function.__name__ + + @property def interval(self): return self._interval def __repr__(self): return "<AdvancedStatsFunction %s at 0x%x>" % ( - self._function.__name__, id(self._function.__name__)) + self.name, id(self.name)) def __call__(self, *args, **kwargs): retValue = self._function(*args, **kwargs) @@ -347,6 +359,80 @@ return bgn_sample, end_sample, (end_time - bgn_time) +def _handleStatsException(ex): + """ + Handle the registered function exceptions and eventually stop the + sampling if a fatal error occurred. + """ + return False + + +class Sampling(object): + def __init__(self, statfunc, vm, log, onException=_handleStatsException): + self.pause = False + self._statfunc = statfunc + self._onException = onException + self._vm = vm + self._log = log + self._done = threading.Event() + self._run = False + self._name = '%s-%s' % (self._vm.id, self._statfunc.name) + self._sampleTime = 0 + + @property + def sampleTime(self): + return self._sampleTime + + def start(self): + self._run = True + _Scheduler(self._statfunc.interval, self._onSchedule) + + def stop(self): + self._run = False + + @property + def done(self): + return self._done.is_set() + + @contextlib.contextmanager + def _in_progress(self): + self._done.clear() + yield + self._done.set() + + def _onSchedule(self): + # to avoid to block the scheduler + _Executor.queueTask(self._name, self._collect) + + def _collect(self): + if not self._run: + return + if not self.pause: + if not self.done: + # previous round not completed, do not clog the executor + self._vm._timeoutExperienced(True) + elif not self._vm.monitorResponsive(): + # something else gone wrong, let's not add more chaos + # FIXME maybe log something? + pass + else: + self._do_sampling() + self._vm._timeoutExperienced(False) + + self._schedule(self._statfunc.interval, self.onSchedule) + + def _do_sampling(self): + with self._in_progress(): + try: + self._statfunc() + except Exception as e: + if not self._onException(e): + self._log.error("Stats function failed: %s", + self._statfunc, exc_info=True) + else: + self._sampleTime = time.time() + + class AdvancedStatsThread(threading.Thread): """ A thread that runs the registered AdvancedStatsFunction objects diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py index 3fde1f7..fd60052 100644 --- a/vdsm/virt/vm.py +++ b/vdsm/virt/vm.py @@ -64,7 +64,7 @@ from . import vmexitreason from . import vmstatus -from .sampling import AdvancedStatsFunction, AdvancedStatsThread +from .sampling import AdvancedStatsFunction, Sampling from .utils import isVdsmImage, XMLElement from vmpowerdown import VmShutdown, VmReboot @@ -166,7 +166,7 @@ pass -class VmStatsThread(AdvancedStatsThread): +class VmStatsCollector(object): MBPS_TO_BPS = 10 ** 6 / 8 # CPU tune sampling window @@ -183,8 +183,8 @@ _libvirt_metadata_supported = True def __init__(self, vm): - AdvancedStatsThread.__init__(self, log=vm.log, daemon=True) self._vm = vm + self._samplings = [] self.highWrite = ( AdvancedStatsFunction( @@ -236,11 +236,37 @@ config.getint('vars', 'vm_sample_cpu_tune_interval'), self.CPU_TUNE_SAMPLING_WINDOW)) - self.addStatsFunction( + self.addSamplings( self.highWrite, self.updateVolumes, self.sampleCpu, self.sampleDisk, self.sampleDiskLatency, self.sampleNet, self.sampleBalloon, self.sampleVmJobs, self.sampleVcpuPinning, self.sampleCpuTune) + + def addSamplings(self, *statsFuncs): + for statFunc in statsFuncs: + self._samplings.append(Sampling(statFunc, + self._vm, self._vm.log, + self._handleStatsException)) + + def start(self): + for sampling in self._samplings: + sampling.start() + + def stop(self): + for sampling in self._samplings: + sampling.start() + + def pause(self): + for sampling in self._samplings: + sampling.pause = True + + def cont(self): + for sampling in self._samplings: + sampling.pause = False + + def getLastSampleTime(self): + return max([sampling.sampleTime + for sampling in self._samplings]) def _highWrite(self): if not self._vm.isDisksStatsCollectionEnabled(): @@ -310,13 +336,13 @@ metadataCpuLimit = None try: - if VmStatsThread._libvirt_metadata_supported: + if VmStatsCollector._libvirt_metadata_supported: metadataCpuLimit = self._vm._dom.metadata( libvirt.VIR_DOMAIN_METADATA_ELEMENT, METADATA_VM_TUNE_URI, 0) except libvirt.libvirtError as e: if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED: - VmStatsThread._libvirt_metadata_supported = False + VmStatsCollector._libvirt_metadata_supported = False self._log.error("libvirt does not support metadata") elif (e.get_error_code() @@ -2777,7 +2803,7 @@ WARNING: this method should only gather statistics by copying data. Especially avoid costly and dangerous ditrect calls to the _dom - attribute. Use the VmStatsThread instead! + attribute. Use the VmStatsCollector instead! """ if self.lastStatus == vmstatus.DOWN: @@ -3058,7 +3084,7 @@ return domxml.toxml() def _initVmStats(self): - self._vmStats = VmStatsThread(self) + self._vmStats = VmStatsCollector(self) self._vmStats.start() self._guestEventTime = self._startTime @@ -3173,7 +3199,7 @@ if drive['device'] == 'disk' and isVdsmImage(drive): self._syncVolumeChain(drive) - # VmStatsThread may use block devices info from libvirt. + # VmStatsCollector may use block devices info from libvirt. # So, run it after you have this info self._initVmStats() self.guestAgent = guestagent.GuestAgent( @@ -3921,6 +3947,10 @@ self.log.warning('_readPauseCode unsupported by libvirt vm') return 'NOERR' + @property + def monitorResponsive(self): + return self._monitorResponse == 0 + def _timeoutExperienced(self, timeout): if timeout: self._monitorResponse = -1 -- To view, visit http://gerrit.ovirt.org/29977 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I9f54990b2114cbe931f64d6207192d1f6c6a3799 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
