Nir Soffer has uploaded a new change for review.

Change subject: monitoring: Use new monitoring infrastructure
......................................................................

monitoring: Use new monitoring infrastructure

DomainMonitor is keeping now a checker object, providing path checking
services. The checker starts with the domain monitor is created, and
stopped when shutting down vdsm.

Monitor threads use the checker to start checking the monitoring path
during setupLoop, and stop checking when the thread is stopping.

Since path checker events are received on a different thread, creating
new status and updating it is done under a lock to races when evaluating
status changes.

Change-Id: I18c87f3be6c570ee914388fc2ef7d6b847516b7c
Signed-off-by: Nir Soffer <nsof...@redhat.com>
---
M vdsm/storage/monitor.py
1 file changed, 54 insertions(+), 27 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/32/57432/1

diff --git a/vdsm/storage/monitor.py b/vdsm/storage/monitor.py
index d86853c..169375d 100644
--- a/vdsm/storage/monitor.py
+++ b/vdsm/storage/monitor.py
@@ -26,6 +26,7 @@
 from vdsm import concurrent
 from vdsm import utils
 from vdsm.config import config
+from vdsm.storage import check
 
 from . import clusterlock
 from . import misc
@@ -129,8 +130,12 @@
     def __init__(self, interval):
         self._monitors = {}
         self._interval = interval
+        # NOTE: This must be used in asynchronous mode to prevent blocking of
+        # the checker event loop thread.
         self.onDomainStateChange = misc.Event(
-            "Storage.DomainMonitor.onDomainStateChange")
+            "Storage.DomainMonitor.onDomainStateChange", sync=False)
+        self._checker = check.CheckService()
+        self._checker.start()
 
     @property
     def domains(self):
@@ -165,7 +170,7 @@
 
         log.info("Start monitoring %s", sdUUID)
         monitor = MonitorThread(weakref.proxy(self), sdUUID, hostId,
-                                self._interval)
+                                self._interval, self._checker)
         monitor.poolDomain = poolDomain
         monitor.start()
         # The domain should be added only after it succesfully started
@@ -205,6 +210,7 @@
         """
         log.info("Shutting down domain monitors")
         self._stopMonitors(self._monitors.values(), shutdown=True)
+        self._checker.stop()
 
     def _stopMonitors(self, monitors, shutdown=False):
         # The domain monitor issues events that might become raceful if
@@ -234,7 +240,7 @@
 
 class MonitorThread(object):
 
-    def __init__(self, domainMonitor, sdUUID, hostId, interval):
+    def __init__(self, domainMonitor, sdUUID, hostId, interval, checker):
         self.thread = concurrent.thread(self._run, logger=log.name)
         self.domainMonitor = domainMonitor
         self.stopEvent = threading.Event()
@@ -242,6 +248,8 @@
         self.sdUUID = sdUUID
         self.hostId = hostId
         self.interval = interval
+        self.checker = checker
+        self.lock = threading.Lock()
         self.monitoringPath = None
         # For backward compatibility, we must present a fake status before
         # collecting the first sample. The fake status is marked as
@@ -287,6 +295,7 @@
         finally:
             log.debug("Domain monitor for %s stopped (shutdown=%s)",
                       self.sdUUID, self.wasShutdown)
+            self._stopCheckingPath()
             if self._shouldReleaseHostId():
                 self._releaseHostId()
 
@@ -322,6 +331,12 @@
         if self.domain is None:
             self._produceDomain()
 
+        # This may fail even if the domain was produced. We will try again in
+        # the next cycle.
+        if self.monitoringPath is None:
+            self.monitoringPath = self.domain.getMonitoringPath()
+            self.checker.start_checking(self.monitoringPath, self._pathChecked)
+
         # The isIsoDomain assignment is deferred because the isoPrefix
         # discovery might fail (if the domain suddenly disappears) and we
         # could risk to never try to set it again.
@@ -331,9 +346,7 @@
     @utils.cancelpoint
     def _produceDomain(self):
         log.debug("Producing domain %s", self.sdUUID)
-        domain = sdCache.produce(self.sdUUID)
-        self.monitoringPath = domain.getMonitoringPath()
-        self.domain = domain
+        self.domain = sdCache.produce(self.sdUUID)
 
     @utils.cancelpoint
     def _setIsoDomainInfo(self):
@@ -362,28 +375,10 @@
         if self._shouldRefreshDomain():
             self._refreshDomain()
 
-        if self._checkPathStatus():
-            self._checkDomainStatus()
+        self._checkDomainStatus()
 
         if self._shouldAcquireHostId():
             self._acquireHostId()
-
-    @utils.cancelpoint
-    def _checkPathStatus(self):
-        # This may block for long time if the storage server is not accessible.
-        # On overloaded machines we have seen this take up to 15 seconds.
-        try:
-            stats = misc.readspeed(self.monitoringPath, 4096)
-        except Exception as e:
-            log.exception("Error checking path %s", self.monitoringPath)
-            path_status = PathStatus(error=e)
-        else:
-            path_status = PathStatus(readDelay=stats['seconds'])
-
-        status = Status(path_status, self.status._domain_status)
-        self._updateStatus(status)
-
-        return path_status.error is None
 
     @utils.cancelpoint
     def _checkDomainStatus(self):
@@ -413,8 +408,9 @@
             log.exception("Error checking domain %s", self.sdUUID)
             domain_status.error = e
 
-        status = Status(self.status._path_status, domain_status)
-        self._updateStatus(status)
+        with self.lock:
+            status = Status(self.status._path_status, domain_status)
+            self._updateStatus(status)
 
         return domain_status.error is None
 
@@ -434,6 +430,8 @@
         log.info("Domain %s became %s", self.sdUUID,
                  "VALID" if status.valid else "INVALID")
         try:
+            # NOTE: We depend on this being asynchrounous, so we don't block
+            # the checker event loop thread.
             self.domainMonitor.onDomainStateChange.emit(
                 self.sdUUID, status.valid)
         except:
@@ -451,6 +449,35 @@
         sdCache.manuallyRemoveDomain(self.sdUUID)
         self.lastRefresh = time.time()
 
+    # Checking monitoring path
+
+    def _pathChecked(self, result):
+        """
+        Called from the checker event loop thread. Must not block!
+        """
+        try:
+            delay = result.delay()
+        except Exception as e:
+            log.exception("Error checking path %s", self.monitoringPath)
+            path_status = PathStatus(error=e)
+        else:
+            path_status = PathStatus(readDelay=delay)
+
+        with self.lock:
+            # NOTE: Everyting under this lock must not block for long time, or
+            # we will block the checker event loop thread.
+            status = Status(path_status, self.status._domain_status)
+            self._updateStatus(status)
+
+    def _stopCheckingPath(self):
+        if self.monitoringPath is None:
+            return
+        try:
+            # May fail with KeyError if path is not being checked (unlikely).
+            self.checker.stop_checking(self.monitoringPath, timeout=1.0)
+        except Exception:
+            log.exception("Error stopping checker for %s", self.monitoringPath)
+
     # Managing host id
 
     def _shouldAcquireHostId(self):


-- 
To view, visit https://gerrit.ovirt.org/57432
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I18c87f3be6c570ee914388fc2ef7d6b847516b7c
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsof...@redhat.com>
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to