Giuseppe Lavagetto has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/355082 )
Change subject: Add netlink-based Ipvsmanager implementation
......................................................................
Add netlink-based Ipvsmanager implementation
A new ipvs manager is implemented using the new FSM we added to pybal
and communicating directly with the kernel via the netlink interface.
Change-Id: I4ae6b61571f37a7a47fa83eb549d381976318ca8
---
M pybal/fsm.py
M pybal/ipvs/__init__.py
A pybal/ipvs/manager.py
A pybal/ipvs/server.py
A pybal/ipvs/service.py
M pybal/pybal.py
M pybal/test/__init__.py
M pybal/test/fixtures.py
A pybal/test/test_ipvs_manager.py
A pybal/test/test_ipvs_server.py
A pybal/test/test_ipvs_service.py
M setup.py
M tox.ini
13 files changed, 872 insertions(+), 8 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/debs/pybal
refs/changes/82/355082/1
diff --git a/pybal/fsm.py b/pybal/fsm.py
index ea2e02e..0166ad1 100644
--- a/pybal/fsm.py
+++ b/pybal/fsm.py
@@ -7,12 +7,12 @@
# a deferred callback (which progresses the current state of the machine to S2)
# or an errback (which does not progress the current states and gets us a
Failure)
#
+from __future__ import absolute_import
+
from twisted.internet import defer
from twisted.python import failure
-from pybal import util
-
-log = util.log
+from pybal.util import log
class State(object):
diff --git a/pybal/ipvs/__init__.py b/pybal/ipvs/__init__.py
index 442a9d0..53c5015 100644
--- a/pybal/ipvs/__init__.py
+++ b/pybal/ipvs/__init__.py
@@ -6,3 +6,11 @@
LVS service interface for PyBal
"""
from pybal.ipvs.interface import LVSService
+from pybal.ipvs.manager import NetlinkServiceManager
+
+
+def get_service(section, cfgtuple, config):
+ if config.getBoolean('netlink', False):
+ return NetlinkServiceManager(section, cfgtuple,
configuration=config)
+ else:
+ return LVSService(section, cfgtuple, configuration=config)
diff --git a/pybal/ipvs/manager.py b/pybal/ipvs/manager.py
new file mode 100644
index 0000000..854c650
--- /dev/null
+++ b/pybal/ipvs/manager.py
@@ -0,0 +1,84 @@
+from gnlpy.ipvs import IpvsClient
+import pybal.ipvs.service
+from pybal.ipvs import LVSService
+
+
+class NetlinkServiceManager(LVSService):
+ """Service manager that uses the netlink-based state machines"""
+
+ Debug = False
+
+ DryRun = False
+
+ def __init__(self, name, (protocol, ip, port, scheduler), configuration):
+ self.name = name
+ # Logical server state within pybal itself
+ self.servers = set()
+ # FSM to manage the IPVS state of every server
+ self.destinations = {}
+
+ if (protocol not in self.SVC_PROTOS or
+ scheduler not in self.SVC_SCHEDULERS):
+ raise ValueError('Invalid protocol or scheduler')
+
+ self.protocol = protocol
+ self.ip = ip
+ self.port = port
+ self.scheduler = scheduler
+
+ self.configuration = configuration
+ self.Debug = configuration.getboolean('debug', False)
+ self.DryRun = configuration.getboolean('dryrun', False)
+
+ self.nlClient = IpvsClient(verbose=self.Debug)
+
+ if self.configuration.getboolean('bgp', True):
+ from pybal import BGPFailover
+ # Add service ip to the BGP announcements
+ BGPFailover.addPrefix(self.ip)
+
+ self.fsm = pybal.ipvs.service.Service(
+ self.nlClient,
+ (self.protocol, self.ip, self.port, self.scheduler)
+ )
+ self.createService()
+
+ def createService(self):
+ """Initializes this LVS instance in LVS."""
+ d = self.fsm.toState('present')
+
+ def assignServers(self, newServers):
+ """Takes a (new) set of servers (as a host->Server dictionary)
+ and updates the LVS state accordingly."""
+ to_remove = self.servers - newServers
+ to_add = newServers
+ for server in to_remove:
+ self.removeServer(server)
+ for server in to_add:
+ # Warning: does this work when we change the weight?
+ self.addServer(server)
+
+ def _to_ipvs(self, server):
+ # TODO: check that server.host is always defined
+ if server.host not in self.destinations:
+ self.destinations[server.host] = pybal.ipvs.server.Server(
+ self.nlClient, server, self.fsm)
+ return self.destinations[server.host]
+
+ def removeServer(self, server):
+ def depool():
+ server.pooled = False
+ self.servers.remove(server) # May raise KeyError
+ ipvsserver = self._to_ipvs(server)
+ # For now this is blocking,
+ # might not be such in the future
+ d = ipvsserver.toState('unknown')
+ d.addCallback(depool)
+
+ def addServer(self, server):
+ def pool():
+ server.pooled = True
+ self.servers.add(server)
+ ipvsserver = self._to_ipvs(server)
+ d = ipvsserver.toState('present')
+ d.addCallback(pool)
diff --git a/pybal/ipvs/server.py b/pybal/ipvs/server.py
new file mode 100644
index 0000000..095908e
--- /dev/null
+++ b/pybal/ipvs/server.py
@@ -0,0 +1,199 @@
+from __future__ import absolute_import
+
+import socket
+
+from gnlpy import ipvs as netlink
+from pybal import fsm
+from pybal.ipvs.exception import IpvsError
+
+
+class Server(netlink.Dest, fsm.FiniteStateMachine):
+ """
+ Handler for ipvs destinations using netlink
+ """
+
+ check_state = True
+
+ def __init__(self, client, server, service):
+ self.client = client
+ # This _will_ block, which is acceptable since we're
+ # in a blocking part of the code
+ self.ip_ = server.ip or socket.gethostbyname(server.host)
+ # Server desired weight
+ self.weight_ = server.weight or 1
+ self.port_ = server.port
+ self.fwd_method_ = netlink.IPVS_ROUTING
+ self.validate()
+ self.service = service
+ fsm.FiniteStateMachine.__init__(self)
+ self.setupFSM()
+ self._check_state()
+
+ def validate(self):
+ # Fix incomplete validation in gnlpy
+ super(Server, self).validate()
+ assert isinstance(self.port_, int)
+
+ def setupFSM(self):
+ # S0: The IPVS state is unknown / has been removed
+ self.addState(fsm.State('unknown'))
+ # S1: The server is registered, but with weight 0
+ self.addState(fsm.State('drained'))
+ # S2: The server is registered with its desired weight
+ self.addState(fsm.State('up'))
+ # S3: The server is registered with a wrong weight
+ self.addState(fsm.State('refresh'))
+ # S0 => S2
+ # Add a server with its desired final weight
+ self.addTransition(self.states['unknown'],
+ self.states['up'],
+ self._add,
+ self.logError)
+ # S0 => S1
+ # Add a server, but with weight zero
+ self.addTransition(self.states['unknown'],
+ self.states['drained'],
+ self._add_weight_zero,
+ self.logError)
+ #S1 => S0
+ # Remove a previously drained server
+ self.addTransition(self.states['drained'],
+ self.states['unknown'],
+ self._del,
+ self.logError)
+
+ # S1 => S2
+ # Modify the weight of a server from zero to the current
+ # value
+ self.addTransition(self.states['drained'],
+ self.states['up'],
+ self._set,
+ self.logError)
+ # S2 => S1
+ # Set the weight of a pooled server to zero
+ self.addTransition(self.states['up'],
+ self.states['drained'],
+ self._drain,
+ self.logError)
+ # S2 => S0
+ # Remove a server from the pool
+ self.addTransition(self.states['up'],
+ self.states['unknown'],
+ self._del,
+ self.logError)
+ # S3 => S2
+ # Refresh a server to its desired state
+ self.addTransition(self.states['refresh'],
+ self.states['up'],
+ self._set,
+ self.logError)
+ # S3 => S0
+ # Remove a server that needed refreshing.
+ self.addTransition(self.states['refresh'],
+ self.states['unknown'],
+ self._del,
+ self.logError)
+
+ # S3 => S1
+ # Drain a server that needed refreshing
+ self.addTransition(self.states['refresh'],
+ self.states['drained'],
+ self._drain,
+ self.logError)
+
+ def checkActualState(self):
+ """
+ Check the actual state of the fsm, and assign it to
+ self.currentState
+
+ This is needed if we want to make sure what pybal knows about the IPVS
+ state still corresponds to what is actually registered in the kernel.
+
+ Multiple things can change the state, most notably someone using
"ipvsadm"
+ by hand.
+
+ If you set the config variable 'check_ipvs_state' to false, the
+ (quite expensive) checks here will not be performed, and pybal will
+ assume what it knows about the ipvs state is accurate.
+ """
+ if self.check_state:
+ self._check_state()
+
+ def _check_state(self):
+ # These are all blocking calls
+ if not self.service_ready():
+ # TODO: add a "waiting" state?
+ self.currentState = self.states['unknown']
+ return
+
+ dests = self.client.get_dests(self.service.to_attr_list())
+ curState = self.states['unknown']
+ for dest in dests:
+ dest.service = self.service
+ if not self.equals(dest):
+ continue
+ w = dest.weight()
+ if w == self.weight():
+ # Server is present with its predefined weight
+ curState = self.states['up']
+ elif w == 0:
+ # Server is present but drained
+ curState = self.states['drained']
+ else:
+ # Server is present but needs a change in weight
+ curState = self.states['refresh']
+ self.currentState = curState
+
+ def service_ready(self):
+ self.service.checkActualState()
+ return (self.service.currentState.name == 'present')
+
+ def _del(self):
+ if not self.service_ready():
+ raise IpvsError("Tried to delete a server from service %s but "
+ "the service is not present",
+ self.service)
+ self.client.del_dest(self.service.vip(), self.port_, self.ip_,
+ protocol=self.service.proto())
+
+ def _add(self, weight=None):
+ if not self.service_ready():
+ raise IpvsError("Tried to add a server to service %s but "
+ "the service is not present",
+ self.service)
+ if weight is None:
+ weight = self.weight_
+ self.client.add_dest(self.service.vip(), self.port_, self.ip_,
+ protocol=self.service.proto(),
+ weight=weight, method=self.fwd_method_)
+
+ def _add_weight_zero(self):
+ return self._add(weight=0)
+
+ def _drain(self):
+ self._set(weight=0)
+
+ def _set(self, weight=None):
+ if not self.service_ready():
+ raise IpvsError("Tried to modify server of service %s but "
+ "the service is not present",
+ self.service)
+ if weight is None:
+ weight = self.weight_
+ if self.currentState.name == 'unknown':
+ return self._add(weight=weight)
+ self.client.update_dest(self.service.vip(), self.port_, self.ip_,
+ protocol=self.service.proto(),
+ weight=weight, method=self.fwd_method_)
+
+ def equals(self, server):
+ return (self.service.equals(server.service) and
+ self.ip() == server.ip() and self.port() == server.port())
+
+ def __eq__(self, server):
+ return (self.equals(server) and
+ self.currentState == server.currentState and
+ self.weight() == server.weight())
+
+ def __str__(self):
+ return "%s:%d with weight %d" % (self.ip_, self.port_, self.weight_)
diff --git a/pybal/ipvs/service.py b/pybal/ipvs/service.py
new file mode 100644
index 0000000..77c0791
--- /dev/null
+++ b/pybal/ipvs/service.py
@@ -0,0 +1,115 @@
+from __future__ import absolute_import
+
+import socket
+
+from gnlpy import ipvs as netlink
+import pybal.fsm
+from pybal.util import log
+
+fsm = pybal.fsm
+
+
+class Service(netlink.Service, fsm.FiniteStateMachine):
+ """
+ Handler for ipvs service using netlink
+ """
+ _protocols = {
+ 'tcp': socket.IPPROTO_TCP,
+ 'udp': socket.IPPROTO_UDP
+ }
+
+ check_state = True
+
+ @staticmethod
+ def from_tuple(service):
+ return {
+ 'proto': service[0],
+ 'vip': service[1],
+ 'port': service[2],
+ 'sched': service[3] if len(service) > 3 else 'rr'
+ }
+
+ def __init__(self, client, service):
+ s = Service.from_tuple(service)
+ self.client = client
+ netlink.Service.__init__(self, s, validate=True)
+ fsm.FiniteStateMachine.__init__(self)
+ self.setUpFSM()
+ # Read the state of this
+ self._check_state()
+
+ def setUpFSM(self):
+ # S0: the service is unknown/deleted
+ self.addState(fsm.State('unknown'))
+ # S1: the service is present
+ self.addState(fsm.State('present'))
+ # S0 => S1
+ self.addTransition(self.states['unknown'],
+ self.states['present'],
+ self._add,
+ self.logError)
+ self.addTransition(self.states['present'],
+ self.states['unknown'],
+ self._del,
+ self.logError)
+ self.currentState = self.states['unknown']
+
+ def checkActualState(self):
+ """
+ Check the actual state of the fsm, and assign it to
+ self.currentState
+
+ This is needed if we want to make sure what pybal knows about the IPVS
+ state still corresponds to what is actually registered in the kernel.
+
+ Multiple things can change the state, most notably someone using
"ipvsadm"
+ by hand.
+
+ If you set the config variable 'check_ipvs_state' to false, the
+ (quite expensive) checks here will not be performed, and pybal will
+ assume what it knows about the ipvs state is accurate.
+ """
+ if self.check_state:
+ self._check_state()
+
+ def _check_state(self):
+ """
+ Check the actual state of the fsm, and assign it to
+ self.currentState
+
+ This is used during initialization to fetch the initial state
+ """
+ # Blocking call
+ s = self.client.get_service(self.to_attr_list())
+ if s is not None and self.equals(s):
+ actualState = self.states['present']
+ else:
+ actualState = self.states['unknown']
+ self.currentState = actualState
+
+ def _add(self):
+ log.debug("Adding service %s" % self)
+ self.client.add_service(self.vip_, self.port_,
+ protocol=self.proto_num(),
+ sched_name=self.sched_)
+
+ def _del(self):
+ if self.currentState == self.states['unknown']:
+ log.info("Not removing already absent service %s" % self)
+ return
+
+ log.debug("Removing service %s" % self)
+ self.client.del_service(self.vip_, self.port_,
+ protocol=self.proto_num())
+
+ def equals(self, srv):
+ return (self.proto_ == srv.proto_ and self.vip() == srv.vip()
+ and self.port() == srv.port() and self.sched() == srv.sched())
+
+ def __eq__(self, srv):
+ return (self.equals(srv) and self.currentState == srv.currentState)
+
+ def __str__(self):
+ return "'%s://%s:%d' with scheduler %s" % (self.proto_,
+ self.vip_, self.port_,
+ self.sched_)
diff --git a/pybal/pybal.py b/pybal/pybal.py
index 1521222..59a869f 100755
--- a/pybal/pybal.py
+++ b/pybal/pybal.py
@@ -14,6 +14,8 @@
import os, sys, signal, socket, random
import logging
from pybal import ipvs, util, config, etcd, instrumentation
+import pybal.ipvs.service
+import pybal.ipvs.server
from twisted.python import failure
from twisted.internet import reactor, defer
@@ -601,9 +603,9 @@
configdict.update(cliconfig)
if section != 'global':
- services[section] = ipvs.LVSService(section, cfgtuple,
configuration=configdict)
+ services[section] = ipvs.get_service(section, cfgtuple,
configdict)
crd = Coordinator(services[section],
- configUrl=config.get(section, 'config'))
+ configUrl=config.get(section, 'config'))
log.info("Created LVS service '{}'".format(section))
instrumentation.PoolsRoot.addPool(crd.lvsservice.name, crd)
@@ -614,6 +616,11 @@
configdict = util.ConfigDict()
configdict.update(cliconfig)
+ # ipvs state should be checked by the FSMs or not?
+ check_state = configdict.getboolean('check_ipvs_state', True)
+ pybal.ipvs.server.Server.check_state = check_state
+ pybal.ipvs.server.Servie.check_state = check_state
+
# Set the logging level
if configdict.get('debug', False):
util.PyBalLogObserver.level = logging.DEBUG
diff --git a/pybal/test/__init__.py b/pybal/test/__init__.py
index 5806cf3..013a9bc 100644
--- a/pybal/test/__init__.py
+++ b/pybal/test/__init__.py
@@ -9,7 +9,7 @@
# flake8: noqa
from .test_config import *
-from .test_ipvs import *
from .test_monitor import *
from .test_util import *
from .test_instrumentation import *
+from .test_ipvs import *
diff --git a/pybal/test/fixtures.py b/pybal/test/fixtures.py
index 8f584de..12a0d4b 100644
--- a/pybal/test/fixtures.py
+++ b/pybal/test/fixtures.py
@@ -8,11 +8,16 @@
"""
import unittest
+import mock
+
import pybal.util
import twisted.test.proto_helpers
import twisted.trial.unittest
from twisted.internet import defer
+from gnlpy import ipvs as netlink
+
+from pybal.ipvs import service, server
class ServerStub(object):
"""Test stub for `pybal.Server`."""
@@ -115,3 +120,42 @@
self.server = ServerStub(self.host, self.ip, self.port,
lvsservice=self.lvsservice)
self.reactor = twisted.test.proto_helpers.MemoryReactor()
+
+
+class IpvsTestCase(twisted.trial.unittest.TestCase):
+
+ def getMockClient(self):
+ c = mock.MagicMock(spec=netlink.IpvsClient)
+ return c
+
+ def getService(self):
+ # currentState will be overridden by the actual state
+ t = ('tcp', '192.168.1.1', 80)
+ s = service.Service(self.client, t)
+ s.currentState = s.states['present']
+ return s
+
+ def getServer(self, srv=None):
+ if srv is None:
+ srv = self.getLogicalServers()[0]
+ s = self.getService()
+ return server.Server(self.client, srv, s)
+
+ def getPool(self, dest_range=range(3)):
+ serv = service.Service(self.client, ('tcp', '192.168.1.1', 80))
+ dests = []
+ for i in dest_range:
+ d = netlink.Dest({'ip': '10.0.0.%d' % (i + 1), 'port': 80,
'weight': 10})
+ dests.append(d)
+ return {'service': serv, 'dests': dests}
+
+ def getLogicalServers(self):
+ s = [
+ ServerStub('www1.local', ip='10.0.0.1', port=80, weight=10),
+ ServerStub('www2.local', ip='10.0.0.2', port=80, weight=10),
+ ServerStub('www3.local', ip='10.0.0.3', port=80, weight=10)
+ ]
+ for srv in s:
+ srv.up = True
+ srv.pooled = True
+ return s
diff --git a/pybal/test/test_ipvs_manager.py b/pybal/test/test_ipvs_manager.py
new file mode 100644
index 0000000..1034753
--- /dev/null
+++ b/pybal/test/test_ipvs_manager.py
@@ -0,0 +1,25 @@
+import mock
+
+from twisted.trial import unittest
+
+from pybal.ipvs import manager
+
+
+class TestIpvsServiceManager(unittest.TestCase):
+
+ def setUp(self):
+ self.service = ('tcp', '192.168.1.1', 6379, 'wrr')
+ self.conf = mock.MagicMock()
+
+ @mock.patch('pybal.ipvs.manager.IpvsClient')
+ def testInit(self, nl_mock):
+ self.conf.getboolean.return_value = False
+ with mock.patch('pybal.ipvs.service.Service'):
+ mgr = manager.NetlinkServiceManager('foo', self.service, self.conf)
+ self.assertEqual(mgr.name, 'foo')
+ self.assertEqual(mgr.servers, set())
+ self.assertEqual(mgr.destinations, {})
+ self.assertEqual(mgr.protocol, 'tcp')
+ self.assertEqual(mgr.ip, '192.168.1.1')
+ self.assertEqual(mgr.port, 6379)
+ self.assertEqual(mgr.scheduler, 'wrr')
diff --git a/pybal/test/test_ipvs_server.py b/pybal/test/test_ipvs_server.py
new file mode 100644
index 0000000..5b7e85f
--- /dev/null
+++ b/pybal/test/test_ipvs_server.py
@@ -0,0 +1,262 @@
+from pybal.ipvs import server
+from pybal.test.fixtures import ServerStub, IpvsTestCase
+
+
+def trapValueError(f):
+ f.trap(ValueError)
+ return None
+
+
+def callbackFailure(_):
+ raise Exception("Not raised the exceptio")
+
+
+class IpvsServerTestCase(IpvsTestCase):
+
+ def setUp(self):
+ self.client = self.getMockClient()
+ self.server = self.getServer()
+
+ def testInit(self):
+ # An invalid IP address causes an exception
+ s = ServerStub('www1.local', '10.0.0,1', 80, 10)
+ self.assertRaises(AssertionError, server.Server, self.client, s,
+ self.getService())
+ # A non-numeric port causes an exception
+ s = ServerStub('www1.local', '10.0.0.1', '80', 10)
+ self.assertRaises(AssertionError, server.Server, self.client, s,
+ self.getService())
+
+ # A non-present weight defaults to 1
+ s = ServerStub('www1.local', '10.0.0.1', 80)
+ srv = server.Server(self.client, s, self.getService())
+ self.assertEquals(srv.weight_, 1)
+
+ # The FSM has been set up
+ states = ['unknown', 'drained', 'up', 'refresh']
+ for state in states:
+ self.assertIn(state, srv.states)
+
+ # Status should be 'unkown' since we have no service defined
+ srv.checkActualState()
+ self.assertEquals(srv.currentState.name, 'unknown')
+
+ def testCheckActualState(self):
+ # All states behave like expected
+ # S0
+ self.client.get_service.return_value = None
+ self.server.checkActualState()
+ self.assertEquals(self.server.currentState.name, 'unknown')
+ p = self.getPool()
+ self.client.get_service.return_value = p['service']
+ self.client.get_dests.return_value = []
+ self.server.checkActualState()
+ self.assertEquals(self.server.currentState.name, 'unknown')
+ # S1
+ dests = p['dests']
+ dests[0].weight_ = 0
+ self.client.get_dests.return_value = dests
+ # if we set check_service to false, checkActualState() won't do
anything.
+ self.server.check_state = False
+ self.server.checkActualState()
+ self.assertEqual(self.server.currentState.name, 'unknown')
+ self.server.check_state = True
+ self.server.checkActualState()
+ self.assertEquals(self.server.currentState.name, 'drained')
+ # S2
+ dests[0].weight_ = self.server.weight()
+ self.server.checkActualState()
+ self.assertEquals(self.server.currentState.name, 'up')
+ # S3
+ dests[0].weight_ = 54
+ self.server.checkActualState()
+ self.assertEquals(self.server.currentState.name, 'refresh')
+ # If the dests request raises an exception, it doesn't get caught
+ self.client.get_dests.side_effect = Exception('Boom!')
+ with self.assertRaises(Exception):
+ self.server.checkActualState()
+
+ def testTransitionFromUnknown(self):
+ # Initial state: unknown
+ p = self.getPool()
+ dests = p['dests']
+ myDest = dests[0]
+ self.client.get_service.return_value = p['service']
+ self.client.get_dests.return_value = []
+ # To "up"
+ d = self.server.toState('up')
+ d.addCallback(
+ lambda _: self.client.add_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ weight=self.server.weight(),
+ method=self.server.fwd_method(),
+ protocol=self.server.service.proto(),
+ )
+ )
+ d.addCallback(
+ lambda _:
+ self.assertEquals(self.server.currentState.name, 'up')
+ )
+ # To "drained"
+ d = self.server.toState('drained')
+ d.addCallback(
+ lambda _: self.client.add_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ weight=0,
+ method=self.server.fwd_method(),
+ protocol=self.server.service.proto(),
+ )
+ )
+ d.addCallback(
+ lambda _:
+ self.assertEquals(self.server.currentState.name, 'drained')
+ )
+ # To "referesh" doesn't exist
+ r = self.server.toState('refresh')
+ r.addCallbacks(callbackFailure, trapValueError)
+
+ def testTransitionFromDrained(self):
+ p = self.getPool()
+ dests = p['dests']
+ myDest = dests[0]
+ myDest.weight_ = 0
+ self.client.get_service.return_value = p['service']
+ self.client.get_dests.return_value = dests
+ # To unknown
+ d = self.server.toState('unknown')
+ d.addCallback(self.assertEquals, self.server.states['unknown'])
+ d.addCallback(
+ lambda _: self.client.del_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto()
+ )
+ )
+ # To up
+ d = self.server.toState('up')
+ d.addCallback(self.assertEquals, self.server.states['up'])
+ d.addCallback(
+ lambda _: self.client.update_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto(),
+ weight=self.server.weight(),
+ method=self.server.fwd_method(),
+ )
+ )
+ # To "referesh" doesn't exist
+ r = self.server.toState('refresh')
+ r.addCallbacks(callbackFailure, trapValueError)
+
+ def testTransitionFromUp(self):
+ p = self.getPool()
+ dests = p['dests']
+ myDest = dests[0]
+ self.client.get_service.return_value = p['service']
+ self.client.get_dests.return_value = dests
+ # To unknown
+ d = self.server.toState('unknown')
+ d.addCallback(self.assertEquals, self.server.states['unknown'])
+ d.addCallback(
+ lambda _: self.client.del_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto()
+ )
+ )
+ # To drained
+ d = self.server.toState('drained')
+ d.addCallback(self.assertEquals, self.server.states['drained'])
+ d.addCallback(
+ lambda _: self.client.update_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto(),
+ weight=0,
+ method=self.server.fwd_method(),
+ )
+ )
+ # To "referesh" doesn't exist
+ r = self.server.toState('refresh')
+ r.addCallbacks(callbackFailure, trapValueError)
+
+
+ def testTransitionFromRefresh(self):
+ p = self.getPool()
+ dests = p['dests']
+ myDest = dests[0]
+ myDest.weight_ = 147
+ self.client.get_service.return_value = p['service']
+ self.client.get_dests.return_value = dests
+ # to unknown
+ d = self.server.toState('unknown')
+ d.addCallback(self.assertEquals, self.server.states['unknown'])
+ d.addCallback(
+ lambda _: self.client.del_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto()
+ )
+ )
+ # to up
+ d= self.server.toState('up')
+ d.addCallback(self.assertEquals, self.server.states['up'])
+ d.addCallback(
+ lambda _: self.client.update_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto(),
+ weight=self.server.weight(),
+ method=self.server.fwd_method(),
+ )
+ )
+ # to drained
+ d= self.server.toState('drained')
+ d.addCallback(self.assertEquals, self.server.states['drained'])
+ d.addCallback(
+ lambda _: self.client.update_dest.assert_called_with(
+ self.server.service.vip(),
+ self.server.service.port(),
+ self.server.ip(),
+ protocol=self.server.service.proto(),
+ weight=0,
+ method=self.server.fwd_method(),
+ )
+ )
+
+
+ def testTransitionCornerCases(self):
+ # From unknown
+ p = self.getPool()
+ dests = p['dests']
+ myDest = dests[0]
+ self.client.get_service.return_value = p['service']
+ self.client.get_dests.return_value = []
+ ## If the service is absent, the transaction fails
+ self.client.get_service.return_value = None
+ d = self.server.toState('up')
+ d.addCallback(self.assertEquals, None)
+ ## If the add_dest method returns an exception, the transaction fails
+ self.client.add_dest.side_effect = Exception('Poww!')
+ self.client.get_service.return_value = p['service']
+ d = self.server.toState('up')
+ d.addCallback(self.assertEquals, None)
+ d.addCallback(lambda _: self.assertEquals(
+ self.server.currentState.name, 'unknown')
+ )
+ # Call from the already 'present' state should not be doing anything.
+ self.client.get_dests.return_value = dests
+ self.client.reset_mock()
+ d = self.server.toState('up')
+ d.addCallback(
+ lambda _: self.client.update_dest.assert_not_called())
diff --git a/pybal/test/test_ipvs_service.py b/pybal/test/test_ipvs_service.py
new file mode 100644
index 0000000..9882275
--- /dev/null
+++ b/pybal/test/test_ipvs_service.py
@@ -0,0 +1,118 @@
+import mock
+import socket
+
+from gnlpy import ipvs as netlink
+
+from pybal.ipvs import service
+from pybal.test import fixtures
+
+
+class IpvsServiceTestCase(fixtures.IpvsTestCase):
+
+ def getPool(self):
+ serv = service.Service.from_tuple(('tcp', '192.168.1.1', 6379))
+ return netlink.Pool({'service': serv, 'dests': []})
+
+ def testFromTuple(self):
+ tup = ('proto', 'vip', 'port', 'sched')
+ s = service.Service.from_tuple(tup)
+ for k, v in s.items():
+ self.assertEquals(k, v)
+ # Default to round-robin scheduling
+ tup = ('proto', 'vip', 'port')
+ s = service.Service.from_tuple(tup)
+ self.assertEquals(s['sched'], 'rr')
+
+ def testInit(self):
+ c = self.getMockClient()
+ # An invalid tuple raises an exception
+ t = ('tcp', '192.168,0,0', 6379)
+ self.assertRaises(AssertionError, service.Service, c, t)
+ # A correct initialization will set up all the service variables
+ t = ('tcp', '192.168.1.1', 6379)
+ s = service.Service(c, t)
+ self.assertEquals(s.proto_, 'tcp')
+ self.assertEquals(s.vip_, '192.168.1.1')
+ self.assertEquals(s.port_, 6379)
+ self.assertEquals(s.sched_, 'rr')
+ # The FSM has been set up
+ self.assertIn('unknown', s.states)
+ self.assertIn('present', s.states)
+ self.assertIn('present', s.states['unknown'].transitions)
+ self.assertIn('unknown', s.states['present'].transitions)
+ # State should be 'unknown' given we gave no answer
+ self.assertEquals(s.currentState, s.states['unknown'])
+
+ def testCheckActualState(self):
+ c = self.getMockClient()
+ # currentState will be overridden by the actual state
+ t = ('tcp', '192.168.1.1', 6379)
+ s = service.Service(c, t)
+ s.currentState = s.states['present']
+ s.checkActualState()
+ self.assertEquals(s.currentState.name, 'unknown')
+ # if the pool is present, the actual state will become present
+ c.get_service.return_value = s
+ # If not required, checkActualState will do nothing
+ s.check_state = False
+ s.checkActualState()
+ self.assertEqual(s.currentState.name, 'unknown')
+ s.check_state = True
+ s.checkActualState()
+ self.assertEquals(s.currentState.name, 'present')
+ # If the request for pools raises an exception,
+ # do not try to catch it.
+ with self.assertRaises(Exception):
+ c.get_service.side_effect = Exception('something!')
+ s.checkActualState()
+
+ def testTransitionToPresent(self):
+ # First situation: service is not present, we transition to s1
(present)
+ c = self.getMockClient()
+ t = ('tcp', '192.168.1.1', 6379)
+ s = service.Service(c, t)
+ c.get_service.return_value = None
+ d = s.toState('present')
+ d.addCallback(self.assertEqual, s.states['present'])
+ d.addCallback(lambda _: s.client.add_service.assert_called_with(
+ '192.168.1.1', 6379, protocol=socket.IPPROTO_TCP,
+ sched_name='rr'))
+ # Check the current state has been set
+ print s.currentState.name
+ assert s.currentState == s.states['present']
+
+ def testTransitionToUnknown(self):
+ c = self.getMockClient()
+ t = ('tcp', '192.168.1.1', 6379)
+ s = service.Service(c, t)
+ c.get_service.return_value = s
+ s.checkActualState()
+ assert s.currentState == s.states['present']
+ d = s.toState('unknown')
+ d.addCallback(self.assertEqual, s.states['unknown'])
+ d.addCallback(lambda _: c.del_service.assert_called_with(
+ '192.168.1.1', 6379, protocol=socket.IPPROTO_TCP))
+ # Now let's try to move to "unknown" again, nothing will happen
+ c.get_service.return_value = None
+ c.del_service.reset_mock()
+ d1 = s.toState('unknown')
+ d1.addCallback(lambda _: c.del_service.assert_not_called())
+
+ def testFaultyTransaction(self):
+ c = self.getMockClient()
+ t = ('tcp', '192.168.1.1', 6379)
+ s = service.Service(c, t)
+ c.get_service.return_value = s
+ # Simulate the netlink layer failing
+ c.del_service.side_effect = RuntimeError('welcome to the terrordome!')
+ # Failure doesn't propagate outside of the fsm
+ try:
+ d = s.toState('unknown')
+ except Exception as e:
+ self.fail("Unhandled failure in state transition, %s" % e)
+
+ # Verify that the deferred returns None in this case
+ d.addCallback(self.assertEquals, None)
+ # State is still present
+ d.addCallback(lambda _: self.assertEquals(s.currentState.name,
+ 'present'))
diff --git a/setup.py b/setup.py
index acbcfc1..b8e53f9 100644
--- a/setup.py
+++ b/setup.py
@@ -15,7 +15,7 @@
setup(
name='PyBal',
- version='1.12',
+ version='1.14-dev',
license='GPLv2+',
author='Mark Bergsma',
author_email='[email protected]',
@@ -42,7 +42,8 @@
),
requires=(
'twisted',
- 'PyOpenSSL'
+ 'PyOpenSSL',
+ 'gnlpy(>=1.2.0)',
),
tests_require=(
'mock',
diff --git a/tox.ini b/tox.ini
index 3de1bc3..02234ba 100644
--- a/tox.ini
+++ b/tox.ini
@@ -10,6 +10,7 @@
twisted
mock
PyOpenSSL
+ gnlpy
[testenv:cover]
commands =
--
To view, visit https://gerrit.wikimedia.org/r/355082
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4ae6b61571f37a7a47fa83eb549d381976318ca8
Gerrit-PatchSet: 1
Gerrit-Project: operations/debs/pybal
Gerrit-Branch: 2.0-dev
Gerrit-Owner: Giuseppe Lavagetto <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits