Petr Horáček has uploaded a new change for review. Change subject: netlink: replace ipwrapper monitor ......................................................................
netlink: replace ipwrapper monitor Change-Id: Ife2ee7d16c79eea7c8ed1f0780aafbaa00356978 Signed-off-by: Petr Horáček <phora...@redhat.com> --- M lib/vdsm/ipwrapper.py M lib/vdsm/netlink/monitor.py M tests/functional/networkTests.py M tests/ipwrapperTests.py M tests/netlinkTests.py 5 files changed, 108 insertions(+), 255 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/39/37239/1 diff --git a/lib/vdsm/ipwrapper.py b/lib/vdsm/ipwrapper.py index e0a789e..e7aac55 100644 --- a/lib/vdsm/ipwrapper.py +++ b/lib/vdsm/ipwrapper.py @@ -17,7 +17,6 @@ # Refer to the README and COPYING files for full details of the license # -from collections import namedtuple from contextlib import closing from glob import iglob import array @@ -25,7 +24,6 @@ import errno import fcntl import os -import signal import socket import struct @@ -520,10 +518,6 @@ pass -class MonitorError(Exception): - pass - - def _execCmd(command): returnCode, output, error = execCmd(command) @@ -644,69 +638,3 @@ def linkDel(dev): command = [_IP_BINARY.cmd, 'link', 'del', 'dev', dev] _execCmd(command) - - -MonitorEvent = namedtuple('MonitorEvent', ['index', 'device', 'flags', - 'state']) - - -class Monitor(object): - """Wrapper over `ip monitor link`. Usage: - Get events collected while the monitor was running: - - mon = Monitor() - mon.start() - .... - mon.stop() - for event in mon: - handle event - - Monitoring events forever: - mon = Monitor() - mon.start() - for event in mon: - handle event - - Note that the underlying `ip monitor` process is killed when its controlled - thread dies, so as not to leave stray processes when Vdsm crahsed. - """ - _DELETED_TEXT = 'Deleted' - LINK_STATE_DELETED = 'DELETED' - - def __init__(self): - self.proc = None - - def __iter__(self): - if self.proc is None: - raise MonitorError('The monitor has not run yet') - for line in self.proc.stdout: - yield self._parseLine(line) - - def start(self): - self.proc = execCmd([_IP_BINARY.cmd, '-d', '-o', 'monitor', 'link'], - deathSignal=signal.SIGKILL, - sync=False) - self.proc.blocking = True - - def stop(self): - self.proc.kill() - self.proc.wait() - - @classmethod - def _parseLine(cls, line): - state = None - if line.startswith(cls._DELETED_TEXT): - state = cls.LINK_STATE_DELETED - line = line[len(cls._DELETED_TEXT):] - - data = _parseLinkLine(line) - # Consider everything with an '@' symbol a vlan/macvlan/macvtap - # since that's how iproute2 reports it and there is currently no - # disambiguation (iproute bug https://bugzilla.redhat.com/1042799 - data['name'] = data['name'].split('@', 1)[0] - state = state if state or 'state' not in data else data['state'] - return MonitorEvent(data['index'], data['name'], data['flags'], state) - - @classmethod - def _parse(cls, text): - return [cls._parseLine(line) for line in text.splitlines()] diff --git a/lib/vdsm/netlink/monitor.py b/lib/vdsm/netlink/monitor.py index ca80752..21e0899 100644 --- a/lib/vdsm/netlink/monitor.py +++ b/lib/vdsm/netlink/monitor.py @@ -24,7 +24,7 @@ import select import threading -from vdsm.utils import NoIntrCall, NoIntrPoll +from vdsm.utils import NoIntrCall, NoIntrPoll, monotonic_time from . import (LIBNL, _GROUPS, _NL_ROUTE_ADDR_NAME, _NL_ROUTE_LINK_NAME, _NL_ROUTE_NAME, _NL_STOP, _add_socket_memberships, @@ -38,8 +38,10 @@ # If monitoring thread is running, queue waiting for new value and we call # stop(), we have to stop queue by passing special code. _STOP_FLAG = 31 +_TIMEOUT_FLAG = 32 E_NOT_RUNNING = 1 +E_TIMEOUT = 2 class MonitorError(Exception): @@ -65,6 +67,14 @@ mon.stop() handle event + Monitoring events with defined timeout. If timeout expires during + iteration and silent_timeout is set to False, MonitorError(E_TIMEOUT) is + raised by iteration: + mon = Monitor(timeout=2) + mon.start() + for event in mon: + handle event + Monitor defined groups (monitor everything if not set): mon = Monitor(groups=('link', 'ipv4-route')) mon.start() @@ -77,8 +87,10 @@ ipv4-route ipv6-ifaddr, ipv6-mroute, ipv6-route, ipv6-ifinfo, decnet-ifaddr, decnet-route, ipv6-prefix """ - def __init__(self, groups=frozenset()): - self._stopped = False + def __init__(self, groups=frozenset(), timeout=None, silent_timeout=False): + self._time_start = None + self._timeout = timeout + self._silent_timeout = silent_timeout if groups: self._groups = groups else: @@ -87,14 +99,21 @@ self._scan_thread = threading.Thread(target=self._scan) self._scan_thread.daemon = True self._scanning_started = threading.Event() + self._scanning_stopped = threading.Event() def __iter__(self): for event in iter(self._queue.get, None): - if event == _STOP_FLAG: + if event == _TIMEOUT_FLAG: + if self._silent_timeout: + break + raise MonitorError(E_TIMEOUT) + elif event == _STOP_FLAG: break yield event def start(self): + if self._timeout: + self._end_time = monotonic_time() + self._timeout self._scan_thread.start() self._scanning_started.wait() @@ -104,21 +123,41 @@ with _pipetrick(epoll) as self._pipetrick: self._scanning_started.set() while True: - events = NoIntrPoll(epoll.poll) - if (self._pipetrick[0], select.POLLIN) in events: + if self._timeout: + timeout = self._end_time - monotonic_time() + # timeout expired + if timeout <= 0: + self._scanning_stopped.set() + self._queue.put(_TIMEOUT_FLAG) + break + else: + timeout = -1 + + events = NoIntrPoll(epoll.poll, timeout=timeout) + # poll timeouted + if len(events) == 0: + self._scanning_stopped.set() + self._queue.put(_TIMEOUT_FLAG) + break + # stopped by pipetrick + elif (self._pipetrick[0], select.POLLIN) in events: NoIntrCall(os.read, self._pipetrick[0], 1) self._queue.put(_STOP_FLAG) break + _nl_recvmsgs_default(sock) def stop(self): - if not self._stopped: - self._stopped = True + if self.is_stopped(): + raise MonitorError(E_NOT_RUNNING) + else: + self._scanning_stopped.set() self._scanning_started.wait() os.write(self._pipetrick[1], 'c') self._scan_thread.join() - else: - raise MonitorError(E_NOT_RUNNING) + + def is_stopped(self): + return self._scanning_stopped.is_set() # libnl/include/linux/rtnetlink.h diff --git a/tests/functional/networkTests.py b/tests/functional/networkTests.py index 12d33e6..c09e9ae 100644 --- a/tests/functional/networkTests.py +++ b/tests/functional/networkTests.py @@ -20,7 +20,6 @@ from functools import wraps import os.path import json -import signal import netaddr from hookValidation import ValidatesHook @@ -45,6 +44,7 @@ from vdsm.utils import RollbackContext, execCmd, running from vdsm.netinfo import (bridges, operstate, prefix2netmask, getRouteDeviceTo, _get_dhclient_ifaces) +from vdsm.netlink import monitor from vdsm import ipwrapper from vdsm.utils import pgrep @@ -174,28 +174,18 @@ veth.remove(left) -class Alarm(Exception): - pass - - def _waitForKnownOperstate(device, timeout=1): - - def _alarmHandler(signum, frame): - raise Alarm - - monitor = ipwrapper.Monitor() - monitor.start() + mon = monitor.Monitor(groups=('link',), timeout=timeout) + mon.start() try: - state = operstate(device).upper() - if state == 'UNKNOWN': - signal.signal(signal.SIGALRM, _alarmHandler) - signal.alarm(timeout) - for event in monitor: - if event.device == device and event.state != 'UNKNOWN': + state = operstate(device).lower() + if state == 'unknown': + for event in mon: + if event['name'] == device and event['state'] != 'unknown': break - signal.alarm(0) finally: - monitor.stop() + if not mon.is_stopped(): + mon.stop() class OperStateChangedError(ValueError): @@ -205,15 +195,15 @@ @contextmanager def nonChangingOperstate(device): """Raises an exception if it detects that the device link state changes.""" - originalState = operstate(device).upper() - monitor = ipwrapper.Monitor() - monitor.start() + originalState = operstate(device).lower() + mon = monitor.Monitor(groups=('link',)) + mon.start() try: yield finally: - monitor.stop() - changes = [(event.device, event.state) for event in monitor - if event.device == device] + mon.stop() + changes = [(event['name'], event['state']) for event in mon + if event['name'] == device] for _, state in changes: if state != originalState: raise OperStateChangedError('%s operstate changed: %s -> %r' % diff --git a/tests/ipwrapperTests.py b/tests/ipwrapperTests.py index 20789ed..9a6e0ee 100644 --- a/tests/ipwrapperTests.py +++ b/tests/ipwrapperTests.py @@ -18,16 +18,8 @@ # # Refer to the README and COPYING files for full details of the license # -from threading import Timer -from time import sleep -import logging -import sys - from testValidation import ValidateRunningAsRoot from vdsm import ipwrapper -from vdsm.ipwrapper import Monitor -from vdsm.ipwrapper import MonitorEvent -from vdsm.ipwrapper import MonitorError from vdsm.ipwrapper import Route from vdsm.ipwrapper import Rule import tcTests @@ -97,128 +89,6 @@ '32: from 10.0.0.0/8 to 264.0.0.0/8 lookup table_100'] for text in bad_rules: self.assertRaises(ValueError, Rule.fromText, text) - - -class TestMonitor(TestCaseBase): - def testWrongMonitorUsage(self): - mon = Monitor() - with self.assertRaises(MonitorError): - for event in mon: - pass - - def testMonitorEvents(self): - devs = ({'index': '273', - 'reportedName': 'bond0', 'name': 'bond0', - 'flags': frozenset(['BROADCAST', 'MULTICAST', 'MASTER']), - 'attrs': 'mtu 1500 qdisc noqueue', - 'state': 'DOWN', - 'address': '33:44:55:66:77:88', 'brd': 'ff:ff:ff:ff:ff:ff'}, - {'index': '4', - 'reportedName': 'wlp3s0', 'name': 'wlp3s0', - 'flags': frozenset(['BROADCAST', 'MULTICAST', 'UP', - 'LOWER_UP']), - 'address': ''}, - {'index': '417', - 'reportedName': 'p1p3.13@p1p3', 'name': 'p1p3.13', - 'flags': frozenset(['NO-CARRIER', 'BROADCAST', 'MULTICAST', - 'UP']), - 'attrs': 'mtu 1500 qdisc noqueue', - 'state': 'LOWERLAYERDOWN', - 'address': '00:10:18:e1:6c:f4', - 'brd': 'ff:ff:ff:ff:ff:ff'}, - {'index': '418', - 'reportedName': 'foo', 'name': 'foo', - 'flags': frozenset(['BROADCAST', 'MULTICAST']), - 'attrs': 'mtu 1500 qdisc noop', - 'state': 'DOWN', - 'extraAttrs': 'group default', - 'address': 'ba:2c:7b:68:b8:77', - 'brd': 'ff:ff:ff:ff:ff:ff', - 'deleted': True}) - - def entry(index, reportedName, flags, address, attrs=None, - state=None, extraAttrs=None, brd=None, deleted=False, - **kwargs): - elements = [] - if deleted: - elements.append(Monitor._DELETED_TEXT) - elements += [index + ':', reportedName + ':', - '<' + ','.join(flags) + '>'] - if attrs is not None: - elements.append(attrs) - if state is not None: - elements.append('state ' + state) - if extraAttrs is not None: - elements.append(extraAttrs) - elements.append('\\ ') - elements.append('link/ether ' + address) - if brd is not None: - elements.append('brd ' + brd) - return ' '.join(elements) - - data = [entry(**dev) for dev in devs] - events = [MonitorEvent( - dev['index'], dev['name'], dev['flags'], - Monitor.LINK_STATE_DELETED if dev.get('deleted') else - dev.get('state', None)) for dev in devs] - self.assertEqual(Monitor._parse('\n'.join(data)), events) - - @ValidateRunningAsRoot - def testMonitorIteration(self): - bridge = tcTests._Bridge() - tcTests._checkDependencies() - mon = Monitor() - mon.start() - try: - def _timeout(): - mon.stop() - err_msg = 'test omitted: waiting too long for a monitor event' - logging.error(err_msg) - sys.stderr.write(err_msg + '\n') - timer = Timer(3, _timeout) - timer.start() - try: - # FIXME: sometimes mon.start() is returned before properly - # started, in this case, iterator doesn't catch the first - # created bridge and stuck forever. Remove this sleep() when - # new netlink-based event monitor will be available. - sleep(0.5) - iterator = iter(mon) - - # Generate events to avoid blocking - bridge.addDevice() - iterator.next() - - bridge.delDevice() - iterator.next() - finally: - timer.cancel() - finally: - # Stop the monitor and check that eventually StopIteration is - # raised. There might be other system link events so we loop to - # exhaust them. - mon.stop() - with self.assertRaises(StopIteration): - while True: - iterator.next() - - -class TestLinks(TestCaseBase): - _bridge = tcTests._Bridge() - - @ValidateRunningAsRoot - def setUp(self): - tcTests._checkDependencies() - self._bridge.addDevice() - - def tearDown(self): - self._bridge.delDevice() - - def testGetLink(self): - link = ipwrapper.getLink(self._bridge.devName) - self.assertTrue(link.isBRIDGE) - self.assertEqual(link.master, None) - self.assertEqual(link.name, self._bridge.devName) class TestDrvinfo(TestCaseBase): diff --git a/tests/netlinkTests.py b/tests/netlinkTests.py index f639075..be13270 100644 --- a/tests/netlinkTests.py +++ b/tests/netlinkTests.py @@ -1,12 +1,12 @@ from collections import deque from contextlib import contextmanager -from threading import Timer import threading import time from functional import dummy from functional.networkTests import IP_ADDRESS, IP_CIDR from vdsm.netlink import monitor +from vdsm.utils import monotonic_time from testValidation import ValidateRunningAsRoot from testlib import VdsmTestCase as TestCaseBase @@ -109,7 +109,7 @@ {'event': 'del_link', 'name': nic}]) with _timed_monitor(timeout=self.TIMEOUT, - raise_exception=False) as mon: + silent_timeout=True) as mon: dummy_name = dummy.create() dummy.setIP(dummy_name, IP_ADDRESS, IP_CIDR) dummy.setLinkUp(dummy_name) @@ -128,27 +128,53 @@ ' been caught (in the right order)' % (1 + len(expected_events))) + def test_timeout(self): + with self.assertRaises(monitor.MonitorError): + try: + with _timed_monitor(timeout=.01, custom_err_msg=False) as mon: + for event in mon: + pass + except monitor.MonitorError as e: + self.assertEquals(e[0], monitor.E_TIMEOUT) + raise + + self.assertTrue(mon.is_stopped()) + + def test_timeout_silent(self): + with _timed_monitor(timeout=.01, silent_timeout=True) as mon: + for event in mon: + pass + + self.assertTrue(mon.is_stopped()) + + @ValidateRunningAsRoot + def test_timeout_not_triggered(self): + time_start = monotonic_time() + with _timed_monitor(timeout=self.TIMEOUT) as mon: + dummy_name = dummy.create() + dummy.remove(dummy_name) + + for event in mon: + break + + self.assertLess(monotonic_time() - time_start, self.TIMEOUT) + self.assertTrue(mon.is_stopped()) + @contextmanager -def _timed_monitor(timeout=0, groups=frozenset(), raise_exception=True): - mon = monitor.Monitor(groups=groups) +def _timed_monitor(timeout=None, groups=frozenset(), silent_timeout=False, + custom_err_msg=True): + mon = monitor.Monitor(groups=groups, timeout=timeout, + silent_timeout=silent_timeout) mon.start() try: - timer = Timer(timeout, mon.stop) - timer.start() - try: - yield mon - finally: - timer.cancel() - timer.join() + yield mon + except monitor.MonitorError as e: + if e[0] == monitor.E_TIMEOUT and custom_err_msg: + raise monitor.MonitorError('Waiting too long for a monitor event.') + raise finally: - # In this point of time, the timer is no longer running. So we can test - # if mon was stopped, if so, timeout was triggered. - if mon._stopped: - if raise_exception: - raise monitor.MonitorError('Waiting too long for a monitor ' - 'event.') - else: + if not mon.is_stopped(): mon.stop() -- To view, visit http://gerrit.ovirt.org/37239 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ife2ee7d16c79eea7c8ed1f0780aafbaa00356978 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Petr Horáček <phora...@redhat.com> _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches