Nir Soffer has uploaded a new change for review.

Change subject: monitoring: Introduce the check module
......................................................................

monitoring: Introduce the check module

The check module is providing DirectioChecker that run in an event loop
thread. Using this we can run all the storage domain checkers in one
thread, protecting them from blocking when domain monitor thread is
blocked on unrelated tasks such as refreshing lvm cache or rescanning
storage connections.

This will be used to separate storage domain health checking form other
tasks performed by the storage domains monitor threads, avoiding false
negative check results.

When running as script, this module watch all multipath devices on a
host. This demo code will be removed when the tests are completed.

(Partial tests)

Change-Id: If22fe38b8b29116270f9012b75895506adc48852
Bug-Url: https://bugzilla.redhat.com/1081962
Signed-off-by: Nir Soffer <[email protected]>
---
M debian/vdsm-python.install
M lib/vdsm/storage/Makefile.am
A lib/vdsm/storage/check.py
M tests/Makefile.am
A tests/storage_check_test.py
M vdsm.spec.in
6 files changed, 345 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/57/54757/15

diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index c1c5e78..1d8e37a 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -62,6 +62,7 @@
 ./usr/lib/python2.7/dist-packages/vdsm/sslcompat.py
 ./usr/lib/python2.7/dist-packages/vdsm/sslutils.py
 ./usr/lib/python2.7/dist-packages/vdsm/storage/__init__.py
+./usr/lib/python2.7/dist-packages/vdsm/storage/check.py
 ./usr/lib/python2.7/dist-packages/vdsm/storage/constants.py
 ./usr/lib/python2.7/dist-packages/vdsm/storage/eventloop.py
 ./usr/lib/python2.7/dist-packages/vdsm/storage/exception.py
diff --git a/lib/vdsm/storage/Makefile.am b/lib/vdsm/storage/Makefile.am
index 44d4507..255abd1 100644
--- a/lib/vdsm/storage/Makefile.am
+++ b/lib/vdsm/storage/Makefile.am
@@ -23,6 +23,7 @@
 
 dist_vdsmstorage_PYTHON = \
        __init__.py \
+       check.py \
        constants.py \
        eventloop.py \
        exception.py \
diff --git a/lib/vdsm/storage/check.py b/lib/vdsm/storage/check.py
new file mode 100644
index 0000000..e6274b8
--- /dev/null
+++ b/lib/vdsm/storage/check.py
@@ -0,0 +1,236 @@
+#
+# Copyright 2016 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+from __future__ import absolute_import
+
+import asyncore
+import logging
+import re
+
+import cpopen
+
+from vdsm.infra import filecontrol
+
+
+class DirectioChecker(object):
+    """
+    Check path availability using direct I/O.
+
+    On each check cycle, the complete callback is invoke with a CheckResult
+    instance.
+    """
+
+    log = logging.getLogger("storage.check")
+
+    def __init__(self, loop, path, complete=None, interval=10.0):
+        self._loop = loop
+        self._path = path
+        self._complete = complete
+        self._interval = interval
+        self._check_time = None
+        self._timer = None
+        self._proc = None
+        self._reader = None
+        self._reaper = None
+        self._err = None
+
+    def start(self):
+        """
+        Start checker from any thread.
+        """
+        self.log.info("Start watching %r", self._path)
+        self._timer = self._loop.call_soon_threadsafe(self.check)
+
+    def stop(self):
+        """
+        Stop checker from any thread.
+        """
+        self.log.info("Stop watching %r", self._path)
+        self._timer.cancel()
+        self._timer = None
+
+    @property
+    def running(self):
+        return self._timer is not None
+
+    def check(self):
+        if not self.running:
+            return
+        cmd = ["dd", "if=%s" % self._path, "of=/dev/null", "bs=4096",
+               "count=1", "iflag=direct"]
+        self.log.debug("START check %r cmd=%s", self._path, cmd)
+        self._check_time = self._loop.time()
+        self._proc = cpopen.CPopen(cmd, stdin=None, stdout=None,
+                                   stderr=cpopen.PIPE)
+        self._reader = self._loop.create_dispatcher(Reader, self._proc.stderr,
+                                                    self._read_completed)
+
+    def _read_completed(self, data):
+        """
+        Called when dd process has closed stderr. At this point the process may
+        be still running.
+        """
+        self._reader = None
+        self._err = data
+        rc = self._proc.poll()
+        if rc is None:
+            self._reaper = Reaper(self._loop, self._proc,
+                                  self._check_completed)
+            return
+        self._check_completed(rc)
+
+    def _check_completed(self, rc):
+        """
+        Called when the dd process has exited with exit code rc.
+        """
+        elapsed = self._loop.time() - self._check_time
+        self._reaper = None
+        self._proc = None
+        self.log.debug("FINISH check %r rc=%s err=%r elapsed=%.02f",
+                       self._path, rc, self._err, elapsed)
+        if self.running:
+            self._timer = self._loop.call_later(self._interval, self.check)
+        if self._complete:
+            result = CheckResult(rc, self._err)
+            self._complete(result)
+
+
+class CheckResult(object):
+
+    _PATTERN = re.compile(
+        r"\d+ bytes? \([\de\-.]+ [kMGT]*B\) copied, "
+        r"([\de\-.]+) s, "
+        r"(?:[\de\-.]+|Infinity) [kMGT]*B/s")
+
+    def __init__(self, rc, err):
+        self._rc = rc
+        self._err = err
+
+    def delay(self):
+        # TODO: use public storage errors
+        if self._rc != 0:
+            raise RuntimeError("Check failed rc=%d err=%r" % (self._rc, 
self._err))
+        try:
+            stats = self._err.splitlines()[-1]
+        except IndexError:
+            raise RuntimeError("No read stats found err=%r" % self._err)
+        match = self._PATTERN.match(stats)
+        if not match:
+            raise RuntimeError("Unexpected stats err=%r" % self._err)
+        try:
+            return float(match.group(1))
+        except ValueError:
+            raise RuntimeError("Invalid seconds value err=%r", self._err)
+
+
+class Reader(asyncore.file_dispatcher):
+    """
+    Read from file and notify when read was completed.
+    """
+
+    def __init__(self, fd, complete, map=None):
+        asyncore.file_dispatcher.__init__(self, fd, map=map)
+        filecontrol.set_close_on_exec(self._fileno)
+        self._complete = complete
+        self._data = ""
+
+    def handle_read(self):
+        chunk = self.socket.read(1024)
+        if not chunk:
+            self.handle_close()
+            return
+        self._data += chunk
+
+    def handle_close(self):
+        self._complete(self._data)
+        self.close()
+
+    def close(self):
+        # asyncore.dispatcher define closing attribute, but doe not use it.
+        if self.closing:
+            return
+        self.closing = True
+        self._complete = None
+        asyncore.file_dispatcher.close(self)
+
+    def writable(self):
+        return False
+
+
+class Reaper(object):
+    """
+    Wait for process and notify when it has terminated.
+    """
+
+    log = logging.getLogger("storage.reaper")
+
+    # TODO: Maybe replace with SIGCHLD
+    MIN_INTERVAL = 0.01
+    MAX_INTERVAL = 1.0
+
+    def __init__(self, loop, proc, complete):
+        self._loop = loop
+        self._proc = proc
+        self._complete = complete
+        self._interval = self.MIN_INTERVAL
+        self._count = 0
+        self._loop.call_later(self._interval, self.reap)
+
+    def reap(self):
+        self._count += 1
+        rc = self._proc.poll()
+        if rc is None:
+            if self._interval < self.MAX_INTERVAL:
+                self._interval *= 2
+            self._loop.call_later(self._interval, self.reap)
+            return
+        self.log.debug("Process %s terminated (count=%d)",
+                       self._proc, self._count)
+        self._complete(rc)
+        self._complete = None
+
+
+# TODO: replace this with tests
+if __name__ == '__main__':
+    import glob
+    import signal
+    from vdsm.storage import eventloop
+
+    logging.basicConfig(
+        level=logging.DEBUG,
+        format="%(asctime)s %(levelname)s [%(name)s] %(message)s")
+    checkers = []
+    loop = eventloop.EventLoop()
+
+    def terminate(signo, frame):
+        logging.info("Received signal %d", signo)
+        for c in checkers:
+            c.stop()
+        loop.stop()
+
+    signal.signal(signal.SIGINT, terminate)
+    signal.signal(signal.SIGTERM, terminate)
+
+    for path in glob.iglob("/dev/disk/by-id/dm-uuid-mpath-*"):
+        c = DirectioChecker(loop, path)
+        c.start()
+        checkers.append(c)
+
+    loop.run_forever()
diff --git a/tests/Makefile.am b/tests/Makefile.am
index f49eb86..107d674 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -40,6 +40,7 @@
        pthreadTests.py \
        responseTests.py \
        scheduleTests.py \
+       storage_check_test.py \
        storage_eventloop_test.py \
        vmStatsTests.py \
        $(NULL)
diff --git a/tests/storage_check_test.py b/tests/storage_check_test.py
new file mode 100644
index 0000000..55f6fbb
--- /dev/null
+++ b/tests/storage_check_test.py
@@ -0,0 +1,105 @@
+#
+# Copyright 2016 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+from __future__ import print_function
+
+import os
+
+from testlib import VdsmTestCase
+from testlib import expandPermutations, permutations
+from testlib import temporaryPath
+
+from vdsm.storage import check
+from vdsm.storage import eventloop
+
+
+class TestDirectioChecker(VdsmTestCase):
+
+    def setUp(self):
+        self.loop = eventloop.EventLoop()
+        self.result = None
+
+    def tearDown(self):
+        self.loop.close()
+
+    def complete(self, result):
+        self.result = result
+        self.loop.stop()
+
+    def test_path_missing(self):
+        checker = check.DirectioChecker(self.loop, "/no/such/path", 
self.complete)
+        checker.start()
+        self.loop.run_forever()
+        self.assertRaises(RuntimeError, self.result.delay)
+
+    def test_path_inaccessible(self):
+        with temporaryPath() as path:
+            os.chmod(path, 0)
+            checker = check.DirectioChecker(self.loop, path, self.complete)
+            checker.start()
+            self.loop.run_forever()
+            self.assertRaises(RuntimeError, self.result.delay)
+
+    def test_path_ok(self):
+        with temporaryPath(data="blah") as path:
+            checker = check.DirectioChecker(self.loop, path, self.complete)
+            checker.start()
+            self.loop.run_forever()
+            delay = self.result.delay()
+            print("delay:", delay)
+            self.assertEqual(type(delay), float)
+
+
+@expandPermutations
+class TestCheckResult(VdsmTestCase):
+
+    @permutations([
+        ("1\n2\n1 byte (1 B) copied, 1 s, 1 B/s\n", 1.0),
+        ("1\n2\n1024 bytes (1 kB) copied, 1 s, 1 kB/s\n", 1.),
+        ("1\n2\n1572864 bytes (1.5 MB) copied, 1.5 s, 1 MB/s\n", 1.5),
+        ("1\n2\n1610612736 bytes (1.5 GB) copied, 1000.5 s, 1.53 MB/s\n", 
1000.5),
+        ("1\n2\n479 bytes (479 B) copied, 5.6832e-05 s, 8.4 MB/s\n", 
5.6832e-05),
+        ("1\n2\n512 bytes (512e-3 MB) copied, 1 s, 512e-3 MB/s\n", 1.0),
+        ("1\n2\n524288 bytes (512e3 B) copied, 1 s, 512e3 B/s\n", 1.0),
+        ("1\n2\n517 bytes (517 B) copied, 0 s, Infinity B/s\n", 0.0)
+    ])
+    def test_success(self, err, seconds):
+        result = check.CheckResult(0, err)
+        self.assertEqual(result.delay(), seconds)
+
+    def test_non_zero_exit_code(self):
+        result = check.CheckResult(1, "REASON")
+        with self.assertRaises(RuntimeError) as ctx:
+            result.delay()
+        self.assertIn("REASON", str(ctx.exception))
+
+    @permutations([
+        ("",),
+        ("1\n2\n\n",),
+        ("1\n2\nBAD, 1 s, 1 kB/s\n",),
+        ("1\n2\n1024 bytes (1 kB) copied, 1 s, 1 BAD\n",),
+        ("1\n2\n1024 bytes (1 kB) copied, BAD, 1 kB/s\n",),
+        ("1\n2\n1024 bytes (1 kB) copied, BAD s, 1 kB/s\n",),
+        ("1\n2\n1024 bytes (1 kB) copied, -1- s, 1 kB/s\n",),
+        ("1\n2\n1024 bytes (1 kB) copied, e3- s, 1 kB/s\n",),
+    ])
+    def test_unexpected_output(self, err):
+        result = check.CheckResult(0, err)
+        self.assertRaises(RuntimeError, result.delay)
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 85f29ae..a688512 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1141,6 +1141,7 @@
 %{python_sitelib}/%{vdsm_name}/profiling/memory.py*
 %{python_sitelib}/%{vdsm_name}/profiling/profile.py*
 %{python_sitelib}/%{vdsm_name}/storage/__init__.py*
+%{python_sitelib}/%{vdsm_name}/storage/check.py*
 %{python_sitelib}/%{vdsm_name}/storage/constants.py*
 %{python_sitelib}/%{vdsm_name}/storage/eventloop.py*
 %{python_sitelib}/%{vdsm_name}/storage/exception.py*


-- 
To view, visit https://gerrit.ovirt.org/54757
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If22fe38b8b29116270f9012b75895506adc48852
Gerrit-PatchSet: 15
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <[email protected]>
Gerrit-Reviewer: Adam Litke <[email protected]>
Gerrit-Reviewer: Allon Mureinik <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: gerrit-hooks <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to