Nir Soffer has uploaded a new change for review. Change subject: monitoring: Use new monitoring infrastructure ......................................................................
monitoring: Use new monitoring infrastructure DomainMonitor is keeping now a checker object, providing path checking services. The checker starts with the domain monitor is created, and stopped when shutting down vdsm. Monitor threads use the checker to start checking the monitoring path during setupLoop, and stop checking when the thread is stopping. Since path checker events are received on a different thread, creating new status and updating it is done under a lock to races when evaluating status changes. Change-Id: I18c87f3be6c570ee914388fc2ef7d6b847516b7c Signed-off-by: Nir Soffer <nsof...@redhat.com> --- M vdsm/storage/monitor.py 1 file changed, 54 insertions(+), 27 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/32/57432/1 diff --git a/vdsm/storage/monitor.py b/vdsm/storage/monitor.py index d86853c..169375d 100644 --- a/vdsm/storage/monitor.py +++ b/vdsm/storage/monitor.py @@ -26,6 +26,7 @@ from vdsm import concurrent from vdsm import utils from vdsm.config import config +from vdsm.storage import check from . import clusterlock from . import misc @@ -129,8 +130,12 @@ def __init__(self, interval): self._monitors = {} self._interval = interval + # NOTE: This must be used in asynchronous mode to prevent blocking of + # the checker event loop thread. self.onDomainStateChange = misc.Event( - "Storage.DomainMonitor.onDomainStateChange") + "Storage.DomainMonitor.onDomainStateChange", sync=False) + self._checker = check.CheckService() + self._checker.start() @property def domains(self): @@ -165,7 +170,7 @@ log.info("Start monitoring %s", sdUUID) monitor = MonitorThread(weakref.proxy(self), sdUUID, hostId, - self._interval) + self._interval, self._checker) monitor.poolDomain = poolDomain monitor.start() # The domain should be added only after it succesfully started @@ -205,6 +210,7 @@ """ log.info("Shutting down domain monitors") self._stopMonitors(self._monitors.values(), shutdown=True) + self._checker.stop() def _stopMonitors(self, monitors, shutdown=False): # The domain monitor issues events that might become raceful if @@ -234,7 +240,7 @@ class MonitorThread(object): - def __init__(self, domainMonitor, sdUUID, hostId, interval): + def __init__(self, domainMonitor, sdUUID, hostId, interval, checker): self.thread = concurrent.thread(self._run, logger=log.name) self.domainMonitor = domainMonitor self.stopEvent = threading.Event() @@ -242,6 +248,8 @@ self.sdUUID = sdUUID self.hostId = hostId self.interval = interval + self.checker = checker + self.lock = threading.Lock() self.monitoringPath = None # For backward compatibility, we must present a fake status before # collecting the first sample. The fake status is marked as @@ -287,6 +295,7 @@ finally: log.debug("Domain monitor for %s stopped (shutdown=%s)", self.sdUUID, self.wasShutdown) + self._stopCheckingPath() if self._shouldReleaseHostId(): self._releaseHostId() @@ -322,6 +331,12 @@ if self.domain is None: self._produceDomain() + # This may fail even if the domain was produced. We will try again in + # the next cycle. + if self.monitoringPath is None: + self.monitoringPath = self.domain.getMonitoringPath() + self.checker.start_checking(self.monitoringPath, self._pathChecked) + # The isIsoDomain assignment is deferred because the isoPrefix # discovery might fail (if the domain suddenly disappears) and we # could risk to never try to set it again. @@ -331,9 +346,7 @@ @utils.cancelpoint def _produceDomain(self): log.debug("Producing domain %s", self.sdUUID) - domain = sdCache.produce(self.sdUUID) - self.monitoringPath = domain.getMonitoringPath() - self.domain = domain + self.domain = sdCache.produce(self.sdUUID) @utils.cancelpoint def _setIsoDomainInfo(self): @@ -362,28 +375,10 @@ if self._shouldRefreshDomain(): self._refreshDomain() - if self._checkPathStatus(): - self._checkDomainStatus() + self._checkDomainStatus() if self._shouldAcquireHostId(): self._acquireHostId() - - @utils.cancelpoint - def _checkPathStatus(self): - # This may block for long time if the storage server is not accessible. - # On overloaded machines we have seen this take up to 15 seconds. - try: - stats = misc.readspeed(self.monitoringPath, 4096) - except Exception as e: - log.exception("Error checking path %s", self.monitoringPath) - path_status = PathStatus(error=e) - else: - path_status = PathStatus(readDelay=stats['seconds']) - - status = Status(path_status, self.status._domain_status) - self._updateStatus(status) - - return path_status.error is None @utils.cancelpoint def _checkDomainStatus(self): @@ -413,8 +408,9 @@ log.exception("Error checking domain %s", self.sdUUID) domain_status.error = e - status = Status(self.status._path_status, domain_status) - self._updateStatus(status) + with self.lock: + status = Status(self.status._path_status, domain_status) + self._updateStatus(status) return domain_status.error is None @@ -434,6 +430,8 @@ log.info("Domain %s became %s", self.sdUUID, "VALID" if status.valid else "INVALID") try: + # NOTE: We depend on this being asynchrounous, so we don't block + # the checker event loop thread. self.domainMonitor.onDomainStateChange.emit( self.sdUUID, status.valid) except: @@ -451,6 +449,35 @@ sdCache.manuallyRemoveDomain(self.sdUUID) self.lastRefresh = time.time() + # Checking monitoring path + + def _pathChecked(self, result): + """ + Called from the checker event loop thread. Must not block! + """ + try: + delay = result.delay() + except Exception as e: + log.exception("Error checking path %s", self.monitoringPath) + path_status = PathStatus(error=e) + else: + path_status = PathStatus(readDelay=delay) + + with self.lock: + # NOTE: Everyting under this lock must not block for long time, or + # we will block the checker event loop thread. + status = Status(path_status, self.status._domain_status) + self._updateStatus(status) + + def _stopCheckingPath(self): + if self.monitoringPath is None: + return + try: + # May fail with KeyError if path is not being checked (unlikely). + self.checker.stop_checking(self.monitoringPath, timeout=1.0) + except Exception: + log.exception("Error stopping checker for %s", self.monitoringPath) + # Managing host id def _shouldAcquireHostId(self): -- To view, visit https://gerrit.ovirt.org/57432 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I18c87f3be6c570ee914388fc2ef7d6b847516b7c Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <nsof...@redhat.com> _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches