This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new f0b1c66  Add a route to show an SBOM validation report
f0b1c66 is described below

commit f0b1c66c90b67a1057efae8ac2b4d8989e505aac
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Aug 25 19:59:24 2025 +0100

    Add a route to show an SBOM validation report
---
 atr/models/results.py |  47 ++++++++++++++++++++-
 atr/models/sql.py     |   1 +
 atr/routes/modules.py |   1 +
 atr/routes/sbom.py    | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++
 atr/tasks/__init__.py |  20 +++++++++
 atr/tasks/sbom.py     |  50 ++++++++++++++++++++++
 6 files changed, 230 insertions(+), 1 deletion(-)

diff --git a/atr/models/results.py b/atr/models/results.py
index 0f9bee0..83e19fe 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -46,6 +46,51 @@ class SBOMGenerateCycloneDX(schema.Strict):
     msg: str = schema.description("The message from the SBOM generation")
 
 
+class SbomQsScore(schema.Strict):
+    category: str
+    feature: str
+    score: float | int
+    max_score: float | int
+    description: str
+    ignored: bool
+
+
+class SbomQsFile(schema.Strict):
+    file_name: str
+    spec: str
+    spec_version: str
+    file_format: str
+    avg_score: float | int
+    num_components: int
+    creation_time: str
+    gen_tool_name: str
+    gen_tool_version: str
+    scores: list[SbomQsScore]
+
+
+class SbomQsCreationInfo(schema.Strict):
+    name: str
+    version: str
+    scoring_engine_version: str
+    vendor: str
+
+
+class SbomQsReport(schema.Strict):
+    run_id: str
+    timestamp: str
+    creation_info: SbomQsCreationInfo
+    files: list[SbomQsFile]
+
+
+class SBOMQsScoreResult(schema.Strict):
+    kind: Literal["sbom_qs_score"] = schema.Field(alias="kind")
+    project_name: str = schema.description("Project name")
+    version_name: str = schema.description("Version name")
+    revision_number: str = schema.description("Revision number")
+    file_path: str = schema.description("Relative path to the scored SBOM 
file")
+    report: SbomQsReport
+
+
 class SvnImportFiles(schema.Strict):
     """Result of the task to import files from SVN."""
 
@@ -66,7 +111,7 @@ class VoteInitiate(schema.Strict):
 
 
 Results = Annotated[
-    HashingCheck | MessageSend | SBOMGenerateCycloneDX | SvnImportFiles | 
VoteInitiate,
+    HashingCheck | MessageSend | SBOMGenerateCycloneDX | SBOMQsScoreResult | 
SvnImportFiles | VoteInitiate,
     schema.Field(discriminator="kind"),
 ]
 
diff --git a/atr/models/sql.py b/atr/models/sql.py
index e38774b..96f31cc 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -182,6 +182,7 @@ class TaskType(str, enum.Enum):
     PATHS_CHECK = "paths_check"
     RAT_CHECK = "rat_check"
     SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
+    SBOM_QS_SCORE = "sbom_qs_score"
     SIGNATURE_CHECK = "signature_check"
     SVN_IMPORT_FILES = "svn_import_files"
     TARGZ_INTEGRITY = "targz_integrity"
diff --git a/atr/routes/modules.py b/atr/routes/modules.py
index c0fc7bf..a63a4f6 100644
--- a/atr/routes/modules.py
+++ b/atr/routes/modules.py
@@ -35,6 +35,7 @@ import atr.routes.report as report
 import atr.routes.resolve as resolve
 import atr.routes.revisions as revisions
 import atr.routes.root as root
+import atr.routes.sbom as sbom
 import atr.routes.start as start
 import atr.routes.tokens as tokens
 import atr.routes.upload as upload
diff --git a/atr/routes/sbom.py b/atr/routes/sbom.py
new file mode 100644
index 0000000..53bb499
--- /dev/null
+++ b/atr/routes/sbom.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asfquart.base as base
+import htpy
+
+import atr.db as db
+import atr.htm as htm
+import atr.models.results as results
+import atr.models.sql as sql
+import atr.routes as routes
+import atr.template as template
+
+
[email protected]("/sbom/report/<project>/<version>/<path:file_path>")
+async def report(session: routes.CommitterSession, project: str, version: str, 
file_path: str) -> str:
+    await session.check_access(project)
+    await session.release(project, version)
+    async with db.session() as data:
+        via = sql.validate_instrumented_attribute
+        tasks = (
+            await data.task(
+                project_name=project,
+                version_name=version,
+                task_type=sql.TaskType.SBOM_QS_SCORE,
+                status=sql.TaskStatus.COMPLETED,
+                primary_rel_path=file_path,
+            )
+            .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
+            .all()
+        )
+    if not tasks:
+        raise base.ASFQuartException("SBOM score not found", errorcode=404)
+    task_result = tasks[0].result
+    if not isinstance(task_result, results.SBOMQsScoreResult):
+        raise base.ASFQuartException("Invalid SBOM score result", 
errorcode=500)
+    report_obj = task_result.report
+
+    block = htm.Block()
+    block.h1["SBOM report"]
+
+    summary_tbody = htpy.tbody[
+        _tr("Run ID", report_obj.run_id),
+        _tr("Timestamp", report_obj.timestamp),
+        _tr("Tool", report_obj.creation_info.name),
+        _tr("Tool version", report_obj.creation_info.version),
+        _tr("Engine version", report_obj.creation_info.scoring_engine_version),
+        _tr("Vendor", report_obj.creation_info.vendor),
+    ]
+    block.h2["Summary"]
+    block.table(".table.table-striped.table-bordered")[summary_tbody]
+
+    block.h2["Files"]
+    for fr in report_obj.files:
+        block.h3[fr.file_name]
+        file_tbody = htpy.tbody[
+            _tr("Spec", fr.spec),
+            _tr("Spec version", fr.spec_version),
+            _tr("Format", fr.file_format),
+            _tr("Avg score", str(fr.avg_score)),
+            _tr("Components", str(fr.num_components)),
+            _tr("Creation time", fr.creation_time),
+            _tr("Generator", fr.gen_tool_name),
+            _tr("Generator version", fr.gen_tool_version),
+        ]
+        block.table(".table.table-striped.table-bordered")[file_tbody]
+
+        header = htpy.thead[
+            htpy.tr[
+                htpy.th["Category"],
+                htpy.th["Feature"],
+                htpy.th["Score"],
+                htpy.th["Max"],
+                htpy.th["Ignored"],
+            ]
+        ]
+        rows = [
+            htpy.tr[
+                htpy.td[s.category],
+                htpy.td[s.feature],
+                htpy.td[str(s.score)],
+                htpy.td[str(s.max_score)],
+                htpy.td["Yes" if s.ignored else "No"],
+            ]
+            for s in fr.scores
+        ]
+        table_block = 
htm.Block(htpy.table(".table.table-striped.table-bordered"))
+        table_block.append(header)
+        table_block.append(htpy.tbody[*rows])
+        block.append(table_block.collect())
+
+    return await template.blank("SBOM report", content=block.collect())
+
+
+def _tr(label: str, value: str) -> htpy.Element:
+    return htpy.tr[htpy.th[label], htpy.td[value]]
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index c1b53c3..2c0f35e 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -79,6 +79,24 @@ async def draft_checks(
                 for task in await task_function(asf_uid, release, 
revision_number, path_str):
                     task.revision_number = revision_number
                     data.add(task)
+            # TODO: Should we check .json files for their content?
+            # Ideally we would not have to do that
+            if path.name.endswith(".cdx.json"):
+                data.add(
+                    queued(
+                        asf_uid,
+                        sql.TaskType.SBOM_QS_SCORE,
+                        release,
+                        revision_number,
+                        path_str,
+                        extra_args={
+                            "project_name": project_name,
+                            "version_name": release_version,
+                            "revision_number": revision_number,
+                            "file_path": path_str,
+                        },
+                    )
+                )
 
         is_podling = False
         if release.project.committee is not None:
@@ -154,6 +172,8 @@ def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results
             return rat.check
         case sql.TaskType.SBOM_GENERATE_CYCLONEDX:
             return sbom.generate_cyclonedx
+        case sql.TaskType.SBOM_QS_SCORE:
+            return sbom.score_qs
         case sql.TaskType.SIGNATURE_CHECK:
             return signature.check
         case sql.TaskType.SVN_IMPORT_FILES:
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index b5e0c1e..a114b93 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -49,6 +49,21 @@ class SBOMGenerationError(Exception):
         self.details = details or {}
 
 
+class SBOMScoringError(Exception):
+    """Raised on a failure to score an SBOM."""
+
+    def __init__(self, msg: str, context: dict[str, Any] | None = None) -> 
None:
+        super().__init__(msg)
+        self.context = context if context is not None else {}
+
+
+class ScoreArgs(schema.Strict):
+    project_name: str = schema.description("Project name")
+    version_name: str = schema.description("Version name")
+    revision_number: str = schema.description("Revision number")
+    file_path: str = schema.description("Relative path to the SBOM file to 
score")
+
+
 @checks.with_model(GenerateCycloneDX)
 async def generate_cyclonedx(args: GenerateCycloneDX) -> results.Results | 
None:
     """Generate a CycloneDX SBOM for the given artifact and write it to the 
output path."""
@@ -67,6 +82,41 @@ async def generate_cyclonedx(args: GenerateCycloneDX) -> 
results.Results | None:
         raise
 
 
[email protected]_model(ScoreArgs)
+async def score_qs(args: ScoreArgs) -> results.Results | None:
+    base_dir = util.get_unfinished_dir() / args.project_name / 
args.version_name / args.revision_number
+    if not os.path.isdir(base_dir):
+        raise SBOMScoringError("Revision directory does not exist", 
{"base_dir": str(base_dir)})
+    full_path = os.path.join(base_dir, args.file_path)
+    if not (full_path.endswith(".cdx.json") and os.path.isfile(full_path)):
+        raise SBOMScoringError("SBOM file does not exist", {"file_path": 
args.file_path})
+    proc = await asyncio.create_subprocess_exec(
+        "sbomqs",
+        "score",
+        os.path.basename(full_path),
+        "--json",
+        cwd=os.path.dirname(full_path),
+        stdout=asyncio.subprocess.PIPE,
+        stderr=asyncio.subprocess.PIPE,
+    )
+    # TODO: Timeout should probably be a lot shorter
+    stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
+    if proc.returncode != 0:
+        raise SBOMScoringError(
+            "sbomqs command failed",
+            {"returncode": proc.returncode, "stderr": stderr.decode("utf-8", 
"ignore")},
+        )
+    report_obj = 
results.SbomQsReport.model_validate(json.loads(stdout.decode("utf-8")))
+    return results.SBOMQsScoreResult(
+        kind="sbom_qs_score",
+        project_name=args.project_name,
+        version_name=args.version_name,
+        revision_number=args.revision_number,
+        file_path=args.file_path,
+        report=report_obj,
+    )
+
+
 async def _generate_cyclonedx_core(artifact_path: str, output_path: str) -> 
dict[str, Any]:
     """Core logic to generate CycloneDX SBOM on failure."""
     log.info(f"Generating CycloneDX SBOM for {artifact_path} -> {output_path}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to