Add a Python script that loads a test module, triggers its debugfs
entry with kcov_dataflow recording active, then pretty-prints captured
records as a nested call tree with kallsyms symbol resolution.
Features:
- 8MB ring buffer (1M u64 words) for INSTRUMENT_ALL kernels
- Enable recording after module load, before trigger (avoids VFS noise)
- Variable-length record parsing using header-encoded field count
- Module-only filtering via kallsyms symbol lookup
- --context/-C N: show N records before/after each module function call
- --raw: print raw records instead of call tree
- Architecture-aware syscall numbers (x86_64 and arm64)
Usage:
python3 trigger-view.py eight_args_c \
--ko eight_args_c/eight_args_c.ko
python3 trigger-view.py eight_args_rust \
--ko eight_args_rust/eight_args_rust.ko
python3 trigger-view.py rust_ffi_contract \
--ko rust_ffi_contract/rust_ffi_contract.ko
Cc: Alexander Potapenko <[email protected]>
Assisted-by: Claude:claude-opus-4-6 [kiro-chat]
Link: https://github.com/yskzalloc/kcov-dataflow/actions
Signed-off-by: Yunseong Kim <[email protected]>
---
.../selftests/kcov_dataflow/trigger-view.py | 377 +++++++++++++++++++++
1 file changed, 377 insertions(+)
diff --git a/tools/testing/selftests/kcov_dataflow/trigger-view.py
b/tools/testing/selftests/kcov_dataflow/trigger-view.py
new file mode 100755
index 000000000000..a3274e472dc1
--- /dev/null
+++ b/tools/testing/selftests/kcov_dataflow/trigger-view.py
@@ -0,0 +1,377 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+"""
+trigger-view.py - Load a module with kcov_dataflow
+recording active, then pretty-print captured records.
+
+Usage:
+ python3 trigger-view.py eight_args_c
+ python3 trigger-view.py rust_ffi_contract
+ python3 trigger-view.py eight_args_c --raw
+
+The script:
+ 1. Opens /sys/kernel/debug/kcov_dataflow
+ 2. Inits and mmaps the buffer
+ 3. Enables recording for this process
+ 4. Loads the module via finit_module() -- init runs in our context
+ 5. Disables recording
+ 6. Unloads the module
+ 7. Parses and prints captured records with kallsyms resolution
+"""
+import os
+import sys
+import struct
+import ctypes
+import ctypes.util
+import argparse
+import fcntl
+
+# Constants
+DF_TYPE_ENTRY = 0xE
+DF_TYPE_RET = 0xF
+MAGIC_BAD = 0xBADADD85
+BUF_SIZE = 1048576 # 1M words = 8MB
+
+# Ioctl numbers
+def _IOR(t, nr, size):
+ return (2 << 30) | (ord(t) << 8) | nr | (size << 16)
+
+def _IO(t, nr):
+ return (ord(t) << 8) | nr
+
+KCOV_DF_INIT_TRACK = _IOR('d', 1, 8)
+KCOV_DF_ENABLE = _IO('d', 100)
+KCOV_DF_DISABLE = _IO('d', 101)
+
+# syscall numbers (x86_64)
+import platform
+_machine = platform.machine()
+if _machine == "aarch64":
+ SYS_FINIT_MODULE = 273
+ SYS_DELETE_MODULE = 106
+else: # x86_64
+ SYS_FINIT_MODULE = 313
+ SYS_DELETE_MODULE = 176
+
+SELFTEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+def load_kallsyms():
+ """Load kernel symbols for PC resolution."""
+ syms = []
+ try:
+ with open("/proc/kallsyms") as f:
+ for line in f:
+ parts = line.split()
+ if len(parts) >= 3:
+ addr = int(parts[0], 16)
+ name = parts[2]
+ mod = parts[3].strip("[]") if len(parts) > 3 else ""
+ syms.append((addr, name, mod))
+ except (PermissionError, FileNotFoundError):
+ pass
+ syms.sort()
+ return syms
+
+
+def symbolize(pc, syms):
+ """Find nearest symbol <= pc."""
+ if not syms:
+ return f"0x{pc:x}"
+ lo, hi = 0, len(syms) - 1
+ while lo < hi:
+ mid = (lo + hi + 1) // 2
+ if syms[mid][0] <= pc:
+ lo = mid
+ else:
+ hi = mid - 1
+ addr, name, mod = syms[lo]
+ if addr > pc:
+ return f"0x{pc:x}"
+ offset = pc - addr
+ if mod:
+ return f"{name}+0x{offset:x} [{mod}]" if offset else f"{name} [{mod}]"
+ return f"{name}+0x{offset:x}" if offset else name
+
+
+def format_val(v):
+ """Format a captured value."""
+ if v == MAGIC_BAD:
+ return "FAULT"
+ if v == 0:
+ return "0x0"
+ return f"0x{v:x}"
+
+
+def find_module(name):
+ """Find the .ko file for the given test name."""
+ ko_path = os.path.join(SELFTEST_DIR, name, f"{name}_mod.ko")
+ if os.path.exists(ko_path):
+ return ko_path
+ # Try without _mod suffix
+ ko_path = os.path.join(SELFTEST_DIR, name, f"{name}.ko")
+ if os.path.exists(ko_path):
+ return ko_path
+ # Search for any .ko in the directory
+ mod_dir = os.path.join(SELFTEST_DIR, name)
+ if os.path.isdir(mod_dir):
+ for f in os.listdir(mod_dir):
+ if f.endswith(".ko"):
+ return os.path.join(mod_dir, f)
+ return None
+
+
+def finit_module(ko_path):
+ """Load a kernel module via finit_module syscall."""
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ fd = os.open(ko_path, os.O_RDONLY)
+ ret = libc.syscall(SYS_FINIT_MODULE, fd, b"", 0)
+ os.close(fd)
+ if ret != 0:
+ errno = ctypes.get_errno()
+ raise OSError(errno, f"finit_module({ko_path}): {os.strerror(errno)}")
+
+
+def delete_module(name):
+ """Unload a kernel module."""
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ ret = libc.syscall(SYS_DELETE_MODULE, name.encode(), 0)
+ if ret != 0:
+ errno = ctypes.get_errno()
+ raise OSError(errno, f"delete_module({name}): {os.strerror(errno)}")
+
+
+def parse_records(buf, total_words):
+ """Parse the ring buffer into a list of records."""
+ records = []
+ pos = 1
+ while pos + 3 <= total_words and pos < BUF_SIZE:
+ hdr = buf[pos]
+
+ # Valid headers fit in 32 bits (upper 32 must be zero)
+ if hdr >> 32:
+ pos += 1
+ continue
+
+ rtype = (hdr >> 28) & 0xF
+
+ if rtype not in (DF_TYPE_ENTRY, DF_TYPE_RET):
+ pos += 1
+ continue
+
+ pc = buf[pos + 1]
+ meta = buf[pos + 2]
+ seq = hdr & 0x00FFFFFF
+ num_vals = (hdr >> 24) & 0xF
+ if num_vals == 0:
+ num_vals = 1
+
+ # Valid records always have a non-zero PC (kernel text address)
+ if pc == 0:
+ pos += 1
+ continue
+
+ val = buf[pos + 3] if pos + 3 < BUF_SIZE else 0
+ records.append({
+ "type": rtype,
+ "seq": seq,
+ "pc": pc,
+ "meta": meta,
+ "val": val,
+ })
+ pos += 3 + num_vals
+ return records
+
+
+def print_raw(records, syms):
+ """Print records in raw format."""
+ for r in records:
+ sym = symbolize(r["pc"], syms)
+ t = "ENTRY" if r["type"] == DF_TYPE_ENTRY else "RET "
+ arg_idx = (r["meta"] >> 56) & 0xFF
+ size = (r["meta"] >> 48) & 0xFF
+ print(f"[{t}] seq={r['seq']:3d} {sym} "
+ f"arg[{arg_idx}]({size}) = {format_val(r['val'])}")
+
+
+def print_tree(records, syms):
+ """Print records as indented call tree matching converted.txt format."""
+ depth = 0
+ # Group consecutive ENTRY records by PC to collect all args
+ i = 0
+ while i < len(records):
+ r = records[i]
+ sym = symbolize(r["pc"], syms)
+
+ if r["type"] == DF_TYPE_ENTRY:
+ # Collect all args for this call (same PC, consecutive entries)
+ args = []
+ pc = r["pc"]
+ while i < len(records) and records[i]["type"] == DF_TYPE_ENTRY \
+ and records[i]["pc"] == pc:
+ args.append(format_val(records[i]["val"]))
+ i += 1
+ indent = " " * depth
+ print(f"{indent}{sym}({', '.join(args)})")
+ depth += 1
+ else:
+ depth = max(0, depth - 1)
+ indent = " " * depth
+ print(f"{indent}{format_val(r['val'])} = {sym}()")
+ i += 1
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Load a test module with kcov_dataflow and view records")
+ parser.add_argument("module", help="Test module name (e.g. eight_args_c)")
+ parser.add_argument("--raw", action="store_true",
+ help="Print raw records instead of tree")
+ parser.add_argument("--ko", help="Explicit path to .ko file")
+ parser.add_argument("--context", "-C", type=int, default=0,
+ help="Show N lines before/after each module record")
+ args = parser.parse_args()
+
+ # Find module
+ if args.ko:
+ ko_path = args.ko
+ else:
+ ko_path = find_module(args.module)
+ if not ko_path or not os.path.exists(ko_path):
+ print(f"Cannot find module for '{args.module}'", file=sys.stderr)
+ print(f"Build it first: make LLVM=1 CC=clang "
+ f"M=tools/testing/selftests/kcov_dataflow/{args.module} modules",
+ file=sys.stderr)
+ sys.exit(1)
+
+ # Open kcov_dataflow
+ # Ensure kallsyms shows real addresses
+ try:
+ with open("/proc/sys/kernel/kptr_restrict", "w") as f:
+ f.write("0")
+ except (PermissionError, FileNotFoundError):
+ pass
+
+ try:
+ df_fd = os.open("/sys/kernel/debug/kcov_dataflow", os.O_RDWR)
+ except OSError as e:
+ print(f"Cannot open kcov_dataflow: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ # Init + mmap
+ fcntl.ioctl(df_fd, KCOV_DF_INIT_TRACK, BUF_SIZE)
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ libc.mmap.restype = ctypes.c_void_p
+ libc.mmap.argtypes = [
+ ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int,
+ ctypes.c_int, ctypes.c_int, ctypes.c_long
+ ]
+ buf_ptr = libc.mmap(None, BUF_SIZE * 8, 0x3, 0x01, df_fd, 0)
+ if buf_ptr == ctypes.c_void_p(-1).value:
+ print("mmap failed", file=sys.stderr)
+ sys.exit(1)
+ buf = (ctypes.c_uint64 * BUF_SIZE).from_address(buf_ptr)
+
+ # Load module first (generates noise with INSTRUMENT_ALL)
+ mod_name = os.path.basename(ko_path).replace(".ko", "")
+ try:
+ finit_module(ko_path)
+ print(f"# Loaded {mod_name}")
+ except OSError as e:
+ print(f"Failed to load module: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ # Get module .text address for PC filtering
+ mod_text_start = 0
+ try:
+ with open(f"/sys/module/{mod_name}/sections/.text") as f:
+ mod_text_start = int(f.read().strip(), 16)
+ except (FileNotFoundError, ValueError, PermissionError):
+ pass
+
+ # Enable recording AFTER load, BEFORE trigger (avoids VFS/loader noise)
+ fcntl.ioctl(df_fd, KCOV_DF_ENABLE, 0)
+ buf[0] = 0
+
+ # Trigger the module's debugfs file to invoke test functions
+ trigger_paths = [
+ f"/sys/kernel/debug/kcov_dataflow_test/trigger",
+ f"/sys/kernel/debug/kcov_dataflow_test/rust_ffi_trigger",
+ f"/sys/kernel/debug/trigger_rust",
+ f"/sys/kernel/debug/{mod_name}/trigger",
+ ]
+ for tp in trigger_paths:
+ try:
+ with open(tp, "w") as f:
+ f.write("1")
+ break
+ except (FileNotFoundError, PermissionError):
+ continue
+
+ fcntl.ioctl(df_fd, KCOV_DF_DISABLE, 0)
+
+ # Read kallsyms while module is still loaded (symbols available)
+ syms = load_kallsyms()
+
+ # Unload
+ try:
+ delete_module(mod_name)
+ except OSError:
+ pass
+
+ # Parse and display
+ total = int(buf[0])
+ print(f"# Captured {total} words")
+ records = parse_records(buf, total)
+ print(f"# {len(records)} records")
+
+ # Filter to module records using kallsyms
+ # Build set of module symbol addresses for fast lookup
+ mod_syms = set()
+ for addr, name, mod in syms:
+ if mod == mod_name and addr != 0:
+ mod_syms.add(addr)
+
+ def is_module_pc(pc):
+ """Check if PC belongs to mod_name via kallsyms."""
+ if mod_syms:
+ # Binary search: find nearest symbol <= pc, check module
+ lo, hi = 0, len(syms) - 1
+ while lo < hi:
+ mid = (lo + hi + 1) // 2
+ if syms[mid][0] <= pc:
+ lo = mid
+ else:
+ hi = mid - 1
+ return syms[lo][2] == mod_name
+ # Fallback: if no module symbols (kptr_restrict), use .text start
+ return mod_text_start and pc >= mod_text_start
+
+ if syms or mod_text_start:
+ if args.context > 0:
+ module_indices = set()
+ for i, r in enumerate(records):
+ if is_module_pc(r["pc"]):
+ for j in range(max(0, i - args.context),
+ min(len(records), i + args.context + 1)):
+ module_indices.add(j)
+ records = [records[i] for i in sorted(module_indices)]
+ print(f"# showing {len(records)} records with
context={args.context} "
+ f"around {mod_name}\n")
+ else:
+ module_records = [r for r in records if is_module_pc(r["pc"])]
+ print(f"# {len(module_records)} from {mod_name}\n")
+ records = module_records
+ else:
+ print("")
+
+ if args.raw:
+ print_raw(records, syms)
+ else:
+ print_tree(records, syms)
+
+ os.close(df_fd)
+
+
+if __name__ == "__main__":
+ main()
--
2.43.0