Extend the RTM_GETMULTICAST dump test to verify IFA_MC_USERS for both IPv4 and IPv6 multicast groups.
Run each protocol test in a fresh network namespace to avoid changing host-network state or racing with unrelated multicast users. Join a fixed multicast group twice using separate sockets and check that the reported user count increases by two. Signed-off-by: Yuyang Huang <[email protected]> --- tools/testing/selftests/net/rtnetlink.py | 101 ++++++++++++++++++++--- 1 file changed, 90 insertions(+), 11 deletions(-) diff --git a/tools/testing/selftests/net/rtnetlink.py b/tools/testing/selftests/net/rtnetlink.py index 3622413d793d..0c67c7c00d84 100755 --- a/tools/testing/selftests/net/rtnetlink.py +++ b/tools/testing/selftests/net/rtnetlink.py @@ -2,27 +2,106 @@ # SPDX-License-Identifier: GPL-2.0 import socket +import struct import time -from lib.py import bkg, ip, ksft_exit, ksft_run, ksft_ge, ksft_true, KsftSkipEx +from lib.py import bkg, ip, ksft_exit, ksft_run, ksft_eq, ksft_ge, ksft_true, KsftSkipEx from lib.py import CmdExitFailure, NetNS, NetNSEnter, RtnlAddrFamily IPV4_ALL_HOSTS_MULTICAST = b'\xe0\x00\x00\x01' +IPV4_TEST_MULTICAST = b'\xef\x01\x01\x01' +IPV6_TEST_MULTICAST = bytes.fromhex('ff020000000000000000000000000123') + + +def _users_for(rtnl: RtnlAddrFamily, family: int, grp: bytes, ifindex: int): + """Return mc-users for grp on ifindex, or 0 if absent.""" + + addrs = rtnl.getmulticast({"ifa-family": family}, dump=True) + matches = [addr for addr in addrs + if addr['multicast'] == grp and addr['ifa-index'] == ifindex] + if not matches: + return 0 + if 'mc-users' not in matches[0]: + return None + + return matches[0]['mc-users'] + def dump_mcaddr_check() -> None: """ - Verify that at least one interface has the IPv4 all-hosts multicast address. - At least the loopback interface should have this address. + Verify IPv4 multicast addresses and their user counts in RTM_GETMULTICAST. + """ + + with NetNS() as ns: + with NetNSEnter(str(ns)): + ip("link set lo up") + rtnl = RtnlAddrFamily() + lo_idx = socket.if_nametoindex('lo') + addresses = rtnl.getmulticast({"ifa-family": socket.AF_INET}, dump=True) + + all_host_multicasts = [ + addr for addr in addresses + if addr['multicast'] == IPV4_ALL_HOSTS_MULTICAST + ] + + ksft_ge(len(all_host_multicasts), 1, + "No interface found with the IPv4 all-hosts multicast address") + + mreq = IPV4_TEST_MULTICAST + socket.inet_aton('127.0.0.1') + before = _users_for(rtnl, socket.AF_INET, IPV4_TEST_MULTICAST, lo_idx) + if before is None: + raise KsftSkipEx("kernel does not expose IFA_MC_USERS") + + s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + s1.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + s2.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + + after_join = _users_for(rtnl, socket.AF_INET, + IPV4_TEST_MULTICAST, lo_idx) + if after_join is None: + raise KsftSkipEx("kernel does not expose IFA_MC_USERS") + ksft_eq(after_join - before, 2, + f"users delta != 2 after two joins " + f"(before={before}, after={after_join})") + finally: + s1.close() + s2.close() + + +def dump_mcaddr6_check() -> None: + """ + Verify IPv6 multicast addresses and their user counts in RTM_GETMULTICAST. """ - rtnl = RtnlAddrFamily() - addresses = rtnl.getmulticast({"ifa-family": socket.AF_INET}, dump=True) + with NetNS() as ns: + with NetNSEnter(str(ns)): + ip("link set lo up") + rtnl = RtnlAddrFamily() + lo_idx = socket.if_nametoindex('lo') + before = _users_for(rtnl, socket.AF_INET6, + IPV6_TEST_MULTICAST, lo_idx) + if before is None: + raise KsftSkipEx("kernel does not expose IFA_MC_USERS for IPv6") + + mreq = IPV6_TEST_MULTICAST + struct.pack('=I', lo_idx) + s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + s2 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) + try: + s1.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) + s2.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) - all_host_multicasts = [ - addr for addr in addresses if addr['multicast'] == IPV4_ALL_HOSTS_MULTICAST - ] + after_join = _users_for(rtnl, socket.AF_INET6, + IPV6_TEST_MULTICAST, lo_idx) + if after_join is None: + raise KsftSkipEx("kernel does not expose IFA_MC_USERS for IPv6") + ksft_eq(after_join - before, 2, + f"IPv6 users delta != 2 after two joins " + f"(before={before}, after={after_join})") + finally: + s1.close() + s2.close() - ksft_ge(len(all_host_multicasts), 1, - "No interface found with the IPv4 all-hosts multicast address") def ipv4_devconf_notify() -> None: """ @@ -56,7 +135,7 @@ def ipv4_devconf_notify() -> None: f"No 'forwarding on' notificiation found for interface {ifname}") def main() -> None: - ksft_run([dump_mcaddr_check, ipv4_devconf_notify]) + ksft_run([dump_mcaddr_check, dump_mcaddr6_check, ipv4_devconf_notify]) ksft_exit() if __name__ == "__main__": -- 2.43.0

