Roman Mohr has uploaded a new change for review.

Change subject: monitor: Add udev monitor
......................................................................

monitor: Add udev monitor

Allow easy monitoring of device hotplug and device state changes which
are covered by udev.

Change-Id: I4b91753424d83896fa538eb6b57f8653b6332fbb
Signed-off-by: Roman Mohr <[email protected]>
---
A lib/vdsm/udev/monitor.py
M tests/Makefile.am
A tests/udevMonitorTests.py
M vdsm.spec.in
4 files changed, 272 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/29/47729/1

diff --git a/lib/vdsm/udev/monitor.py b/lib/vdsm/udev/monitor.py
new file mode 100644
index 0000000..8698bf3
--- /dev/null
+++ b/lib/vdsm/udev/monitor.py
@@ -0,0 +1,133 @@
+# Copyright 2015 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+from functools import partial
+import logging
+import threading
+from time import sleep
+
+from pyudev import Context, Monitor, MonitorObserver
+
+
+class UdevMonitor(object):
+
+    """
+    ``udev`` event monitor. Usage:
+
+    The monitor is a thread-save thin wrapper arount pyudev.MonitorObserver.
+    This allows multiple callbacks for the same netlink socket. To avoid
+    listening for udev events the application is not interested in, the
+    monitoring thread only starts listening on the socket when the monitor is
+    started and at least one subscriber is added.
+
+    The simplest way to use the monitor is to subscribe a callback to a
+    specific subsystem event and let the callback do the work:
+
+        dev listen_for_disabled_cpu_events(device):
+            if device.action == 'offline':
+                print('CPU {0.name} is now offline'.format(device))
+
+        monitor = UdevMonitor()
+        monitor.start()
+        monitor.subscribe(listen_for_disabled_cpu_events, subsystem='cpu')
+
+    Another approach would be to just enqueue the udev event in a queue and do
+    the actual work in another thread or a thread pool. This is one approach to
+    do this:
+
+        queue = Queue.Queue()
+        def new_device_listener(device):
+            if device.action == 'add':
+                queue.put(device)
+
+        monitor = UdevMonitor()
+        monitor.subscribe(new_device_listener, subsystem='usb',
+            device_type='usb_device')
+        monitor.start()
+    """
+
+    def __init__(self):
+        self._subsystems = {}
+        self._context = Context()
+        self._monitor = Monitor.from_netlink(self._context)
+        self._observer = MonitorObserver(self._monitor, callback=partial(
+            UdevMonitor._event_loop, self), name='udev-monitor')
+        self._filter_lock = threading.Lock()
+        self._is_started = False
+        self._can_start = False
+
+    def _event_loop(self, device):
+        subsystem = self._subsystems[device.subsystem]
+        for callback in subsystem.get(device.device_type, []):
+            _execute_callback(callback, device)
+        if device.device_type:
+            for callback in subsystem.get(None, []):
+                _execute_callback(callback, device)
+
+    def subscribe(self, callback, subsystem, device_type=None):
+        """
+        Raise :exc:`~exceptions.ValueError` if the callback is None
+
+        :param callback: function to invoke
+        :param subsystem: byte or unicode string representing the subsystem to
+                          listen on (e.g. ``'cpu'``, ``'usb'``)
+        :param device_type: byte or unicode string representing the device type
+                            to listen for changes (e.g. ``'usb_device'``,
+                            ``'block'``)
+        :return: None
+        """
+        if callback is None:
+            raise ValueError('callback missing')
+        with self._filter_lock:
+            self._monitor.filter_by(subsystem, device_type)
+            device_types = self._subsystems.get(subsystem, {})
+            callbacks = device_types.get(device_type, [])
+            callbacks.append(callback)
+            device_types[device_type] = callbacks
+            self._subsystems[subsystem] = device_types
+            self._start_if_necessary()
+
+    def start(self):
+        self._can_start = True
+        with self._filter_lock:
+            if self._subsystems:
+                self._start_if_necessary()
+
+    def _start_if_necessary(self):
+        if self._can_start and not self._is_started:
+            self._observer.start()
+            self._is_started = True
+
+    def stop(self):
+        """
+        Stops the monitoring thread. It is guaranteed that callbacks are no
+        longer invoked after calling this method. The method can be called
+        multiple times.
+
+        Note that this only stops the monitoring thread and not the monitor
+        itself. The monitor stops listening when it is dereferenced.
+        """
+        self._observer.stop()
+
+
+def _execute_callback(callback, device):
+    try:
+        callback(device)
+    except Exception as callbackException:
+        logging.error(
+            'Callback execution threw an exception: %s', callbackException)
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 1168862..7483d0c 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -110,6 +110,7 @@
        testlibTests.py \
        toolTests.py \
        transportWrapperTests.py \
+       udevMonitorTests.py \
        utilsTests.py \
        vdscliTests.py \
        vdsClientTests.py \
diff --git a/tests/udevMonitorTests.py b/tests/udevMonitorTests.py
new file mode 100644
index 0000000..2526459
--- /dev/null
+++ b/tests/udevMonitorTests.py
@@ -0,0 +1,137 @@
+# Copyright 2015 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+from functools import partial
+
+import Queue
+
+from nettestlib import Bridge
+from testValidation import ValidateRunningAsRoot
+from testlib import VdsmTestCase
+from vdsm.udev.monitor import UdevMonitor
+
+
+class UdevMonitorTest(VdsmTestCase):
+
+    def setUp(self):
+        self._queue = Queue.Queue()
+        self._monitor = UdevMonitor()
+        self._monitor.start()
+
+    def tearDown(self):
+        self._monitor.stop()
+
+    @ValidateRunningAsRoot
+    def testAddDevice(self):
+        self._monitor.subscribe(
+            partial(UdevMonitorTest._device_listener, self),
+            subsystem='net',
+            device_type='bridge')
+        bridge = Bridge()
+        try:
+            bridge.addDevice()
+            device = None
+            try:
+                device = self._queue.get(timeout=1)
+            except Queue.Empty:
+                pass
+            self.assertIsNotNone(device,
+                                 msg='Should have detected an event')
+            self.assertEqual(device.action, 'add')
+        finally:
+            bridge.delDevice()
+
+    @ValidateRunningAsRoot
+    def testAddDeviceForSubsystem(self):
+        self._monitor.subscribe(
+            partial(UdevMonitorTest._device_listener, self),
+            subsystem='net',
+            device_type=None)
+        bridge = Bridge()
+        try:
+            bridge.addDevice()
+            device = None
+            try:
+                device = self._queue.get(timeout=1)
+            except Queue.Empty:
+                pass
+            self.assertIsNotNone(device,
+                                 msg='Should have detected an event')
+            self.assertEqual(device.action, 'add')
+        finally:
+            bridge.delDevice()
+
+    @ValidateRunningAsRoot
+    def testRemoveDevice(self):
+
+        bridge = Bridge()
+        try:
+            bridge.addDevice()
+            self._monitor.subscribe(
+                partial(UdevMonitorTest._device_listener, self),
+                subsystem='net',
+                device_type='bridge')
+            bridge.delDevice()
+            device = None
+            try:
+                device = self._queue.get(timeout=1)
+            except Queue.Empty:
+                pass
+            self.assertIsNotNone(device,
+                                 msg='Should have detected an event')
+            self.assertEqual(device.action, 'remove')
+        except:
+            bridge.delDevice()
+
+    @ValidateRunningAsRoot
+    def testEventSeries(self):
+
+        bridge1 = Bridge()
+        bridge2 = Bridge()
+        try:
+            self._monitor.subscribe(
+                partial(UdevMonitorTest._device_listener, self),
+                subsystem='net',
+                device_type='bridge')
+            bridge1.addDevice()
+            bridge1.delDevice()
+            bridge2.addDevice()
+            bridge2.delDevice()
+            events = []
+            try:
+                events.append(self._queue.get(timeout=1))
+                events.append(self._queue.get(timeout=1))
+                events.append(self._queue.get(timeout=1))
+                events.append(self._queue.get(timeout=1))
+            except Queue.Empty:
+                pass
+            self.assertEqual(len(events), 4)
+        except:
+            _delBridge(bridge1)
+            _delBridge(bridge2)
+
+    def _device_listener(self, device):
+        self._queue.put(device)
+
+
+def _delBridge(bridge):
+    try:
+        bridge.delDevice()
+    except:
+        pass
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 8f758ca..ae5e483 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -126,6 +126,7 @@
 Requires: python-pthreading >= 0.1.3-3
 Requires: python-six
 Requires: python-requests
+Requires: python-udev >= 0.15
 Requires: %{name}-infra = %{version}-%{release}
 Requires: rpm-python
 Requires: nfs-utils


-- 
To view, visit https://gerrit.ovirt.org/47729
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4b91753424d83896fa538eb6b57f8653b6332fbb
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Roman Mohr <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to