This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new dade709 Make the forms to analyse SBOMs more type safe
dade709 is described below
commit dade7092aa47d81b409f6889ef0952bc7e605e69
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Nov 10 20:24:27 2025 +0000
Make the forms to analyse SBOMs more type safe
---
atr/form.py | 8 +++++---
atr/get/sbom.py | 53 ++++++++++++++++----------------------------------
atr/get/tokens.py | 2 --
atr/post/sbom.py | 46 ++++++++++++++++++++++++++++++-------------
atr/shared/__init__.py | 2 ++
atr/shared/sbom.py | 37 +++++++++++++++++++++++++++++++++++
6 files changed, 94 insertions(+), 54 deletions(-)
diff --git a/atr/form.py b/atr/form.py
index aa7c153..50683a7 100644
--- a/atr/form.py
+++ b/atr/form.py
@@ -228,7 +228,7 @@ def _get_flash_error_data() -> dict[str, Any]:
return {}
-def render(
+def render( # noqa: C901
model_cls: type[Form],
action: str | None = None,
form_classes: str = ".atr-canary",
@@ -247,8 +247,10 @@ def render(
is_empty_form = isinstance(model_cls, type) and issubclass(model_cls,
Empty)
is_empty_form |= empty
- if is_empty_form and (form_classes == ".atr-canary"):
- form_classes = ""
+ if is_empty_form:
+ if form_classes == ".atr-canary":
+ form_classes = ""
+ use_error_data = False
flash_error_data: dict[str, Any] = _get_flash_error_data() if
use_error_data else {}
diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index 8e432d2..ce75a5a 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -21,18 +21,16 @@ import json
from typing import TYPE_CHECKING, Any
import asfquart.base as base
-import markupsafe
import atr.blueprints.get as get
import atr.db as db
-import atr.forms as forms
+import atr.form as form
import atr.htm as htm
import atr.models.results as results
import atr.models.sql as sql
-import atr.post as post
import atr.sbom as sbom
+import atr.shared as shared
import atr.template as template
-import atr.util as util
import atr.web as web
if TYPE_CHECKING:
@@ -90,21 +88,14 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
]
block.p["This report is for revision ",
htm.code[task_result.revision_number], "."]
- empty_form = await forms.Empty.create_form()
# TODO: Show the status if the task to augment the SBOM is still running
# TODO: Add a field to the SBOM to show that it's been augmented
# And then don't allow it to be augmented again
- action = util.as_url(
- post.sbom.augment,
- project_name=project,
- version_name=version,
- file_path=file_path,
- )
- block.append(
- htm.form("", action=action, method="post")[
- markupsafe.Markup(str(empty_form.hidden_tag())),
- htm.button(".btn.btn-primary", type="submit")["Augment SBOM"],
- ]
+ form.render_block(
+ block,
+ model_cls=shared.sbom.AugmentSBOMForm,
+ submit_label="Augment SBOM",
+ empty=True,
)
if warnings:
@@ -120,7 +111,7 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
block.p["No NTIA 2021 minimum data field conformance warnings or
errors found."]
block.h2["Vulnerability scan"]
- _vulnerability_scan_section(block, project, version, file_path,
task_result.revision_number, osv_tasks, empty_form)
+ _vulnerability_scan_section(block, project, version, file_path,
task_result.revision_number, osv_tasks)
block.h2["Outdated tool"]
outdated = None
@@ -224,22 +215,14 @@ def _vulnerability_component_details(block: htm.Block,
component: results.OSVCom
block.append(htm.details(".mb-3.rounded")[*details_content])
-def _vulnerability_scan_button(
- block: htm.Block, project: str, version: str, file_path: str, empty_form:
forms.Empty
-) -> None:
+def _vulnerability_scan_button(block: htm.Block, project: str, version: str,
file_path: str) -> None:
block.p["No vulnerability scan has been performed for this revision."]
- action = util.as_url(
- post.sbom.scan,
- project_name=project,
- version_name=version,
- file_path=file_path,
- )
- block.append(
- htm.form("", action=action, method="post")[
- markupsafe.Markup(str(empty_form.hidden_tag())),
- htm.button(".btn.btn-primary", type="submit")["Scan file"],
- ]
+ form.render_block(
+ block,
+ model_cls=shared.sbom.ScanSBOMForm,
+ submit_label="Scan file",
+ empty=True,
)
@@ -299,7 +282,6 @@ def _vulnerability_scan_section(
file_path: str,
revision_number: str,
osv_tasks: collections.abc.Sequence[sql.Task],
- empty_form: forms.Empty,
) -> None:
"""Display the vulnerability scan section based on task status."""
completed_task = _vulnerability_scan_find_completed_task(osv_tasks,
revision_number)
@@ -311,9 +293,9 @@ def _vulnerability_scan_section(
in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks,
revision_number)
if in_progress_task is not None:
- _vulnerability_scan_status(block, in_progress_task, project, version,
file_path, empty_form)
+ _vulnerability_scan_status(block, in_progress_task, project, version,
file_path)
else:
- _vulnerability_scan_button(block, project, version, file_path,
empty_form)
+ _vulnerability_scan_button(block, project, version, file_path)
def _vulnerability_scan_status(
@@ -322,7 +304,6 @@ def _vulnerability_scan_status(
project: str,
version: str,
file_path: str,
- empty_form: forms.Empty,
) -> None:
status_text = task.status.value.replace("_", " ").capitalize()
block.p[f"Vulnerability scan is currently {status_text.lower()}."]
@@ -333,4 +314,4 @@ def _vulnerability_scan_status(
htm.code[task.error],
". Additional details are unavailable from ATR.",
]
- _vulnerability_scan_button(block, project, version, file_path,
empty_form)
+ _vulnerability_scan_button(block, project, version, file_path)
diff --git a/atr/get/tokens.py b/atr/get/tokens.py
index 7779e7c..10f6c19 100644
--- a/atr/get/tokens.py
+++ b/atr/get/tokens.py
@@ -66,7 +66,6 @@ async def tokens(session: web.Committer) -> str:
action=util.as_url(post.tokens.jwt_post),
form_classes="#issue-jwt-form",
submit_label="Generate JWT",
- use_error_data=False,
)
jwt_section.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap
border rounded w-50")
if most_recent_pat and most_recent_pat.last_used:
@@ -105,7 +104,6 @@ def _build_tokens_table(page: htm.Block, tokens_list:
list[sql.PersonalAccessTok
submit_classes="btn-sm btn-danger",
submit_label="Delete",
defaults={"token_id": t.id},
- use_error_data=False,
empty=True,
)
tbody.tr(".align-middle")[
diff --git a/atr/post/sbom.py b/atr/post/sbom.py
index 4911009..4e1bd22 100644
--- a/atr/post/sbom.py
+++ b/atr/post/sbom.py
@@ -26,20 +26,33 @@ import atr.blueprints.post as post
import atr.db as db
import atr.get as get
import atr.log as log
+import atr.shared as shared
import atr.storage as storage
import atr.util as util
import atr.web as web
[email protected]("/sbom/augment/<project_name>/<version_name>/<path:file_path>")
-async def augment(session: web.Committer, project_name: str, version_name:
str, file_path: str) -> web.WerkzeugResponse:
- """Augment a CycloneDX SBOM file."""
- await session.check_access(project_name)
[email protected]("/sbom/report/<project>/<version>/<path:file_path>")
[email protected](shared.sbom.SBOMForm)
+async def report(
+ session: web.Committer, sbom_form: shared.sbom.SBOMForm, project: str,
version: str, file_path: str
+) -> web.WerkzeugResponse:
+ await session.check_access(project)
+
+ match sbom_form:
+ case shared.sbom.AugmentSBOMForm():
+ return await _augment(session, project, version, file_path)
+
+ case shared.sbom.ScanSBOMForm():
+ return await _scan(session, project, version, file_path)
- await util.validate_empty_form()
+
+async def _augment(
+ session: web.Committer, project_name: str, version_name: str, file_path:
str
+) -> web.WerkzeugResponse:
+ """Augment a CycloneDX SBOM file."""
rel_path = pathlib.Path(file_path)
- # Check that the file is a .cdx.json archive before creating a revision
if not (file_path.endswith(".cdx.json")):
raise base.ASFQuartException("SBOM augmentation is only supported for
.cdx.json files", errorcode=400)
@@ -53,7 +66,12 @@ async def augment(session: web.Committer, project_name: str,
version_name: str,
raise RuntimeError("No revision number found for new revision
creation")
log.info(f"Augmenting SBOM for {project_name} {version_name}
{revision_number} {rel_path}")
async with storage.write_as_project_committee_member(project_name) as
wacm:
- sbom_task = await wacm.sbom.augment_cyclonedx(project_name,
version_name, revision_number, rel_path)
+ sbom_task = await wacm.sbom.augment_cyclonedx(
+ project_name,
+ version_name,
+ revision_number,
+ rel_path,
+ )
except Exception as e:
log.exception("Error augmenting SBOM:")
@@ -74,14 +92,11 @@ async def augment(session: web.Committer, project_name:
str, version_name: str,
)
[email protected]("/sbom/scan/<project_name>/<version_name>/<path:file_path>")
-async def scan(session: web.Committer, project_name: str, version_name: str,
file_path: str) -> web.WerkzeugResponse:
+async def _scan(session: web.Committer, project_name: str, version_name: str,
file_path: str) -> web.WerkzeugResponse:
"""Scan a CycloneDX SBOM file for vulnerabilities using OSV."""
- await session.check_access(project_name)
-
- await util.validate_empty_form()
rel_path = pathlib.Path(file_path)
+ # Check that the file is a .cdx.json archive before creating a revision
if not (file_path.endswith(".cdx.json")):
raise base.ASFQuartException("OSV scanning is only supported for
.cdx.json files", errorcode=400)
@@ -95,7 +110,12 @@ async def scan(session: web.Committer, project_name: str,
version_name: str, fil
raise RuntimeError("No revision number found for OSV scan")
log.info(f"Starting OSV scan for {project_name} {version_name}
{revision_number} {rel_path}")
async with storage.write_as_project_committee_member(project_name) as
wacm:
- sbom_task = await wacm.sbom.osv_scan_cyclonedx(project_name,
version_name, revision_number, rel_path)
+ sbom_task = await wacm.sbom.osv_scan_cyclonedx(
+ project_name,
+ version_name,
+ revision_number,
+ rel_path,
+ )
except Exception as e:
log.exception("Error starting OSV scan:")
diff --git a/atr/shared/__init__.py b/atr/shared/__init__.py
index e5d7825..1403e43 100644
--- a/atr/shared/__init__.py
+++ b/atr/shared/__init__.py
@@ -33,6 +33,7 @@ import atr.shared.ignores as ignores
import atr.shared.keys as keys
import atr.shared.projects as projects
import atr.shared.resolve as resolve
+import atr.shared.sbom as sbom
import atr.shared.start as start
import atr.shared.test as test
import atr.shared.tokens as tokens
@@ -194,6 +195,7 @@ __all__ = [
"keys",
"projects",
"resolve",
+ "sbom",
"start",
"test",
"tokens",
diff --git a/atr/shared/sbom.py b/atr/shared/sbom.py
new file mode 100644
index 0000000..e1838dd
--- /dev/null
+++ b/atr/shared/sbom.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Annotated, Literal
+
+import atr.form as form
+
+type AUGMENT = Literal["augment"]
+type SCAN = Literal["scan"]
+
+
+class AugmentSBOMForm(form.Empty):
+ variant: AUGMENT = form.value(AUGMENT)
+
+
+class ScanSBOMForm(form.Empty):
+ variant: SCAN = form.value(SCAN)
+
+
+type SBOMForm = Annotated[
+ AugmentSBOMForm | ScanSBOMForm,
+ form.DISCRIMINATOR,
+]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]