Extend the RTM_GETMULTICAST dump test to verify IFA_MC_USERS for both
IPv4 and IPv6 multicast groups.

Run each protocol test in a fresh network namespace to avoid changing
host-network state or racing with unrelated multicast users. Join a
fixed multicast group twice using separate sockets and check that the
reported user count increases by two.

Signed-off-by: Yuyang Huang <[email protected]>
---
 tools/testing/selftests/net/rtnetlink.py | 101 ++++++++++++++++++++---
 1 file changed, 90 insertions(+), 11 deletions(-)

diff --git a/tools/testing/selftests/net/rtnetlink.py 
b/tools/testing/selftests/net/rtnetlink.py
index 3622413d793d..0c67c7c00d84 100755
--- a/tools/testing/selftests/net/rtnetlink.py
+++ b/tools/testing/selftests/net/rtnetlink.py
@@ -2,27 +2,106 @@
 # SPDX-License-Identifier: GPL-2.0
 
 import socket
+import struct
 import time
-from lib.py import bkg, ip, ksft_exit, ksft_run, ksft_ge, ksft_true, KsftSkipEx
+from lib.py import bkg, ip, ksft_exit, ksft_run, ksft_eq, ksft_ge, ksft_true, 
KsftSkipEx
 from lib.py import CmdExitFailure, NetNS, NetNSEnter, RtnlAddrFamily
 
 IPV4_ALL_HOSTS_MULTICAST = b'\xe0\x00\x00\x01'
+IPV4_TEST_MULTICAST = b'\xef\x01\x01\x01'
+IPV6_TEST_MULTICAST = bytes.fromhex('ff020000000000000000000000000123')
+
+
+def _users_for(rtnl: RtnlAddrFamily, family: int, grp: bytes, ifindex: int):
+    """Return mc-users for grp on ifindex, or 0 if absent."""
+
+    addrs = rtnl.getmulticast({"ifa-family": family}, dump=True)
+    matches = [addr for addr in addrs
+               if addr['multicast'] == grp and addr['ifa-index'] == ifindex]
+    if not matches:
+        return 0
+    if 'mc-users' not in matches[0]:
+        return None
+
+    return matches[0]['mc-users']
+
 
 def dump_mcaddr_check() -> None:
     """
-    Verify that at least one interface has the IPv4 all-hosts multicast 
address.
-    At least the loopback interface should have this address.
+    Verify IPv4 multicast addresses and their user counts in RTM_GETMULTICAST.
+    """
+
+    with NetNS() as ns:
+        with NetNSEnter(str(ns)):
+            ip("link set lo up")
+            rtnl = RtnlAddrFamily()
+            lo_idx = socket.if_nametoindex('lo')
+            addresses = rtnl.getmulticast({"ifa-family": socket.AF_INET}, 
dump=True)
+
+            all_host_multicasts = [
+                addr for addr in addresses
+                if addr['multicast'] == IPV4_ALL_HOSTS_MULTICAST
+            ]
+
+            ksft_ge(len(all_host_multicasts), 1,
+                    "No interface found with the IPv4 all-hosts multicast 
address")
+
+            mreq = IPV4_TEST_MULTICAST + socket.inet_aton('127.0.0.1')
+            before = _users_for(rtnl, socket.AF_INET, IPV4_TEST_MULTICAST, 
lo_idx)
+            if before is None:
+                raise KsftSkipEx("kernel does not expose IFA_MC_USERS")
+
+            s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+            s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+            try:
+                s1.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, 
mreq)
+                s2.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, 
mreq)
+
+                after_join = _users_for(rtnl, socket.AF_INET,
+                                        IPV4_TEST_MULTICAST, lo_idx)
+                if after_join is None:
+                    raise KsftSkipEx("kernel does not expose IFA_MC_USERS")
+                ksft_eq(after_join - before, 2,
+                        f"users delta != 2 after two joins "
+                        f"(before={before}, after={after_join})")
+            finally:
+                s1.close()
+                s2.close()
+
+
+def dump_mcaddr6_check() -> None:
+    """
+    Verify IPv6 multicast addresses and their user counts in RTM_GETMULTICAST.
     """
 
-    rtnl = RtnlAddrFamily()
-    addresses = rtnl.getmulticast({"ifa-family": socket.AF_INET}, dump=True)
+    with NetNS() as ns:
+        with NetNSEnter(str(ns)):
+            ip("link set lo up")
+            rtnl = RtnlAddrFamily()
+            lo_idx = socket.if_nametoindex('lo')
+            before = _users_for(rtnl, socket.AF_INET6,
+                                IPV6_TEST_MULTICAST, lo_idx)
+            if before is None:
+                raise KsftSkipEx("kernel does not expose IFA_MC_USERS for 
IPv6")
+
+            mreq = IPV6_TEST_MULTICAST + struct.pack('=I', lo_idx)
+            s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
+            s2 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
+            try:
+                s1.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, 
mreq)
+                s2.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, 
mreq)
 
-    all_host_multicasts = [
-        addr for addr in addresses if addr['multicast'] == 
IPV4_ALL_HOSTS_MULTICAST
-    ]
+                after_join = _users_for(rtnl, socket.AF_INET6,
+                                        IPV6_TEST_MULTICAST, lo_idx)
+                if after_join is None:
+                    raise KsftSkipEx("kernel does not expose IFA_MC_USERS for 
IPv6")
+                ksft_eq(after_join - before, 2,
+                        f"IPv6 users delta != 2 after two joins "
+                        f"(before={before}, after={after_join})")
+            finally:
+                s1.close()
+                s2.close()
 
-    ksft_ge(len(all_host_multicasts), 1,
-            "No interface found with the IPv4 all-hosts multicast address")
 
 def ipv4_devconf_notify() -> None:
     """
@@ -56,7 +135,7 @@ def ipv4_devconf_notify() -> None:
               f"No 'forwarding on' notificiation found for interface {ifname}")
 
 def main() -> None:
-    ksft_run([dump_mcaddr_check, ipv4_devconf_notify])
+    ksft_run([dump_mcaddr_check, dump_mcaddr6_check, ipv4_devconf_notify])
     ksft_exit()
 
 if __name__ == "__main__":
-- 
2.43.0


Reply via email to