Petr Horáček has uploaded a new change for review.

Change subject: netlink: replace ipwrapper monitor
......................................................................

netlink: replace ipwrapper monitor

Change-Id: Ife2ee7d16c79eea7c8ed1f0780aafbaa00356978
Signed-off-by: Petr Horáček <phora...@redhat.com>
---
M lib/vdsm/ipwrapper.py
M lib/vdsm/netlink/monitor.py
M tests/functional/networkTests.py
M tests/ipwrapperTests.py
M tests/netlinkTests.py
5 files changed, 108 insertions(+), 255 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/39/37239/1

diff --git a/lib/vdsm/ipwrapper.py b/lib/vdsm/ipwrapper.py
index e0a789e..e7aac55 100644
--- a/lib/vdsm/ipwrapper.py
+++ b/lib/vdsm/ipwrapper.py
@@ -17,7 +17,6 @@
 # Refer to the README and COPYING files for full details of the license
 #
 
-from collections import namedtuple
 from contextlib import closing
 from glob import iglob
 import array
@@ -25,7 +24,6 @@
 import errno
 import fcntl
 import os
-import signal
 import socket
 import struct
 
@@ -520,10 +518,6 @@
     pass
 
 
-class MonitorError(Exception):
-    pass
-
-
 def _execCmd(command):
     returnCode, output, error = execCmd(command)
 
@@ -644,69 +638,3 @@
 def linkDel(dev):
     command = [_IP_BINARY.cmd, 'link', 'del', 'dev', dev]
     _execCmd(command)
-
-
-MonitorEvent = namedtuple('MonitorEvent', ['index', 'device', 'flags',
-                                           'state'])
-
-
-class Monitor(object):
-    """Wrapper over `ip monitor link`. Usage:
-    Get events collected while the monitor was running:
-
-        mon = Monitor()
-        mon.start()
-        ....
-        mon.stop()
-        for event in mon:
-            handle event
-
-    Monitoring events forever:
-        mon = Monitor()
-        mon.start()
-        for event in mon:
-            handle event
-
-    Note that the underlying `ip monitor` process is killed when its controlled
-    thread dies, so as not to leave stray processes when Vdsm crahsed.
-    """
-    _DELETED_TEXT = 'Deleted'
-    LINK_STATE_DELETED = 'DELETED'
-
-    def __init__(self):
-        self.proc = None
-
-    def __iter__(self):
-        if self.proc is None:
-            raise MonitorError('The monitor has not run yet')
-        for line in self.proc.stdout:
-            yield self._parseLine(line)
-
-    def start(self):
-        self.proc = execCmd([_IP_BINARY.cmd, '-d', '-o', 'monitor', 'link'],
-                            deathSignal=signal.SIGKILL,
-                            sync=False)
-        self.proc.blocking = True
-
-    def stop(self):
-        self.proc.kill()
-        self.proc.wait()
-
-    @classmethod
-    def _parseLine(cls, line):
-        state = None
-        if line.startswith(cls._DELETED_TEXT):
-            state = cls.LINK_STATE_DELETED
-            line = line[len(cls._DELETED_TEXT):]
-
-        data = _parseLinkLine(line)
-        # Consider everything with an '@' symbol a vlan/macvlan/macvtap
-        # since that's how iproute2 reports it and there is currently no
-        # disambiguation (iproute bug https://bugzilla.redhat.com/1042799
-        data['name'] = data['name'].split('@', 1)[0]
-        state = state if state or 'state' not in data else data['state']
-        return MonitorEvent(data['index'], data['name'], data['flags'], state)
-
-    @classmethod
-    def _parse(cls, text):
-        return [cls._parseLine(line) for line in text.splitlines()]
diff --git a/lib/vdsm/netlink/monitor.py b/lib/vdsm/netlink/monitor.py
index ca80752..21e0899 100644
--- a/lib/vdsm/netlink/monitor.py
+++ b/lib/vdsm/netlink/monitor.py
@@ -24,7 +24,7 @@
 import select
 import threading
 
-from vdsm.utils import NoIntrCall, NoIntrPoll
+from vdsm.utils import NoIntrCall, NoIntrPoll, monotonic_time
 
 from . import (LIBNL, _GROUPS, _NL_ROUTE_ADDR_NAME, _NL_ROUTE_LINK_NAME,
                _NL_ROUTE_NAME, _NL_STOP, _add_socket_memberships,
@@ -38,8 +38,10 @@
 # If monitoring thread is running, queue waiting for new value and we call
 # stop(), we have to stop queue by passing special code.
 _STOP_FLAG = 31
+_TIMEOUT_FLAG = 32
 
 E_NOT_RUNNING = 1
+E_TIMEOUT = 2
 
 
 class MonitorError(Exception):
@@ -65,6 +67,14 @@
             mon.stop()
         handle event
 
+    Monitoring events with defined timeout. If timeout expires during
+    iteration and silent_timeout is set to False, MonitorError(E_TIMEOUT) is
+    raised by iteration:
+    mon = Monitor(timeout=2)
+    mon.start()
+    for event in mon:
+        handle event
+
     Monitor defined groups (monitor everything if not set):
     mon = Monitor(groups=('link', 'ipv4-route'))
     mon.start()
@@ -77,8 +87,10 @@
     ipv4-route ipv6-ifaddr, ipv6-mroute, ipv6-route, ipv6-ifinfo,
     decnet-ifaddr, decnet-route, ipv6-prefix
     """
-    def __init__(self, groups=frozenset()):
-        self._stopped = False
+    def __init__(self, groups=frozenset(), timeout=None, silent_timeout=False):
+        self._time_start = None
+        self._timeout = timeout
+        self._silent_timeout = silent_timeout
         if groups:
             self._groups = groups
         else:
@@ -87,14 +99,21 @@
         self._scan_thread = threading.Thread(target=self._scan)
         self._scan_thread.daemon = True
         self._scanning_started = threading.Event()
+        self._scanning_stopped = threading.Event()
 
     def __iter__(self):
         for event in iter(self._queue.get, None):
-            if event == _STOP_FLAG:
+            if event == _TIMEOUT_FLAG:
+                if self._silent_timeout:
+                    break
+                raise MonitorError(E_TIMEOUT)
+            elif event == _STOP_FLAG:
                 break
             yield event
 
     def start(self):
+        if self._timeout:
+            self._end_time = monotonic_time() + self._timeout
         self._scan_thread.start()
         self._scanning_started.wait()
 
@@ -104,21 +123,41 @@
                 with _pipetrick(epoll) as self._pipetrick:
                     self._scanning_started.set()
                     while True:
-                        events = NoIntrPoll(epoll.poll)
-                        if (self._pipetrick[0], select.POLLIN) in events:
+                        if self._timeout:
+                            timeout = self._end_time - monotonic_time()
+                            # timeout expired
+                            if timeout <= 0:
+                                self._scanning_stopped.set()
+                                self._queue.put(_TIMEOUT_FLAG)
+                                break
+                        else:
+                            timeout = -1
+
+                        events = NoIntrPoll(epoll.poll, timeout=timeout)
+                        # poll timeouted
+                        if len(events) == 0:
+                            self._scanning_stopped.set()
+                            self._queue.put(_TIMEOUT_FLAG)
+                            break
+                        # stopped by pipetrick
+                        elif (self._pipetrick[0], select.POLLIN) in events:
                             NoIntrCall(os.read, self._pipetrick[0], 1)
                             self._queue.put(_STOP_FLAG)
                             break
+
                         _nl_recvmsgs_default(sock)
 
     def stop(self):
-        if not self._stopped:
-            self._stopped = True
+        if self.is_stopped():
+            raise MonitorError(E_NOT_RUNNING)
+        else:
+            self._scanning_stopped.set()
             self._scanning_started.wait()
             os.write(self._pipetrick[1], 'c')
             self._scan_thread.join()
-        else:
-            raise MonitorError(E_NOT_RUNNING)
+
+    def is_stopped(self):
+        return self._scanning_stopped.is_set()
 
 
 # libnl/include/linux/rtnetlink.h
diff --git a/tests/functional/networkTests.py b/tests/functional/networkTests.py
index 12d33e6..c09e9ae 100644
--- a/tests/functional/networkTests.py
+++ b/tests/functional/networkTests.py
@@ -20,7 +20,6 @@
 from functools import wraps
 import os.path
 import json
-import signal
 import netaddr
 
 from hookValidation import ValidatesHook
@@ -45,6 +44,7 @@
 from vdsm.utils import RollbackContext, execCmd, running
 from vdsm.netinfo import (bridges, operstate, prefix2netmask, getRouteDeviceTo,
                           _get_dhclient_ifaces)
+from vdsm.netlink import monitor
 from vdsm import ipwrapper
 from vdsm.utils import pgrep
 
@@ -174,28 +174,18 @@
         veth.remove(left)
 
 
-class Alarm(Exception):
-    pass
-
-
 def _waitForKnownOperstate(device, timeout=1):
-
-    def _alarmHandler(signum, frame):
-        raise Alarm
-
-    monitor = ipwrapper.Monitor()
-    monitor.start()
+    mon = monitor.Monitor(groups=('link',), timeout=timeout)
+    mon.start()
     try:
-        state = operstate(device).upper()
-        if state == 'UNKNOWN':
-            signal.signal(signal.SIGALRM, _alarmHandler)
-            signal.alarm(timeout)
-            for event in monitor:
-                if event.device == device and event.state != 'UNKNOWN':
+        state = operstate(device).lower()
+        if state == 'unknown':
+            for event in mon:
+                if event['name'] == device and event['state'] != 'unknown':
                     break
-            signal.alarm(0)
     finally:
-        monitor.stop()
+        if not mon.is_stopped():
+            mon.stop()
 
 
 class OperStateChangedError(ValueError):
@@ -205,15 +195,15 @@
 @contextmanager
 def nonChangingOperstate(device):
     """Raises an exception if it detects that the device link state changes."""
-    originalState = operstate(device).upper()
-    monitor = ipwrapper.Monitor()
-    monitor.start()
+    originalState = operstate(device).lower()
+    mon = monitor.Monitor(groups=('link',))
+    mon.start()
     try:
         yield
     finally:
-        monitor.stop()
-        changes = [(event.device, event.state) for event in monitor
-                   if event.device == device]
+        mon.stop()
+        changes = [(event['name'], event['state']) for event in mon
+                   if event['name'] == device]
         for _, state in changes:
             if state != originalState:
                 raise OperStateChangedError('%s operstate changed: %s -> %r' %
diff --git a/tests/ipwrapperTests.py b/tests/ipwrapperTests.py
index 20789ed..9a6e0ee 100644
--- a/tests/ipwrapperTests.py
+++ b/tests/ipwrapperTests.py
@@ -18,16 +18,8 @@
 #
 # Refer to the README and COPYING files for full details of the license
 #
-from threading import Timer
-from time import sleep
-import logging
-import sys
-
 from testValidation import ValidateRunningAsRoot
 from vdsm import ipwrapper
-from vdsm.ipwrapper import Monitor
-from vdsm.ipwrapper import MonitorEvent
-from vdsm.ipwrapper import MonitorError
 from vdsm.ipwrapper import Route
 from vdsm.ipwrapper import Rule
 import tcTests
@@ -97,128 +89,6 @@
                      '32:    from 10.0.0.0/8 to 264.0.0.0/8 lookup table_100']
         for text in bad_rules:
             self.assertRaises(ValueError, Rule.fromText, text)
-
-
-class TestMonitor(TestCaseBase):
-    def testWrongMonitorUsage(self):
-        mon = Monitor()
-        with self.assertRaises(MonitorError):
-            for event in mon:
-                pass
-
-    def testMonitorEvents(self):
-        devs = ({'index': '273',
-                 'reportedName': 'bond0', 'name': 'bond0',
-                 'flags': frozenset(['BROADCAST', 'MULTICAST', 'MASTER']),
-                 'attrs': 'mtu 1500 qdisc noqueue',
-                 'state': 'DOWN',
-                 'address': '33:44:55:66:77:88', 'brd': 'ff:ff:ff:ff:ff:ff'},
-                {'index': '4',
-                 'reportedName': 'wlp3s0', 'name': 'wlp3s0',
-                 'flags': frozenset(['BROADCAST', 'MULTICAST', 'UP',
-                                     'LOWER_UP']),
-                 'address': ''},
-                {'index': '417',
-                 'reportedName': 'p1p3.13@p1p3', 'name': 'p1p3.13',
-                 'flags': frozenset(['NO-CARRIER', 'BROADCAST', 'MULTICAST',
-                                     'UP']),
-                 'attrs': 'mtu 1500 qdisc noqueue',
-                 'state': 'LOWERLAYERDOWN',
-                 'address': '00:10:18:e1:6c:f4',
-                 'brd': 'ff:ff:ff:ff:ff:ff'},
-                {'index': '418',
-                 'reportedName': 'foo', 'name': 'foo',
-                 'flags': frozenset(['BROADCAST', 'MULTICAST']),
-                 'attrs': 'mtu 1500 qdisc noop',
-                 'state': 'DOWN',
-                 'extraAttrs': 'group default',
-                 'address': 'ba:2c:7b:68:b8:77',
-                 'brd': 'ff:ff:ff:ff:ff:ff',
-                 'deleted': True})
-
-        def entry(index, reportedName, flags, address, attrs=None,
-                  state=None, extraAttrs=None, brd=None, deleted=False,
-                  **kwargs):
-            elements = []
-            if deleted:
-                elements.append(Monitor._DELETED_TEXT)
-            elements += [index + ':', reportedName + ':',
-                         '<' + ','.join(flags) + '>']
-            if attrs is not None:
-                elements.append(attrs)
-            if state is not None:
-                elements.append('state ' + state)
-            if extraAttrs is not None:
-                elements.append(extraAttrs)
-            elements.append('\\   ')
-            elements.append('link/ether ' + address)
-            if brd is not None:
-                elements.append('brd ' + brd)
-            return ' '.join(elements)
-
-        data = [entry(**dev) for dev in devs]
-        events = [MonitorEvent(
-            dev['index'], dev['name'], dev['flags'],
-            Monitor.LINK_STATE_DELETED if dev.get('deleted') else
-            dev.get('state', None)) for dev in devs]
-        self.assertEqual(Monitor._parse('\n'.join(data)), events)
-
-    @ValidateRunningAsRoot
-    def testMonitorIteration(self):
-        bridge = tcTests._Bridge()
-        tcTests._checkDependencies()
-        mon = Monitor()
-        mon.start()
-        try:
-            def _timeout():
-                mon.stop()
-                err_msg = 'test omitted: waiting too long for a monitor event'
-                logging.error(err_msg)
-                sys.stderr.write(err_msg + '\n')
-            timer = Timer(3, _timeout)
-            timer.start()
-            try:
-                # FIXME: sometimes mon.start() is returned before properly
-                # started, in this case, iterator doesn't catch the first
-                # created bridge and stuck forever. Remove this sleep() when
-                # new netlink-based event monitor will be available.
-                sleep(0.5)
-                iterator = iter(mon)
-
-                # Generate events to avoid blocking
-                bridge.addDevice()
-                iterator.next()
-
-                bridge.delDevice()
-                iterator.next()
-            finally:
-                timer.cancel()
-        finally:
-            # Stop the monitor and check that eventually StopIteration is
-            # raised.  There might be other system link events so we loop to
-            # exhaust them.
-            mon.stop()
-        with self.assertRaises(StopIteration):
-            while True:
-                iterator.next()
-
-
-class TestLinks(TestCaseBase):
-    _bridge = tcTests._Bridge()
-
-    @ValidateRunningAsRoot
-    def setUp(self):
-        tcTests._checkDependencies()
-        self._bridge.addDevice()
-
-    def tearDown(self):
-        self._bridge.delDevice()
-
-    def testGetLink(self):
-        link = ipwrapper.getLink(self._bridge.devName)
-        self.assertTrue(link.isBRIDGE)
-        self.assertEqual(link.master, None)
-        self.assertEqual(link.name, self._bridge.devName)
 
 
 class TestDrvinfo(TestCaseBase):
diff --git a/tests/netlinkTests.py b/tests/netlinkTests.py
index f639075..be13270 100644
--- a/tests/netlinkTests.py
+++ b/tests/netlinkTests.py
@@ -1,12 +1,12 @@
 from collections import deque
 from contextlib import contextmanager
-from threading import Timer
 import threading
 import time
 
 from functional import dummy
 from functional.networkTests import IP_ADDRESS, IP_CIDR
 from vdsm.netlink import monitor
+from vdsm.utils import monotonic_time
 
 from testValidation import ValidateRunningAsRoot
 from testlib import VdsmTestCase as TestCaseBase
@@ -109,7 +109,7 @@
                 {'event': 'del_link', 'name': nic}])
 
         with _timed_monitor(timeout=self.TIMEOUT,
-                            raise_exception=False) as mon:
+                            silent_timeout=True) as mon:
             dummy_name = dummy.create()
             dummy.setIP(dummy_name, IP_ADDRESS, IP_CIDR)
             dummy.setLinkUp(dummy_name)
@@ -128,27 +128,53 @@
                          ' been caught (in the right order)'
                          % (1 + len(expected_events)))
 
+    def test_timeout(self):
+        with self.assertRaises(monitor.MonitorError):
+            try:
+                with _timed_monitor(timeout=.01, custom_err_msg=False) as mon:
+                    for event in mon:
+                        pass
+            except monitor.MonitorError as e:
+                self.assertEquals(e[0], monitor.E_TIMEOUT)
+                raise
+
+        self.assertTrue(mon.is_stopped())
+
+    def test_timeout_silent(self):
+        with _timed_monitor(timeout=.01, silent_timeout=True) as mon:
+            for event in mon:
+                pass
+
+        self.assertTrue(mon.is_stopped())
+
+    @ValidateRunningAsRoot
+    def test_timeout_not_triggered(self):
+        time_start = monotonic_time()
+        with _timed_monitor(timeout=self.TIMEOUT) as mon:
+            dummy_name = dummy.create()
+            dummy.remove(dummy_name)
+
+            for event in mon:
+                break
+
+        self.assertLess(monotonic_time() - time_start, self.TIMEOUT)
+        self.assertTrue(mon.is_stopped())
+
 
 @contextmanager
-def _timed_monitor(timeout=0, groups=frozenset(), raise_exception=True):
-    mon = monitor.Monitor(groups=groups)
+def _timed_monitor(timeout=None, groups=frozenset(), silent_timeout=False,
+                   custom_err_msg=True):
+    mon = monitor.Monitor(groups=groups, timeout=timeout,
+                          silent_timeout=silent_timeout)
     mon.start()
     try:
-        timer = Timer(timeout, mon.stop)
-        timer.start()
-        try:
-            yield mon
-        finally:
-            timer.cancel()
-            timer.join()
+        yield mon
+    except monitor.MonitorError as e:
+        if e[0] == monitor.E_TIMEOUT and custom_err_msg:
+            raise monitor.MonitorError('Waiting too long for a monitor event.')
+        raise
     finally:
-        # In this point of time, the timer is no longer running. So we can test
-        # if mon was stopped, if so, timeout was triggered.
-        if mon._stopped:
-            if raise_exception:
-                raise monitor.MonitorError('Waiting too long for a monitor '
-                                           'event.')
-        else:
+        if not mon.is_stopped():
             mon.stop()
 
 


-- 
To view, visit http://gerrit.ovirt.org/37239
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ife2ee7d16c79eea7c8ed1f0780aafbaa00356978
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Petr Horáček <phora...@redhat.com>
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to