Add QinQ test suite, which verifies PMD behavior when sending QinQ (IEEE 802.1ad) packets.
Signed-off-by: Dean Marx <dm...@iol.unh.edu> --- dts/tests/TestSuite_qinq.py | 240 ++++++++++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 dts/tests/TestSuite_qinq.py diff --git a/dts/tests/TestSuite_qinq.py b/dts/tests/TestSuite_qinq.py new file mode 100644 index 0000000000..4bfa2c1920 --- /dev/null +++ b/dts/tests/TestSuite_qinq.py @@ -0,0 +1,240 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright(c) 2025 University of New Hampshire + +"""QinQ (802.1ad) Test Suite. + +This test suite verifies the correctness and capability of DPDK Poll Mode Drivers (PMDs) +in handling QinQ-tagged Ethernet frames, which contain a pair of stacked VLAN headers +(outer S-VLAN and inner C-VLAN). These tests ensure that both software and hardware offloads +related to QinQ behave as expected across different NIC vendors and PMD implementations. + +""" + +from scapy.layers.inet import IP, UDP +from scapy.layers.l2 import Dot1Q, Ether +from scapy.packet import Packet + +from framework.params.testpmd import SimpleForwardingModes +from framework.remote_session.testpmd_shell import PacketOffloadFlag, TestPmdShell +from framework.test_suite import TestSuite, func_test +from framework.testbed_model.capability import NicCapability, TopologyType, requires + + +@requires(topology_type=TopologyType.two_links) +@requires(NicCapability.RX_OFFLOAD_VLAN_FILTER) +@requires(NicCapability.RX_OFFLOAD_VLAN_EXTEND) +class TestQinq(TestSuite): + """QinQ test suite. + + This suite consists of 3 test cases: + 1. QinQ Rx parse: Ensures correct classification and detection of double VLAN packets (QinQ). + 2. QinQ strip: Validates hardware offload of VLAN header removal and correct TCI population. + 3. QinQ filter: + + """ + + def send_packet_and_verify_offload_flags( + self, packet: Packet, testpmd: TestPmdShell, offload_flags: list[PacketOffloadFlag] + ) -> None: + """Send packet and verify offload flags match the stripping action. + + Args: + packet: The packet to send to testpmd. + testpmd: The testpmd session to send commands to. + offload_flags: List of PacketOffloadFlags that should be in verbose output. + """ + testpmd.start() + self.send_packet_and_capture(packet=packet) + verbose_output = testpmd.extract_verbose_output(testpmd.stop()) + for flag in offload_flags: + self.verify(flag in verbose_output, f"Expected flag {flag} not found in output") + + def send_packet_and_verify( + self, packet: Packet, testpmd: TestPmdShell, should_receive: bool + ) -> None: + """Send packet and verify reception. + + Args: + packet: The packet to send to testpmd. + testpmd: The testpmd session to send commands to. + should_receive: If :data:`True`, verifies packet was received. + """ + testpmd.start() + received = self.send_packet_and_capture(packet=packet) + if should_receive: + self.verify(received != [], "Packet was dropped when it should have been received.") + else: + self.verify(received == [], "Packet was received when it should have been dropped.") + + @property + def test_packet(self): + """QinQ packet to be used in each test case.""" + return ( + Ether(dst="ff:ff:ff:ff:ff:ff") + / Dot1Q(vlan=100, type=0x8100) + / Dot1Q(vlan=200) + / IP(dst="1.2.3.4") + / UDP(dport=1234, sport=4321) + ) + + @func_test + def test_qinq_rx_parse(self) -> None: + """QinQ Rx parse test case. + + Steps: + Launch testpmd with Rxonly forwarding mode. + Set verbose level to 1. + Enable VLAN filter/extend modes on port 0. + Send test packet and capture verbose output. + + Verify: + Check that all expected offload flags are in verbose output. + """ + offload_flags = [PacketOffloadFlag.RTE_MBUF_F_RX_QINQ, PacketOffloadFlag.RTE_MBUF_F_RX_VLAN] + with TestPmdShell(forward_mode=SimpleForwardingModes.rxonly) as testpmd: + testpmd.set_verbose(level=1) + testpmd.stop_all_ports() + testpmd.set_vlan_filter(0, True) + testpmd.set_vlan_extend(0, True) + testpmd.start_all_ports() + self.send_packet_and_verify_offload_flags(self.test_packet, testpmd, offload_flags) + + @requires(NicCapability.RX_OFFLOAD_QINQ_STRIP) + @func_test + def test_qinq_strip(self) -> None: + """QinQ Rx strip test case. + + Steps: + Launch testpmd with Rxonly forwarding mode. + Set verbose level to 1. + Enable VLAN filter/extend modes on port 0. + Enable QinQ strip mode on port 0. + Send test packet and capture verbose output. + + Verify: + Check that all expected offload flags are in verbose output. + """ + offload_flags = [ + PacketOffloadFlag.RTE_MBUF_F_RX_QINQ_STRIPPED, + PacketOffloadFlag.RTE_MBUF_F_RX_VLAN_STRIPPED, + PacketOffloadFlag.RTE_MBUF_F_RX_VLAN, + PacketOffloadFlag.RTE_MBUF_F_RX_QINQ, + ] + with TestPmdShell(forward_mode=SimpleForwardingModes.rxonly) as testpmd: + testpmd.set_verbose(level=1) + testpmd.stop_all_ports() + testpmd.set_vlan_filter(0, True) + testpmd.set_vlan_extend(0, True) + testpmd.set_qinq_strip(0, True) + testpmd.start_all_ports() + self.send_packet_and_verify_offload_flags(self.test_packet, testpmd, offload_flags) + + @func_test + def test_qinq_filter(self) -> None: + """QinQ Rx filter test case. + + Steps: + Launch testpmd with Rxonly forwarding mode. + Enable VLAN filter/extend modes on port 0. + Add VLAN tag 100 to the filter on port 0. + Send test packet and capture verbose output. + + Verify: + Check that all expected offload flags are in verbose output. + """ + packets = [ + Ether(dst="00:11:22:33:44:55", src="66:77:88:99:aa:bb") + / Dot1Q(vlan=100) + / Dot1Q(vlan=200) + / IP(dst="192.0.2.1", src="198.51.100.1") + / UDP(dport=1234, sport=5678), + Ether(dst="00:11:22:33:44:55", src="66:77:88:99:aa:bb") + / Dot1Q(vlan=101) + / Dot1Q(vlan=200) + / IP(dst="192.0.2.1", src="198.51.100.1") + / UDP(dport=1234, sport=5678), + ] + with TestPmdShell(forward_mode=SimpleForwardingModes.rxonly) as testpmd: + testpmd.stop_all_ports() + testpmd.set_vlan_filter(0, True) + testpmd.set_vlan_extend(0, True) + testpmd.start_all_ports() + testpmd.rx_vlan(100, 0) + self.send_packet_and_verify(packets[0], testpmd, should_receive=True) + self.send_packet_and_verify(packets[1], testpmd, should_receive=False) + + @func_test + def test_qinq_forwarding(self) -> None: + """QinQ Rx filter test case. + + Steps: + Launch testpmd with mac forwarding mode. + Disable VLAN filter mode on port 0. + Send test packet and capture verbose output. + + Verify: + Check that the received packet has two separate VLAN layers in proper QinQ fashion. + Check that the received packet outer and inner VLAN layer has the appropriate ID. + """ + with TestPmdShell(forward_mode=SimpleForwardingModes.mac) as testpmd: + testpmd.stop_all_ports() + testpmd.set_vlan_filter(0, False) + testpmd.start_all_ports() + received_packet = self.send_packet_and_capture(self.test_packet) + for packet in received_packet: + vlan_tags = [layer for layer in packet.layers() if layer == Dot1Q] + self.verify(len(vlan_tags) == 2, f"Expected 2 VLAN tags, found {len(vlan_tags)}") + + if packet.haslayer(Dot1Q): + outer_vlan_id = packet[Dot1Q].vlan + self.verify( + outer_vlan_id == 100, + f"Outer VLAN ID was {outer_vlan_id} when it should have been 100.", + ) + + if packet[Dot1Q].haslayer(Dot1Q): + inner_vlan_id = packet[Dot1Q].payload[Dot1Q].vlan + self.verify( + inner_vlan_id == 200, + f"Inner VLAN ID was {inner_vlan_id} when it should have been 200", + ) + + @func_test + def test_mismatched_tpid(self) -> None: + """Test behavior when outer VLAN tag has a non-standard TPID (not 0x8100 or 0x88a8). + + Steps: + Launch testpmd in rxonly forward mode. + Set verbose level to 1. + Disable VLAN filtering on port 0. + Send and capture test packet. + + Verify: + Only 1 VLAN tag is in received packet. + Inner VLAN ID matches the original packet. + """ + with TestPmdShell(forward_mode=SimpleForwardingModes.rxonly) as testpmd: + testpmd.set_verbose(level=1) + testpmd.stop_all_ports() + testpmd.set_vlan_filter(0, False) + testpmd.start_all_ports() + + mismatched_packet = ( + Ether(dst="ff:ff:ff:ff:ff:ff") + / Dot1Q(vlan=100, type=0x1234) + / Dot1Q(vlan=200) + / IP(dst="1.2.3.4") + / UDP(dport=1234, sport=4321) + ) + + received_packet = self.send_packet_and_capture(mismatched_packet) + + for packet in received_packet: + vlan_tags = [layer for layer in packet.layers() if layer == Dot1Q] + self.verify( + len(vlan_tags) == 1, + f"Expected 1 VLAN tag due to mismatched TPID, found {len(vlan_tags)}", + ) + + vlan_id = packet[Dot1Q].vlan + self.verify(vlan_id == 200, f"Expected inner VLAN ID 200, got {vlan_id}") -- 2.49.0