This is an automated email from the ASF dual-hosted git repository.
arm pushed a commit to branch cyclonedx_xml
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/cyclonedx_xml by this push:
new 4ade5457 #901 - add support for SBOM conversion
4ade5457 is described below
commit 4ade54573324266f38800864407f946bc0ec9469
Author: Alastair McFarlane <[email protected]>
AuthorDate: Thu Mar 26 13:59:19 2026 +0000
#901 - add support for SBOM conversion
---
atr/analysis.py | 1 +
atr/get/draft.py | 13 ++++++++
atr/models/results.py | 11 ++++++
atr/models/sql.py | 1 +
atr/post/draft.py | 76 +++++++++++++++++++++++++++++++++++++++++-
atr/sbom/constants/licenses.py | 1 +
atr/sbom/utilities.py | 6 ++++
atr/storage/writers/sbom.py | 27 +++++++++++++++
atr/tasks/__init__.py | 2 ++
atr/tasks/sbom.py | 59 ++++++++++++++++++++++++++++++++
atr/templates/draft-tools.html | 8 +++++
11 files changed, 204 insertions(+), 1 deletion(-)
diff --git a/atr/analysis.py b/atr/analysis.py
index 8ee7d7d2..2ab5ad13 100755
--- a/atr/analysis.py
+++ b/atr/analysis.py
@@ -134,6 +134,7 @@ SKIPPABLE_SUFFIXES: Final[list[str]] = [
STANDALONE_METADATA_SUFFIXES: Final[frozenset[str]] = frozenset(
{
".cdx.json",
+ ".cdx.xml",
}
)
diff --git a/atr/get/draft.py b/atr/get/draft.py
index 538915db..12d6c70c 100644
--- a/atr/get/draft.py
+++ b/atr/get/draft.py
@@ -90,6 +90,18 @@ async def tools(
submit_classes="btn-outline-secondary",
empty=True,
)
+ sbom_convert_form = form.render(
+ model_cls=form.Empty,
+ action=util.as_url(
+ post.draft.sbomconvert,
+ project_key=str(project_key),
+ version_key=str(version_key),
+ file_path=str(file_path),
+ ),
+ submit_label="Convert XML SBOM (.cdx.xml)",
+ submit_classes="btn-outline-secondary",
+ empty=True,
+ )
return await template.render(
"draft-tools.html",
@@ -102,4 +114,5 @@ async def tools(
format_file_size=util.format_file_size,
sha512_form=sha512_form,
sbom_form=sbom_form,
+ sbom_convert_form=sbom_convert_form,
)
diff --git a/atr/models/results.py b/atr/models/results.py
index 99f145f0..cd04e226 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -163,6 +163,16 @@ class SBOMAugment(schema.Strict):
)
+class SBOMConvert(schema.Strict):
+ kind: Literal["sbom_convert"] = schema.Field(alias="kind")
+ path: str = schema.description("The path to the converted SBOM file")
+ bom_version: int | None = schema.Field(
+ default=None,
+ strict=False,
+ description="BOM Version produced by the convert task",
+ )
+
+
class SBOMQsScore(schema.Strict):
kind: Literal["sbom_qs_score"] = schema.Field(alias="kind")
project_key: safe.ProjectKey = schema.description("Project name")
@@ -241,6 +251,7 @@ Results = Annotated[
| MessageSend
| MetadataUpdate
| SBOMAugment
+ | SBOMConvert
| SBOMGenerateCycloneDX
| SBOMOSVScan
| SBOMQsScore
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 64e20091..ddebe510 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -214,6 +214,7 @@ class TaskType(enum.StrEnum):
QUARANTINE_VALIDATE = "quarantine_validate"
RAT_CHECK = "rat_check"
SBOM_AUGMENT = "sbom_augment"
+ SBOM_CONVERT = "sbom_convert"
SBOM_GENERATE_CYCLONEDX = "sbom_generate_cyclonedx"
SBOM_OSV_SCAN = "sbom_osv_scan"
SBOM_QS_SCORE = "sbom_qs_score"
diff --git a/atr/post/draft.py b/atr/post/draft.py
index 3666679f..ebfd9ec3 100644
--- a/atr/post/draft.py
+++ b/atr/post/draft.py
@@ -253,7 +253,7 @@ async def sbomgen(
or rel_path.name.endswith(".jar")
):
raise base.ASFQuartException(
- f"SBOM generation requires .tar.gz, .tgz, .zip or .jar files.
Received: {file_path!s}", errorcode=400
+ f"SBOM generation requires .tar.gz, .tgz, .zip or .jar files.
Received: {rel_path.name}", errorcode=400
)
try:
@@ -309,3 +309,77 @@ async def sbomgen(
project_key=str(project_key),
version_key=str(version_key),
)
+
+
[email protected]
+async def sbomconvert(
+ session: web.Committer,
+ _draft_sbomconvert: Literal["draft/sbomconvert"],
+ project_key: safe.ProjectKey,
+ version_key: safe.VersionKey,
+ file_path: safe.RelPath,
+ empty_form: form.Empty,
+) -> web.WerkzeugResponse:
+ """
+ URL: /draft/sbomconvert/<project_key>/<version_key>/<file_path>
+ Convert an XML CycloneDX SBOM file into JSON, creating a new revision.
+ """
+ rel_path = file_path.as_path()
+
+ # Check that the file is a .cdx.xml file before continuing
+ if not rel_path.name.endswith(".cdx.xml"):
+ raise base.ASFQuartException(f"SBOM converter requires .cdx.xml file.
Received: {rel_path.name}", errorcode=400)
+
+ try:
+ description = "SBOM conversion through web interface"
+ async with storage.write(session) as write:
+ wacp = await write.as_project_committee_participant(project_key)
+
+ async def modify(path: pathlib.Path, old_rev: sql.Revision | None)
-> None:
+ path_in_new_revision = path / rel_path
+ sbom_path_rel = rel_path.with_suffix(".cdx.json").name
+ sbom_path_in_new_revision = path / rel_path.parent /
sbom_path_rel
+
+ # Check that the source file exists in the new revision
+ if not await aiofiles.os.path.exists(path_in_new_revision):
+ log.error(f"Source file {rel_path} not found in new
revision for SBOM generation.")
+ raise web.FlashError("Source artifact file not found in
the new revision.")
+
+ # Check that the SBOM file does not already exist in the new
revision
+ if await aiofiles.os.path.exists(sbom_path_in_new_revision):
+ raise base.ASFQuartException("SBOM file already exists",
errorcode=400)
+
+ # This shouldn't happen as we need a revision to kick the task
off from
+ if old_rev is None:
+ raise web.FlashError("Internal error: Revision not found")
+
+ # Create and queue the task, using paths within the new
revision
+ sbom_task = await wacp.sbom.convert_cyclonedx(
+ project_key,
+ version_key,
+ old_rev.safe_number,
+ str(path_in_new_revision),
+ str(sbom_path_in_new_revision),
+ )
+ success = await interaction.wait_for_task(sbom_task)
+ if not success:
+ raise web.FlashError("Internal error: SBOM conversion
timed out")
+
+ result = await wacp.revision.create_revision_with_quarantine(
+ project_key, version_key, session.uid,
description=description, modify=modify
+ )
+
+ except Exception as e:
+ log.exception("Error generating SBOM:")
+ await quart.flash(f"Error generating SBOM: {e!s}", "error")
+ return await session.redirect(get.compose.selected,
project_key=str(project_key), version_key=str(version_key))
+
+ success = f"SBOM generated for {rel_path.name}"
+ if isinstance(result, sql.Quarantined):
+ success += ". Archive validation in progress."
+ return await session.redirect(
+ get.compose.selected,
+ success=success,
+ project_key=str(project_key),
+ version_key=str(version_key),
+ )
diff --git a/atr/sbom/constants/licenses.py b/atr/sbom/constants/licenses.py
index 299c2521..0e0fcee6 100644
--- a/atr/sbom/constants/licenses.py
+++ b/atr/sbom/constants/licenses.py
@@ -62,6 +62,7 @@ LICENSES: Final[dict[str, list[str]]] = {
"PHP-3.01",
"PostgreSQL",
"Python-2.0",
+ "PSF-2.0",
"SMLNJ",
"TCL",
"UPL-1.0",
diff --git a/atr/sbom/utilities.py b/atr/sbom/utilities.py
index 7bfad13a..0227a2b5 100644
--- a/atr/sbom/utilities.py
+++ b/atr/sbom/utilities.py
@@ -96,6 +96,12 @@ async def bundle_to_vuln_patch(
return patch_ops
+def bundle_outputter(bundle_value: models.bundle.Bundle) -> BaseOutput:
+ return make_outputter(
+ bom=bundle_value.bom, output_format=OutputFormat.JSON,
schema_version=bundle_value.spec_version
+ )
+
+
def cdx_severity_to_osv(severity: list[dict[str, str | float]]) -> tuple[str |
None, list[dict[str, str]]]:
severities = [
{
diff --git a/atr/storage/writers/sbom.py b/atr/storage/writers/sbom.py
index 326a0d0b..f204d824 100644
--- a/atr/storage/writers/sbom.py
+++ b/atr/storage/writers/sbom.py
@@ -105,6 +105,33 @@ class CommitteeParticipant(FoundationCommitter):
await self.__data.refresh(sbom_task)
return sbom_task
+ async def convert_cyclonedx(
+ self,
+ project_key: safe.ProjectKey,
+ version_key: safe.VersionKey,
+ revision_number: safe.RevisionNumber,
+ file_path: str,
+ sbom_path: str,
+ ) -> sql.Task:
+ sbom_task = sql.Task(
+ task_type=sql.TaskType.SBOM_CONVERT,
+ task_args=sbom.ConvertCycloneDX(
+ artifact_path=file_path,
+ revision=revision_number,
+ output_path=sbom_path,
+ ).model_dump(),
+ asf_uid=util.unwrap(self.__asf_uid),
+ added=datetime.datetime.now(datetime.UTC),
+ status=sql.TaskStatus.QUEUED,
+ project_key=str(project_key),
+ version_key=str(version_key),
+ revision_number=str(revision_number),
+ )
+ self.__data.add(sbom_task)
+ await self.__data.commit()
+ await self.__data.refresh(sbom_task)
+ return sbom_task
+
async def generate_cyclonedx(
self,
project_key: safe.ProjectKey,
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 7f3879b4..a2cc27a2 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -323,6 +323,8 @@ def resolve(task_type: sql.TaskType) -> Callable[...,
Awaitable[results.Results
return rat.check
case sql.TaskType.SBOM_AUGMENT:
return sbom.augment
+ case sql.TaskType.SBOM_CONVERT:
+ return sbom.convert_cyclonedx
case sql.TaskType.SBOM_GENERATE_CYCLONEDX:
return sbom.generate_cyclonedx
case sql.TaskType.SBOM_OSV_SCAN:
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index ada5607c..14d5b8ee 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -40,6 +40,14 @@ import atr.util as util
_CONFIG: Final = config.get()
+class ConvertCycloneDX(schema.Strict):
+ """Arguments for the task to generate a CycloneDX SBOM."""
+
+ artifact_path: str = schema.description("Absolute path to the artifact")
+ output_path: str = schema.description("Absolute path where the generated
SBOM JSON should be written")
+ revision: safe.RevisionNumber = schema.description("Revision number")
+
+
class GenerateCycloneDX(schema.Strict):
"""Arguments for the task to generate a CycloneDX SBOM."""
@@ -47,6 +55,14 @@ class GenerateCycloneDX(schema.Strict):
output_path: str = schema.description("Absolute path where the generated
SBOM JSON should be written")
+class SBOMConversionError(Exception):
+ """Custom exception for SBOM conversion failures."""
+
+ def __init__(self, message: str, details: dict[str, Any] | None = None) ->
None:
+ super().__init__(message)
+ self.details = details or {}
+
+
class SBOMGenerationError(Exception):
"""Custom exception for SBOM generation failures."""
@@ -128,6 +144,21 @@ async def augment(args: FileArgs) -> results.Results |
None:
)
[email protected]_model(ConvertCycloneDX)
+async def convert_cyclonedx(args: ConvertCycloneDX) -> results.Results | None:
+ """Generate a JSON CycloneDX SBOM from a given XML SBOM."""
+ try:
+ result_data = await _convert_cyclonedx_core(args.artifact_path,
args.output_path, str(args.revision))
+ log.info(f"Successfully converted CycloneDX SBOM for
{args.artifact_path}")
+ msg = result_data["message"]
+ if not isinstance(msg, str):
+ raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
+ return results.SBOMConvert(kind="sbom_convert", path=args.output_path,
bom_version=result_data.get("version"))
+ except (archives.ExtractionError, SBOMGenerationError) as e:
+ log.error(f"SBOM generation failed for {args.artifact_path}: {e}")
+ raise
+
+
@checks.with_model(GenerateCycloneDX)
async def generate_cyclonedx(args: GenerateCycloneDX) -> results.Results |
None:
"""Generate a CycloneDX SBOM for the given artifact and write it to the
output path."""
@@ -329,6 +360,34 @@ def _extracted_dir(temp_dir: str) -> str | None:
return extract_dir
+async def _convert_cyclonedx_core(artifact_path: str, output_path: str,
revision_str: str) -> dict[str, Any]:
+ """Core logic to convert XML CycloneDX SBOM to JSON."""
+ log.info(f"Generating CycloneDX JSON SBOM for {artifact_path} ->
{output_path}")
+
+ # TODO: Should create a new revision here rather than in the caller
+ bundle = sbom.utilities.path_to_bundle(pathlib.Path(artifact_path))
+ if not bundle:
+ raise SBOMScoringError("Could not load bundle")
+ sbom.utilities.apply_patch("conversion to JSON", revision_str, bundle, [])
+ outputter = sbom.utilities.bundle_outputter(bundle)
+ text = outputter.output_as_string(indent=2)
+
+ try:
+ async with aiofiles.open(output_path, "w", encoding="utf-8") as f:
+ await f.write(text)
+ log.info(f"Successfully wrote JSON SBOM to {output_path}")
+ except Exception as write_err:
+ log.exception(f"Failed to write SBOM JSON to {output_path}:
{write_err}")
+ raise SBOMConversionError(f"Failed to write SBOM to {output_path}:
{write_err}") from write_err
+
+ return {
+ "message": "Successfully generated and saved CycloneDX SBOM",
+ "sbom": text,
+ "format": "CycloneDX",
+ "version": str(bundle.bom.version),
+ }
+
+
async def _generate_cyclonedx_core(artifact_path: str, output_path: str) ->
dict[str, Any]:
"""Core logic to generate CycloneDX SBOM on failure."""
log.info(f"Generating CycloneDX SBOM for {artifact_path} -> {output_path}")
diff --git a/atr/templates/draft-tools.html b/atr/templates/draft-tools.html
index 38851178..0dc4ef47 100644
--- a/atr/templates/draft-tools.html
+++ b/atr/templates/draft-tools.html
@@ -36,4 +36,12 @@
<p>Generate a CycloneDX Software Bill of Materials (SBOM) file for this
artifact.</p>
{{ sbom_form|safe }}
{% endif %}
+ {% if file_path.endswith(".cdx.xml") and
is_viewing_as_admin_fn(current_user.uid) %}
+ <h3>Convert SBOM</h3>
+ <div class="alert-info">
+ <p>NOTE: This functionality is in early release.</p>
+ </div>
+ <p>Convert this XML CycloneDX Software Bill of Materials (SBOM) into a
JSON equivalent.</p>
+ {{ sbom_convert_form|safe }}
+ {% endif %}
{% endblock content %}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]