This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-atr-experiments.git
The following commit(s) were added to refs/heads/main by this push:
new fd58c3b Add a bulk download task
fd58c3b is described below
commit fd58c3b17dae25a0d5a40f9cc4d808c8337f3291
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Mar 4 20:00:24 2025 +0200
Add a bulk download task
---
atr/routes/package.py | 171 +++++++----
atr/routes/release.py | 50 +++-
atr/tasks/bulk.py | 618 ++++++++++++++++++++++++++++++++++++++++
atr/templates/package-add.html | 161 +++++++++++
atr/templates/release-bulk.html | 263 +++++++++++++++++
atr/worker.py | 105 +++++++
docs/conventions.html | 7 +
docs/conventions.md | 14 +
docs/plan.html | 1 +
docs/plan.md | 1 +
10 files changed, 1339 insertions(+), 52 deletions(-)
diff --git a/atr/routes/package.py b/atr/routes/package.py
index e3cc6c6..0670247 100644
--- a/atr/routes/package.py
+++ b/atr/routes/package.py
@@ -40,7 +40,6 @@ from werkzeug.wrappers.response import Response
from asfquart import APP
from asfquart.auth import Requirements, require
from asfquart.base import ASFQuartException
-from asfquart.session import ClientSession
from asfquart.session import read as session_read
from atr.db import get_session
from atr.db.models import (
@@ -165,54 +164,6 @@ async def package_add_artifact_info_get(
return artifact_sha3, await compute_sha512(uploads_path / artifact_sha3),
artifact_size
-async def package_add_post(session: ClientSession, request: Request) ->
Response:
- """Handle POST request for adding a package to a release."""
- try:
- release_key, artifact_file, checksum_file, signature_file,
artifact_type = await package_add_validate(request)
- except FlashError as e:
- logging.exception("FlashError:")
- await flash(f"{e!s}", "error")
- return redirect(url_for("root_package_add"))
- # This must come here to appease the type checker
- if artifact_file.filename is None:
- await flash("Release artifact filename is required", "error")
- return redirect(url_for("root_package_add"))
-
- # Save files and create package record in one transaction
- async with get_session() as db_session:
- async with db_session.begin():
- # Process and save the files
- try:
- try:
- artifact_sha3, artifact_size, artifact_sha512,
signature_sha3 = await package_add_session_process(
- db_session, release_key, artifact_file, checksum_file,
signature_file
- )
- except FlashError as e:
- logging.exception("FlashError:")
- await flash(f"{e!s}", "error")
- return redirect(url_for("root_package_add"))
-
- # Create the package record
- package = Package(
- artifact_sha3=artifact_sha3,
- artifact_type=artifact_type,
- filename=artifact_file.filename,
- signature_sha3=signature_sha3,
- sha512=artifact_sha512,
- release_key=release_key,
- uploaded=datetime.datetime.now(datetime.UTC),
- bytes_size=artifact_size,
- )
- db_session.add(package)
-
- except Exception as e:
- await flash(f"Error processing files: {e!s}", "error")
- return redirect(url_for("root_package_add"))
-
- # Otherwise redirect to review page
- return redirect(url_for("root_candidate_review"))
-
-
async def package_add_session_process(
db_session: AsyncSession,
release_key: str,
@@ -350,6 +301,118 @@ async def package_files_delete(package: Package,
uploads_path: Path) -> None:
# Release functions
+async def package_add_bulk_validate(form: MultiDict, request: Request) ->
tuple[str, str, list[str], bool, int]:
+ """Validate bulk package addition form data."""
+ release_key = form.get("release_key")
+ if (not release_key) or (not isinstance(release_key, str)):
+ raise FlashError("Release key is required")
+
+ url = form.get("url")
+ if (not url) or (not isinstance(url, str)):
+ raise FlashError("URL is required")
+
+ # Validate URL format
+ if not url.startswith(("http://", "https://")):
+ raise FlashError("URL must start with http:// or https://")
+
+ # Get selected file types
+ file_types = form.getlist("file_types")
+ if not file_types:
+ raise FlashError("At least one file type must be selected")
+
+ # Validate file types
+ valid_types = {".tar.gz", ".tgz", ".zip", ".jar"}
+ if not all(ft in valid_types for ft in file_types):
+ raise FlashError("Invalid file type selected")
+
+ # Get require signatures flag
+ require_signatures = bool(form.get("require_signatures"))
+
+ # Get max depth
+ try:
+ max_depth = int(form.get("max_depth", "1"))
+ if not 1 <= max_depth <= 10:
+ raise ValueError()
+ except (TypeError, ValueError):
+ raise FlashError("Maximum depth must be between 1 and 10 inclusive")
+
+ return release_key, url, file_types, require_signatures, max_depth
+
+
+async def package_add_single_post(form: MultiDict, request: Request) ->
Response:
+ """Process single package upload submission."""
+ try:
+ release_key, artifact_file, checksum_file, signature_file,
artifact_type = await package_add_validate(request)
+ except FlashError as e:
+ logging.exception("FlashError:")
+ await flash(f"{e!s}", "error")
+ return redirect(url_for("root_package_add"))
+ # This must come here to appease the type checker
+ if artifact_file.filename is None:
+ await flash("Release artifact filename is required", "error")
+ return redirect(url_for("root_package_add"))
+
+ # Save files and create package record in one transaction
+ async with get_session() as db_session:
+ async with db_session.begin():
+ # Process and save the files
+ try:
+ try:
+ artifact_sha3, artifact_size, artifact_sha512,
signature_sha3 = await package_add_session_process(
+ db_session, release_key, artifact_file, checksum_file,
signature_file
+ )
+ except FlashError as e:
+ logging.exception("FlashError:")
+ await flash(f"{e!s}", "error")
+ return redirect(url_for("root_package_add"))
+
+ # Create the package record
+ package = Package(
+ artifact_sha3=artifact_sha3,
+ artifact_type=artifact_type,
+ filename=artifact_file.filename,
+ signature_sha3=signature_sha3,
+ sha512=artifact_sha512,
+ release_key=release_key,
+ uploaded=datetime.datetime.now(datetime.UTC),
+ bytes_size=artifact_size,
+ )
+ db_session.add(package)
+
+ except Exception as e:
+ await flash(f"Error processing files: {e!s}", "error")
+ return redirect(url_for("root_package_add"))
+
+ # Otherwise redirect to review page
+ return redirect(url_for("root_candidate_review"))
+
+
+async def package_add_bulk_post(form: MultiDict, request: Request) -> Response:
+ """Process bulk package URL submission."""
+ try:
+ release_key, url, file_types, require_signatures, max_depth = await
package_add_bulk_validate(form, request)
+ except FlashError as e:
+ logging.exception("FlashError:")
+ await flash(f"{e!s}", "error")
+ return redirect(url_for("root_package_add"))
+
+ # Create a task for bulk downloading
+ max_concurrency = 5
+ async with get_session() as db_session:
+ async with db_session.begin():
+ task = Task(
+ status=TaskStatus.QUEUED,
+ task_type="package_bulk_download",
+ task_args=[release_key, url, file_types, require_signatures,
max_depth, max_concurrency],
+ )
+ db_session.add(task)
+ # Flush to get the task ID
+ await db_session.flush()
+
+ await flash("Started downloading packages from URL", "success")
+ return redirect(url_for("release_bulk_status", task_id=task.id))
+
+
@app_route("/package/add", methods=["GET", "POST"])
@require(Requirements.committer)
async def root_package_add() -> Response | str:
@@ -358,9 +421,15 @@ async def root_package_add() -> Response | str:
if session is None:
raise ASFQuartException("Not authenticated", errorcode=401)
- # For POST requests, handle the file upload
+ # For POST requests, handle the form submission
if request.method == "POST":
- return await package_add_post(session, request)
+ form = await get_form(request)
+ form_type = form.get("form_type")
+
+ if form_type == "bulk":
+ return await package_add_bulk_post(form, request)
+ else:
+ return await package_add_single_post(form, request)
# Get the storage_key from the query parameters (if redirected from create)
storage_key = request.args.get("storage_key")
diff --git a/atr/routes/release.py b/atr/routes/release.py
index 98c2961..082e25b 100644
--- a/atr/routes/release.py
+++ b/atr/routes/release.py
@@ -24,7 +24,7 @@ from typing import cast
import aiofiles
import aiofiles.os
-from quart import Request, flash, redirect, request, url_for
+from quart import Request, flash, redirect, render_template, request, url_for
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy.orm.attributes import InstrumentedAttribute
@@ -41,6 +41,8 @@ from atr.db.models import (
PMC,
Package,
Release,
+ Task,
+ TaskStatus,
)
from atr.routes import FlashError, app_route
from atr.util import get_release_storage_dir
@@ -154,3 +156,49 @@ async def root_release_delete() -> Response:
await flash("Release deleted successfully", "success")
return redirect(url_for("root_candidate_review"))
+
+
+@app_route("/release/bulk/<int:task_id>", methods=["GET"])
+async def release_bulk_status(task_id: int) -> str | Response:
+ """Show status for a bulk download task."""
+ session = await session_read()
+ if (session is None) or (session.uid is None):
+ await flash("You must be logged in to view bulk download status.",
"error")
+ return redirect(url_for("root_login"))
+
+ async with get_session() as db_session:
+ # Query for the task with the given ID
+ query = select(Task).where(Task.id == task_id)
+ result = await db_session.execute(query)
+ task = result.scalar_one_or_none()
+
+ if not task:
+ await flash(f"Task with ID {task_id} not found.", "error")
+ return redirect(url_for("root_candidate_review"))
+
+ # Verify this is a bulk download task
+ if task.task_type != "package_bulk_download":
+ await flash(f"Task with ID {task_id} is not a bulk download
task.", "error")
+ return redirect(url_for("root_candidate_review"))
+
+ # If result is a list or tuple with a single item, extract it
+ if isinstance(task.result, list | tuple) and (len(task.result) == 1):
+ task.result = task.result[0]
+
+ # Get the release associated with this task if available
+ release = None
+ # Debug print the task.task_args using the logger
+ logging.debug(f"Task args: {task.task_args}")
+ if task.task_args and isinstance(task.task_args, dict) and
("release_key" in task.task_args):
+ release_query = select(Release).where(Release.storage_key ==
task.task_args["release_key"])
+ release_result = await db_session.execute(release_query)
+ release = release_result.scalar_one_or_none()
+
+ # Check whether the user has permission to view this task
+ # Either they're a PMC member or committer for the release's PMC
+ if release and release.pmc:
+ if (session.uid not in release.pmc.pmc_members) and
(session.uid not in release.pmc.committers):
+ await flash("You don't have permission to view this
task.", "error")
+ return redirect(url_for("root_candidate_review"))
+
+ return await render_template("release-bulk.html", task=task,
release=release, TaskStatus=TaskStatus)
diff --git a/atr/tasks/bulk.py b/atr/tasks/bulk.py
new file mode 100644
index 0000000..f6e65ee
--- /dev/null
+++ b/atr/tasks/bulk.py
@@ -0,0 +1,618 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import json
+import logging
+import os
+from dataclasses import dataclass
+from html.parser import HTMLParser
+from typing import Any
+from urllib.parse import urljoin
+
+import aiofiles
+import aiohttp
+from sqlalchemy import text
+from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker,
create_async_engine
+
+# Configure detailed logging
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
+
+# Create file handler for test.log
+file_handler = logging.FileHandler("tasks-bulk.log")
+file_handler.setLevel(logging.DEBUG)
+
+# Create formatter with detailed information
+formatter = logging.Formatter(
+ "[%(asctime)s.%(msecs)03d] [%(process)d] [%(levelname)s]
[%(name)s:%(funcName)s:%(lineno)d] %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
+file_handler.setFormatter(formatter)
+logger.addHandler(file_handler)
+# Ensure parent loggers don't duplicate messages
+logger.propagate = False
+
+logger.info("Bulk download module imported")
+
+global_db_connection: async_sessionmaker | None = None
+global_task_id: int | None = None
+
+
+@dataclass
+class Args:
+ release_key: str
+ base_url: str
+ file_types: list[str]
+ require_sigs: bool
+ max_depth: int
+ max_concurrent: int
+
+ @staticmethod
+ def from_list(args: list[str]) -> "Args":
+ """Parse command line arguments."""
+ logger.debug(f"Parsing arguments: {args}")
+
+ if len(args) != 6:
+ logger.error(f"Invalid number of arguments: {len(args)}, expected
6")
+ raise ValueError("Invalid number of arguments")
+
+ release_key = args[0]
+ base_url = args[1]
+ file_types = args[2]
+ require_sigs = args[3]
+ max_depth = args[4]
+ max_concurrent = args[5]
+
+ logger.debug(
+ f"Extracted values - release_key: {release_key}, base_url:
{base_url}, "
+ f"file_types: {file_types}, require_sigs: {require_sigs}, "
+ f"max_depth: {max_depth}, max_concurrent: {max_concurrent}"
+ )
+
+ if not isinstance(release_key, str):
+ logger.error(f"Release key must be a string, got
{type(release_key)}")
+ raise ValueError("Release key must be a string")
+ if not isinstance(base_url, str):
+ logger.error(f"Base URL must be a string, got {type(base_url)}")
+ raise ValueError("Base URL must be a string")
+ if not isinstance(file_types, list):
+ logger.error(f"File types must be a list, got {type(file_types)}")
+ raise ValueError("File types must be a list")
+ for arg in file_types:
+ if not isinstance(arg, str):
+ logger.error(f"File types must be a list of strings, got
{type(arg)}")
+ raise ValueError("File types must be a list of strings")
+ if not isinstance(require_sigs, bool):
+ logger.error(f"Require sigs must be a boolean, got
{type(require_sigs)}")
+ raise ValueError("Require sigs must be a boolean")
+ if not isinstance(max_depth, int):
+ logger.error(f"Max depth must be an integer, got
{type(max_depth)}")
+ raise ValueError("Max depth must be an integer")
+ if not isinstance(max_concurrent, int):
+ logger.error(f"Max concurrent must be an integer, got
{type(max_concurrent)}")
+ raise ValueError("Max concurrent must be an integer")
+
+ logger.debug("All argument validations passed")
+
+ args_obj = Args(
+ release_key=release_key,
+ base_url=base_url,
+ file_types=file_types,
+ require_sigs=require_sigs,
+ max_depth=max_depth,
+ max_concurrent=max_concurrent,
+ )
+
+ logger.info(f"Args object created: {args_obj}")
+ return args_obj
+
+
+async def database_message(msg: str, progress: tuple[int, int] | None = None)
-> None:
+ """Update database with message and progress."""
+ logger.debug(f"Updating database with message: '{msg}', progress:
{progress}")
+ try:
+ task_id = await database_task_id_get()
+ if task_id:
+ logger.debug(f"Found task_id: {task_id}, updating with message")
+ await database_task_update(task_id, msg, progress)
+ else:
+ logger.warning("No task ID found, skipping database update")
+ except Exception as e:
+ # We don't raise here
+ # We continue even if database updates fail
+ # But in this case, the user won't be informed on the update page
+ logger.exception(f"Failed to update database: {e}")
+ logger.info(f"Continuing despite database error. Message was: '{msg}'")
+
+
+async def get_db_session() -> AsyncSession:
+ """Get a reusable database session."""
+ global global_db_connection
+
+ try:
+ # Create connection only if it doesn't exist already
+ if global_db_connection is None:
+ db_url = "sqlite+aiosqlite:///atr.db"
+ logger.debug(f"Creating database engine: {db_url}")
+
+ engine = create_async_engine(db_url)
+ global_db_connection = async_sessionmaker(engine,
class_=AsyncSession, expire_on_commit=False)
+
+ connection: AsyncSession = global_db_connection()
+ return connection
+ except Exception as e:
+ logger.exception(f"Error creating database session: {e}")
+ raise
+
+
+async def database_task_id_get() -> int | None:
+ """Get current task ID asynchronously with caching."""
+ global global_task_id
+ logger.debug("Attempting to get current task ID")
+
+ # Return cached ID if available
+ if global_task_id is not None:
+ logger.debug(f"Using cached task ID: {global_task_id}")
+ return global_task_id
+
+ try:
+ from os import getpid
+
+ process_id = getpid()
+ logger.debug(f"Current process ID: {process_id}")
+ task_id = await database_task_pid_lookup(process_id)
+
+ if task_id:
+ logger.info(f"Found task ID: {task_id} for process ID:
{process_id}")
+ # Cache the task ID for future use
+ global_task_id = task_id
+ else:
+ logger.warning(f"No task found for process ID: {process_id}")
+
+ return task_id
+ except Exception as e:
+ logger.exception(f"Error getting task ID: {e}")
+ return None
+
+
+async def database_task_pid_lookup(process_id: int) -> int | None:
+ """Look up task ID by process ID asynchronously."""
+ logger.debug(f"Looking up task ID for process ID: {process_id}")
+
+ try:
+ async with await get_db_session() as session:
+ logger.debug(f"Executing SQL query to find task for PID:
{process_id}")
+ # Look for ACTIVE task with our PID
+ result = await session.execute(
+ text("""
+ SELECT id FROM task
+ WHERE pid = :pid AND status = 'ACTIVE'
+ LIMIT 1
+ """),
+ {"pid": process_id},
+ )
+ logger.debug("SQL query executed, fetching results")
+ row = result.fetchone()
+ if row:
+ logger.info(f"Found task ID: {row[0]} for process ID:
{process_id}")
+ row_one = row[0]
+ if not isinstance(row_one, int):
+ logger.error(f"Task ID is not an integer: {row_one}")
+ raise ValueError("Task ID is not an integer")
+ return row_one
+ else:
+ logger.warning(f"No ACTIVE task found for process ID:
{process_id}")
+ return None
+ except Exception as e:
+ logger.exception(f"Error looking up task by PID: {e}")
+ return None
+
+
+async def database_task_update(task_id: int, msg: str, progress: tuple[int,
int] | None) -> None:
+ """Update task in database with message and progress."""
+ logger.debug(f"Updating task {task_id} with message: '{msg}', progress:
{progress}")
+ # Convert progress to percentage
+ progress_pct = database_progress_percentage_calculate(progress)
+ logger.debug(f"Calculated progress percentage: {progress_pct}%")
+ await database_task_update_execute(task_id, msg, progress_pct)
+
+
+async def database_task_update_execute(task_id: int, msg: str, progress_pct:
int) -> None:
+ """Execute database update with message and progress."""
+ logger.debug(f"Executing database update for task {task_id}, message:
'{msg}', progress: {progress_pct}%")
+
+ try:
+ async with await get_db_session() as session:
+ logger.debug(f"Executing SQL UPDATE for task ID: {task_id}")
+
+ # Store progress info in the result column as JSON
+ result_data = json.dumps({"message": msg, "progress":
progress_pct})
+
+ await session.execute(
+ text("""
+ UPDATE task
+ SET result = :result
+ WHERE id = :task_id
+ """),
+ {
+ "result": result_data,
+ "task_id": task_id,
+ },
+ )
+ await session.commit()
+ logger.info(f"Successfully updated task {task_id} with progress
{progress_pct}%")
+ except Exception as e:
+ # Continue even if database update fails
+ logger.exception(f"Error updating task {task_id} in database: {e}")
+
+
+def database_progress_percentage_calculate(progress: tuple[int, int] | None)
-> int:
+ """Calculate percentage from progress tuple."""
+ logger.debug(f"Calculating percentage from progress tuple: {progress}")
+ if progress is None:
+ logger.debug("Progress is None, returning 0%")
+ return 0
+
+ current, total = progress
+
+ # Avoid division by zero
+ if total == 0:
+ logger.warning("Total is zero in progress tuple, avoiding division by
zero")
+ return 0
+
+ percentage = min(100, int((current / total) * 100))
+ logger.debug(f"Calculated percentage: {percentage}% ({current}/{total})")
+ return percentage
+
+
+def download(args: list[str]) -> tuple[str, str | None, tuple[Any, ...]]:
+ """Download bulk package from URL."""
+ # Returns (status, error, result)
+ # This is the main task entry point, called by worker.py
+ # This function should probably be called artifacts_download
+ logger.info(f"Starting bulk download task with args: {args}")
+ try:
+ logger.debug("Delegating to download_core function")
+ status, error, result = download_core(args)
+ logger.info(f"Download completed with status: {status}")
+ return status, error, result
+ except Exception as e:
+ logger.exception(f"Error in download function: {e}")
+ # Return a tuple with a dictionary that matches what the template
expects
+ return "FAILED", str(e), ({"message": f"Error: {e}", "progress": 0},)
+
+
+def download_core(args_list: list[str]) -> tuple[str, str | None, tuple[Any,
...]]:
+ """Download bulk package from URL."""
+ logger.info("Starting download_core")
+ try:
+ logger.debug(f"Parsing arguments: {args_list}")
+ args = Args.from_list(args_list)
+ logger.info(f"Args parsed successfully:
release_key={args.release_key}, base_url={args.base_url}")
+
+ # Create async resources
+ logger.debug("Creating async queue and semaphore")
+ queue: asyncio.Queue[str] = asyncio.Queue()
+ semaphore = asyncio.Semaphore(args.max_concurrent)
+ loop = asyncio.get_event_loop()
+
+ # Start URL crawling
+ loop.run_until_complete(database_message(f"Crawling URLs from
{args.base_url}"))
+
+ logger.info("Starting artifact_urls coroutine")
+ signatures, artifacts = loop.run_until_complete(artifact_urls(args,
queue, semaphore))
+ logger.info(f"Found {len(signatures)} signatures and {len(artifacts)}
artifacts")
+
+ # Update progress for download phase
+ loop.run_until_complete(database_message(f"Found {len(artifacts)}
artifacts to download"))
+
+ # Download artifacts
+ logger.info("Starting artifacts_download coroutine")
+ artifacts_downloaded =
loop.run_until_complete(artifacts_download(artifacts, semaphore))
+ files_downloaded = len(artifacts_downloaded)
+
+ # Return a result dictionary
+ # This matches what we have in templates/release-bulk.html
+ return (
+ "COMPLETED",
+ None,
+ (
+ {
+ "message": f"Successfully downloaded {files_downloaded}
artifacts",
+ "progress": 100,
+ "url": args.base_url,
+ "file_types": args.file_types,
+ "files_downloaded": files_downloaded,
+ },
+ ),
+ )
+
+ except Exception as e:
+ logger.exception(f"Error in download_core: {e}")
+ return (
+ "FAILED",
+ str(e),
+ (
+ {
+ "message": f"Failed to download from {args_list[1] if
len(args_list) > 1 else 'unknown URL'}",
+ "progress": 0,
+ },
+ ),
+ )
+
+
+async def artifact_urls(args: Args, queue: asyncio.Queue, semaphore:
asyncio.Semaphore) -> tuple[list[str], list[str]]:
+ logger.info(f"Starting URL crawling from {args.base_url}")
+ await database_message(f"Crawling artifact URLs from {args.base_url}")
+ signatures: list[str] = []
+ artifacts: list[str] = []
+ seen: set[str] = set()
+
+ logger.debug(f"Adding base URL to queue: {args.base_url}")
+ await queue.put(args.base_url)
+
+ logger.debug("Starting crawl loop")
+ depth = 0
+ # Start with just the base URL
+ urls_at_current_depth = 1
+ urls_at_next_depth = 0
+
+ while (not queue.empty()) and (depth < args.max_depth):
+ logger.debug(f"Processing depth {depth + 1}/{args.max_depth}, queue
size: {queue.qsize()}")
+
+ # Process all URLs at the current depth before moving to the next
+ for _ in range(urls_at_current_depth):
+ if queue.empty():
+ break
+
+ url = await queue.get()
+ logger.debug(f"Processing URL: {url}")
+
+ if url_excluded(seen, url, args):
+ continue
+
+ seen.add(url)
+ logger.debug(f"Checking URL for file types: {args.file_types}")
+
+ # If not a target file type, try to parse HTML links
+ if not check_matches(args, url, artifacts, signatures):
+ logger.debug(f"URL is not a target file, parsing HTML: {url}")
+ try:
+ new_urls = await download_html(url, semaphore)
+ logger.debug(f"Found {len(new_urls)} new URLs in {url}")
+ for new_url in new_urls:
+ if new_url not in seen:
+ logger.debug(f"Adding new URL to queue: {new_url}")
+ await queue.put(new_url)
+ urls_at_next_depth += 1
+ except Exception as e:
+ logger.warning(f"Error parsing HTML from {url}: {e}")
+ # Move to next depth
+ depth += 1
+ urls_at_current_depth = urls_at_next_depth
+ urls_at_next_depth = 0
+
+ # Update database with progress message
+ progress_msg = f"Crawled {len(seen)} URLs, found {len(artifacts)}
artifacts (depth {depth}/{args.max_depth})"
+ await database_message(progress_msg, progress=(30 + min(50, depth *
10), 100))
+ logger.debug(f"Moving to depth {depth + 1}, {urls_at_current_depth}
URLs to process")
+
+ logger.info(f"URL crawling complete. Found {len(artifacts)} artifacts and
{len(signatures)} signatures")
+ return signatures, artifacts
+
+
+def check_matches(args: Args, url: str, artifacts: list[str], signatures:
list[str]) -> bool:
+ for type in args.file_types:
+ if url.endswith(type):
+ logger.info(f"Found artifact: {url}")
+ artifacts.append(url)
+ return True
+ elif url.endswith(type + ".asc"):
+ logger.info(f"Found signature: {url}")
+ signatures.append(url)
+ return True
+ return False
+
+
+def url_excluded(seen: set[str], url: str, args: Args) -> bool:
+ # Filter for sorting URLs to avoid redundant crawling
+ sorting_patterns = ["?C=N;O=", "?C=M;O=", "?C=S;O=", "?C=D;O="]
+
+ if not url.startswith(args.base_url):
+ logger.debug(f"Skipping URL outside base URL scope: {url}")
+ return True
+
+ if url in seen:
+ logger.debug(f"Skipping already seen URL: {url}")
+ return True
+
+ # Skip sorting URLs to avoid redundant crawling
+ if any(pattern in url for pattern in sorting_patterns):
+ logger.debug(f"Skipping sorting URL: {url}")
+ return True
+
+ return False
+
+
+async def download_html(url: str, semaphore: asyncio.Semaphore) -> list[str]:
+ """Download HTML and extract links."""
+ logger.debug(f"Downloading HTML from: {url}")
+ try:
+ return await download_html_core(url, semaphore)
+ except Exception as e:
+ logger.error(f"Error downloading HTML from {url}: {e}")
+ return []
+
+
+async def download_html_core(url: str, semaphore: asyncio.Semaphore) ->
list[str]:
+ """Core HTML download and link extraction logic."""
+ logger.debug(f"Starting HTML download core for {url}")
+ async with semaphore:
+ logger.debug(f"Acquired semaphore for {url}")
+
+ urls = []
+ async with aiohttp.ClientSession() as session:
+ logger.debug(f"Created HTTP session for {url}")
+
+ async with session.get(url) as response:
+ if response.status != 200:
+ logger.warning(f"HTTP {response.status} for {url}")
+ return []
+
+ logger.debug(f"Received HTTP 200 for {url}, content type:
{response.content_type}")
+ if not response.content_type.startswith("text/html"):
+ logger.debug(f"Not HTML content: {response.content_type},
skipping link extraction")
+ return []
+
+ logger.debug(f"Reading HTML content from {url}")
+ html = await response.text()
+
+ urls = extract_links_from_html(html, url)
+ logger.debug(f"Extracted {len(urls)} processed links from
{url}")
+
+ return urls
+
+
+class LinkExtractor(HTMLParser):
+ def __init__(self) -> None:
+ super().__init__()
+ self.links: list[str] = []
+
+ def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]])
-> None:
+ if tag == "a":
+ for attr, value in attrs:
+ if attr == "href" and value:
+ self.links.append(value)
+
+
+def extract_links_from_html(html: str, base_url: str) -> list[str]:
+ """Extract links from HTML content using html.parser."""
+ parser = LinkExtractor()
+ parser.feed(html)
+ raw_links = parser.links
+ logger.debug(f"Found {len(raw_links)} raw links in {base_url}")
+
+ processed_urls = []
+ for link in raw_links:
+ processed_url = urljoin(base_url, link)
+ # Filter out URLs that don't start with the base URL
+ # We also check this elsewhere amongst other checks
+ # But it's good to filter them early
+ if processed_url.startswith(base_url):
+ processed_urls.append(processed_url)
+ else:
+ logger.debug(f"Skipping URL outside base URL scope:
{processed_url}")
+
+ return processed_urls
+
+
+async def artifacts_download(artifacts: list[str], semaphore:
asyncio.Semaphore) -> list[str]:
+ """Download artifacts with progress tracking."""
+ size = len(artifacts)
+ logger.info(f"Starting download of {size} artifacts")
+ downloaded = []
+
+ for i, artifact in enumerate(artifacts):
+ progress_percent = int((i / size) * 100) if (size > 0) else 100
+ progress_msg = f"Downloading {i + 1}/{size} artifacts"
+ logger.info(f"{progress_msg}: {artifact}")
+ await database_message(progress_msg, progress=(progress_percent, 100))
+
+ success = await artifact_download(artifact, semaphore)
+ if success:
+ logger.debug(f"Successfully downloaded: {artifact}")
+ downloaded.append(artifact)
+ else:
+ logger.warning(f"Failed to download: {artifact}")
+
+ logger.info(f"Download complete. Successfully downloaded
{len(downloaded)}/{size} artifacts")
+ await database_message(f"Downloaded {len(downloaded)} artifacts",
progress=(100, 100))
+ return downloaded
+
+
+async def artifact_download(url: str, semaphore: asyncio.Semaphore) -> bool:
+ logger.debug(f"Starting download of artifact: {url}")
+ try:
+ success = await artifact_download_core(url, semaphore)
+ if success:
+ logger.info(f"Successfully downloaded artifact: {url}")
+ else:
+ logger.warning(f"Failed to download artifact: {url}")
+ return success
+ except Exception as e:
+ logger.exception(f"Error downloading artifact {url}: {e}")
+ return False
+
+
+async def artifact_download_core(url: str, semaphore: asyncio.Semaphore) ->
bool:
+ logger.debug(f"Starting core download process for {url}")
+ async with semaphore:
+ logger.debug(f"Acquired semaphore for {url}")
+ # TODO: We flatten the hierarchy to get the filename
+ # We should preserve the hierarchy
+ filename = url.split("/")[-1]
+ if filename.startswith("."):
+ raise ValueError(f"Invalid filename: {filename}")
+ local_path = os.path.join("downloads", filename)
+
+ # Create download directory if it doesn't exist
+ # TODO: Check whether local_path itself exists first
+ os.makedirs("downloads", exist_ok=True)
+ logger.debug(f"Downloading {url} to {local_path}")
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ logger.debug(f"Created HTTP session for {url}")
+ async with session.get(url) as response:
+ if response.status != 200:
+ logger.warning(f"Failed to download {url}: HTTP
{response.status}")
+ return False
+
+ total_size = int(response.headers.get("Content-Length", 0))
+ if total_size:
+ logger.info(f"Content-Length: {total_size} bytes for
{url}")
+
+ chunk_size = 8192
+ downloaded = 0
+ logger.debug(f"Writing file to {local_path} with chunk
size {chunk_size}")
+
+ async with aiofiles.open(local_path, "wb") as f:
+ async for chunk in
response.content.iter_chunked(chunk_size):
+ await f.write(chunk)
+ downloaded += len(chunk)
+ # if total_size:
+ # progress = (downloaded / total_size) * 100
+ # if downloaded % (chunk_size * 128) == 0:
+ # logger.debug(
+ # f"Download progress for {filename}:"
+ # f" {progress:.1f}%
({downloaded}/{total_size} bytes)"
+ # )
+
+ logger.info(f"Download complete: {url} -> {local_path}
({downloaded} bytes)")
+ return True
+
+ except Exception as e:
+ logger.exception(f"Error during download of {url}: {e}")
+ # Remove partial download if an error occurred
+ if os.path.exists(local_path):
+ logger.debug(f"Removing partial download: {local_path}")
+ try:
+ os.remove(local_path)
+ except Exception as del_err:
+ logger.error(f"Error removing partial download
{local_path}: {del_err}")
+ return False
diff --git a/atr/templates/package-add.html b/atr/templates/package-add.html
index 5d762fe..05f94d1 100644
--- a/atr/templates/package-add.html
+++ b/atr/templates/package-add.html
@@ -75,6 +75,51 @@
.radio-group input[type="radio"] {
cursor: pointer;
}
+
+ .checkbox-group {
+ margin-bottom: 0.5rem;
+ }
+
+ .checkbox-group div {
+ margin: 0.5rem 0;
+ }
+
+ .checkbox-group label {
+ margin-left: 0.5rem;
+ cursor: pointer;
+ }
+
+ .checkbox-group input[type="checkbox"] {
+ cursor: pointer;
+ }
+
+ .form-separator {
+ margin: 2rem 0;
+ border-top: 1px solid #ddd;
+ text-align: center;
+ position: relative;
+ }
+
+ .form-separator span {
+ background: #fff;
+ padding: 0 1rem;
+ color: #666;
+ position: relative;
+ top: -0.75rem;
+ }
+
+ input[type="url"],
+ input[type="number"] {
+ width: 100%;
+ max-width: 600px;
+ padding: 0.375rem;
+ border: 1px solid #ced4da;
+ border-radius: 0.25rem;
+ }
+
+ input[type="number"] {
+ width: 100px;
+ }
</style>
{% endblock stylesheets %}
@@ -86,6 +131,7 @@
</p>
<form method="post" enctype="multipart/form-data" class="striking">
+ <input type="hidden" name="form_type" value="single" />
<table class="form-table">
<tbody>
<tr>
@@ -188,4 +234,119 @@
</tbody>
</table>
</form>
+
+ <div class="form-separator">
+ <span>Or add multiple packages from a URL</span>
+ </div>
+
+ <form method="post" class="striking">
+ <input type="hidden" name="form_type" value="bulk" />
+ <table class="form-table">
+ <tbody>
+ <tr>
+ <th>
+ <label for="bulk_release_key">Release:</label>
+ </th>
+ <td>
+ <select id="bulk_release_key" name="release_key" required>
+ <option value="">Select a release...</option>
+ {% for release in releases %}
+ <option value="{{ release.storage_key }}"
+ {% if release.storage_key == selected_release
%}selected{% endif %}>
+ {{ release.pmc.display_name }} - {{
release.product_line.product_name if release.product_line else "unknown" }} -
{{ release.version }}
+ </option>
+ {% endfor %}
+ </select>
+ {% if not releases %}<p class="error-message">No releases found
that you can add packages to.</p>{% endif %}
+ </td>
+ </tr>
+
+ <tr>
+ <th>
+ <label for="bulk_url">URL:</label>
+ </th>
+ <td>
+ <input type="url"
+ id="bulk_url"
+ name="url"
+ required
+ placeholder="https://example.org/path/to/packages/"
+ aria-describedby="url-help" />
+ <span id="url-help" class="help-text">Enter the URL of the
directory containing release packages</span>
+ </td>
+ </tr>
+
+ <tr>
+ <th>
+ <label>File types:</label>
+ </th>
+ <td>
+ <div class="checkbox-group">
+ <div>
+ <input type="checkbox"
+ id="type_targz"
+ name="file_types"
+ value=".tar.gz"
+ checked />
+ <label for="type_targz">.tar.gz files</label>
+ </div>
+ <div>
+ <input type="checkbox" id="type_tgz" name="file_types"
value=".tgz" checked />
+ <label for="type_tgz">.tgz files</label>
+ </div>
+ <div>
+ <input type="checkbox" id="type_zip" name="file_types"
value=".zip" />
+ <label for="type_zip">.zip files</label>
+ </div>
+ <div>
+ <input type="checkbox" id="type_jar" name="file_types"
value=".jar" />
+ <label for="type_jar">.jar files</label>
+ </div>
+ </div>
+ </td>
+ </tr>
+
+ <tr>
+ <th>
+ <label for="bulk_max_depth">Maximum depth:</label>
+ </th>
+ <td>
+ <input type="number"
+ id="bulk_max_depth"
+ name="max_depth"
+ value="1"
+ min="1"
+ max="10"
+ required
+ aria-describedby="depth-help" />
+ <span id="depth-help" class="help-text">Maximum request depth to
search for packages (1-10)</span>
+ </td>
+ </tr>
+
+ <tr>
+ <th>
+ <label for="bulk_require_signatures">Require signatures:</label>
+ </th>
+ <td>
+ <div class="checkbox-group">
+ <div>
+ <input type="checkbox"
+ id="bulk_require_signatures"
+ name="require_signatures"
+ checked />
+ <label for="bulk_require_signatures">Only download packages
that have corresponding .asc signature files</label>
+ </div>
+ </div>
+ </td>
+ </tr>
+
+ <tr>
+ <td></td>
+ <td>
+ <button type="submit" {% if not releases %}disabled{% endif %}>Add
packages from URL</button>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+ </form>
{% endblock content %}
diff --git a/atr/templates/release-bulk.html b/atr/templates/release-bulk.html
new file mode 100644
index 0000000..059460a
--- /dev/null
+++ b/atr/templates/release-bulk.html
@@ -0,0 +1,263 @@
+{% extends "layouts/base.html" %}
+
+{% block title %}
+ Bulk download status ~ ATR
+{% endblock title %}
+
+{% block description %}
+ View the status of a bulk download task.
+{% endblock description %}
+
+{% block head_extra %}
+ {% if task.status.value == "queued" or task.status.value == "active" %}
+ <meta http-equiv="refresh" content="2" />
+ {% endif %}
+{% endblock head_extra %}
+
+{% block stylesheets %}
+ {{ super() }}
+ <style>
+ nav.breadcrumbs {
+ margin-bottom: 1rem;
+ }
+
+ nav.breadcrumbs a {
+ text-decoration: none;
+ }
+
+ .task-container {
+ margin: 1rem 0;
+ }
+
+ .task-header {
+ display: flex;
+ justify-content: space-between;
+ align-items: center;
+ margin-bottom: 1rem;
+ padding: 1rem;
+ background: #f8f9fa;
+ border: 1px solid #dee2e6;
+ border-radius: 4px;
+ }
+
+ .task-title {
+ font-weight: 600;
+ }
+
+ .task-status {
+ padding: 0.25rem 0.5rem;
+ border-radius: 4px;
+ font-size: 0.9em;
+ }
+
+ .status-queued {
+ background: #f8f9fa;
+ border: 1px solid #dee2e6;
+ }
+
+ .status-active {
+ background: #cff4fc;
+ border: 1px solid #9eeaf9;
+ }
+
+ .status-completed {
+ background: #d1e7dd;
+ border: 1px solid #a3cfbb;
+ }
+
+ .status-failed {
+ background: #f8d7da;
+ border: 1px solid #f5c6cb;
+ }
+
+ .task-details {
+ margin-top: 1rem;
+ padding: 1rem;
+ background: #fff;
+ border: 1px solid #dee2e6;
+ border-radius: 4px;
+ }
+
+ .detail-row {
+ display: flex;
+ margin-bottom: 1rem;
+ }
+
+ .detail-label {
+ width: 150px;
+ font-weight: 600;
+ }
+
+ .progress-container {
+ margin: 1rem 0;
+ }
+
+ /* Progress bar styling */
+ progress {
+ width: 100%;
+ height: 20px;
+ border-radius: 4px;
+ overflow: hidden;
+ border: 1px solid #dee2e6;
+ }
+
+ /* Styling the progress bar for different browsers */
+ progress::-webkit-progress-bar {
+ background-color: #f8f9fa;
+ border-radius: 4px;
+ }
+
+ progress::-webkit-progress-value {
+ background-color: #0366d6;
+ transition: width 0.3s ease;
+ }
+
+ progress::-moz-progress-bar {
+ background-color: #0366d6;
+ border-radius: 4px;
+ }
+
+ .progress-text {
+ margin-top: 0.5rem;
+ font-size: 0.9em;
+ color: #666;
+ }
+
+ .message-box {
+ margin-top: 1rem;
+ margin-bottom: 1rem;
+ padding: 1rem;
+ background: #f8f9fa;
+ border: 1px solid #dee2e6;
+ border-radius: 4px;
+ }
+
+ .error-box {
+ margin-top: 1rem;
+ padding: 1rem;
+ background: #f8d7da;
+ border: 1px solid #f5c6cb;
+ border-radius: 4px;
+ color: #842029;
+ }
+
+ .info-box {
+ margin-top: 1rem;
+ padding: 1rem;
+ background: #cff4fc;
+ border: 1px solid #9eeaf9;
+ border-radius: 4px;
+ }
+
+ .full-width {
+ width: 100%;
+ }
+ </style>
+{% endblock stylesheets %}
+
+{% block content %}
+ <div class="task-container">
+ <nav class="breadcrumbs">
+ <a href="{{ url_for('root_candidate_review') }}">Release candidates</a>
+ {% if release %}
+ <span>→</span>
+ <span>{{ release.pmc.display_name }}</span>
+ <span>→</span>
+ <span>{{ release.product_line.product_name if release.product_line
else "Unknown product" }}</span>
+ <span>→</span>
+ <span>{{ release.version }}</span>
+ {% endif %}
+ <span>→</span>
+ <span>Bulk download status</span>
+ </nav>
+
+ <div class="task-header">
+ <div class="task-title">Task status</div>
+ <div class="task-status status-{{ task.status.value.lower() }}">
+ {%- if task.status.value == "queued" -%}
+ Pending
+ {%- elif task.status.value == "active" -%}
+ Running
+ {%- elif task.status.value == "completed" -%}
+ Completed
+ {%- elif task.status.value == "failed" -%}
+ Failed
+ {%- else -%}
+ {{ task.status.value }}
+ {%- endif -%}
+ </div>
+ </div>
+
+ <div class="task-details">
+ <div class="detail-row">
+ <div class="detail-label">Task ID</div>
+ <div>{{ task.id }}</div>
+ </div>
+
+ <div class="detail-row">
+ <div class="detail-label">Started</div>
+ <div>
+ {% if task.started %}
+ {{ task.started.strftime("%Y-%m-%d %H:%M:%S UTC") }}
+ {% else %}
+ Not started
+ {% endif %}
+ </div>
+ </div>
+
+ {% if task.completed %}
+ <div class="detail-row">
+ <div class="detail-label">Completed</div>
+ <div>{{ task.completed.strftime("%Y-%m-%d %H:%M:%S UTC") }}</div>
+ </div>
+ {% endif %}
+
+ {% if task.result %}
+ {% if task.result.progress is defined %}
+ <div class="progress-container">
+ <progress value="{{ task.result.progress }}" max="100"></progress>
+ <div class="progress-text">{{ task.result.progress }}%
complete</div>
+ </div>
+ {% endif %}
+
+ {% if task.result.message %}<div class="message-box">{{
task.result.message }}</div>{% endif %}
+
+ {% if task.status == TaskStatus.COMPLETED %}
+ <div class="detail-row">
+ <div class="detail-label">Summary</div>
+ <div class="full-width">
+ <table>
+ <tbody>
+ {% if task.result.url %}
+ <tr>
+ <th>URL</th>
+ <td>{{ task.result.url }}</td>
+ </tr>
+ {% endif %}
+ {% if task.result.file_types %}
+ <tr>
+ <th>File types</th>
+ <td>{{ task.result.file_types|join(", ") }}</td>
+ </tr>
+ {% endif %}
+ {% if task.result.files_downloaded %}
+ <tr>
+ <th>Files downloaded</th>
+ <td>{{ task.result.files_downloaded }}</td>
+ </tr>
+ {% endif %}
+ </tbody>
+ </table>
+ </div>
+ </div>
+ {% endif %}
+ {% endif %}
+
+ {% if task.error %}<div class="error-box">{{ task.error }}</div>{% endif
%}
+ </div>
+
+ {% if task.status in [TaskStatus.QUEUED, TaskStatus.ACTIVE] %}
+ <div class="info-box">This page will automatically refresh every 2
seconds to show the latest status.</div>
+ {% endif %}
+ </div>
+{% endblock content %}
diff --git a/atr/worker.py b/atr/worker.py
index 41f7a79..f5bb158 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -283,8 +283,112 @@ def task_generate_cyclonedx_sbom(args: list[str]) ->
tuple[str, str | None, tupl
return "COMPLETED", None, task_results
+def task_bulk_download_debug(args: list[str] | dict) -> tuple[str, str | None,
tuple[Any, ...]]:
+ # This was a debug function; pay no attention to this
+ # TODO: Remove once we're sure everything is working
+ logger.info(f"Bulk download debug task received args: {args}")
+
+ try:
+ # Extract parameters from args (support both list and dict inputs)
+ if isinstance(args, list):
+ # If it's a list, the release_key is the first element
+ # release_key = args[0] if args else "unknown"
+ url = args[1] if len(args) > 1 else "unknown"
+ file_types = args[2] if len(args) > 2 else []
+ require_signatures = args[3] if len(args) > 3 else False
+ elif isinstance(args, dict):
+ # release_key = args.get("release_key", "unknown")
+ url = args.get("url", "unknown")
+ file_types = args.get("file_types", [])
+ require_signatures = args.get("require_signatures", False)
+ # else:
+ # logger.warning(f"Unexpected args type: {type(args)}")
+ # release_key = "unknown"
+ # url = "unknown"
+ # file_types = []
+ # require_signatures = False
+
+ # Progress messages to display over time
+ progress_messages = [
+ f"Connecting to {url}...",
+ f"Connected to {url}. Scanning for {', '.join(file_types) if
file_types else 'all'} files...",
+ "Found 15 files matching criteria. Downloading...",
+ "Downloaded 7/15 files (47%)...",
+ "Downloaded 15/15 files (100%). Processing...",
+ ]
+
+ # Get task_id from the current process
+ current_pid = os.getpid()
+ task_id = None
+
+ # Get the task ID for the current process
+ with verify.db_session_get() as session:
+ result = session.execute(
+ text("SELECT id FROM task WHERE pid = :pid AND status =
'ACTIVE'"), {"pid": current_pid}
+ )
+ task_row = result.first()
+ if task_row:
+ task_id = task_row[0]
+
+ if not task_id:
+ logger.warning(f"Could not find active task for PID {current_pid}")
+
+ # Process each progress message with a delay
+ for i, message in enumerate(progress_messages):
+ progress_pct = (i + 1) * 20
+
+ update = {
+ "message": message,
+ "progress": progress_pct,
+ "url": url,
+ "timestamp": datetime.datetime.now(UTC).isoformat(),
+ }
+
+ # Log the progress
+ logger.info(f"Progress update {i + 1}/{len(progress_messages)}:
{message} ({progress_pct}%)")
+
+ # Update the database with the current progress if we have a
task_id
+ if task_id:
+ with verify.db_session_get() as session:
+ # Update the task with the current progress message
+ with session.begin():
+ session.execute(
+ text("""
+ UPDATE task
+ SET result = :result
+ WHERE id = :task_id AND status = 'ACTIVE'
+ """),
+ {"task_id": task_id, "result": json.dumps(update)},
+ )
+
+ # Sleep before the next update, except for the last one
+ if i < len(progress_messages) - 1:
+ time.sleep(2.75)
+
+ final_result = {
+ "message": f"Successfully processed {url}",
+ "progress": 100,
+ "files_processed": 15,
+ "files_downloaded": 15,
+ "url": url,
+ "file_types": file_types,
+ "require_signatures": require_signatures,
+ "completed_at": datetime.datetime.now(UTC).isoformat(),
+ }
+
+ return "COMPLETED", None, (final_result,)
+
+ except Exception as e:
+ logger.exception(f"Error in bulk download debug task: {e}")
+ return "FAILED", str(e), ({"error": str(e), "message": f"Error:
{e!s}", "progress": 0},)
+
+
def task_process(task_id: int, task_type: str, task_args: str) -> None:
"""Process a claimed task."""
+ # TODO: This does not go here permanently
+ # We need to move the other tasks into atr.tasks
+ from atr.tasks.bulk import download as bulk_download
+
logger.info(f"Processing task {task_id} ({task_type}) with args
{task_args}")
try:
args = json.loads(task_args)
@@ -299,6 +403,7 @@ def task_process(task_id: int, task_type: str, task_args:
str) -> None:
"verify_license_headers": task_verify_license_headers,
"verify_rat_license": task_verify_rat_license,
"generate_cyclonedx_sbom": task_generate_cyclonedx_sbom,
+ "package_bulk_download": bulk_download,
}
handler = task_handlers.get(task_type)
diff --git a/docs/conventions.html b/docs/conventions.html
index eb4c64b..7779be9 100644
--- a/docs/conventions.html
+++ b/docs/conventions.html
@@ -43,6 +43,13 @@ def verify_archive_integrity_do_something():
<h3>Keep cyclomatic complexity below 10</h3>
<p>We limit function complexity to a score of 10. If the linter complains,
your function is doing too much.</p>
<p>Cyclomatic complexity counts the number of independent paths through code:
more if/else branches, loops, and exception handlers means higher complexity.
Complex code is harder to test, maintain, and understand. The easiest way to
fix high complexity is usually to refactor a chunk of related logic into a
separate helper function.</p>
+<h3>Use parentheses to group subexpressions in boolean expressions always</h3>
+<p>Instead of this:</p>
+<pre><code class="language-python">a or b and c == d or e
+</code></pre>
+<p>Do:</p>
+<pre><code class="language-python">(a or b) and (c == d) or e
+</code></pre>
<h2>HTML</h2>
<h3>Use sentence case for headings</h3>
<p>We write headings like "This is a heading", and not "This is
a Heading" or "This Is A Heading". This follows the <a
href="https://en.wikipedia.org/wiki/Wikipedia:Manual_of_Style#Section_headings">Wikipedia
style for headings</a>. The same goes for button texts.</p>
diff --git a/docs/conventions.md b/docs/conventions.md
index dd8aa21..f007139 100644
--- a/docs/conventions.md
+++ b/docs/conventions.md
@@ -76,6 +76,20 @@ We limit function complexity to a score of 10. If the linter
complains, your fun
Cyclomatic complexity counts the number of independent paths through code:
more if/else branches, loops, and exception handlers means higher complexity.
Complex code is harder to test, maintain, and understand. The easiest way to
fix high complexity is usually to refactor a chunk of related logic into a
separate helper function.
+### Use parentheses to group subexpressions in boolean expressions always
+
+Instead of this:
+
+```python
+a or b and c == d or e
+```
+
+Do:
+
+```python
+(a or b) and (c == d) or e
+```
+
## HTML
### Use sentence case for headings
diff --git a/docs/plan.html b/docs/plan.html
index d7a6353..2ea18d7 100644
--- a/docs/plan.html
+++ b/docs/plan.html
@@ -79,6 +79,7 @@
<li>Use consistent task status values (pending, running, passed, issue,
error?)</li>
<li>Add a warning task result status</li>
<li>Allow dependencies between tasks to reduce duplication of effort</li>
+<li>Add UI to restart all waiting workers</li>
</ul>
</li>
<li>
diff --git a/docs/plan.md b/docs/plan.md
index 4695e87..3e67245 100644
--- a/docs/plan.md
+++ b/docs/plan.md
@@ -63,6 +63,7 @@ We aim to work on the task scheduler in parallel with the UX
improvements above.
- Use consistent task status values (pending, running, passed, issue,
error?)
- Add a warning task result status
- Allow dependencies between tasks to reduce duplication of effort
+ - Add UI to restart all waiting workers
2. Orchestrating scheduler and resource management
- [DONE] Implement process-based task isolation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]