This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new a197a75  Move functions for processing archives to their own module
a197a75 is described below

commit a197a755e2da3e6bba7a1ded149f35d946ff09a5
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jun 23 18:44:29 2025 +0100

    Move functions for processing archives to their own module
---
 atr/{tasks/sbom.py => archives.py} | 157 +++++---------------------------
 atr/tasks/__init__.py              |   3 +-
 atr/tasks/checks/rat.py            |   4 +-
 atr/tasks/checks/targz.py          |  22 +----
 atr/tasks/sbom.py                  | 181 +------------------------------------
 5 files changed, 36 insertions(+), 331 deletions(-)

diff --git a/atr/tasks/sbom.py b/atr/archives.py
similarity index 53%
copy from atr/tasks/sbom.py
copy to atr/archives.py
index c4967f5..5a0959d 100644
--- a/atr/tasks/sbom.py
+++ b/atr/archives.py
@@ -15,41 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import asyncio
-import json
 import logging
 import os
+import os.path
 import tarfile
-from typing import Any, Final
+from typing import Final
 
-import aiofiles
-
-import atr.config as config
-import atr.schema as schema
-import atr.tasks.checks as checks
-import atr.tasks.checks.targz as targz
-import atr.util as util
-
-_CONFIG: Final = config.get()
 _LOGGER: Final = logging.getLogger(__name__)
 
 
-class GenerateCycloneDX(schema.Strict):
-    """Arguments for the task to generate a CycloneDX SBOM."""
-
-    artifact_path: str = schema.description("Absolute path to the artifact")
-    output_path: str = schema.description("Absolute path where the generated 
SBOM JSON should be written")
-
+class ExtractionError(Exception):
+    pass
 
-class SBOMGenerationError(Exception):
-    """Custom exception for SBOM generation failures."""
 
-    def __init__(self, message: str, details: dict[str, Any] | None = None) -> 
None:
-        super().__init__(message)
-        self.details = details or {}
-
-
-def archive_extract_safe(
+def targz_extract(
     archive_path: str,
     extract_dir: str,
     max_size: int,
@@ -68,24 +47,28 @@ def archive_extract_safe(
                     break
 
     except tarfile.ReadError as e:
-        raise SBOMGenerationError(f"Failed to read archive: {e}", 
{"archive_path": archive_path}) from e
+        raise ExtractionError(f"Failed to read archive: {e}", {"archive_path": 
archive_path}) from e
 
     return total_extracted
 
 
[email protected]_model(GenerateCycloneDX)
-async def generate_cyclonedx(args: GenerateCycloneDX) -> str | None:
-    """Generate a CycloneDX SBOM for the given artifact and write it to the 
output path."""
-    try:
-        result_data = await _generate_cyclonedx_core(args.artifact_path, 
args.output_path)
-        _LOGGER.info(f"Successfully generated CycloneDX SBOM for 
{args.artifact_path}")
-        msg = result_data["message"]
-        if not isinstance(msg, str):
-            raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
-        return msg
-    except SBOMGenerationError as e:
-        _LOGGER.error(f"SBOM generation failed for {args.artifact_path}: 
{e.details}")
-        raise
+def targz_total_size(tgz_path: str, chunk_size: int = 4096) -> int:
+    """Verify a .tar.gz file and compute its uncompressed size."""
+    total_size = 0
+
+    with tarfile.open(tgz_path, mode="r|gz") as tf:
+        for member in tf:
+            # Do not skip metadata here
+            total_size += member.size
+            # Verify file by extraction
+            if member.isfile():
+                f = tf.extractfile(member)
+                if f is not None:
+                    while True:
+                        data = f.read(chunk_size)
+                        if not data:
+                            break
+    return total_size
 
 
 def _archive_extract_safe_process_file(
@@ -122,7 +105,7 @@ def _archive_extract_safe_process_file(
                     # Clean up the partial file before raising
                     target.close()
                     os.unlink(target_path)
-                    raise SBOMGenerationError(
+                    raise ExtractionError(
                         f"Extraction exceeded maximum size limit of {max_size} 
bytes",
                         {"max_size": max_size, "current_size": 
total_extracted},
                     )
@@ -145,7 +128,7 @@ def archive_extract_member(
 
     # Check whether extraction would exceed the size limit
     if member.isreg() and ((total_extracted + member.size) > max_size):
-        raise SBOMGenerationError(
+        raise ExtractionError(
             f"Extraction would exceed maximum size limit of {max_size} bytes",
             {"max_size": max_size, "current_size": total_extracted, 
"file_size": member.size},
         )
@@ -227,96 +210,6 @@ def _archive_extract_safe_process_symlink(member: 
tarfile.TarInfo, extract_dir:
         _LOGGER.warning("Failed to create symlink %s -> %s: %s", target_path, 
link_target, e)
 
 
-async def _generate_cyclonedx_core(artifact_path: str, output_path: str) -> 
dict[str, Any]:
-    """Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on 
failure."""
-    _LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} -> 
{output_path}")
-
-    async with util.async_temporary_directory(prefix="cyclonedx_sbom_") as 
temp_dir:
-        _LOGGER.info(f"Created temporary directory: {temp_dir}")
-
-        # Find and validate the root directory
-        try:
-            root_dir = await asyncio.to_thread(targz.root_directory, 
artifact_path)
-        except targz.RootDirectoryError as e:
-            raise SBOMGenerationError(f"Archive root directory issue: {e}", 
{"artifact_path": artifact_path}) from e
-        except Exception as e:
-            raise SBOMGenerationError(
-                f"Failed to determine archive root directory: {e}", 
{"artifact_path": artifact_path}
-            ) from e
-
-        extract_dir = os.path.join(temp_dir, root_dir)
-
-        # Extract the archive to the temporary directory
-        # TODO: Ideally we'd have task dependencies or archive caching
-        _LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
-        extracted_size = await asyncio.to_thread(
-            archive_extract_safe,
-            artifact_path,
-            str(temp_dir),
-            max_size=_CONFIG.MAX_EXTRACT_SIZE,
-            chunk_size=_CONFIG.EXTRACT_CHUNK_SIZE,
-        )
-        _LOGGER.info(f"Extracted {extracted_size} bytes into {extract_dir}")
-
-        # Run syft to generate the CycloneDX SBOM
-        syft_command = ["syft", extract_dir, "-o", "cyclonedx-json"]
-        _LOGGER.info(f"Running syft: {' '.join(syft_command)}")
-
-        try:
-            process = await asyncio.create_subprocess_exec(
-                *syft_command,
-                stdout=asyncio.subprocess.PIPE,
-                stderr=asyncio.subprocess.PIPE,
-            )
-            stdout, stderr = await asyncio.wait_for(process.communicate(), 
timeout=300)
-
-            stdout_str = stdout.decode("utf-8").strip() if stdout else ""
-            stderr_str = stderr.decode("utf-8").strip() if stderr else ""
-
-            if process.returncode != 0:
-                _LOGGER.error(f"syft command failed with code 
{process.returncode}")
-                _LOGGER.error(f"syft stderr: {stderr_str}")
-                _LOGGER.error(f"syft stdout: {stdout_str[:1000]}...")
-                raise SBOMGenerationError(
-                    f"syft command failed with code {process.returncode}",
-                    {"returncode": process.returncode, "stderr": stderr_str, 
"stdout": stdout_str[:1000]},
-                )
-
-            # Parse the JSON output from syft
-            try:
-                sbom_data = json.loads(stdout_str)
-                _LOGGER.info(f"Successfully parsed syft output for 
{artifact_path}")
-
-                # Write the SBOM data to the specified output path
-                try:
-                    async with aiofiles.open(output_path, "w", 
encoding="utf-8") as f:
-                        await f.write(json.dumps(sbom_data, indent=2))
-                    _LOGGER.info(f"Successfully wrote SBOM to {output_path}")
-                except Exception as write_err:
-                    _LOGGER.exception(f"Failed to write SBOM JSON to 
{output_path}: {write_err}")
-                    raise SBOMGenerationError(f"Failed to write SBOM to 
{output_path}: {write_err}") from write_err
-
-                return {
-                    "message": "Successfully generated and saved CycloneDX 
SBOM",
-                    "sbom": sbom_data,
-                    "format": "CycloneDX",
-                    "components": len(sbom_data.get("components", [])),
-                }
-            except json.JSONDecodeError as e:
-                _LOGGER.error(f"Failed to parse syft output as JSON: {e}")
-                raise SBOMGenerationError(
-                    f"Failed to parse syft output: {e}",
-                    {"error": str(e), "syft_output": stdout_str[:1000]},
-                ) from e
-
-        except TimeoutError:
-            _LOGGER.error("syft command timed out after 5 minutes")
-            raise SBOMGenerationError("syft command timed out after 5 minutes")
-        except FileNotFoundError:
-            _LOGGER.error("syft command not found. Is it installed and in 
PATH?")
-            raise SBOMGenerationError("syft command not found")
-
-
 def _safe_path(base_dir: str, *paths: str) -> str | None:
     """Return an absolute path within the base_dir built from the given paths, 
or None if it escapes."""
     target = os.path.abspath(os.path.join(base_dir, *paths))
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 0fde156..15a8101 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -194,9 +194,10 @@ async def tar_gz_checks(release: models.Release, revision: 
str, path: str) -> li
 async def zip_checks(release: models.Release, revision: str, path: str) -> 
list[models.Task]:
     """Create check tasks for a .zip file."""
     tasks = [
-        queued(models.TaskType.ZIPFORMAT_INTEGRITY, release, revision, path),
         queued(models.TaskType.LICENSE_FILES, release, revision, path),
         queued(models.TaskType.LICENSE_HEADERS, release, revision, path),
+        # queued(models.TaskType.RAT_CHECK, release, revision, path),
+        queued(models.TaskType.ZIPFORMAT_INTEGRITY, release, revision, path),
         queued(models.TaskType.ZIPFORMAT_STRUCTURE, release, revision, path),
     ]
     return tasks
diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 90f8094..4449a9b 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -23,10 +23,10 @@ import tempfile
 import xml.etree.ElementTree as ElementTree
 from typing import Any, Final
 
+import atr.archives as archives
 import atr.config as config
 import atr.tasks.checks as checks
 import atr.tasks.checks.targz as targz
-import atr.tasks.sbom as sbom
 
 _CONFIG: Final = config.get()
 _JAVA_MEMORY_ARGS: Final[list[str]] = []
@@ -169,7 +169,7 @@ def _check_core_logic(
 
             # Extract the archive to the temporary directory
             _LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
-            extracted_size = sbom.archive_extract_safe(
+            extracted_size = archives.targz_extract(
                 artifact_path, temp_dir, max_size=max_extract_size, 
chunk_size=chunk_size
             )
             _LOGGER.info(f"Extracted {extracted_size} bytes")
diff --git a/atr/tasks/checks/targz.py b/atr/tasks/checks/targz.py
index f0103a1..656edfe 100644
--- a/atr/tasks/checks/targz.py
+++ b/atr/tasks/checks/targz.py
@@ -20,6 +20,7 @@ import logging
 import tarfile
 from typing import Final
 
+import atr.archives as archives
 import atr.tasks.checks as checks
 
 _LOGGER: Final = logging.getLogger(__name__)
@@ -41,7 +42,7 @@ async def integrity(args: checks.FunctionArguments) -> str | 
None:
 
     chunk_size = 4096
     try:
-        size = await asyncio.to_thread(_integrity_core, 
str(artifact_abs_path), chunk_size)
+        size = await asyncio.to_thread(archives.targz_total_size, 
str(artifact_abs_path), chunk_size)
         await recorder.success("Able to read all entries of the archive using 
tarfile", {"size": size})
     except Exception as e:
         await recorder.failure("Unable to read all entries of the archive 
using tarfile", {"error": str(e)})
@@ -104,22 +105,3 @@ async def structure(args: checks.FunctionArguments) -> str 
| None:
     except Exception as e:
         await recorder.failure("Unable to verify archive structure", {"error": 
str(e)})
     return None
-
-
-def _integrity_core(tgz_path: str, chunk_size: int = 4096) -> int:
-    """Verify a .tar.gz file and compute its uncompressed size."""
-    total_size = 0
-
-    with tarfile.open(tgz_path, mode="r|gz") as tf:
-        for member in tf:
-            # Do not skip metadata here
-            total_size += member.size
-            # Verify file by extraction
-            if member.isfile():
-                f = tf.extractfile(member)
-                if f is not None:
-                    while True:
-                        data = f.read(chunk_size)
-                        if not data:
-                            break
-    return total_size
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index c4967f5..d136ad1 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -19,11 +19,11 @@ import asyncio
 import json
 import logging
 import os
-import tarfile
 from typing import Any, Final
 
 import aiofiles
 
+import atr.archives as archives
 import atr.config as config
 import atr.schema as schema
 import atr.tasks.checks as checks
@@ -49,30 +49,6 @@ class SBOMGenerationError(Exception):
         self.details = details or {}
 
 
-def archive_extract_safe(
-    archive_path: str,
-    extract_dir: str,
-    max_size: int,
-    chunk_size: int,
-) -> int:
-    """Safe archive extraction."""
-    total_extracted = 0
-
-    try:
-        with tarfile.open(archive_path, mode="r|gz") as tf:
-            for member in tf:
-                keep_going, total_extracted = archive_extract_member(
-                    tf, member, extract_dir, total_extracted, max_size, 
chunk_size
-                )
-                if not keep_going:
-                    break
-
-    except tarfile.ReadError as e:
-        raise SBOMGenerationError(f"Failed to read archive: {e}", 
{"archive_path": archive_path}) from e
-
-    return total_extracted
-
-
 @checks.with_model(GenerateCycloneDX)
 async def generate_cyclonedx(args: GenerateCycloneDX) -> str | None:
     """Generate a CycloneDX SBOM for the given artifact and write it to the 
output path."""
@@ -83,152 +59,13 @@ async def generate_cyclonedx(args: GenerateCycloneDX) -> 
str | None:
         if not isinstance(msg, str):
             raise SBOMGenerationError(f"Invalid message type: {type(msg)}")
         return msg
-    except SBOMGenerationError as e:
-        _LOGGER.error(f"SBOM generation failed for {args.artifact_path}: 
{e.details}")
+    except (archives.ExtractionError, SBOMGenerationError) as e:
+        _LOGGER.error(f"SBOM generation failed for {args.artifact_path}: {e}")
         raise
 
 
-def _archive_extract_safe_process_file(
-    tf: tarfile.TarFile,
-    member: tarfile.TarInfo,
-    extract_dir: str,
-    total_extracted: int,
-    max_size: int,
-    chunk_size: int,
-) -> int:
-    """Process a single file member during safe archive extraction."""
-    target_path = os.path.join(extract_dir, member.name)
-    if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
-        _LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
-        return 0
-
-    os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
-    source = tf.extractfile(member)
-    if source is None:
-        # Should not happen if member.isreg() is true
-        _LOGGER.warning(f"Could not extract file object for member: 
{member.name}")
-        return 0
-
-    extracted_file_size = 0
-    try:
-        with open(target_path, "wb") as target:
-            while chunk := source.read(chunk_size):
-                target.write(chunk)
-                extracted_file_size += len(chunk)
-
-                # Check size limits during extraction
-                if (total_extracted + extracted_file_size) > max_size:
-                    # Clean up the partial file before raising
-                    target.close()
-                    os.unlink(target_path)
-                    raise SBOMGenerationError(
-                        f"Extraction exceeded maximum size limit of {max_size} 
bytes",
-                        {"max_size": max_size, "current_size": 
total_extracted},
-                    )
-    finally:
-        source.close()
-
-    return extracted_file_size
-
-
-def archive_extract_member(
-    tf: tarfile.TarFile, member: tarfile.TarInfo, extract_dir: str, 
total_extracted: int, max_size: int, chunk_size: int
-) -> tuple[bool, int]:
-    if member.name and member.name.split("/")[-1].startswith("._"):
-        # Metadata convention
-        return False, 0
-
-    # Skip any character device, block device, or FIFO
-    if member.isdev():
-        return False, 0
-
-    # Check whether extraction would exceed the size limit
-    if member.isreg() and ((total_extracted + member.size) > max_size):
-        raise SBOMGenerationError(
-            f"Extraction would exceed maximum size limit of {max_size} bytes",
-            {"max_size": max_size, "current_size": total_extracted, 
"file_size": member.size},
-        )
-
-    # Extract directories directly
-    if member.isdir():
-        # Ensure the path is safe before extracting
-        target_path = os.path.join(extract_dir, member.name)
-        if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
-            _LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
-            return False, 0
-        tf.extract(member, extract_dir, numeric_owner=True)
-
-    elif member.isreg():
-        extracted_size = _archive_extract_safe_process_file(
-            tf, member, extract_dir, total_extracted, max_size, chunk_size
-        )
-        total_extracted += extracted_size
-
-    elif member.issym():
-        _archive_extract_safe_process_symlink(member, extract_dir)
-
-    elif member.islnk():
-        _archive_extract_safe_process_hardlink(member, extract_dir)
-
-    return True, total_extracted
-
-
-def _archive_extract_safe_process_hardlink(member: tarfile.TarInfo, 
extract_dir: str) -> None:
-    """Safely create a hard link from the TarInfo entry."""
-    target_path = _safe_path(extract_dir, member.name)
-    if target_path is None:
-        _LOGGER.warning(f"Skipping potentially unsafe hard link path: 
{member.name}")
-        return
-
-    link_target = member.linkname or ""
-    source_path = _safe_path(extract_dir, link_target)
-    if source_path is None or not os.path.exists(source_path):
-        _LOGGER.warning(f"Skipping hard link with invalid target: 
{member.name} -> {link_target}")
-        return
-
-    os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
-    try:
-        if os.path.lexists(target_path):
-            return
-        os.link(source_path, target_path)
-    except (OSError, NotImplementedError) as e:
-        _LOGGER.warning(f"Failed to create hard link {target_path} -> 
{source_path}: {e}")
-
-
-def _archive_extract_safe_process_symlink(member: tarfile.TarInfo, 
extract_dir: str) -> None:
-    """Safely create a symbolic link from the TarInfo entry."""
-    target_path = _safe_path(extract_dir, member.name)
-    if target_path is None:
-        _LOGGER.warning(f"Skipping potentially unsafe symlink path: 
{member.name}")
-        return
-
-    link_target = member.linkname or ""
-
-    # Reject absolute targets to avoid links outside the tree
-    if os.path.isabs(link_target):
-        _LOGGER.warning(f"Skipping symlink with absolute target: {member.name} 
-> {link_target}")
-        return
-
-    # Ensure that the resolved link target stays within the extraction 
directory
-    resolved_target = _safe_path(os.path.dirname(target_path), link_target)
-    if resolved_target is None:
-        _LOGGER.warning(f"Skipping symlink pointing outside tree: 
{member.name} -> {link_target}")
-        return
-
-    os.makedirs(os.path.dirname(target_path), exist_ok=True)
-
-    try:
-        if os.path.lexists(target_path):
-            return
-        os.symlink(link_target, target_path)
-    except (OSError, NotImplementedError) as e:
-        _LOGGER.warning("Failed to create symlink %s -> %s: %s", target_path, 
link_target, e)
-
-
 async def _generate_cyclonedx_core(artifact_path: str, output_path: str) -> 
dict[str, Any]:
-    """Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on 
failure."""
+    """Core logic to generate CycloneDX SBOM on failure."""
     _LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} -> 
{output_path}")
 
     async with util.async_temporary_directory(prefix="cyclonedx_sbom_") as 
temp_dir:
@@ -250,7 +87,7 @@ async def _generate_cyclonedx_core(artifact_path: str, 
output_path: str) -> dict
         # TODO: Ideally we'd have task dependencies or archive caching
         _LOGGER.info(f"Extracting {artifact_path} to {temp_dir}")
         extracted_size = await asyncio.to_thread(
-            archive_extract_safe,
+            archives.targz_extract,
             artifact_path,
             str(temp_dir),
             max_size=_CONFIG.MAX_EXTRACT_SIZE,
@@ -315,11 +152,3 @@ async def _generate_cyclonedx_core(artifact_path: str, 
output_path: str) -> dict
         except FileNotFoundError:
             _LOGGER.error("syft command not found. Is it installed and in 
PATH?")
             raise SBOMGenerationError("syft command not found")
-
-
-def _safe_path(base_dir: str, *paths: str) -> str | None:
-    """Return an absolute path within the base_dir built from the given paths, 
or None if it escapes."""
-    target = os.path.abspath(os.path.join(base_dir, *paths))
-    if target.startswith(os.path.abspath(base_dir)):
-        return target
-    return None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to