This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new dade709  Make the forms to analyse SBOMs more type safe
dade709 is described below

commit dade7092aa47d81b409f6889ef0952bc7e605e69
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Nov 10 20:24:27 2025 +0000

    Make the forms to analyse SBOMs more type safe
---
 atr/form.py            |  8 +++++---
 atr/get/sbom.py        | 53 ++++++++++++++++----------------------------------
 atr/get/tokens.py      |  2 --
 atr/post/sbom.py       | 46 ++++++++++++++++++++++++++++++-------------
 atr/shared/__init__.py |  2 ++
 atr/shared/sbom.py     | 37 +++++++++++++++++++++++++++++++++++
 6 files changed, 94 insertions(+), 54 deletions(-)

diff --git a/atr/form.py b/atr/form.py
index aa7c153..50683a7 100644
--- a/atr/form.py
+++ b/atr/form.py
@@ -228,7 +228,7 @@ def _get_flash_error_data() -> dict[str, Any]:
     return {}
 
 
-def render(
+def render(  # noqa: C901
     model_cls: type[Form],
     action: str | None = None,
     form_classes: str = ".atr-canary",
@@ -247,8 +247,10 @@ def render(
 
     is_empty_form = isinstance(model_cls, type) and issubclass(model_cls, 
Empty)
     is_empty_form |= empty
-    if is_empty_form and (form_classes == ".atr-canary"):
-        form_classes = ""
+    if is_empty_form:
+        if form_classes == ".atr-canary":
+            form_classes = ""
+        use_error_data = False
 
     flash_error_data: dict[str, Any] = _get_flash_error_data() if 
use_error_data else {}
 
diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index 8e432d2..ce75a5a 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -21,18 +21,16 @@ import json
 from typing import TYPE_CHECKING, Any
 
 import asfquart.base as base
-import markupsafe
 
 import atr.blueprints.get as get
 import atr.db as db
-import atr.forms as forms
+import atr.form as form
 import atr.htm as htm
 import atr.models.results as results
 import atr.models.sql as sql
-import atr.post as post
 import atr.sbom as sbom
+import atr.shared as shared
 import atr.template as template
-import atr.util as util
 import atr.web as web
 
 if TYPE_CHECKING:
@@ -90,21 +88,14 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
     ]
     block.p["This report is for revision ", 
htm.code[task_result.revision_number], "."]
 
-    empty_form = await forms.Empty.create_form()
     # TODO: Show the status if the task to augment the SBOM is still running
     # TODO: Add a field to the SBOM to show that it's been augmented
     # And then don't allow it to be augmented again
-    action = util.as_url(
-        post.sbom.augment,
-        project_name=project,
-        version_name=version,
-        file_path=file_path,
-    )
-    block.append(
-        htm.form("", action=action, method="post")[
-            markupsafe.Markup(str(empty_form.hidden_tag())),
-            htm.button(".btn.btn-primary", type="submit")["Augment SBOM"],
-        ]
+    form.render_block(
+        block,
+        model_cls=shared.sbom.AugmentSBOMForm,
+        submit_label="Augment SBOM",
+        empty=True,
     )
 
     if warnings:
@@ -120,7 +111,7 @@ async def report(session: web.Committer, project: str, 
version: str, file_path:
         block.p["No NTIA 2021 minimum data field conformance warnings or 
errors found."]
 
     block.h2["Vulnerability scan"]
-    _vulnerability_scan_section(block, project, version, file_path, 
task_result.revision_number, osv_tasks, empty_form)
+    _vulnerability_scan_section(block, project, version, file_path, 
task_result.revision_number, osv_tasks)
 
     block.h2["Outdated tool"]
     outdated = None
@@ -224,22 +215,14 @@ def _vulnerability_component_details(block: htm.Block, 
component: results.OSVCom
     block.append(htm.details(".mb-3.rounded")[*details_content])
 
 
-def _vulnerability_scan_button(
-    block: htm.Block, project: str, version: str, file_path: str, empty_form: 
forms.Empty
-) -> None:
+def _vulnerability_scan_button(block: htm.Block, project: str, version: str, 
file_path: str) -> None:
     block.p["No vulnerability scan has been performed for this revision."]
 
-    action = util.as_url(
-        post.sbom.scan,
-        project_name=project,
-        version_name=version,
-        file_path=file_path,
-    )
-    block.append(
-        htm.form("", action=action, method="post")[
-            markupsafe.Markup(str(empty_form.hidden_tag())),
-            htm.button(".btn.btn-primary", type="submit")["Scan file"],
-        ]
+    form.render_block(
+        block,
+        model_cls=shared.sbom.ScanSBOMForm,
+        submit_label="Scan file",
+        empty=True,
     )
 
 
@@ -299,7 +282,6 @@ def _vulnerability_scan_section(
     file_path: str,
     revision_number: str,
     osv_tasks: collections.abc.Sequence[sql.Task],
-    empty_form: forms.Empty,
 ) -> None:
     """Display the vulnerability scan section based on task status."""
     completed_task = _vulnerability_scan_find_completed_task(osv_tasks, 
revision_number)
@@ -311,9 +293,9 @@ def _vulnerability_scan_section(
     in_progress_task = _vulnerability_scan_find_in_progress_task(osv_tasks, 
revision_number)
 
     if in_progress_task is not None:
-        _vulnerability_scan_status(block, in_progress_task, project, version, 
file_path, empty_form)
+        _vulnerability_scan_status(block, in_progress_task, project, version, 
file_path)
     else:
-        _vulnerability_scan_button(block, project, version, file_path, 
empty_form)
+        _vulnerability_scan_button(block, project, version, file_path)
 
 
 def _vulnerability_scan_status(
@@ -322,7 +304,6 @@ def _vulnerability_scan_status(
     project: str,
     version: str,
     file_path: str,
-    empty_form: forms.Empty,
 ) -> None:
     status_text = task.status.value.replace("_", " ").capitalize()
     block.p[f"Vulnerability scan is currently {status_text.lower()}."]
@@ -333,4 +314,4 @@ def _vulnerability_scan_status(
             htm.code[task.error],
             ". Additional details are unavailable from ATR.",
         ]
-        _vulnerability_scan_button(block, project, version, file_path, 
empty_form)
+        _vulnerability_scan_button(block, project, version, file_path)
diff --git a/atr/get/tokens.py b/atr/get/tokens.py
index 7779e7c..10f6c19 100644
--- a/atr/get/tokens.py
+++ b/atr/get/tokens.py
@@ -66,7 +66,6 @@ async def tokens(session: web.Committer) -> str:
         action=util.as_url(post.tokens.jwt_post),
         form_classes="#issue-jwt-form",
         submit_label="Generate JWT",
-        use_error_data=False,
     )
     jwt_section.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap 
border rounded w-50")
     if most_recent_pat and most_recent_pat.last_used:
@@ -105,7 +104,6 @@ def _build_tokens_table(page: htm.Block, tokens_list: 
list[sql.PersonalAccessTok
             submit_classes="btn-sm btn-danger",
             submit_label="Delete",
             defaults={"token_id": t.id},
-            use_error_data=False,
             empty=True,
         )
         tbody.tr(".align-middle")[
diff --git a/atr/post/sbom.py b/atr/post/sbom.py
index 4911009..4e1bd22 100644
--- a/atr/post/sbom.py
+++ b/atr/post/sbom.py
@@ -26,20 +26,33 @@ import atr.blueprints.post as post
 import atr.db as db
 import atr.get as get
 import atr.log as log
+import atr.shared as shared
 import atr.storage as storage
 import atr.util as util
 import atr.web as web
 
 
[email protected]("/sbom/augment/<project_name>/<version_name>/<path:file_path>")
-async def augment(session: web.Committer, project_name: str, version_name: 
str, file_path: str) -> web.WerkzeugResponse:
-    """Augment a CycloneDX SBOM file."""
-    await session.check_access(project_name)
[email protected]("/sbom/report/<project>/<version>/<path:file_path>")
[email protected](shared.sbom.SBOMForm)
+async def report(
+    session: web.Committer, sbom_form: shared.sbom.SBOMForm, project: str, 
version: str, file_path: str
+) -> web.WerkzeugResponse:
+    await session.check_access(project)
+
+    match sbom_form:
+        case shared.sbom.AugmentSBOMForm():
+            return await _augment(session, project, version, file_path)
+
+        case shared.sbom.ScanSBOMForm():
+            return await _scan(session, project, version, file_path)
 
-    await util.validate_empty_form()
+
+async def _augment(
+    session: web.Committer, project_name: str, version_name: str, file_path: 
str
+) -> web.WerkzeugResponse:
+    """Augment a CycloneDX SBOM file."""
     rel_path = pathlib.Path(file_path)
 
-    # Check that the file is a .cdx.json archive before creating a revision
     if not (file_path.endswith(".cdx.json")):
         raise base.ASFQuartException("SBOM augmentation is only supported for 
.cdx.json files", errorcode=400)
 
@@ -53,7 +66,12 @@ async def augment(session: web.Committer, project_name: str, 
version_name: str,
                 raise RuntimeError("No revision number found for new revision 
creation")
             log.info(f"Augmenting SBOM for {project_name} {version_name} 
{revision_number} {rel_path}")
         async with storage.write_as_project_committee_member(project_name) as 
wacm:
-            sbom_task = await wacm.sbom.augment_cyclonedx(project_name, 
version_name, revision_number, rel_path)
+            sbom_task = await wacm.sbom.augment_cyclonedx(
+                project_name,
+                version_name,
+                revision_number,
+                rel_path,
+            )
 
     except Exception as e:
         log.exception("Error augmenting SBOM:")
@@ -74,14 +92,11 @@ async def augment(session: web.Committer, project_name: 
str, version_name: str,
     )
 
 
[email protected]("/sbom/scan/<project_name>/<version_name>/<path:file_path>")
-async def scan(session: web.Committer, project_name: str, version_name: str, 
file_path: str) -> web.WerkzeugResponse:
+async def _scan(session: web.Committer, project_name: str, version_name: str, 
file_path: str) -> web.WerkzeugResponse:
     """Scan a CycloneDX SBOM file for vulnerabilities using OSV."""
-    await session.check_access(project_name)
-
-    await util.validate_empty_form()
     rel_path = pathlib.Path(file_path)
 
+    # Check that the file is a .cdx.json archive before creating a revision
     if not (file_path.endswith(".cdx.json")):
         raise base.ASFQuartException("OSV scanning is only supported for 
.cdx.json files", errorcode=400)
 
@@ -95,7 +110,12 @@ async def scan(session: web.Committer, project_name: str, 
version_name: str, fil
                 raise RuntimeError("No revision number found for OSV scan")
             log.info(f"Starting OSV scan for {project_name} {version_name} 
{revision_number} {rel_path}")
         async with storage.write_as_project_committee_member(project_name) as 
wacm:
-            sbom_task = await wacm.sbom.osv_scan_cyclonedx(project_name, 
version_name, revision_number, rel_path)
+            sbom_task = await wacm.sbom.osv_scan_cyclonedx(
+                project_name,
+                version_name,
+                revision_number,
+                rel_path,
+            )
 
     except Exception as e:
         log.exception("Error starting OSV scan:")
diff --git a/atr/shared/__init__.py b/atr/shared/__init__.py
index e5d7825..1403e43 100644
--- a/atr/shared/__init__.py
+++ b/atr/shared/__init__.py
@@ -33,6 +33,7 @@ import atr.shared.ignores as ignores
 import atr.shared.keys as keys
 import atr.shared.projects as projects
 import atr.shared.resolve as resolve
+import atr.shared.sbom as sbom
 import atr.shared.start as start
 import atr.shared.test as test
 import atr.shared.tokens as tokens
@@ -194,6 +195,7 @@ __all__ = [
     "keys",
     "projects",
     "resolve",
+    "sbom",
     "start",
     "test",
     "tokens",
diff --git a/atr/shared/sbom.py b/atr/shared/sbom.py
new file mode 100644
index 0000000..e1838dd
--- /dev/null
+++ b/atr/shared/sbom.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Annotated, Literal
+
+import atr.form as form
+
+type AUGMENT = Literal["augment"]
+type SCAN = Literal["scan"]
+
+
+class AugmentSBOMForm(form.Empty):
+    variant: AUGMENT = form.value(AUGMENT)
+
+
+class ScanSBOMForm(form.Empty):
+    variant: SCAN = form.value(SCAN)
+
+
+type SBOMForm = Annotated[
+    AugmentSBOMForm | ScanSBOMForm,
+    form.DISCRIMINATOR,
+]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to