This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new f0b1c66 Add a route to show an SBOM validation report
f0b1c66 is described below
commit f0b1c66c90b67a1057efae8ac2b4d8989e505aac
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Aug 25 19:59:24 2025 +0100
Add a route to show an SBOM validation report
---
atr/models/results.py | 47 ++++++++++++++++++++-
atr/models/sql.py | 1 +
atr/routes/modules.py | 1 +
atr/routes/sbom.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++++++
atr/tasks/__init__.py | 20 +++++++++
atr/tasks/sbom.py | 50 ++++++++++++++++++++++
6 files changed, 230 insertions(+), 1 deletion(-)
diff --git a/atr/models/results.py b/atr/models/results.py
index 0f9bee0..83e19fe 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -46,6 +46,51 @@ class SBOMGenerateCycloneDX(schema.Strict):
msg: str = schema.description("The message from the SBOM generation")
+class SbomQsScore(schema.Strict):
+ category: str
+ feature: str
+ score: float | int
+ max_score: float | int
+ description: str
+ ignored: bool
+
+
+class SbomQsFile(schema.Strict):
+ file_name: str
+ spec: str
+ spec_version: str
+ file_format: str
+ avg_score: float | int
+ num_components: int
+ creation_time: str
+ gen_tool_name: str
+ gen_tool_version: str
+ scores: list[SbomQsScore]
+
+
+class SbomQsCreationInfo(schema.Strict):
+ name: str
+ version: str
+ scoring_engine_version: str
+ vendor: str
+
+
+class SbomQsReport(schema.Strict):
+ run_id: str
+ timestamp: str
+ creation_info: SbomQsCreationInfo
+ files: list[SbomQsFile]
+
+
+class SBOMQsScoreResult(schema.Strict):
+ kind: Literal["sbom_qs_score"] = schema.Field(alias="kind")
+ project_name: str = schema.description("Project name")
+ version_name: str = schema.description("Version name")
+ revision_number: str = schema.description("Revision number")
+ file_path: str = schema.description("Relative path to the scored SBOM
file")
+ report: SbomQsReport
+
+
class SvnImportFiles(schema.Strict):
"""Result of the task to import files from SVN."""
@@ -66,7 +111,7 @@ class VoteInitiate(schema.Strict):
Results = Annotated[
- HashingCheck | MessageSend | SBOMGenerateCycloneDX | SvnImportFiles |
VoteInitiate,
+ HashingCheck | MessageSend | SBOMGenerateCycloneDX | SBOMQsScoreResult |
SvnImportFiles | VoteInitiate,
schema.Field(discriminator="kind"),
]
diff --git a/atr/models/sql.py b/atr/models/sql.py
index e38774b..96f31cc 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -182,6 +182,7 @@ class TaskType(str, enum.Enum):
PATHS_CHECK = "paths_check"
RAT_CHECK = "rat_check"
SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
+ SBOM_QS_SCORE = "sbom_qs_score"
SIGNATURE_CHECK = "signature_check"
SVN_IMPORT_FILES = "svn_import_files"
TARGZ_INTEGRITY = "targz_integrity"
diff --git a/atr/routes/modules.py b/atr/routes/modules.py
index c0fc7bf..a63a4f6 100644
--- a/atr/routes/modules.py
+++ b/atr/routes/modules.py
@@ -35,6 +35,7 @@ import atr.routes.report as report
import atr.routes.resolve as resolve
import atr.routes.revisions as revisions
import atr.routes.root as root
+import atr.routes.sbom as sbom
import atr.routes.start as start
import atr.routes.tokens as tokens
import atr.routes.upload as upload
diff --git a/atr/routes/sbom.py b/atr/routes/sbom.py
new file mode 100644
index 0000000..53bb499
--- /dev/null
+++ b/atr/routes/sbom.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import asfquart.base as base
+import htpy
+
+import atr.db as db
+import atr.htm as htm
+import atr.models.results as results
+import atr.models.sql as sql
+import atr.routes as routes
+import atr.template as template
+
+
[email protected]("/sbom/report/<project>/<version>/<path:file_path>")
+async def report(session: routes.CommitterSession, project: str, version: str,
file_path: str) -> str:
+ await session.check_access(project)
+ await session.release(project, version)
+ async with db.session() as data:
+ via = sql.validate_instrumented_attribute
+ tasks = (
+ await data.task(
+ project_name=project,
+ version_name=version,
+ task_type=sql.TaskType.SBOM_QS_SCORE,
+ status=sql.TaskStatus.COMPLETED,
+ primary_rel_path=file_path,
+ )
+ .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
+ .all()
+ )
+ if not tasks:
+ raise base.ASFQuartException("SBOM score not found", errorcode=404)
+ task_result = tasks[0].result
+ if not isinstance(task_result, results.SBOMQsScoreResult):
+ raise base.ASFQuartException("Invalid SBOM score result",
errorcode=500)
+ report_obj = task_result.report
+
+ block = htm.Block()
+ block.h1["SBOM report"]
+
+ summary_tbody = htpy.tbody[
+ _tr("Run ID", report_obj.run_id),
+ _tr("Timestamp", report_obj.timestamp),
+ _tr("Tool", report_obj.creation_info.name),
+ _tr("Tool version", report_obj.creation_info.version),
+ _tr("Engine version", report_obj.creation_info.scoring_engine_version),
+ _tr("Vendor", report_obj.creation_info.vendor),
+ ]
+ block.h2["Summary"]
+ block.table(".table.table-striped.table-bordered")[summary_tbody]
+
+ block.h2["Files"]
+ for fr in report_obj.files:
+ block.h3[fr.file_name]
+ file_tbody = htpy.tbody[
+ _tr("Spec", fr.spec),
+ _tr("Spec version", fr.spec_version),
+ _tr("Format", fr.file_format),
+ _tr("Avg score", str(fr.avg_score)),
+ _tr("Components", str(fr.num_components)),
+ _tr("Creation time", fr.creation_time),
+ _tr("Generator", fr.gen_tool_name),
+ _tr("Generator version", fr.gen_tool_version),
+ ]
+ block.table(".table.table-striped.table-bordered")[file_tbody]
+
+ header = htpy.thead[
+ htpy.tr[
+ htpy.th["Category"],
+ htpy.th["Feature"],
+ htpy.th["Score"],
+ htpy.th["Max"],
+ htpy.th["Ignored"],
+ ]
+ ]
+ rows = [
+ htpy.tr[
+ htpy.td[s.category],
+ htpy.td[s.feature],
+ htpy.td[str(s.score)],
+ htpy.td[str(s.max_score)],
+ htpy.td["Yes" if s.ignored else "No"],
+ ]
+ for s in fr.scores
+ ]
+ table_block =
htm.Block(htpy.table(".table.table-striped.table-bordered"))
+ table_block.append(header)
+ table_block.append(htpy.tbody[*rows])
+ block.append(table_block.collect())
+
+ return await template.blank("SBOM report", content=block.collect())
+
+
+def _tr(label: str, value: str) -> htpy.Element:
+ return htpy.tr[htpy.th[label], htpy.td[value]]
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index c1b53c3..2c0f35e 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -79,6 +79,24 @@ async def draft_checks(
for task in await task_function(asf_uid, release,
revision_number, path_str):
task.revision_number = revision_number
data.add(task)
+ # TODO: Should we check .json files for their content?
+ # Ideally we would not have to do that
+ if path.name.endswith(".cdx.json"):
+ data.add(
+ queued(
+ asf_uid,
+ sql.TaskType.SBOM_QS_SCORE,
+ release,
+ revision_number,
+ path_str,
+ extra_args={
+ "project_name": project_name,
+ "version_name": release_version,
+ "revision_number": revision_number,
+ "file_path": path_str,
+ },
+ )
+ )
is_podling = False
if release.project.committee is not None:
@@ -154,6 +172,8 @@ def resolve(task_type: sql.TaskType) -> Callable[...,
Awaitable[results.Results
return rat.check
case sql.TaskType.SBOM_GENERATE_CYCLONEDX:
return sbom.generate_cyclonedx
+ case sql.TaskType.SBOM_QS_SCORE:
+ return sbom.score_qs
case sql.TaskType.SIGNATURE_CHECK:
return signature.check
case sql.TaskType.SVN_IMPORT_FILES:
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index b5e0c1e..a114b93 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -49,6 +49,21 @@ class SBOMGenerationError(Exception):
self.details = details or {}
+class SBOMScoringError(Exception):
+ """Raised on a failure to score an SBOM."""
+
+ def __init__(self, msg: str, context: dict[str, Any] | None = None) ->
None:
+ super().__init__(msg)
+ self.context = context if context is not None else {}
+
+
+class ScoreArgs(schema.Strict):
+ project_name: str = schema.description("Project name")
+ version_name: str = schema.description("Version name")
+ revision_number: str = schema.description("Revision number")
+ file_path: str = schema.description("Relative path to the SBOM file to
score")
+
+
@checks.with_model(GenerateCycloneDX)
async def generate_cyclonedx(args: GenerateCycloneDX) -> results.Results |
None:
"""Generate a CycloneDX SBOM for the given artifact and write it to the
output path."""
@@ -67,6 +82,41 @@ async def generate_cyclonedx(args: GenerateCycloneDX) ->
results.Results | None:
raise
[email protected]_model(ScoreArgs)
+async def score_qs(args: ScoreArgs) -> results.Results | None:
+ base_dir = util.get_unfinished_dir() / args.project_name /
args.version_name / args.revision_number
+ if not os.path.isdir(base_dir):
+ raise SBOMScoringError("Revision directory does not exist",
{"base_dir": str(base_dir)})
+ full_path = os.path.join(base_dir, args.file_path)
+ if not (full_path.endswith(".cdx.json") and os.path.isfile(full_path)):
+ raise SBOMScoringError("SBOM file does not exist", {"file_path":
args.file_path})
+ proc = await asyncio.create_subprocess_exec(
+ "sbomqs",
+ "score",
+ os.path.basename(full_path),
+ "--json",
+ cwd=os.path.dirname(full_path),
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ # TODO: Timeout should probably be a lot shorter
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=300)
+ if proc.returncode != 0:
+ raise SBOMScoringError(
+ "sbomqs command failed",
+ {"returncode": proc.returncode, "stderr": stderr.decode("utf-8",
"ignore")},
+ )
+ report_obj =
results.SbomQsReport.model_validate(json.loads(stdout.decode("utf-8")))
+ return results.SBOMQsScoreResult(
+ kind="sbom_qs_score",
+ project_name=args.project_name,
+ version_name=args.version_name,
+ revision_number=args.revision_number,
+ file_path=args.file_path,
+ report=report_obj,
+ )
+
+
async def _generate_cyclonedx_core(artifact_path: str, output_path: str) ->
dict[str, Any]:
"""Core logic to generate CycloneDX SBOM on failure."""
log.info(f"Generating CycloneDX SBOM for {artifact_path} -> {output_path}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]