Petr Horáček has uploaded a new change for review. Change subject: netlink: event monitor ......................................................................
netlink: event monitor NOTE: just a draft Change-Id: I23ea72986564c5a115e36be0e7cf679c28c8ea96 Signed-off-by: Petr Horáček <phora...@redhat.com> --- M debian/vdsm-python.install M lib/vdsm/netlink/Makefile.am M lib/vdsm/netlink/__init__.py A lib/vdsm/netlink/monitor.py M vdsm.spec.in 5 files changed, 183 insertions(+), 3 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/26/32626/1 diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install index 8d11bfc..60f71f4 100644 --- a/debian/vdsm-python.install +++ b/debian/vdsm-python.install @@ -13,6 +13,7 @@ ./usr/lib/python2.7/dist-packages/vdsm/netlink/__init__.py ./usr/lib/python2.7/dist-packages/vdsm/netlink/addr.py ./usr/lib/python2.7/dist-packages/vdsm/netlink/link.py +./usr/lib/python2.7/dist-packages/vdsm/netlink/monitor.py ./usr/lib/python2.7/dist-packages/vdsm/netlink/route.py ./usr/lib/python2.7/dist-packages/vdsm/profile.py ./usr/lib/python2.7/dist-packages/vdsm/qemuimg.py diff --git a/lib/vdsm/netlink/Makefile.am b/lib/vdsm/netlink/Makefile.am index 4c9f214..02e5ea6 100644 --- a/lib/vdsm/netlink/Makefile.am +++ b/lib/vdsm/netlink/Makefile.am @@ -23,5 +23,6 @@ __init__.py \ addr.py \ link.py \ + monitor.py \ route.py \ $(NULL) diff --git a/lib/vdsm/netlink/__init__.py b/lib/vdsm/netlink/__init__.py index a292e23..2e2884e 100644 --- a/lib/vdsm/netlink/__init__.py +++ b/lib/vdsm/netlink/__init__.py @@ -34,11 +34,13 @@ class NLSocketPool(object): """Pool of netlink sockets.""" - def __init__(self, size): + def __init__(self, size, groups=None, disable_seq_check=False): if size <= 0: raise ValueError('Invalid socket pool size %r. Must be positive') + self._disable_seq_check = disable_seq_check self._semaphore = BoundedSemaphore(size) self._sockets = Queue(maxsize=size) + self._groups = groups @contextmanager def socket(self): @@ -47,21 +49,29 @@ try: sock = self._sockets.get_nowait() except Empty: - sock = _open_socket() + sock = _open_socket(self._disable_seq_check) try: + _add_socket_memberships(sock, self._groups) yield sock finally: + _drop_socket_memberships(sock, self._groups) self._sockets.put_nowait(sock) _pool = NLSocketPool(_POOL_SIZE) -def _open_socket(): +def _open_socket(disable_seq_check): """Returns an open netlink socket.""" sock = _nl_socket_alloc() if sock is None: raise IOError(get_errno(), 'Failed to allocate netlink handle') + + if disable_seq_check: + err = _nl_socket_disable_seq_check(sock) + if err: + _nl_socket_free(sock) + raise IOError(-err, _nl_geterror()) err = _nl_connect(sock, _NETLINK_ROUTE) if err: @@ -107,6 +117,55 @@ # libnl1 reports 'universe' instead of 'global' return 'global' if scope == 'universe' else scope + +# TODO: define all groups listed in file?: +# /usr/linux/include/rtnetlink.h, enum rtnetlink_groups {... +_KNOWN_GROUPS = {'link': 1, # RTNLGRP_LINK + 'notify': 2, # RTNPGRP_NOTIFY + 'neigh': 3, # RTNLGRP_NEIGH + 'tc': 4, # RTNLGRP_TC + 'ipv4-ifaddr': 5, # RTNLGRP_IPV4_IFADDR + 'ipv4-mroute': 6, # RTNLGRP_IPV4_MROUTE + 'ipv4-route': 7, # RTNLGRP_IPV4_ROUTE + 'ipv6-ifaddr': 9, # RTNLGRP_IPV6_IFADDR + 'ipv6-mroute': 10, # RTNLGRP_IPV6_MROUTE + 'ipv6-route': 11, # RTNLGRP_IPV6_ROUTE + 'ipv6-ifinfo': 12, # RTNLGRP_IPV6_IFINFO + 'decnet-ifaddr': 13, # RTNLGRP_DECnet_IFADDR + 'decnet-route': 14, # RTNLGRP_DECnet_ROUTE + 'ipv6-prefix': 16} # RTNLGRP_IPV6_PREFIX + + +class UnknownGroup(Exception): + pass + + +def _get_groups_codes(groups): + groups_codes = [] + for g in groups: + code = _KNOWN_GROUPS.get(g, None) + if not code: + raise UnknownGroup + groups_codes.append(code) + return groups_codes + + +def _add_socket_memberships(socket, groups): + groups_codes = _get_groups_codes(groups) + err = _nl_socket_add_memberships(socket, groups_codes) + if err: + _nl_socket_free(socket) + raise IOError(-err, _nl_geterror()) + + +def _drop_socket_memberships(socket, groups): + groups_codes = _get_groups_codes(groups) + err = _nl_socket_drop_memberships(socket, groups_codes) + if err: + _nl_socket_free(socket) + raise IOError(-err, _nl_geterror()) + + # C function prototypes # http://docs.python.org/2/library/ctypes.html#function-prototypes # This helps ctypes know the calling conventions it should use to communicate @@ -128,16 +187,27 @@ LIBNL = CDLL('libnl-3.so.200', use_errno=True) LIBNL_ROUTE = CDLL('libnl-route-3.so.200', use_errno=True) + _nl_socket_add_memberships = _int_proto(('nl_socket_add_memberships', + LIBNL)) _nl_socket_alloc = CFUNCTYPE(c_void_p)(('nl_socket_alloc', LIBNL)) + _nl_socket_disable_seq_check = _int_proto(('nl_socket_disable_seq_check', + LIBNL)) + _nl_socket_drop_memberships = _int_proto(('nl_socket_drop_memberships', + LIBNL)) _nl_socket_free = _none_proto(('nl_socket_free', LIBNL)) + _nl_socket_get_fd = _int_proto(('nl_socket_get_fd'), LIBNL) else: # libnl-1 # Change from handle to socket as it is now more accurately called in # libnl-3 LIBNL_ROUTE = LIBNL = CDLL('libnl.so.1', use_errno=True) + _nl_socket_add_memberships = None # TODO _nl_socket_alloc = CFUNCTYPE(c_void_p)(('nl_handle_alloc', LIBNL)) + _nl_socket_disable_seq_check = None # TODO + _nl_socket_drop_memberships = None # TODO _nl_socket_free = _none_proto(('nl_handle_destroy', LIBNL)) + _nl_socket_get_fd = None def _alloc_cache(allocator, sock): cache = allocator(sock) diff --git a/lib/vdsm/netlink/monitor.py b/lib/vdsm/netlink/monitor.py new file mode 100644 index 0000000..d20d4c2 --- /dev/null +++ b/lib/vdsm/netlink/monitor.py @@ -0,0 +1,107 @@ +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# +from Queue import Queue +import ctypes +import threading +import select + +from . import (_KNOWN_GROUPS, NLSocketPool, _POOL_SIZE, _nl_link_cache, + _nl_socket_get_fd) + + +class MonitorError(Exception): + pass + + +class Monitor(object): + """Netlink monitor. Usage: + Get events collected while the monitor was running: + + mon = Monitor() + mon.start() + .... + mon.stop() + for event in mon: + handle event + + Monitoring events forever: + mon = Monitor('link', 'ipv4-route') + mon.start() + for event in mon: + handle event + + Possible groups: all (default value), link, notify, neigh, tc, ipv4-ifaddr, + ipv4-mroute, ipv4-route ipv6-ifaddr, ipv6-mroute, ipv6-route, ipv6-ifinfo, + decnet-ifaddr, decnet-route, ipv6-prefix + """ + def __init__(self, *groups): + if not groups or 'all' in groups: + groups = _KNOWN_GROUPS.keys() + self.queue = Queue() + # NOTE: mutable object -> pointer? not very elegant solution + self.monitor = MonitorThread(self.queue, groups) + + def __iter__(self, timeout=None): + if not self.monitor.is_alive(): + raise MonitorError('The monitor has not run yet') + yield self.queue.get() + + def start(self): + if self.monitor.is_alive(): + raise MonitorError('The monitor has already started') + # TODO: start() or run() ? + self.monitor.start() + + def stop(self): + if not self.monitor.is_alive(): + raise MonitorError('The monitor is already stopped') + self.monitor.stop() + + +class StopMonitor(Exception): + pass + + +class MonitorThread(threading.Thread): + def __init__(self, queue, groups): + threading.Thread.__init__(self) + self.queue = queue + self.groups = groups + + def run(self): + # TODO: try except StopException + _pool = NLSocketPool(_POOL_SIZE, self.groups, disable_seq_check=True) + with _pool.socket() as sock: + with _nl_link_cache(sock): + while True: + fd = _nl_socket_get_fd(sock) + select.select([fd], [], []) + # TODO: get _nl output, parse it + output = {} + self.queue.put(output) + + # NOTE: https://docs.python.org/2/c-api/init.html + def stop(self): + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.ident), + ctypes.py_object(StopMonitor)) + + # TODO: check type and if detected, parse with another module + def _parse(input): + pass diff --git a/vdsm.spec.in b/vdsm.spec.in index e5ea531..0ca5b69 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1184,6 +1184,7 @@ %{python_sitearch}/%{vdsm_name}/netlink/__init__.py* %{python_sitearch}/%{vdsm_name}/netlink/addr.py* %{python_sitearch}/%{vdsm_name}/netlink/link.py* +%{python_sitearch}/%{vdsm_name}/netlink/monitor.py* %{python_sitearch}/%{vdsm_name}/netlink/route.py* %{python_sitearch}/%{vdsm_name}/profile.py* %{python_sitearch}/%{vdsm_name}/qemuimg.py* -- To view, visit http://gerrit.ovirt.org/32626 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I23ea72986564c5a115e36be0e7cf679c28c8ea96 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Petr Horáček <phora...@redhat.com> _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches