This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch cyclonedx_xml
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/cyclonedx_xml by this push:
     new 4ade5457 #901 - add support for SBOM conversion
4ade5457 is described below

commit 4ade54573324266f38800864407f946bc0ec9469
Author: Alastair McFarlane <[email protected]>
AuthorDate: Thu Mar 26 13:59:19 2026 +0000

    #901 - add support for SBOM conversion
---
 atr/analysis.py                |  1 +
 atr/get/draft.py               | 13 ++++++++
 atr/models/results.py          | 11 ++++++
 atr/models/sql.py              |  1 +
 atr/post/draft.py              | 76 +++++++++++++++++++++++++++++++++++++++++-
 atr/sbom/constants/licenses.py |  1 +
 atr/sbom/utilities.py          |  6 ++++
 atr/storage/writers/sbom.py    | 27 +++++++++++++++
 atr/tasks/__init__.py          |  2 ++
 atr/tasks/sbom.py              | 59 ++++++++++++++++++++++++++++++++
 atr/templates/draft-tools.html |  8 +++++
 11 files changed, 204 insertions(+), 1 deletion(-)

diff --git a/atr/analysis.py b/atr/analysis.py
index 8ee7d7d2..2ab5ad13 100755
--- a/atr/analysis.py
+++ b/atr/analysis.py
@@ -134,6 +134,7 @@ SKIPPABLE_SUFFIXES: Final[list[str]] = [
 STANDALONE_METADATA_SUFFIXES: Final[frozenset[str]] = frozenset(
     {
         ".cdx.json",
+        ".cdx.xml",
     }
 )
 
diff --git a/atr/get/draft.py b/atr/get/draft.py
index 538915db..12d6c70c 100644
--- a/atr/get/draft.py
+++ b/atr/get/draft.py
@@ -90,6 +90,18 @@ async def tools(
         submit_classes="btn-outline-secondary",
         empty=True,
     )
+    sbom_convert_form = form.render(
+        model_cls=form.Empty,
+        action=util.as_url(
+            post.draft.sbomconvert,
+            project_key=str(project_key),
+            version_key=str(version_key),
+            file_path=str(file_path),
+        ),
+        submit_label="Convert XML SBOM (.cdx.xml)",
+        submit_classes="btn-outline-secondary",
+        empty=True,
+    )
 
     return await template.render(
         "draft-tools.html",
@@ -102,4 +114,5 @@ async def tools(
         format_file_size=util.format_file_size,
         sha512_form=sha512_form,
         sbom_form=sbom_form,
+        sbom_convert_form=sbom_convert_form,
     )
diff --git a/atr/models/results.py b/atr/models/results.py
index 99f145f0..cd04e226 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -163,6 +163,16 @@ class SBOMAugment(schema.Strict):
     )
 
 
+class SBOMConvert(schema.Strict):
+    kind: Literal["sbom_convert"] = schema.Field(alias="kind")
+    path: str = schema.description("The path to the converted SBOM file")
+    bom_version: int | None = schema.Field(
+        default=None,
+        strict=False,
+        description="BOM Version produced by the convert task",
+    )
+
+
 class SBOMQsScore(schema.Strict):
     kind: Literal["sbom_qs_score"] = schema.Field(alias="kind")
     project_key: safe.ProjectKey = schema.description("Project name")
@@ -241,6 +251,7 @@ Results = Annotated[
     | MessageSend
     | MetadataUpdate
     | SBOMAugment
+    | SBOMConvert
     | SBOMGenerateCycloneDX
     | SBOMOSVScan
     | SBOMQsScore
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 64e20091..ddebe510 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -214,6 +214,7 @@ class TaskType(enum.StrEnum):
     QUARANTINE_VALIDATE = "quarantine_validate"
     RAT_CHECK = "rat_check"
     SBOM_AUGMENT = "sbom_augment"
+    SBOM_CONVERT = "sbom_convert"
     SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
     SBOM_OSV_SCAN = "sbom_osv_scan"
     SBOM_QS_SCORE = "sbom_qs_score"
diff --git a/atr/post/draft.py b/atr/post/draft.py
index 3666679f..ebfd9ec3 100644
--- a/atr/post/draft.py
+++ b/atr/post/draft.py
@@ -253,7 +253,7 @@ async def sbomgen(
         or rel_path.name.endswith(".jar")
     ):
         raise base.ASFQuartException(
-            f"SBOM generation requires .tar.gz, .tgz, .zip or .jar files. 
Received: {file_path!s}", errorcode=400
+            f"SBOM generation requires .tar.gz, .tgz, .zip or .jar files. 
Received: {rel_path.name}", errorcode=400
         )
 
     try:
@@ -309,3 +309,77 @@ async def sbomgen(
         project_key=str(project_key),
         version_key=str(version_key),
     )
+
+
[email protected]
+async def sbomconvert(
+    session: web.Committer,
+    _draft_sbomconvert: Literal["draft/sbomconvert"],
+    project_key: safe.ProjectKey,
+    version_key: safe.VersionKey,
+    file_path: safe.RelPath,
+    empty_form: form.Empty,
+) -> web.WerkzeugResponse:
+    """
+    URL: /draft/sbomconvert/<project_key>/<version_key>/<file_path>
+    Convert an XML CycloneDX SBOM file into JSON, creating a new revision.
+    """
+    rel_path = file_path.as_path()
+
+    # Check that the file is a .cdx.xml file before continuing
+    if not rel_path.name.endswith(".cdx.xml"):
+        raise base.ASFQuartException(f"SBOM converter requires .cdx.xml file. 
Received: {rel_path.name}", errorcode=400)
+
+    try:
+        description = "SBOM conversion through web interface"
+        async with storage.write(session) as write:
+            wacp = await write.as_project_committee_participant(project_key)
+
+            async def modify(path: pathlib.Path, old_rev: sql.Revision | None) 
-> None:
+                path_in_new_revision = path / rel_path
+                sbom_path_rel = rel_path.with_suffix(".cdx.json").name
+                sbom_path_in_new_revision = path / rel_path.parent / 
sbom_path_rel
+
+                # Check that the source file exists in the new revision
+                if not await aiofiles.os.path.exists(path_in_new_revision):
+                    log.error(f"Source file {rel_path} not found in new 
revision for SBOM generation.")
+                    raise web.FlashError("Source artifact file not found in 
the new revision.")
+
+                # Check that the SBOM file does not already exist in the new 
revision
+                if await aiofiles.os.path.exists(sbom_path_in_new_revision):
+                    raise base.ASFQuartException("SBOM file already exists", 
errorcode=400)
+
+                # This shouldn't happen as we need a revision to kick the task 
off from
+                if old_rev is None:
+                    raise web.FlashError("Internal error: Revision not found")
+
+                # Create and queue the task, using paths within the new 
revision
+                sbom_task = await wacp.sbom.convert_cyclonedx(
+                    project_key,
+                    version_key,
+                    old_rev.safe_number,
+                    str(path_in_new_revision),
+                    str(sbom_path_in_new_revision),
+                )
+                success = await interaction.wait_for_task(sbom_task)
+                if not success:
+                    raise web.FlashError("Internal error: SBOM conversion 
timed out")
+
+            result = await wacp.revision.create_revision_with_quarantine(
+                project_key, version_key, session.uid, 
description=description, modify=modify
+            )
+
+    except Exception as e:
+        log.exception("Error generating SBOM:")
+        await quart.flash(f"Error generating SBOM: {e!s}", "error")
+        return await session.redirect(get.compose.selected, 
project_key=str(project_key), version_key=str(version_key))
+
+    success = f"SBOM generated for {rel_path.name}"
+    if isinstance(result, sql.Quarantined):
+        success += ". Archive validation in progress."
+    return await session.redirect(
+        get.compose.selected,
+        success=success,
+        project_key=str(project_key),
+        version_key=str(version_key),
+    )
diff --git a/atr/sbom/constants/licenses.py b/atr/sbom/constants/licenses.py
index 299c2521..0e0fcee6 100644
--- a/atr/sbom/constants/licenses.py
+++ b/atr/sbom/constants/licenses.py
@@ -62,6 +62,7 @@ LICENSES: Final[dict[str, list[str]]] = {
         "PHP-3.01",
         "PostgreSQL",
         "Python-2.0",
+        "PSF-2.0",
         "SMLNJ",
         "TCL",
         "UPL-1.0",
diff --git a/atr/sbom/utilities.py b/atr/sbom/utilities.py
index 7bfad13a..0227a2b5 100644
--- a/atr/sbom/utilities.py
+++ b/atr/sbom/utilities.py
@@ -96,6 +96,12 @@ async def bundle_to_vuln_patch(
     return patch_ops
 
 
+def bundle_outputter(bundle_value: models.bundle.Bundle) -> BaseOutput:
+    return make_outputter(
+        bom=bundle_value.bom, output_format=OutputFormat.JSON, 
schema_version=bundle_value.spec_version
+    )
+
+
 def cdx_severity_to_osv(severity: list[dict[str, str | float]]) -> tuple[str | 
None, list[dict[str, str]]]:
     severities = [
         {
diff --git a/atr/storage/writers/sbom.py b/atr/storage/writers/sbom.py
index 326a0d0b..f204d824 100644
--- a/atr/storage/writers/sbom.py
+++ b/atr/storage/writers/sbom.py
@@ -105,6 +105,33 @@ class CommitteeParticipant(FoundationCommitter):
         await self.__data.refresh(sbom_task)
         return sbom_task
 
+    async def convert_cyclonedx(
+        self,
+        project_key: safe.ProjectKey,
+        version_key: safe.VersionKey,
+        revision_number: safe.RevisionNumber,
+        file_path: str,
+        sbom_path: str,
+    ) -> sql.Task:
+        sbom_task = sql.Task(
+            task_type=sql.TaskType.SBOM_CONVERT,
+            task_args=sbom.ConvertCycloneDX(
+                artifact_path=file_path,
+                revision=revision_number,
+                output_path=sbom_path,
+            ).model_dump(),
+            asf_uid=util.unwrap(self.__asf_uid),
+            added=datetime.datetime.now(datetime.UTC),
+            status=sql.TaskStatus.QUEUED,
+            project_key=str(project_key),
+            version_key=str(version_key),
+            revision_number=str(revision_number),
+        )
+        self.__data.add(sbom_task)
+        await self.__data.commit()
+        await self.__data.refresh(sbom_task)
+        return sbom_task
+
     async def generate_cyclonedx(
         self,
         project_key: safe.ProjectKey,
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 7f3879b4..a2cc27a2 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -323,6 +323,8 @@ def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results
             return rat.check
         case sql.TaskType.SBOM_AUGMENT:
             return sbom.augment
+        case sql.TaskType.SBOM_CONVERT:
+            return sbom.convert_cyclonedx
         case sql.TaskType.SBOM_GENERATE_CYCLONEDX:
             return sbom.generate_cyclonedx
         case sql.TaskType.SBOM_OSV_SCAN:
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index ada5607c..14d5b8ee 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -40,6 +40,14 @@ import atr.util as util
 _CONFIG: Final = config.get()
 
 
+class ConvertCycloneDX(schema.Strict):
+    """Arguments for the task to generate a CycloneDX SBOM."""
+
+    artifact_path: str = schema.description("Absolute path to the artifact")
+    output_path: str = schema.description("Absolute path where the generated 
SBOM JSON should be written")
+    revision: safe.RevisionNumber = schema.description("Revision number")
+
+
 class GenerateCycloneDX(schema.Strict):
     """Arguments for the task to generate a CycloneDX SBOM."""
 
@@ -47,6 +55,14 @@ class GenerateCycloneDX(schema.Strict):
     output_path: str = schema.description("Absolute path where the generated 
SBOM JSON should be written")
 
 
+class SBOMConversionError(Exception):
+    """Custom exception for SBOM conversion failures."""
+
+    def __init__(self, message: str, details: dict[str, Any] | None = None) -> 
None:
+        super().__init__(message)
+        self.details = details or {}
+
+
 class SBOMGenerationError(Exception):
     """Custom exception for SBOM generation failures."""
 
@@ -128,6 +144,21 @@ async def augment(args: FileArgs) -> results.Results | 
None:
     )
 
 
[email protected]_model(ConvertCycloneDX)
+async def convert_cyclonedx(args: ConvertCycloneDX) -> results.Results | None:
+    """Generate a JSON CycloneDX SBOM from a given XML SBOM."""
+    try:
+        result_data = await _convert_cyclonedx_core(args.artifact_path, 
args.output_path, str(args.revision))
+        log.info(f"Successfully converted CycloneDX SBOM for 
{args.artifact_path}")
+        msg = result_data["message"]
+        if not isinstance(msg, str):
+            raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
+        return results.SBOMConvert(kind="sbom_convert", path=args.output_path, 
bom_version=result_data.get("version"))
+    except (archives.ExtractionError, SBOMGenerationError) as e:
+        log.error(f"SBOM generation failed for {args.artifact_path}: {e}")
+        raise
+
+
 @checks.with_model(GenerateCycloneDX)
 async def generate_cyclonedx(args: GenerateCycloneDX) -> results.Results | 
None:
     """Generate a CycloneDX SBOM for the given artifact and write it to the 
output path."""
@@ -329,6 +360,34 @@ def _extracted_dir(temp_dir: str) -> str | None:
     return extract_dir
 
 
+async def _convert_cyclonedx_core(artifact_path: str, output_path: str, 
revision_str: str) -> dict[str, Any]:
+    """Core logic to convert XML CycloneDX SBOM to JSON."""
+    log.info(f"Generating CycloneDX JSON SBOM for {artifact_path} -> 
{output_path}")
+
+    # TODO: Should create a new revision here rather than in the caller
+    bundle = sbom.utilities.path_to_bundle(pathlib.Path(artifact_path))
+    if not bundle:
+        raise SBOMScoringError("Could not load bundle")
+    sbom.utilities.apply_patch("conversion to JSON", revision_str, bundle, [])
+    outputter = sbom.utilities.bundle_outputter(bundle)
+    text = outputter.output_as_string(indent=2)
+
+    try:
+        async with aiofiles.open(output_path, "w", encoding="utf-8") as f:
+            await f.write(text)
+        log.info(f"Successfully wrote JSON SBOM to {output_path}")
+    except Exception as write_err:
+        log.exception(f"Failed to write SBOM JSON to {output_path}: 
{write_err}")
+        raise SBOMConversionError(f"Failed to write SBOM to {output_path}: 
{write_err}") from write_err
+
+    return {
+        "message": "Successfully generated and saved CycloneDX SBOM",
+        "sbom": text,
+        "format": "CycloneDX",
+        "version": str(bundle.bom.version),
+    }
+
+
 async def _generate_cyclonedx_core(artifact_path: str, output_path: str) -> 
dict[str, Any]:
     """Core logic to generate CycloneDX SBOM on failure."""
     log.info(f"Generating CycloneDX SBOM for {artifact_path} -> {output_path}")
diff --git a/atr/templates/draft-tools.html b/atr/templates/draft-tools.html
index 38851178..0dc4ef47 100644
--- a/atr/templates/draft-tools.html
+++ b/atr/templates/draft-tools.html
@@ -36,4 +36,12 @@
     <p>Generate a CycloneDX Software Bill of Materials (SBOM) file for this 
artifact.</p>
     {{ sbom_form|safe }}
   {% endif %}
+  {% if file_path.endswith(".cdx.xml") and 
is_viewing_as_admin_fn(current_user.uid) %}
+    <h3>Convert SBOM</h3>
+    <div class="alert-info">
+      <p>NOTE: This functionality is in early release.</p>
+    </div>
+    <p>Convert this XML CycloneDX Software Bill of Materials (SBOM) into a 
JSON equivalent.</p>
+    {{ sbom_convert_form|safe }}
+  {% endif %}
 {% endblock content %}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to