Roman Mohr has uploaded a new change for review. Change subject: monitor: Add udev monitor ......................................................................
monitor: Add udev monitor Allow easy monitoring of device hotplug and device state changes which are covered by udev. Change-Id: I4b91753424d83896fa538eb6b57f8653b6332fbb Signed-off-by: Roman Mohr <[email protected]> --- A lib/vdsm/udev/monitor.py M tests/Makefile.am A tests/udevMonitorTests.py M vdsm.spec.in 4 files changed, 272 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/29/47729/1 diff --git a/lib/vdsm/udev/monitor.py b/lib/vdsm/udev/monitor.py new file mode 100644 index 0000000..8698bf3 --- /dev/null +++ b/lib/vdsm/udev/monitor.py @@ -0,0 +1,133 @@ +# Copyright 2015 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# +from functools import partial +import logging +import threading +from time import sleep + +from pyudev import Context, Monitor, MonitorObserver + + +class UdevMonitor(object): + + """ + ``udev`` event monitor. Usage: + + The monitor is a thread-save thin wrapper arount pyudev.MonitorObserver. + This allows multiple callbacks for the same netlink socket. To avoid + listening for udev events the application is not interested in, the + monitoring thread only starts listening on the socket when the monitor is + started and at least one subscriber is added. + + The simplest way to use the monitor is to subscribe a callback to a + specific subsystem event and let the callback do the work: + + dev listen_for_disabled_cpu_events(device): + if device.action == 'offline': + print('CPU {0.name} is now offline'.format(device)) + + monitor = UdevMonitor() + monitor.start() + monitor.subscribe(listen_for_disabled_cpu_events, subsystem='cpu') + + Another approach would be to just enqueue the udev event in a queue and do + the actual work in another thread or a thread pool. This is one approach to + do this: + + queue = Queue.Queue() + def new_device_listener(device): + if device.action == 'add': + queue.put(device) + + monitor = UdevMonitor() + monitor.subscribe(new_device_listener, subsystem='usb', + device_type='usb_device') + monitor.start() + """ + + def __init__(self): + self._subsystems = {} + self._context = Context() + self._monitor = Monitor.from_netlink(self._context) + self._observer = MonitorObserver(self._monitor, callback=partial( + UdevMonitor._event_loop, self), name='udev-monitor') + self._filter_lock = threading.Lock() + self._is_started = False + self._can_start = False + + def _event_loop(self, device): + subsystem = self._subsystems[device.subsystem] + for callback in subsystem.get(device.device_type, []): + _execute_callback(callback, device) + if device.device_type: + for callback in subsystem.get(None, []): + _execute_callback(callback, device) + + def subscribe(self, callback, subsystem, device_type=None): + """ + Raise :exc:`~exceptions.ValueError` if the callback is None + + :param callback: function to invoke + :param subsystem: byte or unicode string representing the subsystem to + listen on (e.g. ``'cpu'``, ``'usb'``) + :param device_type: byte or unicode string representing the device type + to listen for changes (e.g. ``'usb_device'``, + ``'block'``) + :return: None + """ + if callback is None: + raise ValueError('callback missing') + with self._filter_lock: + self._monitor.filter_by(subsystem, device_type) + device_types = self._subsystems.get(subsystem, {}) + callbacks = device_types.get(device_type, []) + callbacks.append(callback) + device_types[device_type] = callbacks + self._subsystems[subsystem] = device_types + self._start_if_necessary() + + def start(self): + self._can_start = True + with self._filter_lock: + if self._subsystems: + self._start_if_necessary() + + def _start_if_necessary(self): + if self._can_start and not self._is_started: + self._observer.start() + self._is_started = True + + def stop(self): + """ + Stops the monitoring thread. It is guaranteed that callbacks are no + longer invoked after calling this method. The method can be called + multiple times. + + Note that this only stops the monitoring thread and not the monitor + itself. The monitor stops listening when it is dereferenced. + """ + self._observer.stop() + + +def _execute_callback(callback, device): + try: + callback(device) + except Exception as callbackException: + logging.error( + 'Callback execution threw an exception: %s', callbackException) diff --git a/tests/Makefile.am b/tests/Makefile.am index 1168862..7483d0c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -110,6 +110,7 @@ testlibTests.py \ toolTests.py \ transportWrapperTests.py \ + udevMonitorTests.py \ utilsTests.py \ vdscliTests.py \ vdsClientTests.py \ diff --git a/tests/udevMonitorTests.py b/tests/udevMonitorTests.py new file mode 100644 index 0000000..2526459 --- /dev/null +++ b/tests/udevMonitorTests.py @@ -0,0 +1,137 @@ +# Copyright 2015 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +from functools import partial + +import Queue + +from nettestlib import Bridge +from testValidation import ValidateRunningAsRoot +from testlib import VdsmTestCase +from vdsm.udev.monitor import UdevMonitor + + +class UdevMonitorTest(VdsmTestCase): + + def setUp(self): + self._queue = Queue.Queue() + self._monitor = UdevMonitor() + self._monitor.start() + + def tearDown(self): + self._monitor.stop() + + @ValidateRunningAsRoot + def testAddDevice(self): + self._monitor.subscribe( + partial(UdevMonitorTest._device_listener, self), + subsystem='net', + device_type='bridge') + bridge = Bridge() + try: + bridge.addDevice() + device = None + try: + device = self._queue.get(timeout=1) + except Queue.Empty: + pass + self.assertIsNotNone(device, + msg='Should have detected an event') + self.assertEqual(device.action, 'add') + finally: + bridge.delDevice() + + @ValidateRunningAsRoot + def testAddDeviceForSubsystem(self): + self._monitor.subscribe( + partial(UdevMonitorTest._device_listener, self), + subsystem='net', + device_type=None) + bridge = Bridge() + try: + bridge.addDevice() + device = None + try: + device = self._queue.get(timeout=1) + except Queue.Empty: + pass + self.assertIsNotNone(device, + msg='Should have detected an event') + self.assertEqual(device.action, 'add') + finally: + bridge.delDevice() + + @ValidateRunningAsRoot + def testRemoveDevice(self): + + bridge = Bridge() + try: + bridge.addDevice() + self._monitor.subscribe( + partial(UdevMonitorTest._device_listener, self), + subsystem='net', + device_type='bridge') + bridge.delDevice() + device = None + try: + device = self._queue.get(timeout=1) + except Queue.Empty: + pass + self.assertIsNotNone(device, + msg='Should have detected an event') + self.assertEqual(device.action, 'remove') + except: + bridge.delDevice() + + @ValidateRunningAsRoot + def testEventSeries(self): + + bridge1 = Bridge() + bridge2 = Bridge() + try: + self._monitor.subscribe( + partial(UdevMonitorTest._device_listener, self), + subsystem='net', + device_type='bridge') + bridge1.addDevice() + bridge1.delDevice() + bridge2.addDevice() + bridge2.delDevice() + events = [] + try: + events.append(self._queue.get(timeout=1)) + events.append(self._queue.get(timeout=1)) + events.append(self._queue.get(timeout=1)) + events.append(self._queue.get(timeout=1)) + except Queue.Empty: + pass + self.assertEqual(len(events), 4) + except: + _delBridge(bridge1) + _delBridge(bridge2) + + def _device_listener(self, device): + self._queue.put(device) + + +def _delBridge(bridge): + try: + bridge.delDevice() + except: + pass diff --git a/vdsm.spec.in b/vdsm.spec.in index 8f758ca..ae5e483 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -126,6 +126,7 @@ Requires: python-pthreading >= 0.1.3-3 Requires: python-six Requires: python-requests +Requires: python-udev >= 0.15 Requires: %{name}-infra = %{version}-%{release} Requires: rpm-python Requires: nfs-utils -- To view, visit https://gerrit.ovirt.org/47729 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4b91753424d83896fa538eb6b57f8653b6332fbb Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Roman Mohr <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
