commit:     e5f5f3d588c93317bf86953eb249fd04a71f6003
Author:     Brian Harring <ferringb <AT> gmail <DOT> com>
AuthorDate: Mon Nov 24 14:31:38 2025 +0000
Commit:     Brian Harring <ferringb <AT> gmail <DOT> com>
CommitDate: Mon Nov 24 15:49:35 2025 +0000
URL:        
https://gitweb.gentoo.org/proj/pkgcore/pkgcheck.git/commit/?id=e5f5f3d5

WIP

Signed-off-by: Brian Harring <ferringb <AT> gmail.com>

 tests/scripts/test_pkgcheck_scan.py | 63 +++++++++++++++++++++++++++----------
 1 file changed, 46 insertions(+), 17 deletions(-)

diff --git a/tests/scripts/test_pkgcheck_scan.py 
b/tests/scripts/test_pkgcheck_scan.py
index ca8f37e5..152e1e6a 100644
--- a/tests/scripts/test_pkgcheck_scan.py
+++ b/tests/scripts/test_pkgcheck_scan.py
@@ -1,9 +1,12 @@
+import importlib
+import io
 import os
 import shlex
 import shutil
 import subprocess
 import tempfile
 import textwrap
+import typing
 from collections import defaultdict
 from functools import partial
 from io import StringIO
@@ -11,11 +14,6 @@ from operator import attrgetter
 from unittest.mock import patch
 
 import pytest
-from pkgcheck import __title__ as project
-from pkgcheck import base
-from pkgcheck import checks as checks_mod
-from pkgcheck import const, objects, reporters, scan
-from pkgcheck.scripts import run
 from pkgcore import const as pkgcore_const
 from pkgcore.ebuild import atom, restricts
 from pkgcore.restrictions import packages
@@ -24,6 +22,12 @@ from snakeoil.fileutils import touch
 from snakeoil.formatters import PlainTextFormatter
 from snakeoil.osutils import pjoin
 
+from pkgcheck import __title__ as project
+from pkgcheck import base, const, objects, reporters, scan
+from pkgcheck import checks as checks_mod
+from pkgcheck.results import Result
+from pkgcheck.scripts import run
+
 from ..misc import Profile
 
 
@@ -587,23 +591,45 @@ class TestPkgcheckScan:
         assert len(results) == len(results_set)
         return results_set
 
-    def _get_results(self, path: str):
+    def _load_expected_results(
+        self, path: str
+    ) -> tuple[typing.Callable[[typing.Set[Result]], bool] | None, 
set[Result]]:
         """Return the set of result objects from a given json stream file."""
+        explicit = []
+        custom = None
+        expected_path = self.repos_data / path / "expected.json"
+        custom_path = self.repos_data / path / "custom.py"
         try:
-            with (self.repos_data / path).open() as f:
-                return set(reporters.JsonStream.from_iter(f))
+            with expected_path.open() as f:
+                explicit = list(reporters.JsonStream.from_iter(f))
         except FileNotFoundError:
-            return set()
+            pass
+        except Exception as e:
+            raise Exception(f"failed loading {expected_path}") from e
+        try:
+            with custom_path.open() as f:
+                # we have to use eval since the import pathway isn't valid 
import lookup
+                module = eval(f.read())
+                custom = getattr(module, "handle")
+        except FileNotFoundError:
+            pass
+        except Exception as e:
+            raise Exception(f"failed loading {custom_path}") from e
+        s_explicit = set(explicit)
+        assert len(s_explicit) == len(explicit), f"results of {expected_path} 
are not unique"
+        assert custom is not None or len(explicit), (
+            f"{(self.repos_data / path)!r} contains no expected.json nor 
custom.py"
+        )
+        return custom, set(explicit)
 
-    def _render_results(self, results, **kwargs):
+    def _render_results(self, results, **kwargs) -> str:
         """Render a given set of result objects into their related string 
form."""
-        with tempfile.TemporaryFile() as f:
+        # with tempfile.TemporaryFile() as f:
+        with io.BytesIO() as f:
             with reporters.FancyReporter(out=PlainTextFormatter(f), **kwargs) 
as reporter:
                 for result in sorted(results):
                     reporter.report(result)
-            f.seek(0)
-            output = f.read().decode()
-            return output
+            return f.getvalue().decode()
 
     @pytest.mark.parametrize("repo", repos)
     def test_scan_repo(self, repo, tmp_path_factory):
@@ -615,13 +641,16 @@ class TestPkgcheckScan:
         for check, keywords in self._checks[repo].items():
             for keyword in keywords:
                 # verify the expected results were seen during the repo scans
-                expected_results = 
self._get_results(f"{repo}/{check}/{keyword}/expected.json")
-                assert expected_results, "regular results must always exist"
+                custom_check, expected_results = self._load_expected_results(
+                    f"{repo}/{check}/{keyword}"
+                )
+                if custom_check:
+                    custom_check(scan_results)
                 assert self._render_results(expected_results), "failed 
rendering results"
                 results.update(expected_results)
 
                 # when expected verbose results exist use them, otherwise 
fallback to using the regular ones
-                expected_verbose_results = self._get_results(
+                expected_verbose_results = self._load_expected_results(
                     f"{repo}/{check}/{keyword}/expected-verbose.json"
                 )
                 if expected_verbose_results:

Reply via email to