Giuseppe Lavagetto has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/355082 )

Change subject: Add netlink-based Ipvsmanager implementation
......................................................................

Add netlink-based Ipvsmanager implementation

A new ipvs manager is implemented using the new FSM we added to pybal
and communicating directly with the kernel via the netlink interface.

Change-Id: I4ae6b61571f37a7a47fa83eb549d381976318ca8
---
M pybal/fsm.py
M pybal/ipvs/__init__.py
A pybal/ipvs/manager.py
A pybal/ipvs/server.py
A pybal/ipvs/service.py
M pybal/pybal.py
M pybal/test/__init__.py
M pybal/test/fixtures.py
A pybal/test/test_ipvs_manager.py
A pybal/test/test_ipvs_server.py
A pybal/test/test_ipvs_service.py
M setup.py
M tox.ini
13 files changed, 872 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal 
refs/changes/82/355082/1

diff --git a/pybal/fsm.py b/pybal/fsm.py
index ea2e02e..0166ad1 100644
--- a/pybal/fsm.py
+++ b/pybal/fsm.py
@@ -7,12 +7,12 @@
 # a deferred callback (which progresses the current state of the machine to S2)
 # or an errback (which does not progress the current states and gets us a 
Failure)
 #
+from __future__ import absolute_import
+
 from twisted.internet import defer
 from twisted.python import failure
 
-from pybal import util
-
-log = util.log
+from pybal.util import log
 
 
 class State(object):
diff --git a/pybal/ipvs/__init__.py b/pybal/ipvs/__init__.py
index 442a9d0..53c5015 100644
--- a/pybal/ipvs/__init__.py
+++ b/pybal/ipvs/__init__.py
@@ -6,3 +6,11 @@
 LVS service interface for PyBal
 """
 from pybal.ipvs.interface import LVSService
+from pybal.ipvs.manager import NetlinkServiceManager
+
+
+def get_service(section, cfgtuple, config):
+        if config.getBoolean('netlink', False):
+            return NetlinkServiceManager(section, cfgtuple, 
configuration=config)
+        else:
+            return LVSService(section, cfgtuple, configuration=config)
diff --git a/pybal/ipvs/manager.py b/pybal/ipvs/manager.py
new file mode 100644
index 0000000..854c650
--- /dev/null
+++ b/pybal/ipvs/manager.py
@@ -0,0 +1,84 @@
+from gnlpy.ipvs import IpvsClient
+import pybal.ipvs.service
+from pybal.ipvs import LVSService
+
+
+class NetlinkServiceManager(LVSService):
+    """Service manager that uses the netlink-based state machines"""
+
+    Debug = False
+
+    DryRun = False
+
+    def __init__(self, name, (protocol, ip, port, scheduler), configuration):
+        self.name = name
+        # Logical server state within pybal itself
+        self.servers = set()
+        # FSM to manage the IPVS state of every server
+        self.destinations = {}
+
+        if (protocol not in self.SVC_PROTOS or
+                scheduler not in self.SVC_SCHEDULERS):
+            raise ValueError('Invalid protocol or scheduler')
+
+        self.protocol = protocol
+        self.ip = ip
+        self.port = port
+        self.scheduler = scheduler
+
+        self.configuration = configuration
+        self.Debug = configuration.getboolean('debug', False)
+        self.DryRun = configuration.getboolean('dryrun', False)
+
+        self.nlClient = IpvsClient(verbose=self.Debug)
+
+        if self.configuration.getboolean('bgp', True):
+            from pybal import BGPFailover
+            # Add service ip to the BGP announcements
+            BGPFailover.addPrefix(self.ip)
+
+        self.fsm = pybal.ipvs.service.Service(
+            self.nlClient,
+            (self.protocol, self.ip, self.port, self.scheduler)
+        )
+        self.createService()
+
+    def createService(self):
+        """Initializes this LVS instance in LVS."""
+        d = self.fsm.toState('present')
+
+    def assignServers(self, newServers):
+        """Takes a (new) set of servers (as a host->Server dictionary)
+        and updates the LVS state accordingly."""
+        to_remove = self.servers - newServers
+        to_add = newServers
+        for server in to_remove:
+            self.removeServer(server)
+        for server in to_add:
+            # Warning: does this work when we change the weight?
+            self.addServer(server)
+
+    def _to_ipvs(self, server):
+        # TODO: check that server.host is always defined
+        if server.host not in self.destinations:
+            self.destinations[server.host] = pybal.ipvs.server.Server(
+                self.nlClient, server, self.fsm)
+        return self.destinations[server.host]
+
+    def removeServer(self, server):
+        def depool():
+            server.pooled = False
+            self.servers.remove(server) # May raise KeyError
+        ipvsserver = self._to_ipvs(server)
+        # For now this is blocking,
+        # might not be such in the future
+        d = ipvsserver.toState('unknown')
+        d.addCallback(depool)
+
+    def addServer(self, server):
+        def pool():
+            server.pooled = True
+            self.servers.add(server)
+        ipvsserver = self._to_ipvs(server)
+        d = ipvsserver.toState('present')
+        d.addCallback(pool)
diff --git a/pybal/ipvs/server.py b/pybal/ipvs/server.py
new file mode 100644
index 0000000..095908e
--- /dev/null
+++ b/pybal/ipvs/server.py
@@ -0,0 +1,199 @@
+from __future__ import absolute_import
+
+import socket
+
+from gnlpy import ipvs as netlink
+from pybal import fsm
+from pybal.ipvs.exception import IpvsError
+
+
+class Server(netlink.Dest, fsm.FiniteStateMachine):
+    """
+    Handler for ipvs destinations using netlink
+    """
+
+    check_state = True
+
+    def __init__(self, client, server, service):
+        self.client = client
+        # This _will_ block, which is acceptable since we're
+        # in a blocking part of the code
+        self.ip_ = server.ip or socket.gethostbyname(server.host)
+        # Server desired weight
+        self.weight_ = server.weight or 1
+        self.port_ = server.port
+        self.fwd_method_ = netlink.IPVS_ROUTING
+        self.validate()
+        self.service = service
+        fsm.FiniteStateMachine.__init__(self)
+        self.setupFSM()
+        self._check_state()
+
+    def validate(self):
+        # Fix incomplete validation in gnlpy
+        super(Server, self).validate()
+        assert isinstance(self.port_, int)
+
+    def setupFSM(self):
+        # S0: The IPVS state is unknown / has been removed
+        self.addState(fsm.State('unknown'))
+        # S1: The server is registered, but with weight 0
+        self.addState(fsm.State('drained'))
+        # S2: The server is registered with its desired weight
+        self.addState(fsm.State('up'))
+        # S3: The server is registered with a wrong weight
+        self.addState(fsm.State('refresh'))
+        # S0 => S2
+        # Add a server with its desired final weight
+        self.addTransition(self.states['unknown'],
+                           self.states['up'],
+                           self._add,
+                           self.logError)
+        # S0 => S1
+        # Add a server, but with weight zero
+        self.addTransition(self.states['unknown'],
+                           self.states['drained'],
+                           self._add_weight_zero,
+                           self.logError)
+        #S1 => S0
+        # Remove a previously drained server
+        self.addTransition(self.states['drained'],
+                           self.states['unknown'],
+                           self._del,
+                           self.logError)
+
+        # S1 => S2
+        # Modify the weight of a server from zero to the current
+        # value
+        self.addTransition(self.states['drained'],
+                           self.states['up'],
+                           self._set,
+                           self.logError)
+        # S2 => S1
+        # Set the weight of a pooled server to zero
+        self.addTransition(self.states['up'],
+                           self.states['drained'],
+                           self._drain,
+                           self.logError)
+        # S2 => S0
+        # Remove a server from the pool
+        self.addTransition(self.states['up'],
+                           self.states['unknown'],
+                           self._del,
+                           self.logError)
+        # S3 => S2
+        # Refresh a server to its desired state
+        self.addTransition(self.states['refresh'],
+                           self.states['up'],
+                           self._set,
+                           self.logError)
+        # S3 => S0
+        # Remove a server that needed refreshing.
+        self.addTransition(self.states['refresh'],
+                           self.states['unknown'],
+                           self._del,
+                           self.logError)
+
+        # S3 => S1
+        # Drain a server that needed refreshing
+        self.addTransition(self.states['refresh'],
+                           self.states['drained'],
+                           self._drain,
+                           self.logError)
+
+    def checkActualState(self):
+        """
+        Check the actual state of the fsm, and assign it to
+        self.currentState
+
+        This is needed if we want to make sure what pybal knows about the IPVS
+        state still corresponds to what is actually registered in the kernel.
+
+        Multiple things can change the state, most notably someone using 
"ipvsadm"
+        by hand.
+
+        If you set the config variable 'check_ipvs_state' to false, the
+        (quite expensive) checks here will not be performed, and pybal will
+        assume what it knows about the ipvs state is accurate.
+        """
+        if self.check_state:
+            self._check_state()
+
+    def _check_state(self):
+        # These are all blocking calls
+        if not self.service_ready():
+            # TODO: add a "waiting" state?
+            self.currentState = self.states['unknown']
+            return
+
+        dests = self.client.get_dests(self.service.to_attr_list())
+        curState = self.states['unknown']
+        for dest in dests:
+            dest.service = self.service
+            if not self.equals(dest):
+                continue
+            w = dest.weight()
+            if w == self.weight():
+                # Server is present with its predefined weight
+                curState = self.states['up']
+            elif w == 0:
+                # Server is present but drained
+                curState = self.states['drained']
+            else:
+                # Server is present but needs a change in weight
+                curState = self.states['refresh']
+        self.currentState = curState
+
+    def service_ready(self):
+        self.service.checkActualState()
+        return (self.service.currentState.name == 'present')
+
+    def _del(self):
+        if not self.service_ready():
+            raise IpvsError("Tried to delete a server from service %s but "
+                            "the service is not present",
+                            self.service)
+        self.client.del_dest(self.service.vip(), self.port_, self.ip_,
+                             protocol=self.service.proto())
+
+    def _add(self, weight=None):
+        if not self.service_ready():
+            raise IpvsError("Tried to add a server to service %s but "
+                            "the service is not present",
+                            self.service)
+        if weight is None:
+            weight = self.weight_
+        self.client.add_dest(self.service.vip(), self.port_, self.ip_,
+                             protocol=self.service.proto(),
+                             weight=weight, method=self.fwd_method_)
+
+    def _add_weight_zero(self):
+        return self._add(weight=0)
+
+    def _drain(self):
+        self._set(weight=0)
+
+    def _set(self, weight=None):
+        if not self.service_ready():
+            raise IpvsError("Tried to modify server of service %s but "
+                            "the service is not present",
+                            self.service)
+        if weight is None:
+            weight = self.weight_
+        if self.currentState.name == 'unknown':
+            return self._add(weight=weight)
+        self.client.update_dest(self.service.vip(), self.port_, self.ip_,
+                                protocol=self.service.proto(),
+                                weight=weight, method=self.fwd_method_)
+
+    def equals(self, server):
+        return (self.service.equals(server.service) and
+                self.ip() == server.ip() and self.port() == server.port())
+
+    def __eq__(self, server):
+        return (self.equals(server) and
+                self.currentState == server.currentState and
+                self.weight() == server.weight())
+
+    def __str__(self):
+        return "%s:%d with weight %d" % (self.ip_, self.port_, self.weight_)
diff --git a/pybal/ipvs/service.py b/pybal/ipvs/service.py
new file mode 100644
index 0000000..77c0791
--- /dev/null
+++ b/pybal/ipvs/service.py
@@ -0,0 +1,115 @@
+from __future__ import absolute_import
+
+import socket
+
+from gnlpy import ipvs as netlink
+import pybal.fsm
+from pybal.util import log
+
+fsm = pybal.fsm
+
+
+class Service(netlink.Service, fsm.FiniteStateMachine):
+    """
+    Handler for ipvs service using netlink
+    """
+    _protocols = {
+        'tcp': socket.IPPROTO_TCP,
+        'udp': socket.IPPROTO_UDP
+    }
+
+    check_state = True
+
+    @staticmethod
+    def from_tuple(service):
+        return {
+            'proto': service[0],
+            'vip': service[1],
+            'port': service[2],
+            'sched': service[3] if len(service) > 3 else 'rr'
+        }
+
+    def __init__(self, client, service):
+        s = Service.from_tuple(service)
+        self.client = client
+        netlink.Service.__init__(self, s, validate=True)
+        fsm.FiniteStateMachine.__init__(self)
+        self.setUpFSM()
+        # Read the state of this
+        self._check_state()
+
+    def setUpFSM(self):
+        # S0: the service is unknown/deleted
+        self.addState(fsm.State('unknown'))
+        # S1: the service is present
+        self.addState(fsm.State('present'))
+        # S0 => S1
+        self.addTransition(self.states['unknown'],
+                           self.states['present'],
+                           self._add,
+                           self.logError)
+        self.addTransition(self.states['present'],
+                           self.states['unknown'],
+                           self._del,
+                           self.logError)
+        self.currentState = self.states['unknown']
+
+    def checkActualState(self):
+        """
+        Check the actual state of the fsm, and assign it to
+        self.currentState
+
+        This is needed if we want to make sure what pybal knows about the IPVS
+        state still corresponds to what is actually registered in the kernel.
+
+        Multiple things can change the state, most notably someone using 
"ipvsadm"
+        by hand.
+
+        If you set the config variable 'check_ipvs_state' to false, the
+        (quite expensive) checks here will not be performed, and pybal will
+        assume what it knows about the ipvs state is accurate.
+        """
+        if self.check_state:
+            self._check_state()
+
+    def _check_state(self):
+        """
+        Check the actual state of the fsm, and assign it to
+        self.currentState
+
+        This is used during initialization to fetch the initial state
+        """
+        # Blocking call
+        s = self.client.get_service(self.to_attr_list())
+        if s is not None and self.equals(s):
+            actualState = self.states['present']
+        else:
+            actualState = self.states['unknown']
+        self.currentState = actualState
+
+    def _add(self):
+        log.debug("Adding service %s" % self)
+        self.client.add_service(self.vip_, self.port_,
+                                protocol=self.proto_num(),
+                                sched_name=self.sched_)
+
+    def _del(self):
+        if self.currentState == self.states['unknown']:
+            log.info("Not removing already absent service %s" % self)
+            return
+
+        log.debug("Removing service %s" % self)
+        self.client.del_service(self.vip_, self.port_,
+                                protocol=self.proto_num())
+
+    def equals(self, srv):
+        return (self.proto_ == srv.proto_ and self.vip() == srv.vip()
+                and self.port() == srv.port() and self.sched() == srv.sched())
+
+    def __eq__(self, srv):
+        return (self.equals(srv) and self.currentState == srv.currentState)
+
+    def __str__(self):
+        return "'%s://%s:%d' with scheduler %s" % (self.proto_,
+                                                   self.vip_, self.port_,
+                                                   self.sched_)
diff --git a/pybal/pybal.py b/pybal/pybal.py
index 1521222..59a869f 100755
--- a/pybal/pybal.py
+++ b/pybal/pybal.py
@@ -14,6 +14,8 @@
 import os, sys, signal, socket, random
 import logging
 from pybal import ipvs, util, config, etcd, instrumentation
+import pybal.ipvs.service
+import pybal.ipvs.server
 
 from twisted.python import failure
 from twisted.internet import reactor, defer
@@ -601,9 +603,9 @@
             configdict.update(cliconfig)
 
             if section != 'global':
-                services[section] = ipvs.LVSService(section, cfgtuple, 
configuration=configdict)
+                services[section] = ipvs.get_service(section, cfgtuple, 
configdict)
                 crd = Coordinator(services[section],
-                    configUrl=config.get(section, 'config'))
+                                  configUrl=config.get(section, 'config'))
                 log.info("Created LVS service '{}'".format(section))
                 instrumentation.PoolsRoot.addPool(crd.lvsservice.name, crd)
 
@@ -614,6 +616,11 @@
             configdict = util.ConfigDict()
         configdict.update(cliconfig)
 
+        # ipvs state should be checked by the FSMs or not?
+        check_state = configdict.getboolean('check_ipvs_state', True)
+        pybal.ipvs.server.Server.check_state = check_state
+        pybal.ipvs.server.Servie.check_state = check_state
+
         # Set the logging level
         if configdict.get('debug', False):
             util.PyBalLogObserver.level = logging.DEBUG
diff --git a/pybal/test/__init__.py b/pybal/test/__init__.py
index 5806cf3..013a9bc 100644
--- a/pybal/test/__init__.py
+++ b/pybal/test/__init__.py
@@ -9,7 +9,7 @@
 # flake8: noqa
 
 from .test_config import *
-from .test_ipvs import *
 from .test_monitor import *
 from .test_util import *
 from .test_instrumentation import *
+from .test_ipvs import *
diff --git a/pybal/test/fixtures.py b/pybal/test/fixtures.py
index 8f584de..12a0d4b 100644
--- a/pybal/test/fixtures.py
+++ b/pybal/test/fixtures.py
@@ -8,11 +8,16 @@
 """
 import unittest
 
+import mock
+
 import pybal.util
 import twisted.test.proto_helpers
 import twisted.trial.unittest
 from twisted.internet import defer
 
+from gnlpy import ipvs as netlink
+
+from pybal.ipvs import service, server
 
 class ServerStub(object):
     """Test stub for `pybal.Server`."""
@@ -115,3 +120,42 @@
         self.server = ServerStub(self.host, self.ip, self.port,
                                  lvsservice=self.lvsservice)
         self.reactor = twisted.test.proto_helpers.MemoryReactor()
+
+
+class IpvsTestCase(twisted.trial.unittest.TestCase):
+
+    def getMockClient(self):
+        c = mock.MagicMock(spec=netlink.IpvsClient)
+        return c
+
+    def getService(self):
+        # currentState will be overridden by the actual state
+        t = ('tcp', '192.168.1.1', 80)
+        s = service.Service(self.client, t)
+        s.currentState = s.states['present']
+        return s
+
+    def getServer(self, srv=None):
+        if srv is None:
+            srv = self.getLogicalServers()[0]
+        s = self.getService()
+        return server.Server(self.client, srv, s)
+
+    def getPool(self, dest_range=range(3)):
+        serv = service.Service(self.client, ('tcp', '192.168.1.1', 80))
+        dests = []
+        for i in dest_range:
+            d = netlink.Dest({'ip': '10.0.0.%d' % (i + 1), 'port': 80, 
'weight': 10})
+            dests.append(d)
+        return {'service': serv, 'dests': dests}
+
+    def getLogicalServers(self):
+        s = [
+            ServerStub('www1.local', ip='10.0.0.1', port=80, weight=10),
+            ServerStub('www2.local', ip='10.0.0.2', port=80, weight=10),
+            ServerStub('www3.local', ip='10.0.0.3', port=80, weight=10)
+        ]
+        for srv in s:
+            srv.up = True
+            srv.pooled = True
+        return s
diff --git a/pybal/test/test_ipvs_manager.py b/pybal/test/test_ipvs_manager.py
new file mode 100644
index 0000000..1034753
--- /dev/null
+++ b/pybal/test/test_ipvs_manager.py
@@ -0,0 +1,25 @@
+import mock
+
+from twisted.trial import unittest
+
+from pybal.ipvs import manager
+
+
+class TestIpvsServiceManager(unittest.TestCase):
+
+    def setUp(self):
+        self.service = ('tcp', '192.168.1.1', 6379, 'wrr')
+        self.conf = mock.MagicMock()
+
+    @mock.patch('pybal.ipvs.manager.IpvsClient')
+    def testInit(self, nl_mock):
+        self.conf.getboolean.return_value = False
+        with mock.patch('pybal.ipvs.service.Service'):
+            mgr = manager.NetlinkServiceManager('foo', self.service, self.conf)
+        self.assertEqual(mgr.name, 'foo')
+        self.assertEqual(mgr.servers, set())
+        self.assertEqual(mgr.destinations, {})
+        self.assertEqual(mgr.protocol, 'tcp')
+        self.assertEqual(mgr.ip, '192.168.1.1')
+        self.assertEqual(mgr.port, 6379)
+        self.assertEqual(mgr.scheduler, 'wrr')
diff --git a/pybal/test/test_ipvs_server.py b/pybal/test/test_ipvs_server.py
new file mode 100644
index 0000000..5b7e85f
--- /dev/null
+++ b/pybal/test/test_ipvs_server.py
@@ -0,0 +1,262 @@
+from pybal.ipvs import server
+from pybal.test.fixtures import ServerStub, IpvsTestCase
+
+
+def trapValueError(f):
+    f.trap(ValueError)
+    return None
+
+
+def callbackFailure(_):
+    raise Exception("Not raised the exceptio")
+
+
+class IpvsServerTestCase(IpvsTestCase):
+
+    def setUp(self):
+        self.client = self.getMockClient()
+        self.server = self.getServer()
+
+    def testInit(self):
+        # An invalid IP address causes an exception
+        s = ServerStub('www1.local', '10.0.0,1', 80, 10)
+        self.assertRaises(AssertionError, server.Server, self.client, s,
+                          self.getService())
+        # A non-numeric port causes an exception
+        s = ServerStub('www1.local', '10.0.0.1', '80', 10)
+        self.assertRaises(AssertionError, server.Server, self.client, s,
+                          self.getService())
+
+        # A non-present weight defaults to 1
+        s = ServerStub('www1.local', '10.0.0.1', 80)
+        srv = server.Server(self.client, s, self.getService())
+        self.assertEquals(srv.weight_, 1)
+
+        # The FSM has been set up
+        states = ['unknown', 'drained', 'up', 'refresh']
+        for state in states:
+            self.assertIn(state, srv.states)
+
+        # Status should be 'unkown' since we have no service defined
+        srv.checkActualState()
+        self.assertEquals(srv.currentState.name, 'unknown')
+
+    def testCheckActualState(self):
+        # All states behave like expected
+        # S0
+        self.client.get_service.return_value = None
+        self.server.checkActualState()
+        self.assertEquals(self.server.currentState.name, 'unknown')
+        p = self.getPool()
+        self.client.get_service.return_value = p['service']
+        self.client.get_dests.return_value = []
+        self.server.checkActualState()
+        self.assertEquals(self.server.currentState.name, 'unknown')
+        # S1
+        dests = p['dests']
+        dests[0].weight_ = 0
+        self.client.get_dests.return_value = dests
+        # if we set check_service to false, checkActualState() won't do 
anything.
+        self.server.check_state = False
+        self.server.checkActualState()
+        self.assertEqual(self.server.currentState.name, 'unknown')
+        self.server.check_state = True
+        self.server.checkActualState()
+        self.assertEquals(self.server.currentState.name, 'drained')
+        # S2
+        dests[0].weight_ = self.server.weight()
+        self.server.checkActualState()
+        self.assertEquals(self.server.currentState.name, 'up')
+        # S3
+        dests[0].weight_ = 54
+        self.server.checkActualState()
+        self.assertEquals(self.server.currentState.name, 'refresh')
+        # If the dests request raises  an exception, it doesn't get caught
+        self.client.get_dests.side_effect = Exception('Boom!')
+        with self.assertRaises(Exception):
+            self.server.checkActualState()
+
+    def testTransitionFromUnknown(self):
+        # Initial state: unknown
+        p = self.getPool()
+        dests = p['dests']
+        myDest = dests[0]
+        self.client.get_service.return_value = p['service']
+        self.client.get_dests.return_value = []
+        # To "up"
+        d = self.server.toState('up')
+        d.addCallback(
+            lambda _: self.client.add_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                weight=self.server.weight(),
+                method=self.server.fwd_method(),
+                protocol=self.server.service.proto(),
+            )
+        )
+        d.addCallback(
+            lambda _:
+            self.assertEquals(self.server.currentState.name, 'up')
+        )
+        # To "drained"
+        d = self.server.toState('drained')
+        d.addCallback(
+            lambda _: self.client.add_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                weight=0,
+                method=self.server.fwd_method(),
+                protocol=self.server.service.proto(),
+            )
+        )
+        d.addCallback(
+            lambda _:
+            self.assertEquals(self.server.currentState.name, 'drained')
+        )
+        # To "referesh" doesn't exist
+        r = self.server.toState('refresh')
+        r.addCallbacks(callbackFailure, trapValueError)
+
+    def testTransitionFromDrained(self):
+        p = self.getPool()
+        dests = p['dests']
+        myDest = dests[0]
+        myDest.weight_ = 0
+        self.client.get_service.return_value = p['service']
+        self.client.get_dests.return_value = dests
+        # To unknown
+        d = self.server.toState('unknown')
+        d.addCallback(self.assertEquals, self.server.states['unknown'])
+        d.addCallback(
+            lambda _: self.client.del_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto()
+            )
+        )
+        # To up
+        d = self.server.toState('up')
+        d.addCallback(self.assertEquals, self.server.states['up'])
+        d.addCallback(
+            lambda _: self.client.update_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto(),
+                weight=self.server.weight(),
+                method=self.server.fwd_method(),
+            )
+        )
+        # To "referesh" doesn't exist
+        r = self.server.toState('refresh')
+        r.addCallbacks(callbackFailure, trapValueError)
+
+    def testTransitionFromUp(self):
+        p = self.getPool()
+        dests = p['dests']
+        myDest = dests[0]
+        self.client.get_service.return_value = p['service']
+        self.client.get_dests.return_value = dests
+        # To unknown
+        d = self.server.toState('unknown')
+        d.addCallback(self.assertEquals, self.server.states['unknown'])
+        d.addCallback(
+            lambda _: self.client.del_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto()
+            )
+        )
+        # To drained
+        d = self.server.toState('drained')
+        d.addCallback(self.assertEquals, self.server.states['drained'])
+        d.addCallback(
+            lambda _: self.client.update_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto(),
+                weight=0,
+                method=self.server.fwd_method(),
+            )
+        )
+        # To "referesh" doesn't exist
+        r = self.server.toState('refresh')
+        r.addCallbacks(callbackFailure, trapValueError)
+
+
+    def testTransitionFromRefresh(self):
+        p = self.getPool()
+        dests = p['dests']
+        myDest = dests[0]
+        myDest.weight_ = 147
+        self.client.get_service.return_value = p['service']
+        self.client.get_dests.return_value = dests
+        # to unknown
+        d = self.server.toState('unknown')
+        d.addCallback(self.assertEquals, self.server.states['unknown'])
+        d.addCallback(
+            lambda _: self.client.del_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto()
+            )
+        )
+        # to up
+        d= self.server.toState('up')
+        d.addCallback(self.assertEquals, self.server.states['up'])
+        d.addCallback(
+            lambda _: self.client.update_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto(),
+                weight=self.server.weight(),
+                method=self.server.fwd_method(),
+            )
+        )
+        # to drained
+        d= self.server.toState('drained')
+        d.addCallback(self.assertEquals, self.server.states['drained'])
+        d.addCallback(
+            lambda _: self.client.update_dest.assert_called_with(
+                self.server.service.vip(),
+                self.server.service.port(),
+                self.server.ip(),
+                protocol=self.server.service.proto(),
+                weight=0,
+                method=self.server.fwd_method(),
+            )
+        )
+
+
+    def testTransitionCornerCases(self):
+        # From unknown
+        p = self.getPool()
+        dests = p['dests']
+        myDest = dests[0]
+        self.client.get_service.return_value = p['service']
+        self.client.get_dests.return_value = []
+        ## If the service is absent, the transaction fails
+        self.client.get_service.return_value = None
+        d = self.server.toState('up')
+        d.addCallback(self.assertEquals, None)
+        ## If the add_dest method returns an exception, the transaction fails
+        self.client.add_dest.side_effect = Exception('Poww!')
+        self.client.get_service.return_value = p['service']
+        d = self.server.toState('up')
+        d.addCallback(self.assertEquals, None)
+        d.addCallback(lambda _: self.assertEquals(
+            self.server.currentState.name, 'unknown')
+        )
+        # Call from the already 'present' state should not be doing anything.
+        self.client.get_dests.return_value = dests
+        self.client.reset_mock()
+        d = self.server.toState('up')
+        d.addCallback(
+            lambda _: self.client.update_dest.assert_not_called())
diff --git a/pybal/test/test_ipvs_service.py b/pybal/test/test_ipvs_service.py
new file mode 100644
index 0000000..9882275
--- /dev/null
+++ b/pybal/test/test_ipvs_service.py
@@ -0,0 +1,118 @@
+import mock
+import socket
+
+from gnlpy import ipvs as netlink
+
+from pybal.ipvs import service
+from pybal.test import fixtures
+
+
+class IpvsServiceTestCase(fixtures.IpvsTestCase):
+
+    def getPool(self):
+        serv = service.Service.from_tuple(('tcp', '192.168.1.1', 6379))
+        return netlink.Pool({'service': serv, 'dests': []})
+
+    def testFromTuple(self):
+        tup = ('proto', 'vip', 'port', 'sched')
+        s = service.Service.from_tuple(tup)
+        for k, v in s.items():
+            self.assertEquals(k, v)
+        # Default to round-robin scheduling
+        tup = ('proto', 'vip', 'port')
+        s = service.Service.from_tuple(tup)
+        self.assertEquals(s['sched'], 'rr')
+
+    def testInit(self):
+        c = self.getMockClient()
+        # An invalid tuple raises an exception
+        t = ('tcp', '192.168,0,0', 6379)
+        self.assertRaises(AssertionError, service.Service, c, t)
+        # A correct initialization will set up all the service variables
+        t = ('tcp', '192.168.1.1', 6379)
+        s = service.Service(c, t)
+        self.assertEquals(s.proto_, 'tcp')
+        self.assertEquals(s.vip_, '192.168.1.1')
+        self.assertEquals(s.port_, 6379)
+        self.assertEquals(s.sched_, 'rr')
+        # The FSM has been set up
+        self.assertIn('unknown', s.states)
+        self.assertIn('present', s.states)
+        self.assertIn('present', s.states['unknown'].transitions)
+        self.assertIn('unknown', s.states['present'].transitions)
+        # State should be 'unknown' given we gave no answer
+        self.assertEquals(s.currentState, s.states['unknown'])
+
+    def testCheckActualState(self):
+        c = self.getMockClient()
+        # currentState will be overridden by the actual state
+        t = ('tcp', '192.168.1.1', 6379)
+        s = service.Service(c, t)
+        s.currentState = s.states['present']
+        s.checkActualState()
+        self.assertEquals(s.currentState.name, 'unknown')
+        # if the pool is present, the actual state will become present
+        c.get_service.return_value = s
+        # If not required, checkActualState will do nothing
+        s.check_state = False
+        s.checkActualState()
+        self.assertEqual(s.currentState.name, 'unknown')
+        s.check_state = True
+        s.checkActualState()
+        self.assertEquals(s.currentState.name, 'present')
+        # If the request for pools raises an exception,
+        # do not try to catch it.
+        with self.assertRaises(Exception):
+            c.get_service.side_effect = Exception('something!')
+            s.checkActualState()
+
+    def testTransitionToPresent(self):
+        # First situation: service is not present, we transition to s1 
(present)
+        c = self.getMockClient()
+        t = ('tcp', '192.168.1.1', 6379)
+        s = service.Service(c, t)
+        c.get_service.return_value = None
+        d = s.toState('present')
+        d.addCallback(self.assertEqual, s.states['present'])
+        d.addCallback(lambda _: s.client.add_service.assert_called_with(
+            '192.168.1.1', 6379, protocol=socket.IPPROTO_TCP,
+            sched_name='rr'))
+        # Check the current state has been set
+        print s.currentState.name
+        assert s.currentState == s.states['present']
+
+    def testTransitionToUnknown(self):
+        c = self.getMockClient()
+        t = ('tcp', '192.168.1.1', 6379)
+        s = service.Service(c, t)
+        c.get_service.return_value = s
+        s.checkActualState()
+        assert s.currentState == s.states['present']
+        d = s.toState('unknown')
+        d.addCallback(self.assertEqual, s.states['unknown'])
+        d.addCallback(lambda _: c.del_service.assert_called_with(
+            '192.168.1.1', 6379, protocol=socket.IPPROTO_TCP))
+        # Now let's try to move to "unknown" again, nothing will happen
+        c.get_service.return_value = None
+        c.del_service.reset_mock()
+        d1 = s.toState('unknown')
+        d1.addCallback(lambda _: c.del_service.assert_not_called())
+
+    def testFaultyTransaction(self):
+        c = self.getMockClient()
+        t = ('tcp', '192.168.1.1', 6379)
+        s = service.Service(c, t)
+        c.get_service.return_value = s
+        # Simulate the netlink layer failing
+        c.del_service.side_effect = RuntimeError('welcome to the terrordome!')
+        # Failure doesn't propagate outside of the fsm
+        try:
+            d = s.toState('unknown')
+        except Exception as e:
+            self.fail("Unhandled failure in state transition, %s" % e)
+
+        # Verify that the deferred returns None in this case
+        d.addCallback(self.assertEquals, None)
+        # State is still present
+        d.addCallback(lambda _: self.assertEquals(s.currentState.name,
+                                                  'present'))
diff --git a/setup.py b/setup.py
index acbcfc1..b8e53f9 100644
--- a/setup.py
+++ b/setup.py
@@ -15,7 +15,7 @@
 
 setup(
     name='PyBal',
-    version='1.12',
+    version='1.14-dev',
     license='GPLv2+',
     author='Mark Bergsma',
     author_email='[email protected]',
@@ -42,7 +42,8 @@
     ),
     requires=(
         'twisted',
-        'PyOpenSSL'
+        'PyOpenSSL',
+        'gnlpy(>=1.2.0)',
     ),
     tests_require=(
         'mock',
diff --git a/tox.ini b/tox.ini
index 3de1bc3..02234ba 100644
--- a/tox.ini
+++ b/tox.ini
@@ -10,6 +10,7 @@
   twisted
   mock
   PyOpenSSL
+  gnlpy
 
 [testenv:cover]
 commands =

-- 
To view, visit https://gerrit.wikimedia.org/r/355082
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4ae6b61571f37a7a47fa83eb549d381976318ca8
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/pybal
Gerrit-Branch: 2.0-dev
Gerrit-Owner: Giuseppe Lavagetto <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to