jenkins-bot has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/354746 )
Change subject: Move BGP classes to bgp.bgp, IP classes to bgp.ip ...................................................................... Move BGP classes to bgp.bgp, IP classes to bgp.ip Split BGP code into bgp.bgp and bgp.ip. Add unit tests for bgp.ip. Change-Id: I4e474adb9177fc934977339266eb7c3b145dae08 --- A pybal/bgp/__init__.py R pybal/bgp/bgp.py A pybal/bgp/ip.py A pybal/bgp/test_ip.py M pybal/pybal.py 5 files changed, 194 insertions(+), 141 deletions(-) Approvals: Mark Bergsma: Looks good to me, approved jenkins-bot: Verified diff --git a/pybal/bgp/__init__.py b/pybal/bgp/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/pybal/bgp/__init__.py diff --git a/pybal/bgp.py b/pybal/bgp/bgp.py similarity index 94% rename from pybal/bgp.py rename to pybal/bgp/bgp.py index 09583d0..cec4e55 100644 --- a/pybal/bgp.py +++ b/pybal/bgp/bgp.py @@ -33,6 +33,8 @@ from twisted import copyright from twisted.internet import reactor, protocol, base, interfaces, error, defer +from ip import IPv4IP, IPv6IP, IPPrefix + # Constants VERSION = 4 PORT = 179 @@ -187,146 +189,6 @@ be initiated. """ -# TODO: Replace by some better third party classes or rewrite -class IPPrefix(object): - """Class that represents an IP prefix""" - - def __init__(self, ipprefix, addressfamily=None): - self.prefix = None # packed ip string - - if isinstance(ipprefix, IPPrefix): - self.prefix, self.prefixlen, self.addressfamily = ipprefix.prefix, ipprefix.prefixlen, ipprefix.addressfamily - elif type(ipprefix) is tuple: - # address family must be specified - if not addressfamily: - raise ValueError() - - self.addressfamily = addressfamily - - prefix, self.prefixlen = ipprefix - if type(prefix) is str: - # tuple (ipstr, prefixlen) - self.prefix = prefix - elif type(prefix) is int: - if self.addressfamily == AFI_INET: - # tuple (ipint, prefixlen) - self.prefix = struct.pack('!I', prefix) - else: - raise ValueError() - else: - # Assume prefix is a sequence of octets - self.prefix = b"".join(map(chr, prefix)) - elif type(ipprefix) is str: - # textual form - prefix, prefixlen = ipprefix.split('/') - self.addressfamily = addressfamily or (':' in prefix and AFI_INET6 or AFI_INET) - - if self.addressfamily == AFI_INET: - self.prefix = b"".join([chr(int(o)) for o in prefix.split('.')]) - elif self.addressfamily == AFI_INET6: - self.prefix = bytearray() - hexlist = prefix.split(":") - if len(hexlist) > 8: - raise ValueError() - - for hexstr in hexlist: - if hexstr is not "": - self.prefix += struct.pack('!H', int(hexstr, 16)) - else: - zeroCount = 8 - len(hexlist) + 1 - self.prefix += struct.pack('!%dH' % zeroCount, *((0,) * zeroCount)) - self.prefix = bytes(self.prefix) - - self.prefixlen = int(prefixlen) - else: - raise ValueError() - - def __repr__(self): - return repr(str(self)) - - def __str__(self): - if self.addressfamily == AFI_INET: - return '.'.join([str(ord(o)) for o in self.packed(pad=True)]) + '/%d' % self.prefixlen - elif self.addressfamily == AFI_INET6: - return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', self.packed(pad=True))]) + '/%d' % self.prefixlen - - def __eq__(self, other): - # FIXME: masked ips - return isinstance(other, IPPrefix) and self.prefixlen == other.prefixlen and self.prefix == other.prefix - - def __ne__(self, other): - return not self.__eq__(other) - - def __lt__(self, other): - return self.prefix < other.prefix or \ - (self.prefix == other.prefix and self.prefixlen < other.prefixlen) - - def __le__(self, other): - return self.__lt__(other) or self.__eq__(other) - - def __gt__(self, other): - return self.prefix > other.prefix or \ - (self.prefix == other.prefix and self.prefixlen > other.prefixlen) - - def __ge__(self, other): - return self.__gt__(other) or self.__eq__(other) - - def __hash__(self): - return hash(self.prefix) ^ hash(self.prefixlen) - - def __len__(self): - return self.prefixlen - - def _packedMaxLen(self): - return (self.addressfamily == AFI_INET6 and 16 or 4) - - def ipToInt(self): - return reduce(lambda x, y: x * 256 + y, map(ord, self.prefix)) - - def netmask(self): - return ~( (1 << (len(self.prefix)*8 - self.prefixlen)) - 1) - - def mask(self, prefixlen, shorten=False): - # DEBUG - assert len(self.prefix) == self._packedMaxLen() - - masklen = len(self.prefix) * 8 - prefixlen - self.prefix = struct.pack('!I', self.ipToInt() >> masklen << masklen) - if shorten: self.prefixlen = prefixlen - return self - - def packed(self, pad=False): - if pad: - return self.prefix + '\0' * (self._packedMaxLen() - len(self.prefix)) - else: - return self.prefix - -class IPv4IP(IPPrefix): - """Class that represents a single non-prefix IPv4 IP.""" - - def __init__(self, ip): - if type(ip) is str and len(ip) > 4: - super(IPv4IP, self).__init__(ip + '/32', AFI_INET) - else: - super(IPv4IP, self).__init__((ip, 32), AFI_INET) - - def __str__(self): - return ".".join([str(ord(o)) for o in self.prefix]) - -class IPv6IP(IPPrefix): - """Class that represents a single non-prefix IPv6 IP.""" - - def __init__(self, ip=None, packed=None): - if not ip and not packed: - raise ValueError() - - if packed: - super(IPv6IP, self).__init__((packed, 128), AFI_INET6) - else: - super(IPv6IP, self).__init__(ip + '/128', AFI_INET6) - - def __str__(self): - return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', self.packed(pad=True))]) class Attribute(object): """ diff --git a/pybal/bgp/ip.py b/pybal/bgp/ip.py new file mode 100644 index 0000000..e63c813 --- /dev/null +++ b/pybal/bgp/ip.py @@ -0,0 +1,149 @@ +# ip.py +# Copyright (c) 2007 by Mark Bergsma <m...@nedworks.org> + +import struct + +# Constants +AFI_INET = 1 +AFI_INET6 = 2 + +# TODO: Replace by some better third party classes or rewrite +class IPPrefix(object): + """Class that represents an IP prefix""" + + def __init__(self, ipprefix, addressfamily=None): + self.prefix = None # packed ip string + + if isinstance(ipprefix, IPPrefix): + self.prefix, self.prefixlen, self.addressfamily = ipprefix.prefix, ipprefix.prefixlen, ipprefix.addressfamily + elif type(ipprefix) is tuple: + # address family must be specified + if not addressfamily: + raise ValueError() + + self.addressfamily = addressfamily + + prefix, self.prefixlen = ipprefix + if type(prefix) is str: + # tuple (ipstr, prefixlen) + self.prefix = prefix + elif type(prefix) is int: + if self.addressfamily == AFI_INET: + # tuple (ipint, prefixlen) + self.prefix = struct.pack('!I', prefix) + else: + raise ValueError() + else: + # Assume prefix is a sequence of octets + self.prefix = b"".join(map(chr, prefix)) + elif type(ipprefix) is str: + # textual form + prefix, prefixlen = ipprefix.split('/') + self.addressfamily = addressfamily or (':' in prefix and AFI_INET6 or AFI_INET) + + if self.addressfamily == AFI_INET: + self.prefix = b"".join([chr(int(o)) for o in prefix.split('.')]) + elif self.addressfamily == AFI_INET6: + self.prefix = bytearray() + hexlist = prefix.split(":") + if len(hexlist) > 8: + raise ValueError() + + for hexstr in hexlist: + if hexstr is not "": + self.prefix += struct.pack('!H', int(hexstr, 16)) + else: + zeroCount = 8 - len(hexlist) + 1 + self.prefix += struct.pack('!%dH' % zeroCount, *((0,) * zeroCount)) + self.prefix = bytes(self.prefix) + + self.prefixlen = int(prefixlen) + else: + raise ValueError() + + def __repr__(self): + return repr(str(self)) + + def __str__(self): + if self.addressfamily == AFI_INET: + return '.'.join([str(ord(o)) for o in self.packed(pad=True)]) + '/%d' % self.prefixlen + elif self.addressfamily == AFI_INET6: + return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', self.packed(pad=True))]) + '/%d' % self.prefixlen + + def __eq__(self, other): + # FIXME: masked ips + return isinstance(other, IPPrefix) and self.prefixlen == other.prefixlen and self.prefix == other.prefix + + def __ne__(self, other): + return not self.__eq__(other) + + def __lt__(self, other): + return self.prefix < other.prefix or \ + (self.prefix == other.prefix and self.prefixlen < other.prefixlen) + + def __le__(self, other): + return self.__lt__(other) or self.__eq__(other) + + def __gt__(self, other): + return self.prefix > other.prefix or \ + (self.prefix == other.prefix and self.prefixlen > other.prefixlen) + + def __ge__(self, other): + return self.__gt__(other) or self.__eq__(other) + + def __hash__(self): + return hash(self.prefix) ^ hash(self.prefixlen) + + def __len__(self): + return self.prefixlen + + def _packedMaxLen(self): + return (self.addressfamily == AFI_INET6 and 16 or 4) + + def ipToInt(self): + return reduce(lambda x, y: x * 256 + y, map(ord, self.prefix)) + + def netmask(self): + return ~( (1 << (len(self.prefix)*8 - self.prefixlen)) - 1) + + def mask(self, prefixlen, shorten=False): + # DEBUG + assert len(self.prefix) == self._packedMaxLen() + + masklen = len(self.prefix) * 8 - prefixlen + self.prefix = struct.pack('!I', self.ipToInt() >> masklen << masklen) + if shorten: self.prefixlen = prefixlen + return self + + def packed(self, pad=False): + if pad: + return self.prefix + '\0' * (self._packedMaxLen() - len(self.prefix)) + else: + return self.prefix + +class IPv4IP(IPPrefix): + """Class that represents a single non-prefix IPv4 IP.""" + + def __init__(self, ip): + if type(ip) is str and len(ip) > 4: + super(IPv4IP, self).__init__(ip + '/32', AFI_INET) + else: + super(IPv4IP, self).__init__((ip, 32), AFI_INET) + + def __str__(self): + return ".".join([str(ord(o)) for o in self.prefix]) + +class IPv6IP(IPPrefix): + """Class that represents a single non-prefix IPv6 IP.""" + + def __init__(self, ip=None, packed=None): + if not ip and not packed: + raise ValueError() + + if packed: + super(IPv6IP, self).__init__((packed, 128), AFI_INET6) + else: + super(IPv6IP, self).__init__(ip + '/128', AFI_INET6) + + def __str__(self): + return ':'.join([hex(o)[2:] for o in struct.unpack('!8H', self.packed(pad=True))]) diff --git a/pybal/bgp/test_ip.py b/pybal/bgp/test_ip.py new file mode 100644 index 0000000..ed70930 --- /dev/null +++ b/pybal/bgp/test_ip.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +""" + bgp.ip unit tests + ~~~~~~~~~~~~~~~~~ + + This module contains tests for `bgp.ip`. + +""" +import ip + +from unittest import TestCase + + +class IPv4IPTestCase(TestCase): + + def testPrefixStr(self): + prefix = ip.IPv4IP('91.198.174.192') + + self.assertEquals(prefix.addressfamily, ip.AFI_INET) + self.assertEquals(prefix.prefixlen, 32) + + def testPrefixInt(self): + prefix = ip.IPv4IP(2130706433) + self.assertEquals(prefix.prefixlen, 32) + self.assertEquals(str(prefix), '127.0.0.1') + + def testPrefixOctets(self): + prefix = ip.IPv4IP((0x7f, 0x0, 0x0, 0x1)) + self.assertEquals(prefix.prefixlen, 32) + self.assertEquals(str(prefix), '127.0.0.1') + +class IPv6IPTestCase(TestCase): + + def testOK(self): + prefix = ip.IPv6IP('2620:0:862:ed1a::1') + + self.assertEquals(prefix.addressfamily, ip.AFI_INET6) + self.assertEquals(prefix.prefixlen, 128) + + def testValueError(self): + with self.assertRaises(ValueError): + ip.IPv6IP() diff --git a/pybal/pybal.py b/pybal/pybal.py index 1521222..968ffd0 100755 --- a/pybal/pybal.py +++ b/pybal/pybal.py @@ -27,7 +27,7 @@ inotify = None try: - from pybal import bgp + from pybal.bgp import bgp except ImportError: pass -- To view, visit https://gerrit.wikimedia.org/r/354746 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4e474adb9177fc934977339266eb7c3b145dae08 Gerrit-PatchSet: 5 Gerrit-Project: operations/debs/pybal Gerrit-Branch: master Gerrit-Owner: Ema <e...@wikimedia.org> Gerrit-Reviewer: Mark Bergsma <m...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits