This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new a197a75 Move functions for processing archives to their own module
a197a75 is described below
commit a197a755e2da3e6bba7a1ded149f35d946ff09a5
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jun 23 18:44:29 2025 +0100
Move functions for processing archives to their own module
---
atr/{tasks/sbom.py => archives.py} | 157 +++++---------------------------
atr/tasks/__init__.py | 3 +-
atr/tasks/checks/rat.py | 4 +-
atr/tasks/checks/targz.py | 22 +----
atr/tasks/sbom.py | 181 +------------------------------------
5 files changed, 36 insertions(+), 331 deletions(-)
diff --git a/atr/tasks/sbom.py b/atr/archives.py
similarity index 53%
copy from atr/tasks/sbom.py
copy to atr/archives.py
index c4967f5..5a0959d 100644
--- a/atr/tasks/sbom.py
+++ b/atr/archives.py
@@ -15,41 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-import asyncio
-import json
import logging
import os
+import os.path
import tarfile
-from typing import Any, Final
+from typing import Final
-import aiofiles
-
-import atr.config as config
-import atr.schema as schema
-import atr.tasks.checks as checks
-import atr.tasks.checks.targz as targz
-import atr.util as util
-
-_CONFIG: Final = config.get()
_LOGGER: Final = logging.getLogger(__name__)
-class GenerateCycloneDX(schema.Strict):
- """Arguments for the task to generate a CycloneDX SBOM."""
-
- artifact_path: str = schema.description("Absolute path to the artifact")
- output_path: str = schema.description("Absolute path where the generated
SBOM JSON should be written")
-
+class ExtractionError(Exception):
+ pass
-class SBOMGenerationError(Exception):
- """Custom exception for SBOM generation failures."""
- def __init__(self, message: str, details: dict[str, Any] | None = None) ->
None:
- super().__init__(message)
- self.details = details or {}
-
-
-def archive_extract_safe(
+def targz_extract(
archive_path: str,
extract_dir: str,
max_size: int,
@@ -68,24 +47,28 @@ def archive_extract_safe(
break
except tarfile.ReadError as e:
- raise SBOMGenerationError(f"Failed to read archive: {e}",
{"archive_path": archive_path}) from e
+ raise ExtractionError(f"Failed to read archive: {e}", {"archive_path":
archive_path}) from e
return total_extracted
[email protected]_model(GenerateCycloneDX)
-async def generate_cyclonedx(args: GenerateCycloneDX) -> str | None:
- """Generate a CycloneDX SBOM for the given artifact and write it to the
output path."""
- try:
- result_data = await _generate_cyclonedx_core(args.artifact_path,
args.output_path)
- _LOGGER.info(f"Successfully generated CycloneDX SBOM for
{args.artifact_path}")
- msg = result_data["message"]
- if not isinstance(msg, str):
- raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
- return msg
- except SBOMGenerationError as e:
- _LOGGER.error(f"SBOM generation failed for {args.artifact_path}:
{e.details}")
- raise
+def targz_total_size(tgz_path: str, chunk_size: int = 4096) -> int:
+ """Verify a .tar.gz file and compute its uncompressed size."""
+ total_size = 0
+
+ with tarfile.open(tgz_path, mode="r|gz") as tf:
+ for member in tf:
+ # Do not skip metadata here
+ total_size += member.size
+ # Verify file by extraction
+ if member.isfile():
+ f = tf.extractfile(member)
+ if f is not None:
+ while True:
+ data = f.read(chunk_size)
+ if not data:
+ break
+ return total_size
def _archive_extract_safe_process_file(
@@ -122,7 +105,7 @@ def _archive_extract_safe_process_file(
# Clean up the partial file before raising
target.close()
os.unlink(target_path)
- raise SBOMGenerationError(
+ raise ExtractionError(
f"Extraction exceeded maximum size limit of {max_size}
bytes",
{"max_size": max_size, "current_size":
total_extracted},
)
@@ -145,7 +128,7 @@ def archive_extract_member(
# Check whether extraction would exceed the size limit
if member.isreg() and ((total_extracted + member.size) > max_size):
- raise SBOMGenerationError(
+ raise ExtractionError(
f"Extraction would exceed maximum size limit of {max_size} bytes",
{"max_size": max_size, "current_size": total_extracted,
"file_size": member.size},
)
@@ -227,96 +210,6 @@ def _archive_extract_safe_process_symlink(member:
tarfile.TarInfo, extract_dir:
_LOGGER.warning("Failed to create symlink %s -> %s: %s", target_path,
link_target, e)
-async def _generate_cyclonedx_core(artifact_path: str, output_path: str) ->
dict[str, Any]:
- """Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on
failure."""
- _LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} ->
{output_path}")
-
- async with util.async_temporary_directory(prefix="cyclonedx_sbom_") as
temp_dir:
- _LOGGER.info(f"Created temporary directory: {temp_dir}")
-
- # Find and validate the root directory
- try:
- root_dir = await asyncio.to_thread(targz.root_directory,
artifact_path)
- except targz.RootDirectoryError as e:
- raise SBOMGenerationError(f"Archive root directory issue: {e}",
{"artifact_path": artifact_path}) from e
- except Exception as e:
- raise SBOMGenerationError(
- f"Failed to determine archive root directory: {e}",
{"artifact_path": artifact_path}
- ) from e
-
- extract_dir = os.path.join(temp_dir, root_dir)
-
- # Extract the archive to the temporary directory
- # TODO: Ideally we'd have task dependencies or archive caching
- _LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
- extracted_size = await asyncio.to_thread(
- archive_extract_safe,
- artifact_path,
- str(temp_dir),
- max_size=_CONFIG.MAX_EXTRACT_SIZE,
- chunk_size=_CONFIG.EXTRACT_CHUNK_SIZE,
- )
- _LOGGER.info(f"Extracted {extracted_size} bytes into {extract_dir}")
-
- # Run syft to generate the CycloneDX SBOM
- syft_command = ["syft", extract_dir, "-o", "cyclonedx-json"]
- _LOGGER.info(f"Running syft: {' '.join(syft_command)}")
-
- try:
- process = await asyncio.create_subprocess_exec(
- *syft_command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- stdout, stderr = await asyncio.wait_for(process.communicate(),
timeout=300)
-
- stdout_str = stdout.decode("utf-8").strip() if stdout else ""
- stderr_str = stderr.decode("utf-8").strip() if stderr else ""
-
- if process.returncode != 0:
- _LOGGER.error(f"syft command failed with code
{process.returncode}")
- _LOGGER.error(f"syft stderr: {stderr_str}")
- _LOGGER.error(f"syft stdout: {stdout_str[:1000]}...")
- raise SBOMGenerationError(
- f"syft command failed with code {process.returncode}",
- {"returncode": process.returncode, "stderr": stderr_str,
"stdout": stdout_str[:1000]},
- )
-
- # Parse the JSON output from syft
- try:
- sbom_data = json.loads(stdout_str)
- _LOGGER.info(f"Successfully parsed syft output for
{artifact_path}")
-
- # Write the SBOM data to the specified output path
- try:
- async with aiofiles.open(output_path, "w",
encoding="utf-8") as f:
- await f.write(json.dumps(sbom_data, indent=2))
- _LOGGER.info(f"Successfully wrote SBOM to {output_path}")
- except Exception as write_err:
- _LOGGER.exception(f"Failed to write SBOM JSON to
{output_path}: {write_err}")
- raise SBOMGenerationError(f"Failed to write SBOM to
{output_path}: {write_err}") from write_err
-
- return {
- "message": "Successfully generated and saved CycloneDX
SBOM",
- "sbom": sbom_data,
- "format": "CycloneDX",
- "components": len(sbom_data.get("components", [])),
- }
- except json.JSONDecodeError as e:
- _LOGGER.error(f"Failed to parse syft output as JSON: {e}")
- raise SBOMGenerationError(
- f"Failed to parse syft output: {e}",
- {"error": str(e), "syft_output": stdout_str[:1000]},
- ) from e
-
- except TimeoutError:
- _LOGGER.error("syft command timed out after 5 minutes")
- raise SBOMGenerationError("syft command timed out after 5 minutes")
- except FileNotFoundError:
- _LOGGER.error("syft command not found. Is it installed and in
PATH?")
- raise SBOMGenerationError("syft command not found")
-
-
def _safe_path(base_dir: str, *paths: str) -> str | None:
"""Return an absolute path within the base_dir built from the given paths,
or None if it escapes."""
target = os.path.abspath(os.path.join(base_dir, *paths))
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 0fde156..15a8101 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -194,9 +194,10 @@ async def tar_gz_checks(release: models.Release, revision:
str, path: str) -> li
async def zip_checks(release: models.Release, revision: str, path: str) ->
list[models.Task]:
"""Create check tasks for a .zip file."""
tasks = [
- queued(models.TaskType.ZIPFORMAT_INTEGRITY, release, revision, path),
queued(models.TaskType.LICENSE_FILES, release, revision, path),
queued(models.TaskType.LICENSE_HEADERS, release, revision, path),
+ # queued(models.TaskType.RAT_CHECK, release, revision, path),
+ queued(models.TaskType.ZIPFORMAT_INTEGRITY, release, revision, path),
queued(models.TaskType.ZIPFORMAT_STRUCTURE, release, revision, path),
]
return tasks
diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 90f8094..4449a9b 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -23,10 +23,10 @@ import tempfile
import xml.etree.ElementTree as ElementTree
from typing import Any, Final
+import atr.archives as archives
import atr.config as config
import atr.tasks.checks as checks
import atr.tasks.checks.targz as targz
-import atr.tasks.sbom as sbom
_CONFIG: Final = config.get()
_JAVA_MEMORY_ARGS: Final[list[str]] = []
@@ -169,7 +169,7 @@ def _check_core_logic(
# Extract the archive to the temporary directory
_LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
- extracted_size = sbom.archive_extract_safe(
+ extracted_size = archives.targz_extract(
artifact_path, temp_dir, max_size=max_extract_size,
chunk_size=chunk_size
)
_LOGGER.info(f"Extracted {extracted_size} bytes")
diff --git a/atr/tasks/checks/targz.py b/atr/tasks/checks/targz.py
index f0103a1..656edfe 100644
--- a/atr/tasks/checks/targz.py
+++ b/atr/tasks/checks/targz.py
@@ -20,6 +20,7 @@ import logging
import tarfile
from typing import Final
+import atr.archives as archives
import atr.tasks.checks as checks
_LOGGER: Final = logging.getLogger(__name__)
@@ -41,7 +42,7 @@ async def integrity(args: checks.FunctionArguments) -> str |
None:
chunk_size = 4096
try:
- size = await asyncio.to_thread(_integrity_core,
str(artifact_abs_path), chunk_size)
+ size = await asyncio.to_thread(archives.targz_total_size,
str(artifact_abs_path), chunk_size)
await recorder.success("Able to read all entries of the archive using
tarfile", {"size": size})
except Exception as e:
await recorder.failure("Unable to read all entries of the archive
using tarfile", {"error": str(e)})
@@ -104,22 +105,3 @@ async def structure(args: checks.FunctionArguments) -> str
| None:
except Exception as e:
await recorder.failure("Unable to verify archive structure", {"error":
str(e)})
return None
-
-
-def _integrity_core(tgz_path: str, chunk_size: int = 4096) -> int:
- """Verify a .tar.gz file and compute its uncompressed size."""
- total_size = 0
-
- with tarfile.open(tgz_path, mode="r|gz") as tf:
- for member in tf:
- # Do not skip metadata here
- total_size += member.size
- # Verify file by extraction
- if member.isfile():
- f = tf.extractfile(member)
- if f is not None:
- while True:
- data = f.read(chunk_size)
- if not data:
- break
- return total_size
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index c4967f5..d136ad1 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -19,11 +19,11 @@ import asyncio
import json
import logging
import os
-import tarfile
from typing import Any, Final
import aiofiles
+import atr.archives as archives
import atr.config as config
import atr.schema as schema
import atr.tasks.checks as checks
@@ -49,30 +49,6 @@ class SBOMGenerationError(Exception):
self.details = details or {}
-def archive_extract_safe(
- archive_path: str,
- extract_dir: str,
- max_size: int,
- chunk_size: int,
-) -> int:
- """Safe archive extraction."""
- total_extracted = 0
-
- try:
- with tarfile.open(archive_path, mode="r|gz") as tf:
- for member in tf:
- keep_going, total_extracted = archive_extract_member(
- tf, member, extract_dir, total_extracted, max_size,
chunk_size
- )
- if not keep_going:
- break
-
- except tarfile.ReadError as e:
- raise SBOMGenerationError(f"Failed to read archive: {e}",
{"archive_path": archive_path}) from e
-
- return total_extracted
-
-
@checks.with_model(GenerateCycloneDX)
async def generate_cyclonedx(args: GenerateCycloneDX) -> str | None:
"""Generate a CycloneDX SBOM for the given artifact and write it to the
output path."""
@@ -83,152 +59,13 @@ async def generate_cyclonedx(args: GenerateCycloneDX) ->
str | None:
if not isinstance(msg, str):
raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
return msg
- except SBOMGenerationError as e:
- _LOGGER.error(f"SBOM generation failed for {args.artifact_path}:
{e.details}")
+ except (archives.ExtractionError, SBOMGenerationError) as e:
+ _LOGGER.error(f"SBOM generation failed for {args.artifact_path}: {e}")
raise
-def _archive_extract_safe_process_file(
- tf: tarfile.TarFile,
- member: tarfile.TarInfo,
- extract_dir: str,
- total_extracted: int,
- max_size: int,
- chunk_size: int,
-) -> int:
- """Process a single file member during safe archive extraction."""
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
- _LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
- return 0
-
- os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
- source = tf.extractfile(member)
- if source is None:
- # Should not happen if member.isreg() is true
- _LOGGER.warning(f"Could not extract file object for member:
{member.name}")
- return 0
-
- extracted_file_size = 0
- try:
- with open(target_path, "wb") as target:
- while chunk := source.read(chunk_size):
- target.write(chunk)
- extracted_file_size += len(chunk)
-
- # Check size limits during extraction
- if (total_extracted + extracted_file_size) > max_size:
- # Clean up the partial file before raising
- target.close()
- os.unlink(target_path)
- raise SBOMGenerationError(
- f"Extraction exceeded maximum size limit of {max_size}
bytes",
- {"max_size": max_size, "current_size":
total_extracted},
- )
- finally:
- source.close()
-
- return extracted_file_size
-
-
-def archive_extract_member(
- tf: tarfile.TarFile, member: tarfile.TarInfo, extract_dir: str,
total_extracted: int, max_size: int, chunk_size: int
-) -> tuple[bool, int]:
- if member.name and member.name.split("/")[-1].startswith("._"):
- # Metadata convention
- return False, 0
-
- # Skip any character device, block device, or FIFO
- if member.isdev():
- return False, 0
-
- # Check whether extraction would exceed the size limit
- if member.isreg() and ((total_extracted + member.size) > max_size):
- raise SBOMGenerationError(
- f"Extraction would exceed maximum size limit of {max_size} bytes",
- {"max_size": max_size, "current_size": total_extracted,
"file_size": member.size},
- )
-
- # Extract directories directly
- if member.isdir():
- # Ensure the path is safe before extracting
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
- _LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
- return False, 0
- tf.extract(member, extract_dir, numeric_owner=True)
-
- elif member.isreg():
- extracted_size = _archive_extract_safe_process_file(
- tf, member, extract_dir, total_extracted, max_size, chunk_size
- )
- total_extracted += extracted_size
-
- elif member.issym():
- _archive_extract_safe_process_symlink(member, extract_dir)
-
- elif member.islnk():
- _archive_extract_safe_process_hardlink(member, extract_dir)
-
- return True, total_extracted
-
-
-def _archive_extract_safe_process_hardlink(member: tarfile.TarInfo,
extract_dir: str) -> None:
- """Safely create a hard link from the TarInfo entry."""
- target_path = _safe_path(extract_dir, member.name)
- if target_path is None:
- _LOGGER.warning(f"Skipping potentially unsafe hard link path:
{member.name}")
- return
-
- link_target = member.linkname or ""
- source_path = _safe_path(extract_dir, link_target)
- if source_path is None or not os.path.exists(source_path):
- _LOGGER.warning(f"Skipping hard link with invalid target:
{member.name} -> {link_target}")
- return
-
- os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
- try:
- if os.path.lexists(target_path):
- return
- os.link(source_path, target_path)
- except (OSError, NotImplementedError) as e:
- _LOGGER.warning(f"Failed to create hard link {target_path} ->
{source_path}: {e}")
-
-
-def _archive_extract_safe_process_symlink(member: tarfile.TarInfo,
extract_dir: str) -> None:
- """Safely create a symbolic link from the TarInfo entry."""
- target_path = _safe_path(extract_dir, member.name)
- if target_path is None:
- _LOGGER.warning(f"Skipping potentially unsafe symlink path:
{member.name}")
- return
-
- link_target = member.linkname or ""
-
- # Reject absolute targets to avoid links outside the tree
- if os.path.isabs(link_target):
- _LOGGER.warning(f"Skipping symlink with absolute target: {member.name}
-> {link_target}")
- return
-
- # Ensure that the resolved link target stays within the extraction
directory
- resolved_target = _safe_path(os.path.dirname(target_path), link_target)
- if resolved_target is None:
- _LOGGER.warning(f"Skipping symlink pointing outside tree:
{member.name} -> {link_target}")
- return
-
- os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
- try:
- if os.path.lexists(target_path):
- return
- os.symlink(link_target, target_path)
- except (OSError, NotImplementedError) as e:
- _LOGGER.warning("Failed to create symlink %s -> %s: %s", target_path,
link_target, e)
-
-
async def _generate_cyclonedx_core(artifact_path: str, output_path: str) ->
dict[str, Any]:
- """Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on
failure."""
+ """Core logic to generate CycloneDX SBOM on failure."""
_LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} ->
{output_path}")
async with util.async_temporary_directory(prefix="cyclonedx_sbom_") as
temp_dir:
@@ -250,7 +87,7 @@ async def _generate_cyclonedx_core(artifact_path: str,
output_path: str) -> dict
# TODO: Ideally we'd have task dependencies or archive caching
_LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
extracted_size = await asyncio.to_thread(
- archive_extract_safe,
+ archives.targz_extract,
artifact_path,
str(temp_dir),
max_size=_CONFIG.MAX_EXTRACT_SIZE,
@@ -315,11 +152,3 @@ async def _generate_cyclonedx_core(artifact_path: str,
output_path: str) -> dict
except FileNotFoundError:
_LOGGER.error("syft command not found. Is it installed and in
PATH?")
raise SBOMGenerationError("syft command not found")
-
-
-def _safe_path(base_dir: str, *paths: str) -> str | None:
- """Return an absolute path within the base_dir built from the given paths,
or None if it escapes."""
- target = os.path.abspath(os.path.join(base_dir, *paths))
- if target.startswith(os.path.abspath(base_dir)):
- return target
- return None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]