Signed-off-by: HIYAMA Manabu <[email protected]> --- ryu/tests/unit/packet/test_arp.py | 169 ++++++++++++++++ ryu/tests/unit/packet/test_ipv4.py | 122 ++++++++++++ ryu/tests/unit/packet/test_packet.py | 354 ++++++++++++++++++++++++++++++++++ ryu/tests/unit/packet/test_tcp.py | 142 ++++++++++++++ ryu/tests/unit/packet/test_udp.py | 96 +++++++++ ryu/tests/unit/packet/test_vlan.py | 133 +++++++++++++ tools/test-requires | 1 + 7 files changed, 1017 insertions(+) create mode 100644 ryu/tests/unit/packet/__init__.py create mode 100644 ryu/tests/unit/packet/test_arp.py create mode 100644 ryu/tests/unit/packet/test_ipv4.py create mode 100644 ryu/tests/unit/packet/test_packet.py create mode 100644 ryu/tests/unit/packet/test_tcp.py create mode 100644 ryu/tests/unit/packet/test_udp.py create mode 100644 ryu/tests/unit/packet/test_vlan.py
diff --git a/ryu/tests/unit/packet/__init__.py b/ryu/tests/unit/packet/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ryu/tests/unit/packet/test_arp.py b/ryu/tests/unit/packet/test_arp.py new file mode 100644 index 0000000..8bfbf93 --- /dev/null +++ b/ryu/tests/unit/packet/test_arp.py @@ -0,0 +1,169 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.arp import arp +from ryu.lib.packet.vlan import vlan + + +LOG = logging.getLogger('test_arp') + + +class Test_arp(unittest.TestCase): + """ Test case for arp + """ + + hwtype = 1 + proto = 0x0800 + hlen = 6 + plen = 4 + opcode = 1 + src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54') + src_ip = int(netaddr.IPAddress('24.166.172.1')) + dst_mac = mac.haddr_to_bin('00:00:00:00:00:00') + dst_ip = int(netaddr.IPAddress('24.166.173.159')) + + fmt = arp._PACK_STR + length = struct.calcsize(arp._PACK_STR) + buf = pack(fmt, hwtype, proto, hlen, plen, opcode, src_mac, src_ip, + dst_mac, dst_ip) + + a = arp(hwtype, proto, hlen, plen, opcode, src_mac, src_ip, dst_mac, + dst_ip) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.hwtype, self.a.hwtype) + eq_(self.proto, self.a.proto) + eq_(self.hlen, self.a.hlen) + eq_(self.plen, self.a.plen) + eq_(self.opcode, self.a.opcode) + eq_(self.src_mac, self.a.src_mac) + eq_(self.src_ip, self.a.src_ip) + eq_(self.dst_mac, self.a.dst_mac) + eq_(self.dst_ip, self.a.dst_ip) + eq_(self.length, self.a.length) + + def test_parser(self): + _res = self.a.parser(self.buf) + if type(_res) is tuple: + res = _res[0] + else: + res = _res + + eq_(res.hwtype, self.hwtype) + eq_(res.proto, self.proto) + eq_(res.hlen, self.hlen) + eq_(res.plen, self.plen) + eq_(res.opcode, self.opcode) + eq_(res.src_mac, self.src_mac) + eq_(res.src_ip, self.src_ip) + eq_(res.dst_mac, self.dst_mac) + eq_(res.dst_ip, self.dst_ip) + eq_(res.length, self.length) + + def test_serialize(self): + data = bytearray() + prev = None + buf = self.a.serialize(data, prev) + + fmt = arp._PACK_STR + res = struct.unpack(fmt, buf) + + eq_(res[0], self.hwtype) + eq_(res[1], self.proto) + eq_(res[2], self.hlen) + eq_(res[3], self.plen) + eq_(res[4], self.opcode) + eq_(res[5], self.src_mac) + eq_(res[6], self.src_ip) + eq_(res[7], self.dst_mac) + eq_(res[8], self.dst_ip) + + def _build_arp(self, vlan_enabled): + if vlan_enabled is True: + ethertype = ether.ETH_TYPE_8021Q + v = vlan(1, 1, 3, ether.ETH_TYPE_ARP) + else: + ethertype = ether.ETH_TYPE_ARP + e = ethernet(self.dst_mac, self.src_mac, ethertype) + p = Packet() + + p.add_protocol(e) + if vlan_enabled is True: + p.add_protocol(v) + p.add_protocol(self.a) + p.serialize() + return p + + def test_build_arp_vlan(self): + p = self._build_arp(True) + + e = p.find_protocol("ethernet") + assert e is not None + eq_(e.ethertype, ether.ETH_TYPE_8021Q) + + v = p.find_protocol("vlan") + assert v is not None + eq_(v.ethertype, ether.ETH_TYPE_ARP) + + a = p.find_protocol("arp") + assert a is not None + eq_(a.hwtype, self.hwtype) + eq_(a.proto, self.proto) + eq_(a.hlen, self.hlen) + eq_(a.plen, self.plen) + eq_(a.opcode, self.opcode) + eq_(a.src_mac, self.src_mac) + eq_(a.src_ip, self.src_ip) + eq_(a.dst_mac, self.dst_mac) + eq_(a.dst_ip, self.dst_ip) + + def test_build_arp_novlan(self): + p = self._build_arp(False) + + e = p.find_protocol("ethernet") + assert e is not None + eq_(e.ethertype, ether.ETH_TYPE_ARP) + + a = p.find_protocol("arp") + assert a is not None + eq_(a.hwtype, self.hwtype) + eq_(a.proto, self.proto) + eq_(a.hlen, self.hlen) + eq_(a.plen, self.plen) + eq_(a.opcode, self.opcode) + eq_(a.src_mac, self.src_mac) + eq_(a.src_ip, self.src_ip) + eq_(a.dst_mac, self.dst_mac) + eq_(a.dst_ip, self.dst_ip) + eq_(a.length, self.length) diff --git a/ryu/tests/unit/packet/test_ipv4.py b/ryu/tests/unit/packet/test_ipv4.py new file mode 100644 index 0000000..1e60e0e --- /dev/null +++ b/ryu/tests/unit/packet/test_ipv4.py @@ -0,0 +1,122 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet import packet_utils +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet.tcp import tcp +import netaddr + + +LOG = logging.getLogger('test_ipv4') + + +class Test_ipv4(unittest.TestCase): + """ Test case for ipv4 + """ + + version = 4 + header_length = 5 + 1 + ver_hlen = version << 4 | header_length + tos = 0 + total_length = header_length + 64 + identification = 30774 + flags = 4 + offset = 1480 + flg_off = flags << 13 | offset + ttl = 64 + proto = inet.IPPROTO_TCP + csum = 0xadc6 + src = int(netaddr.IPAddress('131.151.32.21')) + dst = int(netaddr.IPAddress('131.151.32.129')) + length = header_length * 4 + option = '\x86\x28\x00\x00' + + buf = pack(ipv4._PACK_STR, ver_hlen, tos, total_length, identification, + flg_off, ttl, proto, csum, src, dst) \ + + option + + ip = ipv4(version, header_length, tos, total_length, identification, + flags, offset, ttl, proto, csum, src, dst, option) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.version, self.ip.version) + eq_(self.header_length, self.ip.header_length) + eq_(self.tos, self.ip.tos) + eq_(self.total_length, self.ip.total_length) + eq_(self.identification, self.ip.identification) + eq_(self.flags, self.ip.flags) + eq_(self.offset, self.ip.offset) + eq_(self.ttl, self.ip.ttl) + eq_(self.proto, self.ip.proto) + eq_(self.csum, self.ip.csum) + eq_(self.src, self.ip.src) + eq_(self.dst, self.ip.dst) + eq_(self.length, self.ip.length) + eq_(self.option, self.ip.option) + + def test_parser(self): + res, ptype = self.ip.parser(self.buf) + + eq_(res.version, self.version) + eq_(res.header_length, self.header_length) + eq_(res.tos, self.tos) + eq_(res.total_length, self.total_length) + eq_(res.identification, self.identification) + eq_(res.flags, self.flags) + eq_(res.offset, self.offset) + eq_(res.ttl, self.ttl) + eq_(res.proto, self.proto) + eq_(res.csum, self.csum) + eq_(res.src, self.src) + eq_(res.dst, self.dst) + eq_(ptype, tcp) + + def test_serialize(self): + buf = self.ip.serialize(bytearray(), None) + res = struct.unpack_from(ipv4._PACK_STR, str(buf)) + option = buf[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] + + eq_(res[0], self.ver_hlen) + eq_(res[1], self.tos) + eq_(res[2], self.total_length) + eq_(res[3], self.identification) + eq_(res[4], self.flg_off) + eq_(res[5], self.ttl) + eq_(res[6], self.proto) + eq_(res[8], self.src) + eq_(res[9], self.dst) + eq_(option, self.option) + + # checksum + csum = packet_utils.checksum(buf) + eq_(csum, 0) diff --git a/ryu/tests/unit/packet/test_packet.py b/ryu/tests/unit/packet/test_packet.py new file mode 100644 index 0000000..fb0b1c0 --- /dev/null +++ b/ryu/tests/unit/packet/test_packet.py @@ -0,0 +1,354 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +import array +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet import * + + +LOG = logging.getLogger('test_packet') + + +class TestPacket(unittest.TestCase): + """ Test case for packet + """ + + dst_mac = mac.haddr_to_bin('AA:AA:AA:AA:AA:AA') + src_mac = mac.haddr_to_bin('BB:BB:BB:BB:BB:BB') + dst_ip = int(netaddr.IPAddress('192.168.128.10')) + dst_ip_bin = struct.pack('!I', dst_ip) + src_ip = int(netaddr.IPAddress('192.168.122.20')) + src_ip_bin = struct.pack('!I', src_ip) + payload = '\x06\x06\x47\x50\x00\x00\x00\x00' \ + + '\xcd\xc5\x00\x00\x00\x00\x00\x00' \ + + '\x10\x11\x12\x13\x14\x15\x16\x17' \ + + '\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f' + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_arp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_ARP) + a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2, + self.src_mac, self.src_ip, self.dst_mac, + self.dst_ip) + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(a) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x08\x06' + + # arp !HHBBH6sI6sI + a_buf = '\x00\x01' \ + + '\x08\x00' \ + + '\x06' \ + + '\x04' \ + + '\x00\x02' \ + + self.src_mac \ + + self.src_ip_bin \ + + self.dst_mac \ + + self.dst_ip_bin + + buf = e_buf + a_buf + eq_(buf, p.data) + + # parse + pkt = packet.Packet(array.array('B', p.data)) + p_eth = pkt.find_protocol('ethernet') + p_arp = pkt.find_protocol('arp') + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_ARP, p_eth.ethertype) + + # arp + ok_(p_arp) + eq_(1, p_arp.hwtype) + eq_(ether.ETH_TYPE_IP, p_arp.proto) + eq_(6, p_arp.hlen) + eq_(4, p_arp.plen) + eq_(2, p_arp.opcode) + eq_(self.src_mac, p_arp.src_mac) + eq_(self.src_ip, p_arp.src_ip) + eq_(self.dst_mac, p_arp.dst_mac) + eq_(self.dst_ip, p_arp.dst_ip) + + def test_vlan_arp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_8021Q) + v = vlan.vlan(0b111, 0b1, 3, ether.ETH_TYPE_ARP) + a = arp.arp(1, ether.ETH_TYPE_IP, 6, 4, 2, + self.src_mac, self.src_ip, self.dst_mac, + self.dst_ip) + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(v) + p.add_protocol(a) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x81\x00' + + # vlan !HH + v_buf = '\xF0\x03' \ + + '\x08\x06' + + # arp !HHBBH6sI6sI + a_buf = '\x00\x01' \ + + '\x08\x00' \ + + '\x06' \ + + '\x04' \ + + '\x00\x02' \ + + self.src_mac \ + + self.src_ip_bin \ + + self.dst_mac \ + + self.dst_ip_bin + + buf = e_buf + v_buf + a_buf + eq_(buf, p.data) + + # parse + pkt = packet.Packet(array.array('B', p.data)) + p_eth = pkt.find_protocol('ethernet') + p_vlan = pkt.find_protocol('vlan') + p_arp = pkt.find_protocol('arp') + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_8021Q, p_eth.ethertype) + + # vlan + ok_(p_vlan) + eq_(0b111, p_vlan.pcp) + eq_(0b1, p_vlan.cfi) + eq_(3, p_vlan.vid) + eq_(ether.ETH_TYPE_ARP, p_vlan.ethertype) + + # arp + ok_(p_arp) + eq_(1, p_arp.hwtype) + eq_(ether.ETH_TYPE_IP, p_arp.proto) + eq_(6, p_arp.hlen) + eq_(4, p_arp.plen) + eq_(2, p_arp.opcode) + eq_(self.src_mac, p_arp.src_mac) + eq_(self.src_ip, p_arp.src_ip) + eq_(self.dst_mac, p_arp.dst_mac) + eq_(self.dst_ip, p_arp.dst_ip) + + def test_ipv4_udp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IP) + ip = ipv4.ipv4(4, 5, 1, 0, 3, 1, 4, 64, inet.IPPROTO_UDP, 0, + self.src_ip, self.dst_ip) + u = udp.udp(0x190F, 0x1F90, 0, 0, self.payload) + + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(ip) + p.add_protocol(u) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x08\x00' + + # ipv4 !BBHHHBBHII + ip_buf = '\x45' \ + + '\x01' \ + + '\x00\x3C' \ + + '\x00\x03' \ + + '\x20\x04' \ + + '\x40' \ + + '\x11' \ + + '\x00\x00' \ + + self.src_ip_bin \ + + self.dst_ip_bin + + # udp !HHHH + u_buf = '\x19\x0F' \ + + '\x1F\x90' \ + + '\x00\x28' \ + + '\x00\x00' + + buf = e_buf + ip_buf + u_buf + self.payload + + # parse + pkt = packet.Packet(array.array('B', p.data)) + p_eth = pkt.find_protocol('ethernet') + p_ipv4 = pkt.find_protocol('ipv4') + p_udp = pkt.find_protocol('udp') + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IP, p_eth.ethertype) + + # ipv4 + ok_(p_ipv4) + eq_(4, p_ipv4.version) + eq_(5, p_ipv4.header_length) + eq_(1, p_ipv4.tos) + l = len(ip_buf) + len(u_buf) + len(self.payload) + eq_(l, p_ipv4.total_length) + eq_(3, p_ipv4.identification) + eq_(1, p_ipv4.flags) + eq_(64, p_ipv4.ttl) + eq_(inet.IPPROTO_UDP, p_ipv4.proto) + eq_(self.src_ip, p_ipv4.src) + eq_(self.dst_ip, p_ipv4.dst) + t = bytearray(ip_buf) + struct.pack_into('!H', t, 10, p_ipv4.csum) + eq_(packet_utils.checksum(t), 0) + + # udp + ok_(p_udp) + eq_(0x190f, p_udp.src_port) + eq_(0x1F90, p_udp.dst_port) + eq_(len(u_buf) + len(self.payload), p_udp.length) + eq_(0x77b2, p_udp.csum) + t = bytearray(u_buf) + struct.pack_into('!H', t, 6, p_udp.csum) + ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0, + 17, len(u_buf) + len(self.payload)) + t = ph + t + self.payload + eq_(packet_utils.checksum(t), 0) + + # payload + eq_(self.payload, p_udp.payload.tostring()) + + def test_ipv4_tcp(self): + # buid packet + e = ethernet.ethernet(self.dst_mac, self.src_mac, + ether.ETH_TYPE_IP) + ip = ipv4.ipv4(4, 5, 0, 0, 0, 0, 0, 64, inet.IPPROTO_TCP, 0, + self.src_ip, self.dst_ip) + t = tcp.tcp(0x190F, 0x1F90, 0x123, 1, 6, 0b101010, 2048, 0, 0x6f, + '\x01\x02', self.payload) + + p = packet.Packet() + p.add_protocol(e) + p.add_protocol(ip) + p.add_protocol(t) + p.serialize() + + # ethernet !6s6sH + e_buf = self.dst_mac \ + + self.src_mac \ + + '\x08\x00' + + # ipv4 !BBHHHBBHII + ip_buf = '\x45' \ + + '\x00' \ + + '\x00\x4C' \ + + '\x00\x00' \ + + '\x00\x00' \ + + '\x40' \ + + '\x06' \ + + '\x00\x00' \ + + self.src_ip_bin \ + + self.dst_ip_bin + + # tcp !HHIIBBHHH + option + t_buf = '\x19\x0F' \ + + '\x1F\x90' \ + + '\x00\x00\x01\x23' \ + + '\x00\x00\x00\x01' \ + + '\x60' \ + + '\x2A' \ + + '\x08\x00' \ + + '\x00\x00' \ + + '\x00\x6F' \ + + '\x01\x02\x00\x00' + + buf = e_buf + ip_buf + t_buf + self.payload + + # parse + pkt = packet.Packet(array.array('B', p.data)) + p_eth = pkt.find_protocol('ethernet') + p_ipv4 = pkt.find_protocol('ipv4') + p_tcp = pkt.find_protocol('tcp') + + # ethernet + ok_(p_eth) + eq_(self.dst_mac, p_eth.dst) + eq_(self.src_mac, p_eth.src) + eq_(ether.ETH_TYPE_IP, p_eth.ethertype) + + # ipv4 + ok_(p_ipv4) + eq_(4, p_ipv4.version) + eq_(5, p_ipv4.header_length) + eq_(0, p_ipv4.tos) + l = len(ip_buf) + len(t_buf) + len(self.payload) + eq_(l, p_ipv4.total_length) + eq_(0, p_ipv4.identification) + eq_(0, p_ipv4.flags) + eq_(64, p_ipv4.ttl) + eq_(inet.IPPROTO_TCP, p_ipv4.proto) + eq_(self.src_ip, p_ipv4.src) + eq_(self.dst_ip, p_ipv4.dst) + t = bytearray(ip_buf) + struct.pack_into('!H', t, 10, p_ipv4.csum) + eq_(packet_utils.checksum(t), 0) + + # tcp + ok_(p_tcp) + eq_(0x190f, p_tcp.src_port) + eq_(0x1F90, p_tcp.dst_port) + eq_(0x123, p_tcp.seq) + eq_(1, p_tcp.ack) + eq_(6, p_tcp.offset) + eq_(0b101010, p_tcp.bits) + eq_(2048, p_tcp.window_size) + eq_(0x6f, p_tcp.urgent) + eq_(len(t_buf), p_tcp.length) + t = bytearray(t_buf) + struct.pack_into('!H', t, 16, p_tcp.csum) + ph = struct.pack('!IIBBH', self.src_ip, self.dst_ip, 0, + 6, len(t_buf) + len(self.payload)) + t = ph + t + self.payload + eq_(packet_utils.checksum(t), 0) + + # payload + eq_(self.payload, p_tcp.payload.tostring()) diff --git a/ryu/tests/unit/packet/test_tcp.py b/ryu/tests/unit/packet/test_tcp.py new file mode 100644 index 0000000..c9bd38e --- /dev/null +++ b/ryu/tests/unit/packet/test_tcp.py @@ -0,0 +1,142 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.tcp import tcp +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet import packet_utils + + +LOG = logging.getLogger('test_tcp') + + +class Test_tcp(unittest.TestCase): + """ Test case for tcp + """ + src_port = 6431 + dst_port = 8080 + seq = 5 + ack = 1 + offset = 6 + bits = 0b101010 + window_size = 2048 + csum = 12345 + urgent = 128 + option = '\x01\x02\x03\x04' + payload = '\x05\06' + + t = tcp(src_port, dst_port, seq, ack, offset, bits, + window_size, csum, urgent, option, payload) + + buf = pack(tcp._PACK_STR, src_port, dst_port, seq, ack, + offset << 4, bits, window_size, csum, urgent) + buf += option + buf += payload + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.src_port, self.t.src_port) + eq_(self.dst_port, self.t.dst_port) + eq_(self.seq, self.t.seq) + eq_(self.ack, self.t.ack) + eq_(self.offset, self.t.offset) + eq_(self.bits, self.t.bits) + eq_(self.window_size, self.t.window_size) + eq_(self.csum, self.t.csum) + eq_(self.urgent, self.t.urgent) + eq_(self.option, self.t.option) + eq_(self.payload, self.t.payload) + + def test_parser(self): + r1, r2 = self.t.parser(self.buf) + + eq_(self.src_port, r1.src_port) + eq_(self.dst_port, r1.dst_port) + eq_(self.seq, r1.seq) + eq_(self.ack, r1.ack) + eq_(self.offset, r1.offset) + eq_(self.bits, r1.bits) + eq_(self.window_size, r1.window_size) + eq_(self.csum, r1.csum) + eq_(self.urgent, r1.urgent) + eq_(self.option, r1.option) + eq_(self.payload, r1.payload) + eq_(None, r2) + + def test_serialize(self): + offset = 5 + csum = 0 + + src_ip = int(netaddr.IPAddress('192.168.10.1')) + dst_ip = int(netaddr.IPAddress('192.168.100.1')) + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + t = tcp(self.src_port, self.dst_port, self.seq, self.ack, + offset, self.bits, self.window_size, csum, self.urgent) + buf = t.serialize(bytearray(), prev) + res = struct.unpack(tcp._PACK_STR, str(buf)) + + eq_(res[0], self.src_port) + eq_(res[1], self.dst_port) + eq_(res[2], self.seq) + eq_(res[3], self.ack) + eq_(res[4], offset << 4) + eq_(res[5], self.bits) + eq_(res[6], self.window_size) + eq_(res[8], self.urgent) + + # checksum + ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 6, offset * 4) + d = ph + buf + bytearray() + s = packet_utils.checksum(d) + eq_(0, s) + + def test_serialize_option(self): + offset = 6 + csum = 0 + option = '\x01\x02' + + src_ip = int(netaddr.IPAddress('192.168.10.1')) + dst_ip = int(netaddr.IPAddress('192.168.100.1')) + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + t = tcp(self.src_port, self.dst_port, self.seq, self.ack, + offset, self.bits, self.window_size, csum, self.urgent, + option, self.payload) + buf = t.serialize(bytearray(), prev) + r_option = buf[tcp._MIN_LEN:tcp._MIN_LEN + len(option)] + eq_(option, r_option) + + r_payload = buf[offset * 4:] + eq_(self.payload, r_payload) diff --git a/ryu/tests/unit/packet/test_udp.py b/ryu/tests/unit/packet/test_udp.py new file mode 100644 index 0000000..d30929f --- /dev/null +++ b/ryu/tests/unit/packet/test_udp.py @@ -0,0 +1,96 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.udp import udp +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet import packet_utils + + +LOG = logging.getLogger('test_udp') + + +class Test_udp(unittest.TestCase): + """ Test case for udp + """ + src_port = 6431 + dst_port = 8080 + length = 65507 + csum = 12345 + payload = '\x01\x02' + u = udp(src_port, dst_port, length, csum, payload) + buf = pack(udp._PACK_STR, src_port, dst_port, length, csum) + buf += payload + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.src_port, self.u.src_port) + eq_(self.dst_port, self.u.dst_port) + eq_(self.length, self.u.length) + eq_(self.csum, self.u.csum) + eq_(self.payload, self.u.payload) + + def test_parser(self): + r1, r2 = self.u.parser(self.buf) + + eq_(self.src_port, r1.src_port) + eq_(self.dst_port, r1.dst_port) + eq_(self.length, r1.length) + eq_(self.csum, r1.csum) + eq_(self.payload, r1.payload) + eq_(None, r2) + + def test_serialize(self): + src_port = 6431 + dst_port = 8080 + length = 0 + csum = 0 + + src_ip = int(netaddr.IPAddress('192.168.10.1')) + dst_ip = int(netaddr.IPAddress('192.168.100.1')) + prev = ipv4(4, 5, 0, 0, 0, 0, 0, 64, + inet.IPPROTO_UDP, 0, src_ip, dst_ip) + + u = udp(src_port, dst_port, length, csum, self.payload) + buf = u.serialize(bytearray(), prev) + res = struct.unpack(udp._PACK_STR, str(buf[:u._MIN_LEN])) + + eq_(res[0], src_port) + eq_(res[1], dst_port) + eq_(res[2], u._MIN_LEN + len(self.payload)) + eq_(self.payload, buf[u._MIN_LEN:]) + + # checksum + ph = struct.pack('!IIBBH', src_ip, dst_ip, 0, 17, res[2]) + d = ph + buf + bytearray() + s = packet_utils.checksum(d) + eq_(0, s) diff --git a/ryu/tests/unit/packet/test_vlan.py b/ryu/tests/unit/packet/test_vlan.py new file mode 100644 index 0000000..3ae168c --- /dev/null +++ b/ryu/tests/unit/packet/test_vlan.py @@ -0,0 +1,133 @@ +# Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +import unittest +import logging +import struct +import netaddr +from struct import * +from nose.tools import * +from nose.plugins.skip import Skip, SkipTest +from ryu.ofproto import ether, inet +from ryu.lib import mac +from ryu.lib.packet.ethernet import ethernet +from ryu.lib.packet.packet import Packet +from ryu.lib.packet.ipv4 import ipv4 +from ryu.lib.packet.vlan import vlan + + +LOG = logging.getLogger('test_vlan') + + +class Test_vlan(unittest.TestCase): + """ Test case for vlan + """ + + pcp = 0 + cfi = 0 + vid = 32 + tci = pcp << 15 | cfi << 12 | vid + ethertype = ether.ETH_TYPE_IP + length = struct.calcsize(vlan._PACK_STR) + + buf = pack(vlan._PACK_STR, tci, ethertype) + + v = vlan(pcp, cfi, vid, ethertype) + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_init(self): + eq_(self.pcp, self.v.pcp) + eq_(self.cfi, self.v.cfi) + eq_(self.vid, self.v.vid) + eq_(self.ethertype, self.v.ethertype) + eq_(self.length, self.v.length) + + def test_parser(self): + res, ptype = self.v.parser(self.buf) + + eq_(res.pcp, self.pcp) + eq_(res.cfi, self.cfi) + eq_(res.vid, self.vid) + eq_(res.ethertype, self.ethertype) + eq_(res.length, self.length) + eq_(ptype, ipv4) + + def test_serialize(self): + data = bytearray() + prev = None + buf = self.v.serialize(data, prev) + + fmt = vlan._PACK_STR + res = struct.unpack(fmt, buf) + + eq_(res[0], self.tci) + eq_(res[1], self.ethertype) + + def _build_vlan(self): + src_mac = mac.haddr_to_bin('00:07:0d:af:f4:54') + dst_mac = mac.haddr_to_bin('00:00:00:00:00:00') + ethertype = ether.ETH_TYPE_8021Q + e = ethernet(dst_mac, src_mac, ethertype) + + version = 4 + header_length = 20 + tos = 0 + total_length = 24 + identification = 0x8a5d + flags = 0 + offset = 1480 + ttl = 64 + proto = inet.IPPROTO_ICMP + csum = 0xa7f2 + src = int(netaddr.IPAddress('131.151.32.21')) + dst = int(netaddr.IPAddress('131.151.32.129')) + option = 'TEST' + ip = ipv4(version, header_length, tos, total_length, identification, + flags, offset, ttl, proto, csum, src, dst, option) + + p = Packet() + + p.add_protocol(e) + p.add_protocol(self.v) + p.add_protocol(ip) + p.serialize() + + return p + + def test_build_vlan(self): + p = self._build_vlan() + + e = p.find_protocol("ethernet") + assert e is not None + eq_(e.ethertype, ether.ETH_TYPE_8021Q) + + v = p.find_protocol("vlan") + assert v is not None + eq_(v.ethertype, ether.ETH_TYPE_IP) + + ip = p.find_protocol("ipv4") + assert ip is not None + eq_(v.pcp, self.pcp) + eq_(v.cfi, self.cfi) + eq_(v.vid, self.vid) + eq_(v.ethertype, self.ethertype) + eq_(v.length, self.length) diff --git a/tools/test-requires b/tools/test-requires index da50e3e..6f21bff 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -2,3 +2,4 @@ coverage nose pep8 pylint==0.25.0 +netaddr -- 1.7.9.5 ------------------------------------------------------------------------------ Don't let slow site performance ruin your business. Deploy New Relic APM Deploy New Relic app performance management and know exactly what is happening inside your Ruby, Python, PHP, Java, and .NET app Try New Relic at no cost today and get our sweet Data Nerd shirt too! http://p.sf.net/sfu/newrelic-dev2dev _______________________________________________ Ryu-devel mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/ryu-devel
