Hi Piyush.
> This patch adds the bpf-vmtest-tool subdirectory under contrib which tests > BPF programs under a live kernel using a QEMU VM. It can build the > specified kernel version with eBPF support enabled > and stores it under $VMTEST_DIR > > It can also compile BPF C source files or BPF bytecode objects and > test them against the kernel verifier for errors. When a BPF program > is rejected by the kernel verifier, the verifier logs are displayed. The BPF bytecode objects don't get compiled I suppose. > > $ python3 main.py -k 6.15 --bpf-src assets/ebpf-programs/fail.c > BPF program failed to load > Verifier logs: > btf_vmlinux is malformed > 0: R1=ctx() R10=fp0 > 0: (81) r0 = *(s32 *)(r10 +4) > invalid read from stack R10 off=4 size=4 > processed 1 insns (limit 1000000) max_states_per_insn 0 total_states > 0 peak_states 0 mark_read 0 > > See the README for more examples. > > The script uses vmtest (https://github.com/danobi/vmtest) to boot > the VM and run the program. By default, it uses the host's root > ("/") as the VM rootfs via the 9p filesystem, so only the kernel is > replaced during testing. > > Tested with Python 3.9 and above. > > contrib/ChangeLog: > > * bpf-vmtest-tool/README: New file. > * bpf-vmtest-tool/bpf.py: New file. > * bpf-vmtest-tool/config.py: New file. > * bpf-vmtest-tool/kernel.py: New file. > * bpf-vmtest-tool/main.py: New file. > * bpf-vmtest-tool/pyproject.toml: New file. > * bpf-vmtest-tool/tests/test_cli.py: New file. > * bpf-vmtest-tool/utils.py: New file. > * bpf-vmtest-tool/vm.py: New file. > > Signed-off-by: Piyush Raj <[email protected]> > --- > contrib/bpf-vmtest-tool/README | 157 ++++++++++++ > contrib/bpf-vmtest-tool/bpf.py | 221 +++++++++++++++++ > contrib/bpf-vmtest-tool/config.py | 50 ++++ > contrib/bpf-vmtest-tool/kernel.py | 290 ++++++++++++++++++++++ > contrib/bpf-vmtest-tool/main.py | 285 +++++++++++++++++++++ > contrib/bpf-vmtest-tool/pyproject.toml | 36 +++ > contrib/bpf-vmtest-tool/tests/test_cli.py | 219 ++++++++++++++++ > contrib/bpf-vmtest-tool/utils.py | 31 +++ > contrib/bpf-vmtest-tool/vm.py | 169 +++++++++++++ > 9 files changed, 1458 insertions(+) > create mode 100644 contrib/bpf-vmtest-tool/README > create mode 100644 contrib/bpf-vmtest-tool/bpf.py > create mode 100644 contrib/bpf-vmtest-tool/config.py > create mode 100644 contrib/bpf-vmtest-tool/kernel.py > create mode 100644 contrib/bpf-vmtest-tool/main.py > create mode 100644 contrib/bpf-vmtest-tool/pyproject.toml > create mode 100644 contrib/bpf-vmtest-tool/tests/test_cli.py > create mode 100644 contrib/bpf-vmtest-tool/utils.py > create mode 100644 contrib/bpf-vmtest-tool/vm.py > > diff --git a/contrib/bpf-vmtest-tool/README b/contrib/bpf-vmtest-tool/README > new file mode 100644 > index 00000000000..552b2a3e1c8 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/README > @@ -0,0 +1,157 @@ > +BPF vmtest Tool > +=============== > +https://gcc.gnu.org/wiki/BPFRunTimeTests > + > +This directory contains a Python script to run BPF programs or shell commands > +under a live Linux kernel using QEMU virtual machines. > + > +USAGE > +===== > + > +Initial Setup > +------------- > + > +Before using the tool, you must set the directory where vmtest will look for > +kernels and store kernel artifacts. You can do this in two ways: > + > +1. Set the VMTEST_DIR environment variable > +2. Use the --vmtest-dir flag with each command > + > +Note: This is required to use the tool. > + > +Available Options > +----------------- > + > +View all supported commands using the --help option: > + > + usage: main.py [-h] [-v DEBUG|INFO|WARNING|ERROR] [--vmtest-dir DIR] > {bpf,vmtest,kernel} ... > + > + BPF vmtest tool > + > + positional arguments: > + {bpf,vmtest,kernel} Available commands > + bpf BPF program management > + vmtest Run VM tests > + kernel Kernel management > + > + options: > + -h, --help show this help message and exit > + -v DEBUG|INFO|WARNING|ERROR, --log-level DEBUG|INFO|WARNING|ERROR > + Log level > + --vmtest-dir DIR Directory for vmtest artifacts (or set > VMTEST_DIR env variable) > + > + Examples: > + # Compile BPF source to bytecode > + main.py bpf compile my_prog.bpf.c -o my_prog.bpf.o > + > + # Run BPF program in VM > + main.py vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c > + > + # List available kernels > + main.py kernel list > + > + > +COMMANDS > +======== > + > +kernel subcommand > +----------------- > + > +You must build a kernel before using it. > + > +Build a kernel: > + > + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel build > 6.16 > + > +The tool will download and build the specified kernel version from > +https://www.kernel.org/pub/linux/kernel and store the build artifacts in > +$VMTEST_DIR/kernels/linux-6.15-x86_64. Specifically, it stores bpftool, > +bzImage-6.15-x86_64, and vmlinux.h, which are used when compiling BPF > programs > +instead of relying on the host system. > + > +List available kernels: > + > + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel list > + > +Remove kernels: > + > + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" kernel remove > 6.15-x86_64 > + > +Note: If the architecture is omitted, the host architecture is assumed. For > +example, "6.15" will be treated as "6.15-x86_64" on an x86_64 system. > + > +vmtest subcommand > +----------------- > + > +Run a shell command inside a live kernel VM: > + > + python main.py vmtest -k 6.15 -r / -c "uname -a" I see that -r/-rootfs is not documented, nor the other options which are specific to subcommands. Can we have a little description of each? > + > +Run a BPF source file in the VM: > + > + python main.py vmtest -k 6.15 --bpf-src fail.c > + > +Run a precompiled BPF object file: > + > + python main.py vmtest -k 6.15 --bpf-obj fail.bpf.o > + > +bpf subcommand > +-------------- > + > +You can compile BPF source to bytecode using the kernel-specific bpftool and > +vmlinux.h stored in $VMTEST_DIR: > + > + python main.py --vmtest-dir="/home/user/.bpf-vmtest-tool" bpf compile > invalid-memory-access.c -k 6.15 -o /tmp/invalid-memory-access.bpf.o > + > + > +LIMITATIONS > +=========== > + > +- Only x86_64 architecture is currently supported > +- Only "/" (the root filesystem) is currently supported as the VM rootfs > when > + running or testing BPF programs using --bpf-src or --bpf-obj > + > + > +DEPENDENCIES > +============ > + > +- Python >= 3.9 > +- vmtest >= v0.18.0 (https://github.com/danobi/vmtest) > + - QEMU > + - qemu-guest-agent > + > +For compiling kernels: > +- pahole > +- https://docs.kernel.org/process/changes.html#current-minimal-requirements > + > +For compiling and loading BPF programs: > +- libbpf > +- gcc-bpf-unknown-none > (https://gcc.gnu.org/wiki/BPFBackEnd#Where_to_find_GCC_BPF) So libbpf is still a host dependency. Would it be possible to use the libbpf of the built kernel instead? Much like bpftool. > + > + > +BUILD FLAGS > +=========== > + > +You can customize compiler settings using environment variables: > + > +- BPF_CC: Compiler for the BPF program (default: > bpf-unknown-none-gcc) > +- BPF_CFLAGS: Extra flags for BPF program compilation (default: "-O2") > +- BPF_INCLUDES: Include paths for BPF (default: "-I/usr/local/include > -I/usr/include") > +- VMTEST_CC: Compiler for the user-space loader (default: gcc) > +- VMTEST_CFLAGS: Flags for compiling the loader (default: "-g -Wall") > +- VMTEST_LDFLAGS: Linker flags for the loader (default: "-lelf -lz -lbpf") > + > +Example usage: > + > + BPF_CFLAGS="-O3 -g" BPF_CC="/bpf-gcc-build/gcc/xgcc" python main.py > vmtest -k 6.15 --bpf-src fail.c > + > + > +DEVELOPMENT > +=========== > + > +Development dependencies are specified in pyproject.toml, which can be used > +with any suitable Python virtual environment manager. > + > +To run the test suite: > + > + python3 -m pytest > \ No newline at end of file Please add a newline character here. > diff --git a/contrib/bpf-vmtest-tool/bpf.py b/contrib/bpf-vmtest-tool/bpf.py > new file mode 100644 > index 00000000000..714cf49abb8 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/bpf.py > @@ -0,0 +1,221 @@ > +import re > +import subprocess > +import logging > +from pathlib import Path > +import sys > +import tempfile > +from typing import Optional > +import utils > +import config > +import os > + > +# Based on the compilation process described in: > +# https://git.sr.ht/~brianwitte/gcc-bpf-example/tree/master/item/Makefile > + > +logger = logging.getLogger(__name__) > + > + > +def generate_sanitized_name(path: Path): > + """generate sanitized c variable name""" > + name = re.sub(r"\W", "_", path.stem) > + if name and name[0].isdigit(): > + name = "_" + name > + return name > + > + > +class BPFProgram: > + tmp_base_dir = tempfile.TemporaryDirectory(prefix="vmtest") > + tmp_base_dir_path = Path(tmp_base_dir.name) > + > + def __init__( > + self, > + source_path: Optional[Path] = None, > + bpf_bytecode_path: Optional[Path] = None, > + use_temp_dir: bool = False, > + ): > + path = source_path or bpf_bytecode_path > + self.name = generate_sanitized_name(path) > + self.build_dir = self.__class__.tmp_base_dir_path / "ebpf_programs" > / self.name > + > + if source_path: > + self.bpf_src = source_path > + self.bpf_obj = self.build_dir / f"{self.name}.bpf.o" > + else: > + self.bpf_obj = bpf_bytecode_path > + self.build_dir.mkdir(parents=True, exist_ok=True) > + self.bpf_skel = self.build_dir / f"{self.name}.skel.h" > + self.loader_src = self.build_dir / f"{self.name}-loader.c" > + self.output = self.build_dir / f"{self.name}.o" > + > + @classmethod > + def from_source(cls, source_path: Path, kernel_spec): > + self = cls(source_path=source_path) > + self._compile_bpf(kernel_spec) > + self._compile_from_bpf_bytecode(kernel_spec) > + return self.output > + > + @classmethod > + def from_bpf_obj(cls, obj_path: Path, kernel_spec): > + self = cls(bpf_bytecode_path=obj_path) > + self._compile_from_bpf_bytecode(kernel_spec) > + return self.output > + > + def compile_bpf(self, kernel_spec) -> Path: > + if self.bpf_src is None: > + raise ValueError( > + "Cannot compile BPF source: instance was created with " > + "bpf_bytecode_path instead of source_path" > + ) > + self._compile_bpf(kernel_spec) > + return self.bpf_obj > + > + def _compile_from_bpf_bytecode(self, kernel_spec): > + self._generate_skeleton(kernel_spec) > + self._compile_loader() > + > + def _compile_bpf(self, kernel_spec): > + """Compile the eBPF program using gcc""" > + logger.info(f"Compiling eBPF source: {self.bpf_src}") > + cmd = [ > + config.config.bpf_cc, > + f"-D__TARGET_ARCH_{config.config.arch}", > + "-gbtf", > + "-std=gnu17", > + ] > + cmd.append(f"-I{kernel_spec.vmlinux_path.parent}") > + cmd.extend(config.config.bpf_cflags.split(" ")) > + cmd.extend(config.config.bpf_includes.split(" ")) > + cmd.extend( > + [ > + "-c", > + str(self.bpf_src), > + "-o", > + str(self.bpf_obj), > + ] > + ) > + # remove variables that conflict with host compiler > + clean_env = os.environ.copy() > + clean_env.pop("GCC_EXEC_PREFIX", None) > + logger.debug("".join(cmd)) > + try: > + utils.run_command(cmd, env=clean_env) > + except subprocess.CalledProcessError as e: > + logger.error(f"bpf compilation failed: {e}") > + sys.exit(1) > + logger.info(f"eBPF compiled: {self.bpf_obj}") > + > + def _generate_skeleton(self, kernel_spec): > + """Generate the BPF skeleton header using bpftool""" > + logger.info(f"Generating skeleton: {self.bpf_skel}") > + cmd = [ > + kernel_spec.bpftool_path, > + "gen", > + "skeleton", > + str(self.bpf_obj), > + "name", > + self.name, > + ] > + try: > + result = utils.run_command(cmd) > + with open(self.bpf_skel, "w") as f: > + f.write(result.stdout) > + logger.info("Skeleton generated.") > + except subprocess.CalledProcessError: > + logger.error("Failed to generate skeleton.") > + sys.exit(1) > + > + def _compile_loader(self): > + """Compile the C loader program""" > + self.generate_loader() > + logger.info(f"Compiling loader: {self.loader_src}") > + cmd = [ > + config.config.vmtest_cc, > + *config.config.vmtest_cflags.split(" "), > + "-I", > + str(self.build_dir), > + str(self.loader_src), > + *config.config.vmtest_ldflags.split(" "), > + "-o", > + str(self.output), > + ] > + # remove variables that conflict with host compiler > + clean_env = os.environ.copy() > + clean_env.pop("GCC_EXEC_PREFIX", None) > + try: > + utils.run_command(cmd, env=clean_env) > + except subprocess.CalledProcessError as e: > + logger.error(f"bpf loader compilation failed: {e}") > + sys.exit(1) > + > + logger.info("Compilation complete") > + > + def generate_loader(self): > + """ > + Generate a loader C file for the given BPF skeleton. > + > + Args: > + bpf_name (str): Name of the BPF program (e.g. "prog"). > + output_path (str): Path to write loader.c. > + """ > + skeleton_header = f"{self.name}.skel.h" > + loader_code = f"""\ > + #include <stdio.h> > + #include <stdlib.h> > + #include <signal.h> > + #include <unistd.h> > + #include <bpf/libbpf.h> > + #include "{skeleton_header}" > + > + #define LOG_BUF_SIZE 1024 * 1024 > + > + static volatile sig_atomic_t stop; > + static char log_buf[LOG_BUF_SIZE]; > + > + void handle_sigint(int sig) {{ > + stop = 1; > + }} > + > + int main() {{ > + struct {self.name} *skel; > + struct bpf_program *prog; > + int err; > + > + signal(SIGINT, handle_sigint); > + > + skel = {self.name}__open(); // STEP 1: open only > + if (!skel) {{ > + fprintf(stderr, "Failed to open BPF skeleton\\n"); > + return 1; > + }} > + > + // STEP 2: Get the bpf_program object for the main program > + bpf_object__for_each_program(prog, skel->obj) {{ > + bpf_program__set_log_buf(prog, log_buf, sizeof(log_buf)); > + bpf_program__set_log_level(prog, 1); // optional: verbose > logs > + }} > + > + // STEP 3: Load the program (this will trigger verifier log > output) > + err = {self.name}__load(skel); > + fprintf( > + stderr, > + "--- Verifier log start ---\\n" > + "%s\\n" > + "--- Verifier log end ---\\n", > + log_buf > + ); > + if (err) {{ > + fprintf(stderr, "Failed to load BPF skeleton: %d\\n", err); > + {self.name}__destroy(skel); > + return 1; > + }} > + > + printf("BPF program loaded successfully.\\n"); > + > + {self.name}__destroy(skel); > + return 0; > + }} > + > + """ > + with open(self.loader_src, "w") as f: > + f.write(loader_code) > + logger.info(f"Generated loader at {self.loader_src}") > diff --git a/contrib/bpf-vmtest-tool/config.py > b/contrib/bpf-vmtest-tool/config.py > new file mode 100644 > index 00000000000..5f94f8f69bf > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/config.py > @@ -0,0 +1,50 @@ > +import platform > +from pathlib import Path > +import os > +from dataclasses import dataclass > + > + > +@dataclass > +class VMTestConfig: > + """Configuration for BPF vmtest tool""" > + > + vmtest_dir: Path > + kernel_tarball_url: str = "https://cdn.kernel.org/pub/linux/kernel/" > + arch: str = platform.machine() > + vmtest_cc: str = os.getenv("VMTEST_CC", "gcc") > + vmtest_cflags: str = os.getenv("VMTEST_CFLAGS", "-g -Wall -Werror ") > + vmtest_ldflags: str = os.getenv("VMTEST_LDFLAGS", "-lelf -lz -lbpf") > + bpf_cc: str = os.getenv("BPF_CC", "bpf-unknown-none-gcc") > + bpf_cflags: str = os.getenv("BPF_CFLAGS", "-O2 -Wall -Werror") > + bpf_includes: str = os.getenv("BPF_INCLUDES", "-I/usr/local/include > -I/usr/include") > + > + @property > + def kconfig_rel_paths(self) -> list: > + """Kernel config paths relative to kernel directory""" > + return [ > + "tools/testing/selftests/bpf/config", > + "tools/testing/selftests/bpf/config.vm", > + f"tools/testing/selftests/bpf/config.{self.arch}", > + ] > + > + @property > + def kernels_dir(self) -> Path: > + """Get kernels directory""" > + return self.vmtest_dir / "kernels" > + > + def __post_init__(self): > + """Validate vmtest_dir exists""" > + if not self.vmtest_dir.exists(): > + raise ValueError(f"VMTEST_DIR does not exist: {self.vmtest_dir}") > + if not self.vmtest_dir.is_dir(): > + raise ValueError(f"VMTEST_DIR is not a directory: > {self.vmtest_dir}") > + > + > +# Global config instance > +config = None > + > + > +def init_config(vmtest_dir: str): > + """Initialize global config""" > + global config > + config = VMTestConfig(vmtest_dir=Path(vmtest_dir)) > diff --git a/contrib/bpf-vmtest-tool/kernel.py > b/contrib/bpf-vmtest-tool/kernel.py > new file mode 100644 > index 00000000000..f60bc859206 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/kernel.py > @@ -0,0 +1,290 @@ > +import logging > +import os > +import shutil > +import subprocess > +from pathlib import Path > +import re > +import sys > +from urllib.parse import urljoin > +from urllib.request import urlretrieve > +from typing import Optional, List > +from dataclasses import dataclass > + > +import config > +import utils > + > +logger = logging.getLogger(__name__) > + > + > +@dataclass > +class KernelSpec: > + """Immutable kernel specification""" > + > + version: str > + arch: str | None = None > + > + def __post_init__(self): > + if self.arch is None or self.arch == "": > + self.arch = config.config.arch > + self.major = self.version.split(".")[0] > + > + def __str__(self): > + return f"{self.version}-{self.arch}" > + > + @property > + def kernel_build_dir(self) -> Path: > + return config.config.kernels_dir / f"linux-{self}-build" > + > + @property > + def kernel_dir(self) -> Path: > + return config.config.kernels_dir / f"linux-{self}" > + > + @property > + def bzimage_path(self) -> Path: > + return self.kernel_dir / f"bzImage-{self}" > + > + @property > + def bpftool_path(self) -> Path: > + return self.kernel_dir / "bpftool" > + > + @property > + def vmlinux_path(self) -> Path: > + return self.kernel_dir / "vmlinux.h" > + > + @property > + def tarball_path(self) -> Path: > + return config.config.kernels_dir / f"linux-{self}.tar.xz" > + > + > +class KernelImage: > + """Represents a compiled kernel image""" > + > + def __init__(self, path: Path): > + if not isinstance(path, Path): > + path = Path(path) > + > + if not path.exists(): > + raise FileNotFoundError(f"Kernel image not found: {path}") > + > + self.path = path > + > + def __str__(self): > + return str(self.path) > + > + > +class KernelCompiler: > + """Handles complete kernel compilation process including download and > build""" > + > + @staticmethod > + def _progress_hook(block_num: int, block_size: int, total_size: int) -> > None: > + """Progress hook for urlretrieve to display download progress""" > + if total_size <= 0: > + return > + > + downloaded = block_num * block_size > + percent = min(downloaded * 100 // total_size, 100) > + bar_length = 10 > + filled = int(bar_length * downloaded // total_size) > + bar = "#" * filled + "-" * (bar_length - filled) > + > + if logger.getEffectiveLevel() <= logging.INFO: > + downloaded_mb = downloaded / (1024 * 1024) > + total_mb = total_size / (1024 * 1024) > + > + if sys.stdout.isatty(): > + sys.stdout.write( > + f"\rDownloading: |{bar}| {percent}% > ({downloaded_mb:.2f}MB / {total_mb:.2f}MB)" > + ) > + sys.stdout.flush() > + if downloaded >= total_size: > + sys.stdout.write("\n") > + sys.stdout.flush() > + else: > + if downloaded >= total_size or downloaded % (block_size * > 100) == 0: > + logger.info( > + f"Downloading: {percent}% ({downloaded_mb:.2f}MB / > {total_mb:.2f}MB)" > + ) > + > + def compile_from_version(self, spec: KernelSpec) -> KernelImage: > + """Complete compilation process from kernel version""" > + if spec.bzimage_path.exists(): > + logger.info(f"Kernel {spec} already exists, skipping > compilation") > + return KernelImage(spec.bzimage_path) > + > + try: > + self._download_source(spec) > + self._extract_source(spec) > + self._configure_kernel(spec) > + self._compile_kernel(spec) > + self._copy_bzimage(spec) > + # generate vmlinux.h from the kernel by starign the vm from the > build kernel > + > + logger.info(f"Successfully compiled kernel {spec}") > + return KernelImage(spec.bzimage_path) > + > + except Exception as e: > + logger.error(f"Failed to compile kernel {spec}: {e}") > + sys.exit(1) > + finally: > + # Always cleanup temporary files > + self._cleanup(spec) > + > + def _download_source(self, spec: KernelSpec) -> None: > + """Download kernel source tarball""" > + if spec.tarball_path.exists(): > + logger.info(f"Tarball already exists: {spec.tarball_path}") > + return > + > + url_suffix = f"v{spec.major}.x/linux-{spec.version}.tar.xz" > + url = urljoin(config.config.kernel_tarball_url, url_suffix) > + > + logger.info(f"Downloading kernel from {url}") > + spec.tarball_path.parent.mkdir(parents=True, exist_ok=True) > + urlretrieve(url, spec.tarball_path, reporthook=self._progress_hook) > + logger.info("Kernel source downloaded") > + > + def _extract_source(self, spec: KernelSpec) -> None: > + """Extract kernel source tarball""" > + logger.info(f"Extracting kernel source to {spec.kernel_build_dir}") > + spec.kernel_build_dir.mkdir(parents=True, exist_ok=True) > + > + utils.run_command( > + [ > + "tar", > + "-xf", > + str(spec.tarball_path), > + "-C", > + str(spec.kernel_build_dir), > + "--strip-components=1", > + ] > + ) > + > + def _configure_kernel(self, spec: KernelSpec) -> None: > + """Configure kernel with provided config files""" > + config_path = spec.kernel_build_dir / ".config" > + > + with open(config_path, "wb") as kconfig: > + for config_rel_path in config.config.kconfig_rel_paths: > + config_abs_path = spec.kernel_build_dir / config_rel_path > + if config_abs_path.exists(): > + with open(config_abs_path, "rb") as conf: > + kconfig.write(conf.read()) > + > + logger.info("Updated kernel configuration") > + > + def _compile_kernel(self, spec: KernelSpec) -> None: > + """Compile the kernel""" > + logger.info(f"Compiling kernel in {spec.kernel_build_dir}") > + old_cwd = os.getcwd() > + > + try: > + os.chdir(spec.kernel_build_dir) > + # pahole is required for the DEBUG_INFO_BTF kernel configuration > option. > + pahole_path = shutil.which("pahole") > + if pahole_path is None: > + logger.error( > + "pahole not found in PATH. BTF generation requires > pahole v1.16 or later." > + ) > + sys.exit(1) > + friendly_cores = os.cpu_count() - 2 > + utils.run_command(["make", "olddefconfig"], stream_output=True) > + logger.info("Starting kernel compilation") > + utils.run_command( > + ["make", f"-j{friendly_cores}", "bzImage"], > stream_output=True > + ) > + logger.info("Compiling bpftool") > + utils.run_command( > + ["make", "-C", "tools/bpf", f"-j{friendly_cores}", > "bpftool"], > + stream_output=True, > + ) > + except subprocess.CalledProcessError as e: > + logger.error(f"Kernel compilation failed: {e}") > + sys.exit(1) > + finally: > + os.chdir(old_cwd) > + > + def _copy_bzimage(self, spec: KernelSpec) -> None: > + """Copy compiled bzImage to final location""" > + # compile the bpftool as well > + src = spec.kernel_build_dir / "arch/x86/boot/bzImage" > + dest = spec.bzimage_path > + dest.parent.mkdir(parents=True, exist_ok=True) > + > + shutil.copy2(src, dest) > + logger.info(f"Stored bzImage at {dest}") > + > + shutil.copy2( > + spec.kernel_build_dir / "tools/bpf/bpftool/vmlinux.h", > spec.vmlinux_path > + ) > + logger.info(f"Stored vmlinux at {spec.vmlinux_path}") > + > + shutil.copy2( > + spec.kernel_build_dir / "tools/bpf/bpftool/bpftool", > spec.bpftool_path > + ) > + logger.info(f"Stored bpftool at {spec.bpftool_path}") > + > + def _cleanup(self, spec: KernelSpec) -> None: > + """Clean up temporary files""" > + if spec.tarball_path.exists(): > + spec.tarball_path.unlink() > + logger.info("Removed tarball") > + > + if spec.kernel_build_dir.exists(): > + shutil.rmtree(spec.kernel_build_dir) > + logger.info("Removed kernel source directory") > + > + > +class KernelManager: > + """Main interface for kernel management""" > + > + def __init__(self): > + self.compiler = KernelCompiler() > + > + def remove_kernel(self, name: str) -> None: > + """Remove compiled kernel by version""" > + version, _, arch = name.partition("-") > + spec = KernelSpec(version=version, arch=arch) > + if spec.kernel_dir.exists(): > + shutil.rmtree(spec.kernel_dir) > + logger.info(f"Removed kernel {spec}") > + else: > + logger.error( > + f"Kernel {spec} does not exist, path {spec.kernel_dir} not > found" > + ) > + raise SystemExit(1) > + > + def build_kernel(self, version: str, arch=None) -> None: > + """Build kernel from version""" > + > + spec = KernelSpec(version=version, arch=arch) > + self.compiler.compile_from_version(spec) > + > + @staticmethod > + def get_kernel_from_version( > + version: str, > + ): > + """Get kernel image from version""" > + version, _, arch = version.partition("-") > + spec = KernelSpec(version=version, arch=arch) > + if spec.bzimage_path.exists(): > + return spec, KernelImage(spec.bzimage_path) > + else: > + raise FileNotFoundError( > + f"Kernel {spec} not found. Use 'main.py kernel build' to > create it." > + ) > + > + def list_kernels(self) -> List[str]: > + """List all available compiled kernels""" > + if not config.config.kernels_dir.exists(): > + raise FileNotFoundError( > + f"Kernels directory not found: {config.config.kernels_dir}" > + ) > + > + kernels = [] > + for file in config.config.kernels_dir.glob("linux-*"): > + if file.is_dir(): > + match = re.match(r"linux-(.*)", file.name) > + if match: > + kernels.append(match.group(1)) > + > + return sorted(kernels) > diff --git a/contrib/bpf-vmtest-tool/main.py b/contrib/bpf-vmtest-tool/main.py > new file mode 100644 > index 00000000000..43b6036c615 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/main.py > @@ -0,0 +1,285 @@ > +import argparse > +import logging > +from pathlib import Path > +import sys > +import textwrap > +import os > + > +import bpf > +import kernel > +import vm > +import config > + > +logger = logging.getLogger(__name__) > + > + > +def cmd_kernel_list(args): > + """List all available kernels""" > + kmanager = kernel.KernelManager() > + kernels = kmanager.list_kernels() > + if kernels: > + for k in kernels: > + print(k) > + else: > + logger.info("No kernels available") > + > + > +def cmd_kernel_remove(args): > + """Remove a kernel""" > + kmanager = kernel.KernelManager() > + if not args.kernel: > + logger.error("kernel version required for remove action") > + sys.exit(1) > + kmanager.remove_kernel(args.kernel) > + logger.info(f"Kernel {args.kernel} removed") > + print(f"Kernel {args.kernel} removed") > + > + > +def cmd_kernel_build(args): > + """Build a kernel""" > + kmanager = kernel.KernelManager() > + if not args.kernel: > + logger.error("kernel version required for build action") > + sys.exit(1) > + kmanager.build_kernel(version=args.kernel) > + > + > +def cmd_bpf_compile(args): > + """Compile BPF source to bytecode only""" > + kmanager = kernel.KernelManager() > + > + try: > + kernel_spec, _ = > kmanager.get_kernel_from_version(version=args.kernel) > + except Exception as e: > + logger.error(f"Failed to get kernel: {e}") > + sys.exit(1) > + > + try: > + bpf_program = bpf.BPFProgram(source_path=Path(args.bpf_src)) > + output_path = bpf_program.compile_bpf(kernel_spec) > + > + if args.output: > + import shutil > + > + output_dest = Path(args.output) > + shutil.copy2(output_path, output_dest) > + logger.info(f"Copied to: {output_dest}") > + > + except Exception as e: > + logger.error(f"Failed to compile BPF source: {e}") > + sys.exit(1) > + > + > +def cmd_vmtest(args): > + """Handle vmtest subcommand""" > + kmanager = kernel.KernelManager() > + > + try: > + kernel_spec, kernel_image = kmanager.get_kernel_from_version( > + version=args.kernel > + ) > + except Exception as e: > + logger.error(f"Failed to get kernel: {e}") > + sys.exit(1) > + > + try: > + if args.bpf_src: > + command = bpf.BPFProgram.from_source(Path(args.bpf_src), > kernel_spec) > + elif args.bpf_obj: > + command = bpf.BPFProgram.from_bpf_obj(Path(args.bpf_obj), > kernel_spec) > + elif args.command: > + command = args.command > + except Exception as e: > + logger.error(f"Failed to prepare command for vmtest: {e}") > + sys.exit(1) > + > + virtual_machine = vm.VirtualMachine(kernel_image, args.rootfs, > str(command)) > + try: > + result = virtual_machine.execute() > + except vm.BootFailedError as e: > + logger.error(f"VM boot failure: {e}") > + sys.exit(e.returncode) > + > + if args.bpf_src or args.bpf_obj: > + if result.returncode == 0: > + print("BPF programs successfully loaded") > + else: > + if "Failed to load BPF skeleton" in result.stdout: > + print("BPF program failed to load") > + print("Verifier logs:") > + print(textwrap.indent(vm.bpf_verifier_logs(result.stdout), > "\t")) > + elif args.command: > + print(result.stdout) > + > + sys.exit(result.returncode) > + > + > +def main(): > + parser = argparse.ArgumentParser( > + description="BPF vmtest tool", > + formatter_class=argparse.RawDescriptionHelpFormatter, > + epilog=textwrap.dedent(""" > + Examples: > + # Compile BPF source to bytecode > + %(prog)s bpf compile my_prog.bpf.c -o my_prog.bpf.o > + > + # Run BPF program in VM > + %(prog)s vmtest --kernel 6.15-x86_64 --bpf-src my_prog.bpf.c > + > + # List available kernels > + %(prog)s kernel list > + """), > + ) > + > + parser.add_argument( > + "-v", > + "--log-level", > + help="Log level", > + metavar="DEBUG|INFO|WARNING|ERROR", > + choices=["DEBUG", "INFO", "WARNING", "ERROR"], > + default="INFO", > + ) > + > + parser.add_argument( > + "--vmtest-dir", > + help="Directory for vmtest artifacts (or set VMTEST_DIR env > variable)", > + metavar="DIR", > + type=str, > + default=os.getenv("VMTEST_DIR"), > + ) > + > + subparsers = parser.add_subparsers(dest="subcommand", help="Available > commands") > + > + # BPF subcommand > + bpf_subparser = subparsers.add_parser("bpf", help="BPF program > management") > + bpf_subparsers = bpf_subparser.add_subparsers(dest="bpf_action", > help="BPF actions") > + > + # bpf compile subcommand > + compile_parser = bpf_subparsers.add_parser( > + "compile", help="Compile BPF source to bytecode (.bpf.o)" > + ) > + compile_parser.add_argument( > + "bpf_src", > + help="Path to BPF C source file", > + type=str, > + ) > + compile_parser.add_argument( > + "-o", > + "--output", > + help="Output path for compiled bytecode (optional, defaults to temp > dir)", > + metavar="PATH", > + type=str, > + required=True, > + ) > + compile_parser.add_argument( > + "-k", > + "--kernel", > + help="Kernel version to use for compilation", > + metavar="VERSION", > + type=str, > + required=True, > + ) > + > + compile_parser.set_defaults(func=cmd_bpf_compile) > + > + # VMtest subcommand > + vmtest_parser = subparsers.add_parser("vmtest", help="Run VM tests") > + vmtest_parser.set_defaults(func=cmd_vmtest) > + > + vmtest_parser.add_argument( > + "-k", > + "--kernel", > + help="Kernel version to boot in the vm", > + metavar="VERSION", > + type=str, > + required=True, > + ) > + vmtest_parser.add_argument( > + "-r", "--rootfs", help="rootfs to mount in the vm", default="/", > metavar="PATH" > + ) > + command_group = vmtest_parser.add_mutually_exclusive_group(required=True) > + command_group.add_argument( > + "--bpf-src", > + help="Path to BPF C source file", > + metavar="PATH", > + type=str, > + ) > + command_group.add_argument( > + "--bpf-obj", > + help="Path to bpf bytecode object", > + metavar="PATH", > + type=str, > + ) > + command_group.add_argument( > + "-c", "--command", help="command to run in the vm", metavar="COMMAND" > + ) > + > + # Kernel subcommand with nested subcommands > + kernel_subparser = subparsers.add_parser("kernel", help="Kernel > management") > + kernel_subparsers = kernel_subparser.add_subparsers( > + dest="kernel_action", help="Kernel actions" > + ) > + > + # kernel list subcommand > + list_parser = kernel_subparsers.add_parser( > + "list", help="List all available kernels" > + ) > + list_parser.set_defaults(func=cmd_kernel_list) > + > + # kernel remove subcommand > + remove_parser = kernel_subparsers.add_parser("remove", help="Remove a > kernel") > + remove_parser.add_argument( > + "kernel", help="Kernel version to remove (e.g., 6.15-x86_64)" > + ) > + remove_parser.set_defaults(func=cmd_kernel_remove) > + > + # kernel build subcommand > + build_parser = kernel_subparsers.add_parser("build", help="Build a > kernel") > + build_parser.add_argument( > + "kernel", help="Kernel version to build (e.g. 6.15-x86_64)" > + ) > + build_parser.set_defaults(func=cmd_kernel_build) > + > + args = parser.parse_args() > + logging.basicConfig(level=args.log_level, format="%(levelname)s: > %(message)s") > + > + if not args.vmtest_dir: > + logger.error( > + "VMTEST_DIR not specified. Use --vmtest-dir=DIR or set > VMTEST_DIR environment variable" > + ) > + sys.exit(1) > + > + vmtest_path = Path(args.vmtest_dir) > + if not vmtest_path.exists(): > + logger.error(f"VMTEST_DIR does not exist: {vmtest_path}") > + sys.exit(1) > + > + if not vmtest_path.is_dir(): > + logger.error(f"VMTEST_DIR is not a directory: {vmtest_path}") > + sys.exit(1) > + > + try: > + config.init_config(vmtest_dir=args.vmtest_dir) > + except ValueError as e: > + logger.error(str(e)) > + sys.exit(1) > + > + logger.debug(f"VMTEST_DIR set to: {args.vmtest_dir}") > + > + if hasattr(args, "func"): > + args.func(args) > + sys.exit(0) > + else: > + parser.print_help() > + sys.exit(1) > + > + > +if __name__ == "__main__": > + try: > + main() > + except KeyboardInterrupt: > + logger.error("Operation cancelled by user") > + sys.exit(1) > + except Exception as e: > + logger.error(f"Unknown error: {e}") > + sys.exit(1) > diff --git a/contrib/bpf-vmtest-tool/pyproject.toml > b/contrib/bpf-vmtest-tool/pyproject.toml > new file mode 100644 > index 00000000000..1977612cfd6 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/pyproject.toml > @@ -0,0 +1,36 @@ > +[project] > +name = "bpf-vmtest-tool" > +version = "0.1.0" > +description = "Test BPF code against live kernels" > +readme = "README.md" > +requires-python = ">=3.9" > +dependencies = [] > + > +[dependency-groups] > +dev = [ > + "pre-commit>=4.2.0", > + "pytest>=8.4.0", > + "pytest-sugar>=1.0.0", > + "ruff>=0.11.13", > + "tox>=4.26.0", > +] > + > +[tool.pytest.ini_options] > +addopts = [ > + "--import-mode=importlib", > +] > +testpaths = ["tests"] > +pythonpath = ["."] > + > +[tool.ruff.lint] > +select = [ > + # pycodestyle > + "E", > + # Pyflakes > + "F", > +] > +# Allow fix for all enabled rules (when `--fix`) is provided. > +fixable = ["ALL"] > + > +# Allow unused variables when underscore-prefixed. > +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" > \ No newline at end of file > diff --git a/contrib/bpf-vmtest-tool/tests/test_cli.py > b/contrib/bpf-vmtest-tool/tests/test_cli.py > new file mode 100644 > index 00000000000..d9a01328bf2 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/tests/test_cli.py > @@ -0,0 +1,219 @@ > +import sys > +from unittest import mock > +import pytest > +from bpf import BPFProgram > +import kernel > +import main > +import logging > +import config > +import os > + > +logger = logging.getLogger(__name__) > + > + > [email protected] > +def tmp_config(): > + VMTEST_DIR = "/home/d3bug/.bpf-vmtest-tool" > + assert VMTEST_DIR is not None, "Specify VMTEST_DIR environment varible" > + config.init_config(vmtest_dir=VMTEST_DIR) > + > + > +# reset config for every test > [email protected](autouse=True) > +def reset_config(): > + config.config = None > + > + > [email protected] > +def openat_bpf_source(tmp_path): > + openat_bpf = tmp_path / "openat_bpf.c" > + openat_bpf.write_text(r""" > + #include "vmlinux.h" > + #include <bpf/bpf_helpers.h> > + #include <bpf/bpf_tracing.h> > + #include <bpf/bpf_core_read.h> > + > + char LICENSE[] SEC("license") = "GPL"; > + > + int example_pid = 0; > + > + SEC("tracepoint/syscalls/sys_enter_openat") > + int handle_openat(struct trace_event_raw_sys_enter *ctx) > + { > + int pid = bpf_get_current_pid_tgid() >> 32; > + char filename[256]; // filename buffer > + bpf_probe_read_user(&filename, sizeof(filename), (void > *)ctx->args[1]); > + bpf_printk("sys_enter_openat() called from PID %d for file: %s\n", > pid, > + filename); > + > + return 0; > + } > + > + """) > + return openat_bpf > + > + > [email protected] > +def openat_bpf_obj(openat_bpf_source, tmp_config): > + def _create_openat_bpf_obj(kernel_spec): > + bpf_program = BPFProgram(source_path=openat_bpf_source) > + bpf_program._compile_bpf(kernel_spec) > + return bpf_program.bpf_obj > + > + return _create_openat_bpf_obj > + > + > [email protected] > +def invalid_memory_access_bpf_source(tmp_path): > + invalid_memory_access_bpf = tmp_path / "invalid_memory_access_bpf.c" > + invalid_memory_access_bpf.write_text(r""" > + #include "vmlinux.h" > + #include <bpf/bpf_helpers.h> > + #include <bpf/bpf_tracing.h> > + > + char LICENSE[] SEC("license") = "GPL"; > + > + SEC("tracepoint/syscalls/sys_enter_openat") > + int bpf_prog(struct trace_event_raw_sys_enter *ctx) { > + int arr[4] = {1, 2, 3, 4}; > + > + // Invalid memory access: out-of-bounds > + int val = arr[5]; // This causes the verifier to fail > + > + return val; > + } > + """) > + return invalid_memory_access_bpf > + > + > [email protected] > +def invalid_memory_access_bpf_obj(invalid_memory_access_bpf_source, > tmp_config): > + def _create_invalid_memory_access_bpf_obj(kernel_spec): > + bpf_program = > BPFProgram(source_path=invalid_memory_access_bpf_source) > + bpf_program._compile_bpf(kernel_spec) > + return bpf_program.bpf_obj > + > + return _create_invalid_memory_access_bpf_obj > + > + > +def run_main_with_args_and_capture_output(args, capsys): > + with mock.patch.object(sys, "argv", args): > + try: > + main.main() > + except SystemExit as e: > + result = capsys.readouterr() > + output = result.out.rstrip() > + error = result.err.rstrip() > + logger.debug("STDOUT:\n%s", output) > + logger.debug("STDERR:\n%s", error) > + return (e.code, output, error) > + except Exception as e: > + pytest.fail(f"Unknown error happend: {e}") > + else: > + pytest.fail("Expected main to raise SystemExit") > + > + > +KERNEL_VERSION = "6.16" > +kernel_cli_flags = [["--kernel", KERNEL_VERSION]] > + > + > [email protected]("kernel_args", kernel_cli_flags) > +class TestCLI: > + def test_main_with_valid_bpf(self, kernel_args, openat_bpf_source, > capsys): > + args = [ > + "main.py", > + "vmtest", > + *kernel_args, > + "--rootfs", > + "/", > + "--bpf-src", > + str(openat_bpf_source), > + ] > + code, output, _ = run_main_with_args_and_capture_output(args, capsys) > + assert code == 0 > + assert "BPF programs successfully loaded" == output > + > + def test_main_with_valid_bpf_obj(self, kernel_args, openat_bpf_obj, > capsys): > + args = [ > + "main.py", > + "vmtest", > + *kernel_args, > + "--rootfs", > + "/", > + "--bpf-obj", > + str(openat_bpf_obj(kernel.KernelSpec(kernel_args[1]))), > + ] > + code, output, _ = run_main_with_args_and_capture_output(args, capsys) > + assert code == 0 > + assert "BPF programs successfully loaded" == output > + > + def test_main_with_invalid_bpf( > + self, kernel_args, invalid_memory_access_bpf_source, capsys > + ): > + args = [ > + "main.py", > + "vmtest", > + *kernel_args, > + "--rootfs", > + "/", > + "--bpf-src", > + str(invalid_memory_access_bpf_source), > + ] > + code, output, _ = run_main_with_args_and_capture_output(args, capsys) > + output_lines = output.splitlines() > + assert code == 1 > + assert "BPF program failed to load" == output_lines[0] > + assert "Verifier logs:" == output_lines[1] > + > + def test_main_with_invalid_bpf_obj( > + self, kernel_args, invalid_memory_access_bpf_obj, capsys > + ): > + args = [ > + "main.py", > + "vmtest", > + *kernel_args, > + "--rootfs", > + "/", > + "--bpf-obj", > + > str(invalid_memory_access_bpf_obj(kernel.KernelSpec(kernel_args[1]))), > + ] > + code, output, _ = run_main_with_args_and_capture_output(args, capsys) > + output_lines = output.splitlines() > + assert code == 1 > + assert "BPF program failed to load" == output_lines[0] > + assert "Verifier logs:" == output_lines[1] > + > + def test_main_with_valid_command(self, kernel_args, capsys): > + args = ["main.py", "vmtest", *kernel_args, "--rootfs", "/", "-c", > "uname -r"] > + code, output, _ = run_main_with_args_and_capture_output(args, capsys) > + assert code == 0 > + assert f"{kernel_args[1]}.0" in output > + > + def test_main_with_invalid_command(self, kernel_args, capsys): > + args = [ > + "main.py", > + "vmtest", > + *kernel_args, > + "--rootfs", > + "/", > + "-c", > + "NotImplemented", > + ] > + code, output, error = run_main_with_args_and_capture_output(args, > capsys) > + assert code != 0 > + assert f"Command failed with exit code: {code}" in output > + > + def test_bpf_compile_subcommand( > + self, kernel_args, openat_bpf_source, tmp_path, capsys > + ): > + args = [ > + "main.py", > + "bpf", > + "compile", > + *kernel_args, > + "-o", > + "", > + str(openat_bpf_source), > + ] > + code, _, _ = run_main_with_args_and_capture_output(args, capsys) > + assert code == 0 > diff --git a/contrib/bpf-vmtest-tool/utils.py > b/contrib/bpf-vmtest-tool/utils.py > new file mode 100644 > index 00000000000..682840556f1 > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/utils.py > @@ -0,0 +1,31 @@ > +import subprocess > +import logging > +from typing import Any > + > +logger = logging.getLogger(__name__) > + > + > +def run_command(cmd: list[str], stream_output: bool = False, **kwargs: Any): > + cleaned_cmd = [str(item) for item in cmd if str(item).strip()] > + capture_cmd_output = not stream_output > + try: > + logger.debug(f"running command: {cleaned_cmd}") > + result = subprocess.run( > + cleaned_cmd, > + text=True, > + check=True, > + capture_output=capture_cmd_output, > + shell=False, > + **kwargs, > + ) > + if capture_cmd_output: > + logger.debug("Command stdout:\n" + result.stdout.strip()) > + if result.stderr: > + logger.debug("Command stderr:\n" + result.stderr.strip()) > + return result > + except subprocess.CalledProcessError as e: > + logger.error(e) > + if capture_cmd_output: > + logger.error("Command failed with stdout:\n" + e.stdout.strip()) > + logger.error("Command failed with stderr:\n" + e.stderr.strip()) > + raise > diff --git a/contrib/bpf-vmtest-tool/vm.py b/contrib/bpf-vmtest-tool/vm.py > new file mode 100644 > index 00000000000..ba3bdecf94b > --- /dev/null > +++ b/contrib/bpf-vmtest-tool/vm.py > @@ -0,0 +1,169 @@ > +import logging > +import subprocess > +from typing import List > + > +from kernel import KernelImage > + > +logger = logging.getLogger(__name__) > + > + > +class VMConfig: > + """Configuration container for VM settings""" > + > + def __init__( > + self, kernel_image: KernelImage, rootfs_path: str, command: str, > **kwargs > + ): > + self.kernel = kernel_image > + self.kernel_path = str(kernel_image.path) > + self.rootfs_path = rootfs_path > + self.command = command > + self.memory_mb = kwargs.get("memory_mb", 512) > + self.cpu_count = kwargs.get("cpu_count", 1) > + self.extra_args = kwargs.get("extra_args", {}) > + > + > +def bpf_verifier_logs(output: str) -> str: > + start_tag = "--- Verifier log start ---" > + end_tag = "--- Verifier log end ---" > + > + start_idx = output.find(start_tag) > + end_idx = output.find(end_tag) > + > + if start_idx != -1 and end_idx != -1: > + # Extract between the tags (excluding the markers themselves) > + log_body = output[start_idx + len(start_tag) : end_idx].strip() > + return log_body > + else: > + return "No verifier log found in the output." > + > + > +class Vmtest: > + """vmtest backend implementation""" > + > + def __init__(self): > + pass > + > + def _boot_command(self, vm_config: VMConfig): > + vmtest_command = ["vmtest"] > + vmtest_command.extend(["-k", vm_config.kernel_path]) > + vmtest_command.extend(["-r", vm_config.rootfs_path]) > + vmtest_command.append(vm_config.command) > + return vmtest_command > + > + def _remove_boot_log(self, full_output: str) -> str: > + """ > + Filters QEMU and kernel boot logs, returning only the output after > the > + `===> Running command` marker. > + """ > + marker = "===> Running command" > + lines = full_output.splitlines() > + > + try: > + start_index = next(i for i, line in enumerate(lines) if marker > in line) > + # Return everything after that marker (excluding the marker > itself) > + return "\n".join(lines[start_index + 1 :]).strip() > + except StopIteration: > + return full_output.strip() > + > + def run_command(self, vm_config): > + vm = None > + try: > + logger.info(f"Booting VM with kernel: {vm_config.kernel_path}") > + logger.info(f"Using rootfs: {vm_config.rootfs_path}") > + vm = subprocess.run( > + self._boot_command(vm_config), > + check=True, > + text=True, > + capture_output=True, > + shell=False, > + ) > + vm_stdout = vm.stdout > + logger.debug(vm_stdout) > + return VMCommandResult( > + vm.returncode, self._remove_boot_log(vm_stdout), None > + ) > + except FileNotFoundError: > + raise BootFailedError( > + "vmtest command not found in PATH. Please ensure vmtest is > installed and available in your system PATH." > + ) > + except subprocess.CalledProcessError as e: > + out = e.stdout > + err = e.stderr > + # when the command in the vm fails we consider it as a > successful boot > + if "===> Running command" not in out: > + raise BootFailedError("Boot failed", out, err, e.returncode) > + logger.debug("STDOUT: \n%s", out) > + logger.debug("STDERR: \n%s", err) > + return VMCommandResult(e.returncode, self._remove_boot_log(out), > err) > + > + > +class VMCommandResult: > + def __init__(self, returncode, stdout, stderr) -> None: > + self.returncode = returncode > + self.stdout = stdout > + self.stderr = stderr > + > + > +class VirtualMachine: > + """Main VM class - simple interface for end users""" > + > + # Registry of available hypervisors > + _hypervisors = { > + "vmtest": Vmtest, > + } > + > + def __init__( > + self, > + kernel_image: KernelImage, > + rootfs_path: str, > + command: str, > + hypervisor_type: str = "vmtest", > + **kwargs, > + ): > + self.config = VMConfig(kernel_image, rootfs_path, command, **kwargs) > + > + if hypervisor_type not in self._hypervisors: > + raise ValueError(f"Unsupported hypervisor: {hypervisor_type}") > + > + self.hypervisor = self._hypervisors[hypervisor_type]() > + > + @classmethod > + def list_hypervisors(cls) -> List[str]: > + """List available hypervisors""" > + return list(cls._hypervisors.keys()) > + > + def execute(self): > + """Execute command in VM""" > + return self.hypervisor.run_command(self.config) > + > + > +class BootFailedError(Exception): > + """Raised when VM fails to boot properly (before command execution).""" > + > + def __init__( > + self, message: str, stdout: str = "", stderr: str = "", returncode: > int = -1 > + ): > + super().__init__(message) > + self.stdout = stdout > + self.stderr = stderr > + self.returncode = returncode > + > + def __str__(self): > + base = super().__str__() > + > + output_parts = [ > + base, > + f"Return code: {self.returncode}", > + ] > + > + optional_sections = [ > + ("STDOUT", self.stdout), > + ("STDERR", self.stderr), > + ] > + > + for header, content in optional_sections: > + if content: > + output_parts.append(f"--- {header} ---") > + output_parts.append(content) > + > + return "\n".join(output_parts)
