Ema has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/404713 )
Change subject: Separate out coordinator.Server into its own module ...................................................................... Separate out coordinator.Server into its own module Change-Id: I925f0abe553cd82603014cd8a0760be8bc8ad880 --- M pybal/coordinator.py A pybal/server.py M pybal/test/test_coordinator.py M pybal/test/test_server.py 4 files changed, 277 insertions(+), 264 deletions(-) Approvals: Ema: Looks good to me, approved jenkins-bot: Verified diff --git a/pybal/coordinator.py b/pybal/coordinator.py index a4b4e16..b7b3591 100755 --- a/pybal/coordinator.py +++ b/pybal/coordinator.py @@ -6,270 +6,14 @@ LVS Squid balancer/monitor for managing the Wikimedia Squid servers using LVS """ -import importlib -import random -import socket -from twisted.internet import defer, reactor -from twisted.names import client, dns -from twisted.python import failure +from twisted.internet import defer from pybal import config, util from pybal.metrics import Counter, Gauge +import pybal.server log = util.log - - -class Server: - """ - Class that maintains configuration and state of a single (real)server - """ - - # Defaults - DEF_STATE = True - DEF_WEIGHT = 10 - - # Set of attributes allowed to be overridden in a server list - allowedConfigKeys = [ ('host', str), ('weight', int), ('enabled', bool) ] - - def __init__(self, host, lvsservice, addressFamily=None): - """Constructor""" - - self.host = host - self.lvsservice = lvsservice - if addressFamily: - self.addressFamily = addressFamily - else: - self.addressFamily = (':' in self.lvsservice.ip) and socket.AF_INET6 or socket.AF_INET - self.ip = None - self.port = 80 - self.ip4_addresses = set() - self.ip6_addresses = set() - self.monitors = set() - - # A few invariants that SHOULD be maintained (but currently may not be): - # P0: pooled => enabled /\ ready - # P1: up => pooled \/ !enabled \/ !ready - # P2: pooled => up \/ !canDepool - - self.weight = self.DEF_WEIGHT - self.up = False - self.pooled = False - self.enabled = True - self.ready = False - self.modified = None - - def __eq__(self, other): - return isinstance(other, Server) and self.host == other.host and self.lvsservice == other.lvsservice - - def __hash__(self): - return hash(self.host) - - def addMonitor(self, monitor): - """Adds a monitor instance to the set""" - - self.monitors.add(monitor) - - def removeMonitors(self): - """Removes all monitors""" - - for monitor in self.monitors: - monitor.stop() - - self.monitors.clear() - - def resolveHostname(self): - """Attempts to resolve the server's hostname to an IP address for better reliability.""" - - timeout = [1, 2, 5] - lookups = [] - - query = dns.Query(self.host, dns.A) - lookups.append(client.lookupAddress(self.host, timeout - ).addCallback(self._lookupFinished, socket.AF_INET, query)) - - query = dns.Query(self.host, dns.AAAA) - lookups.append(client.lookupIPV6Address(self.host, timeout - ).addCallback(self._lookupFinished, socket.AF_INET6, query)) - - return defer.DeferredList(lookups).addBoth(self._hostnameResolved) - - def _lookupFinished(self, (answers, authority, additional), addressFamily, query): - ips = set([socket.inet_ntop(addressFamily, r.payload.address) - for r in answers - if r.name == query.name and r.type == query.type]) - - if query.type == dns.A: - self.ip4_addresses = ips - elif query.type == dns.AAAA: - self.ip6_addresses = ips - - # TODO: expire TTL - #if self.ip: - # minTTL = min([r.ttl for r in answers - # if r.name == query.name and r.type == query.type]) - - return ips - - def _hostnameResolved(self, result): - # Pick *1* main ip address to use. Prefer any existing one - # if still available. - - addr = " ".join( - list(self.ip4_addresses) + list(self.ip6_addresses)) - msg = "Resolved {} to addresses {}".format(self.host, addr) - log.debug(msg) - - ip_addresses = { - socket.AF_INET: - self.ip4_addresses, - socket.AF_INET6: - self.ip6_addresses - }[self.addressFamily] - - try: - if not self.ip or self.ip not in ip_addresses: - self.ip = random.choice(list(ip_addresses)) - # TODO: (re)pool - except IndexError: - return failure.Failure() # TODO: be more specific? - else: - return True - - def destroy(self): - self.enabled = False - self.removeMonitors() - - def initialize(self, coordinator): - """ - Initializes this server instance and fires a Deferred - when ready for use (self.ready == True) - """ - - d = self.resolveHostname() - - return d.addCallbacks(self._ready, self._initFailed, callbackArgs=[coordinator]) - - def _ready(self, result, coordinator): - """ - Called when initialization has finished. - """ - - self.ready = True - self.up = self.DEF_STATE - self.pooled = self.DEF_STATE - self.maintainState() - - self.createMonitoringInstances(coordinator) - - return True - - def _initFailed(self, fail): - """ - Called when initialization failed - """ - log.error("Initialization failed for server {}".format(self.host)) - - assert self.ready == False - self.maintainState() - - return False # Continue on success callback chain - - def createMonitoringInstances(self, coordinator): - """Creates and runs monitoring instances for this Server""" - - lvsservice = self.lvsservice - - try: - monitorlist = eval(lvsservice.configuration['monitors']) - except KeyError: - log.critical( - "LVS service {} does not have a 'monitors' configuration option set.".format( - lvsservice.name) - ) - raise - - if type(monitorlist) != list: - msg = "option 'monitors' in LVS service section {} is not a python list" - log.err(msg.format(lvsservice.name)) - else: - for monitorname in monitorlist: - try: - monitormodule = importlib.import_module( - "pybal.monitors.{}".format(monitorname.lower())) - except ImportError: - log.err("Monitor {} does not exist".format(monitorname)) - except Exception: - log.critical("Cannot import pybal.monitors.{}".format(monitorname)) - # An exception was raised importing the given monitor - # module. Instead of just logging the problem, stop PyBal - # as the admin might think everything is fine and all - # checks are green, while in fact no check is being - # performed. - reactor.stop() - else: - monitorclass = getattr(monitormodule, monitorname + 'MonitoringProtocol') - monitor = monitorclass(coordinator, self, lvsservice.configuration) - self.addMonitor(monitor) - monitor.run() - - def calcStatus(self): - """AND quantification of monitor.up over all monitoring instances of a single Server""" - - # Global status is up iff all monitors report up - return reduce(lambda b,monitor: b and monitor.up, self.monitors, len(self.monitors) != 0) - - def calcPartialStatus(self): - """OR quantification of monitor.up over all monitoring instances of a single Server""" - - # Partial status is up iff one of the monitors reports up - return reduce(lambda b,monitor: b or monitor.up, self.monitors, len(self.monitors) == 0) - - def textStatus(self): - return "%s/%s/%s" % (self.enabled and "enabled" or "disabled", - self.up and "up" or (self.calcPartialStatus() and "partially up" or "down"), - self.pooled and "pooled" or "not pooled") - - def maintainState(self): - """Maintains a few invariants on configuration changes""" - - # P0 - if not self.enabled or not self.ready: - self.pooled = False - # P1 - if not self.pooled and self.enabled: - self.up = False - - def merge(self, configuration): - """Merges in configuration from a dictionary of (allowed) attributes""" - - for key, value in configuration.iteritems(): - if (key, type(value)) not in self.allowedConfigKeys: - del configuration[key] - - # Overwrite configuration - self.__dict__.update(configuration) - self.maintainState() - self.modified = True # Indicate that this instance previously existed - - def dumpState(self): - """Dump current state of the server""" - return {'pooled': self.pooled, 'weight': self.weight, - 'up': self.up, 'enabled': self.enabled} - - @classmethod - def buildServer(cls, hostName, configuration, lvsservice): - """ - Factory method which builds a Server instance from a - dictionary of (allowed) configuration attributes - """ - - server = cls(hostName, lvsservice) # create a new instance... - server.merge(configuration) # ...and override attributes - server.modified = False - - return server - class Coordinator: """ @@ -478,7 +222,7 @@ ) else: # New server - server = Server.buildServer(hostName, hostConfig, self.lvsservice) + server = pybal.server.Server.buildServer(hostName, hostConfig, self.lvsservice) data = {'status': (server.enabled and "enabled" or "disabled"), 'host': hostName, 'weight': server.weight} # Initialize with LVS service specific configuration diff --git a/pybal/server.py b/pybal/server.py new file mode 100644 index 0000000..48b35fc --- /dev/null +++ b/pybal/server.py @@ -0,0 +1,268 @@ +""" +PyBal +Copyright (C) 2006-2018 by Mark Bergsma <m...@nedworks.org> + +LVS balancer/monitor +""" + +import importlib +import random +import socket + +from twisted.internet import defer, reactor +from twisted.names import client, dns +from twisted.python import failure + +from pybal import util + +log = util.log + +class Server: + """ + Class that maintains configuration and state of a single (real)server + """ + + # Defaults + DEF_STATE = True + DEF_WEIGHT = 10 + + # Set of attributes allowed to be overridden in a server list + allowedConfigKeys = [ ('host', str), ('weight', int), ('enabled', bool) ] + + def __init__(self, host, lvsservice, addressFamily=None): + """Constructor""" + + self.host = host + self.lvsservice = lvsservice + if addressFamily: + self.addressFamily = addressFamily + else: + self.addressFamily = (':' in self.lvsservice.ip) and socket.AF_INET6 or socket.AF_INET + self.ip = None + self.port = 80 + self.ip4_addresses = set() + self.ip6_addresses = set() + self.monitors = set() + + # A few invariants that SHOULD be maintained (but currently may not be): + # P0: pooled => enabled /\ ready + # P1: up => pooled \/ !enabled \/ !ready + # P2: pooled => up \/ !canDepool + + self.weight = self.DEF_WEIGHT + self.up = False + self.pooled = False + self.enabled = True + self.ready = False + self.modified = None + + def __eq__(self, other): + return isinstance(other, Server) and self.host == other.host and self.lvsservice == other.lvsservice + + def __hash__(self): + return hash(self.host) + + def addMonitor(self, monitor): + """Adds a monitor instance to the set""" + + self.monitors.add(monitor) + + def removeMonitors(self): + """Removes all monitors""" + + for monitor in self.monitors: + monitor.stop() + + self.monitors.clear() + + def resolveHostname(self): + """Attempts to resolve the server's hostname to an IP address for better reliability.""" + + timeout = [1, 2, 5] + lookups = [] + + query = dns.Query(self.host, dns.A) + lookups.append(client.lookupAddress(self.host, timeout + ).addCallback(self._lookupFinished, socket.AF_INET, query)) + + query = dns.Query(self.host, dns.AAAA) + lookups.append(client.lookupIPV6Address(self.host, timeout + ).addCallback(self._lookupFinished, socket.AF_INET6, query)) + + return defer.DeferredList(lookups).addBoth(self._hostnameResolved) + + def _lookupFinished(self, (answers, authority, additional), addressFamily, query): + ips = set([socket.inet_ntop(addressFamily, r.payload.address) + for r in answers + if r.name == query.name and r.type == query.type]) + + if query.type == dns.A: + self.ip4_addresses = ips + elif query.type == dns.AAAA: + self.ip6_addresses = ips + + # TODO: expire TTL + #if self.ip: + # minTTL = min([r.ttl for r in answers + # if r.name == query.name and r.type == query.type]) + + return ips + + def _hostnameResolved(self, result): + # Pick *1* main ip address to use. Prefer any existing one + # if still available. + + addr = " ".join( + list(self.ip4_addresses) + list(self.ip6_addresses)) + msg = "Resolved {} to addresses {}".format(self.host, addr) + log.debug(msg) + + ip_addresses = { + socket.AF_INET: + self.ip4_addresses, + socket.AF_INET6: + self.ip6_addresses + }[self.addressFamily] + + try: + if not self.ip or self.ip not in ip_addresses: + self.ip = random.choice(list(ip_addresses)) + # TODO: (re)pool + except IndexError: + return failure.Failure() # TODO: be more specific? + else: + return True + + def destroy(self): + self.enabled = False + self.removeMonitors() + + def initialize(self, coordinator): + """ + Initializes this server instance and fires a Deferred + when ready for use (self.ready == True) + """ + + d = self.resolveHostname() + + return d.addCallbacks(self._ready, self._initFailed, callbackArgs=[coordinator]) + + def _ready(self, result, coordinator): + """ + Called when initialization has finished. + """ + + self.ready = True + self.up = self.DEF_STATE + self.pooled = self.DEF_STATE + self.maintainState() + + self.createMonitoringInstances(coordinator) + + return True + + def _initFailed(self, fail): + """ + Called when initialization failed + """ + log.error("Initialization failed for server {}".format(self.host)) + + assert self.ready == False + self.maintainState() + + return False # Continue on success callback chain + + def createMonitoringInstances(self, coordinator): + """Creates and runs monitoring instances for this Server""" + + lvsservice = self.lvsservice + + try: + monitorlist = eval(lvsservice.configuration['monitors']) + except KeyError: + log.critical( + "LVS service {} does not have a 'monitors' configuration option set.".format( + lvsservice.name) + ) + raise + + if type(monitorlist) != list: + msg = "option 'monitors' in LVS service section {} is not a python list" + log.err(msg.format(lvsservice.name)) + else: + for monitorname in monitorlist: + try: + monitormodule = importlib.import_module( + "pybal.monitors.{}".format(monitorname.lower())) + except ImportError: + log.err("Monitor {} does not exist".format(monitorname)) + except Exception: + log.critical("Cannot import pybal.monitors.{}".format(monitorname)) + # An exception was raised importing the given monitor + # module. Instead of just logging the problem, stop PyBal + # as the admin might think everything is fine and all + # checks are green, while in fact no check is being + # performed. + reactor.stop() + else: + monitorclass = getattr(monitormodule, monitorname + 'MonitoringProtocol') + monitor = monitorclass(coordinator, self, lvsservice.configuration) + self.addMonitor(monitor) + monitor.run() + + def calcStatus(self): + """AND quantification of monitor.up over all monitoring instances of a single Server""" + + # Global status is up iff all monitors report up + return reduce(lambda b,monitor: b and monitor.up, self.monitors, len(self.monitors) != 0) + + def calcPartialStatus(self): + """OR quantification of monitor.up over all monitoring instances of a single Server""" + + # Partial status is up iff one of the monitors reports up + return reduce(lambda b,monitor: b or monitor.up, self.monitors, len(self.monitors) == 0) + + def textStatus(self): + return "%s/%s/%s" % (self.enabled and "enabled" or "disabled", + self.up and "up" or (self.calcPartialStatus() and "partially up" or "down"), + self.pooled and "pooled" or "not pooled") + + def maintainState(self): + """Maintains a few invariants on configuration changes""" + + # P0 + if not self.enabled or not self.ready: + self.pooled = False + # P1 + if not self.pooled and self.enabled: + self.up = False + + def merge(self, configuration): + """Merges in configuration from a dictionary of (allowed) attributes""" + + for key, value in configuration.iteritems(): + if (key, type(value)) not in self.allowedConfigKeys: + del configuration[key] + + # Overwrite configuration + self.__dict__.update(configuration) + self.maintainState() + self.modified = True # Indicate that this instance previously existed + + def dumpState(self): + """Dump current state of the server""" + return {'pooled': self.pooled, 'weight': self.weight, + 'up': self.up, 'enabled': self.enabled} + + @classmethod + def buildServer(cls, hostName, configuration, lvsservice): + """ + Factory method which builds a Server instance from a + dictionary of (allowed) configuration attributes + """ + + server = cls(hostName, lvsservice) # create a new instance... + server.merge(configuration) # ...and override attributes + server.modified = False + + return server diff --git a/pybal/test/test_coordinator.py b/pybal/test/test_coordinator.py index 35cede6..b6ab1a9 100644 --- a/pybal/test/test_coordinator.py +++ b/pybal/test/test_coordinator.py @@ -9,6 +9,7 @@ import mock import pybal.coordinator +import pybal.server import pybal.util from twisted.internet.reactor import getDelayedCalls @@ -30,7 +31,7 @@ self.coordinator.lvsservice.getDepoolThreshold = mock.MagicMock( return_value=0.5) - pybal.coordinator.Server.initialize = mock.MagicMock() + pybal.server.Server.initialize = mock.MagicMock() def tearDown(self): self.coordinator.configObserver.reloadTask.stop() diff --git a/pybal/test/test_server.py b/pybal/test/test_server.py index 3eefca4..01da0d9 100644 --- a/pybal/test/test_server.py +++ b/pybal/test/test_server.py @@ -3,7 +3,7 @@ PyBal unit tests ~~~~~~~~~~~~~~~~ - This module contains tests for `pybal.coordinator.Server`. + This module contains tests for `pybal.server.Server`. """ @@ -17,12 +17,12 @@ from .fixtures import PyBalTestCase class ServerTestCase(PyBalTestCase): - """Test case for `pybal.coordinator.Server`.""" + """Test case for `pybal.server.Server`.""" def setUp(self): super(ServerTestCase, self).setUp() - self.server = pybal.coordinator.Server( + self.server = pybal.server.Server( 'example.com', mock.MagicMock()) self.mockMonitor = mock.MagicMock() @@ -124,5 +124,5 @@ hostName=self.exampleConfigDict['host'], configuration=self.exampleConfigDict, lvsservice=mock.MagicMock()) - self.assertTrue(isinstance(server, pybal.coordinator.Server)) + self.assertTrue(isinstance(server, pybal.server.Server)) self.assertFalse(server.modified) -- To view, visit https://gerrit.wikimedia.org/r/404713 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I925f0abe553cd82603014cd8a0760be8bc8ad880 Gerrit-PatchSet: 1 Gerrit-Project: operations/debs/pybal Gerrit-Branch: master Gerrit-Owner: Mark Bergsma <m...@wikimedia.org> Gerrit-Reviewer: Ema <e...@wikimedia.org> Gerrit-Reviewer: Giuseppe Lavagetto <glavage...@wikimedia.org> Gerrit-Reviewer: Volans <rcocci...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits