This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch pending_dist_changes in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit d7d89670f92ddb54e81104d66e55d19f50177702 Author: Alastair McFarlane <[email protected]> AuthorDate: Thu Jan 29 11:48:53 2026 +0000 #216 - Add pending distribution status and background task to check it. Refactor some of the distribution logic out to shared module and some of shared module to precent circular references. --- atr/db/__init__.py | 9 + atr/get/checks.py | 2 +- atr/get/compose.py | 2 +- atr/models/__init__.py | 4 +- atr/models/results.py | 9 +- atr/models/sql.py | 5 +- atr/post/distribution.py | 1 + atr/shared/__init__.py | 215 +------------- atr/shared/distribution.py | 247 ++++++++++++++++ atr/shared/{__init__.py => web.py} | 70 +---- atr/storage/writers/distributions.py | 360 +++++------------------- atr/tasks/__init__.py | 30 ++ atr/tasks/distribution.py | 79 ++++++ migrations/versions/0042_2026.01.28_3e434625.py | 31 ++ 14 files changed, 489 insertions(+), 575 deletions(-) diff --git a/atr/db/__init__.py b/atr/db/__init__.py index 4ff46d7..80ebbc5 100644 --- a/atr/db/__init__.py +++ b/atr/db/__init__.py @@ -289,6 +289,9 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession): owner_namespace: Opt[str] = NOT_SET, package: Opt[str] = NOT_SET, version: Opt[str] = NOT_SET, + pending: Opt[bool] = NOT_SET, + _with_release: bool = False, + _with_release_project: bool = False, ) -> Query[sql.Distribution]: query = sqlmodel.select(sql.Distribution) if is_defined(release_name): @@ -301,6 +304,12 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession): query = query.where(sql.Distribution.package == package) if is_defined(version): query = query.where(sql.Distribution.version == version) + if is_defined(pending): + query = query.where(sql.Distribution.pending == pending) + if _with_release_project: + query = query.options(joined_load_nested(sql.Distribution.release, sql.Release.project)) + elif _with_release: + query = query.options(joined_load(sql.Distribution.release)) return Query(self, query) async def execute_query(self, query: sqlalchemy.sql.expression.Executable) -> sqlalchemy.engine.Result: diff --git a/atr/get/checks.py b/atr/get/checks.py index 1781b2f..df36f9e 100644 --- a/atr/get/checks.py +++ b/atr/get/checks.py @@ -162,7 +162,7 @@ async def selected_revision( ongoing_count = await interaction.tasks_ongoing(project_name, version_name, revision_number) - checks_summary_elem = shared._render_checks_summary(info, project_name, version_name) + checks_summary_elem = shared.web._render_checks_summary(info, project_name, version_name) checks_summary_html = str(checks_summary_elem) if checks_summary_elem else "" delete_file_forms: dict[str, str] = {} diff --git a/atr/get/compose.py b/atr/get/compose.py index 304f246..6e40087 100644 --- a/atr/get/compose.py +++ b/atr/get/compose.py @@ -39,4 +39,4 @@ async def selected(session: web.Committer, project_name: str, version_name: str) ).demand(base.ASFQuartException("Release does not exist", errorcode=404)) if release.phase != sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT: return await mapping.release_as_redirect(session, release) - return await shared.check(session, release) + return await shared.web.check(session, release) diff --git a/atr/models/__init__.py b/atr/models/__init__.py index 52c3cad..daca400 100644 --- a/atr/models/__init__.py +++ b/atr/models/__init__.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -from . import api, distribution, helpers, results, schema, sql, tabulate, validation +from . import api, basic, distribution, helpers, results, schema, sql, tabulate, validation # If we use .__name__, pyright gives a warning -__all__ = ["api", "distribution", "helpers", "results", "schema", "sql", "tabulate", "validation"] +__all__ = ["api", "basic", "distribution", "helpers", "results", "schema", "sql", "tabulate", "validation"] diff --git a/atr/models/results.py b/atr/models/results.py index 1ecb4d9..eeb73b9 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -22,6 +22,12 @@ import pydantic from . import schema +class DistributionStatusCheck(schema.Strict): + """Result of the task to check pending distribution statuses.""" + + kind: Literal["distribution_status"] = schema.Field(alias="kind") + + class DistributionWorkflow(schema.Strict): """Result of the task to run a Github workflow.""" @@ -228,7 +234,8 @@ class MetadataUpdate(schema.Strict): Results = Annotated[ - DistributionWorkflow + DistributionStatusCheck + | DistributionWorkflow | DistributionWorkflowStatus | HashingCheck | MessageSend diff --git a/atr/models/sql.py b/atr/models/sql.py index 9887711..e830083 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -190,6 +190,7 @@ class TaskStatus(str, enum.Enum): class TaskType(str, enum.Enum): + DISTRIBUTION_STATUS = "distribution_status" DISTRIBUTION_WORKFLOW = "distribution_workflow" HASHING_CHECK = "hashing_check" KEYS_IMPORT_FILE = "keys_import_file" @@ -970,8 +971,10 @@ class Distribution(sqlmodel.SQLModel, table=True): package: str = sqlmodel.Field(primary_key=True, index=True) version: str = sqlmodel.Field(primary_key=True, index=True) staging: bool = sqlmodel.Field(default=False) + pending: bool = sqlmodel.Field(default=False) + retries: int = sqlmodel.Field(default=0) upload_date: datetime.datetime | None = sqlmodel.Field(default=None) - api_url: str + api_url: str | None = sqlmodel.Field(default=None) web_url: str | None = sqlmodel.Field(default=None) # The API response can be huge, e.g. from npm # So we do not store it in the database diff --git a/atr/post/distribution.py b/atr/post/distribution.py index 706bafa..438a16a 100644 --- a/atr/post/distribution.py +++ b/atr/post/distribution.py @@ -176,6 +176,7 @@ async def record_form_process_page( release_name=release.name, staging=staging, dd=dd, + allow_retries=True, ) except storage.AccessError as e: # Instead of calling record_form_page_new, redirect with error message diff --git a/atr/shared/__init__.py b/atr/shared/__init__.py index 4c9e400..4090d59 100644 --- a/atr/shared/__init__.py +++ b/atr/shared/__init__.py @@ -15,18 +15,8 @@ # specific language governing permissions and limitations # under the License. -from typing import TYPE_CHECKING, Final +from typing import Final -import htpy - -import atr.db as db -import atr.db.interaction as interaction -import atr.form as form -import atr.get as get -import atr.htm as htm -import atr.models.results as results -import atr.models.sql as sql -import atr.post as post import atr.shared.announce as announce import atr.shared.distribution as distribution import atr.shared.draft as draft @@ -45,15 +35,7 @@ import atr.shared.upload as upload import atr.shared.user as user import atr.shared.vote as vote import atr.shared.voting as voting -import atr.storage as storage -import atr.storage.types as types -import atr.template as template -import atr.util as util -import atr.web as web - -if TYPE_CHECKING: - from collections.abc import Sequence - +import atr.shared.web as web # | 1 | RSA (Encrypt or Sign) [HAC] | # | 2 | RSA Encrypt-Only [HAC] | @@ -81,201 +63,9 @@ algorithms: Final[dict[int, str]] = { } -async def check( - session: web.Committer | None, - release: sql.Release, - task_mid: str | None = None, - vote_form: htm.Element | None = None, - resolve_form: htm.Element | None = None, - archive_url: str | None = None, - vote_task: sql.Task | None = None, - can_vote: bool = False, - can_resolve: bool = False, -) -> web.WerkzeugResponse | str: - base_path = util.release_directory(release) - - # TODO: This takes 180ms for providers - # We could cache it - paths = [path async for path in util.paths_recursive(base_path)] - paths.sort() - - async with storage.read(session) as read: - ragp = read.as_general_public() - info = await ragp.releases.path_info(release, paths) - - user_ssh_keys: Sequence[sql.SSHKey] = [] - asf_id: str | None = None - server_domain: str | None = None - server_host: str | None = None - - if session is not None: - asf_id = session.uid - server_domain = session.app_host.split(":", 1)[0] - server_host = session.app_host - async with db.session() as data: - user_ssh_keys = await data.ssh_key(asf_uid=session.uid).all() - - # Get the number of ongoing tasks for the current revision - ongoing_tasks_count = 0 - match await interaction.latest_info(release.project.name, release.version): - case (revision_number, revision_editor, revision_timestamp): - ongoing_tasks_count = await interaction.tasks_ongoing( - release.project.name, - release.version, - revision_number, - ) - case None: - revision_number = None - revision_editor = None - revision_timestamp = None - - delete_form = form.render( - model_cls=form.Empty, - action=util.as_url(get.compose.selected, project_name=release.project.name, version_name=release.version), - submit_label="Delete this draft", - submit_classes="btn btn-danger", - empty=True, - confirm="Are you sure you want to delete this draft? This cannot be undone.", - ) - - delete_file_forms: dict[str, htm.Element] = {} - for path in paths: - delete_file_forms[str(path)] = form.render( - model_cls=draft.DeleteFileForm, - action=util.as_url(post.draft.delete_file, project_name=release.project.name, version_name=release.version), - form_classes=".d-inline-block.m-0", - submit_classes="btn-sm btn-outline-danger", - submit_label="Delete", - empty=True, - defaults={"file_path": str(path)}, - # TODO: Add a static check for the confirm syntax - confirm=( - "Are you sure you want to delete this file? " - "This will also delete any associated metadata files. " - "This cannot be undone." - ), - ) - - empty_form = form.render( - model_cls=form.Empty, - action=util.as_url(post.draft.fresh, project_name=release.project.name, version_name=release.version), - submit_label="Restart all checks", - submit_classes="btn btn-primary", - ) - - vote_task_warnings = _warnings_from_vote_result(vote_task) - has_files = await util.has_files(release) - - has_any_errors = any(info.errors.get(path, []) for path in paths) if info else False - strict_checking = release.project.policy_strict_checking - strict_checking_errors = strict_checking and has_any_errors - - checks_summary_html = _render_checks_summary(info, release.project.name, release.version) - - return await template.render( - "check-selected.html", - project_name=release.project.name, - version_name=release.version, - release=release, - paths=paths, - info=info, - revision_editor=revision_editor, - revision_time=revision_timestamp, - revision_number=revision_number, - ongoing_tasks_count=ongoing_tasks_count, - delete_form=delete_form, - delete_file_forms=delete_file_forms, - asf_id=asf_id, - server_domain=server_domain, - server_host=server_host, - user_ssh_keys=user_ssh_keys, - format_datetime=util.format_datetime, - models=sql, - task_mid=task_mid, - vote_form=vote_form, - vote_task=vote_task, - archive_url=archive_url, - vote_task_warnings=vote_task_warnings, - empty_form=empty_form, - csrf_input=str(form.csrf_input()), - resolve_form=resolve_form, - has_files=has_files, - strict_checking_errors=strict_checking_errors, - can_vote=can_vote, - can_resolve=can_resolve, - checks_summary_html=checks_summary_html, - ) - - -def _checker_display_name(checker: str) -> str: - return checker.removeprefix("atr.tasks.checks.").replace("_", " ").replace(".", " ").title() - - -def _render_checks_summary(info: types.PathInfo | None, project_name: str, version_name: str) -> htm.Element | None: - if (info is None) or (not info.checker_stats): - return None - - card = htm.Block(htm.div, classes=".card.mb-4") - card.div(".card-header")[htpy.h5(".mb-0")["Checks summary"]] - - body = htm.Block(htm.div, classes=".card-body") - for i, stat in enumerate(info.checker_stats): - stripe_class = ".atr-stripe-odd" if ((i % 2) == 0) else ".atr-stripe-even" - details = htm.Block(htm.details, classes=f".mb-0.p-2{stripe_class}") - - summary_content: list[htm.Element | str] = [] - if stat.warning_count > 0: - summary_content.append(htpy.span(".badge.bg-warning.text-dark.me-2")[str(stat.warning_count)]) - if stat.failure_count > 0: - summary_content.append(htpy.span(".badge.bg-danger.me-2")[str(stat.failure_count)]) - summary_content.append(htpy.strong[_checker_display_name(stat.checker)]) - - details.summary[*summary_content] - - files_div = htm.Block(htm.div, classes=".mt-2.atr-checks-files") - all_files = set(stat.failure_files.keys()) | set(stat.warning_files.keys()) - for file_path in sorted(all_files): - report_url = f"/report/{project_name}/{version_name}/{file_path}" - error_count = stat.failure_files.get(file_path, 0) - warning_count = stat.warning_files.get(file_path, 0) - - file_content: list[htm.Element | str] = [] - if error_count > 0: - file_content.append(htpy.span(".badge.bg-danger.me-2")[util.plural(error_count, "error")]) - if warning_count > 0: - file_content.append( - htpy.span(".badge.bg-warning.text-dark.me-2")[util.plural(warning_count, "warning")] - ) - file_content.append(htpy.a(href=report_url)[htpy.strong[htpy.code[file_path]]]) - - files_div.div[*file_content] - - details.append(files_div.collect()) - body.append(details.collect()) - - card.append(body.collect()) - return card.collect() - - -def _warnings_from_vote_result(vote_task: sql.Task | None) -> list[str]: - # TODO: Replace this with a schema.Strict model - # But we'd still need to do some of this parsing and validation - # We should probably rethink how to send data through tasks - - if (not vote_task) or (not vote_task.result): - return ["No vote task result found."] - - vote_task_result = vote_task.result - if not isinstance(vote_task_result, results.VoteInitiate): - return ["Vote task result is not a results.VoteInitiate instance."] - - return vote_task_result.mail_send_warnings - - __all__ = [ "algorithms", "announce", - "check", "distribution", "draft", "finish", @@ -293,4 +83,5 @@ __all__ = [ "user", "vote", "voting", + "web", ] diff --git a/atr/shared/distribution.py b/atr/shared/distribution.py index 743c781..e0af07a 100644 --- a/atr/shared/distribution.py +++ b/atr/shared/distribution.py @@ -17,15 +17,20 @@ from __future__ import annotations +import datetime import enum +import aiohttp import pydantic import atr.db as db import atr.form as form import atr.htm as htm +import atr.models.basic as basic import atr.models.distribution as distribution import atr.models.sql as sql +import atr.util as util +from atr.storage import outcome class DistributionPlatform(enum.Enum): @@ -131,6 +136,25 @@ def html_tr_a(label: str, value: str | None) -> htm.Element: return htm.tr[htm.th[label], htm.td[htm.a(href=value)[value] if value else "-"]] +async def json_from_distribution_platform( + api_url: str, platform: sql.DistributionPlatform, version: str +) -> outcome.Outcome[basic.JSON]: + try: + async with util.create_secure_session() as session: + async with session.get(api_url) as response: + response.raise_for_status() + response_json = await response.json() + result = basic.as_json(response_json) + except aiohttp.ClientError as e: + return outcome.Error(e) + match platform: + case sql.DistributionPlatform.NPM | sql.DistributionPlatform.NPM_SCOPED: + if version not in distribution.NpmResponse.model_validate(result).time: + e = RuntimeError(f"Version '{version}' not found") + return outcome.Error(e) + return outcome.Result(result) + + async def release_validated( project: str, version: str, committee: bool = False, staging: bool | None = None, release_policy: bool = False ) -> sql.Release: @@ -163,3 +187,226 @@ async def release_validated_and_committee( if committee is None: raise RuntimeError(f"Release {project} {version} has no committee") return release, committee + + +# async def __json_from_maven_cdn( +# self, api_url: str, group_id: str, artifact_id: str, version: str +# ) -> outcome.Outcome[models.basic.JSON]: +# import datetime +# +# try: +# async with util.create_secure_session() as session: +# async with session.get(api_url) as response: +# response.raise_for_status() +# +# # Use current time as timestamp since we're just validating the package exists +# timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() * 1000) +# +# # Convert to dict matching MavenResponse structure +# result_dict = { +# "response": { +# "start": 0, +# "docs": [ +# { +# "g": group_id, +# "a": artifact_id, +# "v": version, +# "timestamp": timestamp_ms, +# } +# ], +# } +# } +# result = models.basic.as_json(result_dict) +# return outcome.Result(result) +# except aiohttp.ClientError as e: +# return outcome.Error(e) + + +async def json_from_maven_xml(api_url: str, version: str) -> outcome.Outcome[basic.JSON]: + import datetime + import xml.etree.ElementTree as ET + + try: + async with util.create_secure_session() as session: + async with session.get(api_url) as response: + response.raise_for_status() + xml_text = await response.text() + + # Parse the XML + root = ET.fromstring(xml_text) + + # Extract versioning info + group = root.find("groupId") + artifact = root.find("artifactId") + versioning = root.find("versioning") + if versioning is None: + e = RuntimeError("No versioning element found in Maven metadata") + return outcome.Error(e) + + # Get lastUpdated timestamp (format: yyyyMMddHHmmss) + last_updated_elem = versioning.find("lastUpdated") + if (last_updated_elem is None) or (not last_updated_elem.text): + e = RuntimeError("No lastUpdated timestamp found in Maven metadata") + return outcome.Error(e) + + # Convert lastUpdated string to Unix timestamp in milliseconds + last_updated_str = last_updated_elem.text + dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S") + dt = dt.replace(tzinfo=datetime.UTC) + timestamp_ms = int(dt.timestamp() * 1000) + + # Verify the version exists + versions_elem = versioning.find("versions") + if versions_elem is not None: + versions = [v.text for v in versions_elem.findall("version") if v.text] + if version not in versions: + e = RuntimeError(f"Version '{version}' not found in Maven metadata") + return outcome.Error(e) + + # Convert to dict matching MavenResponse structure + result_dict = { + "response": { + "start": 0, + "docs": [ + { + "g": group.text if (group is not None) else "", + "a": artifact.text if (artifact is not None) else "", + "v": version, + "timestamp": timestamp_ms, + } + ], + } + } + result = basic.as_json(result_dict) + return outcome.Result(result) + except aiohttp.ClientError as e: + return outcome.Error(e) + except ET.ParseError as e: + return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}")) + + +def distribution_upload_date( # noqa: C901 + platform: sql.DistributionPlatform, + data: basic.JSON, + version: str, +) -> datetime.datetime | None: + match platform: + case sql.DistributionPlatform.ARTIFACT_HUB: + if not (versions := distribution.ArtifactHubResponse.model_validate(data).available_versions): + return None + return datetime.datetime.fromtimestamp(versions[0].ts, tz=datetime.UTC) + case sql.DistributionPlatform.DOCKER_HUB: + if not (pushed_at := distribution.DockerResponse.model_validate(data).tag_last_pushed): + return None + return datetime.datetime.fromisoformat(pushed_at.rstrip("Z")) + # case models.sql.DistributionPlatform.GITHUB: + # if not (published_at := GitHubResponse.model_validate(data).published_at): + # return None + # return datetime.datetime.fromisoformat(published_at.rstrip("Z")) + case sql.DistributionPlatform.MAVEN: + m = distribution.MavenResponse.model_validate(data) + docs = m.response.docs + if not docs: + return None + timestamp = docs[0].timestamp + if not timestamp: + return None + return datetime.datetime.fromtimestamp(timestamp / 1000, tz=datetime.UTC) + case sql.DistributionPlatform.NPM | sql.DistributionPlatform.NPM_SCOPED: + if not (times := distribution.NpmResponse.model_validate(data).time): + return None + # Versions can be in the form "1.2.3" or "v1.2.3", so we check for both + if not (upload_time := times.get(version) or times.get(f"v{version}")): + return None + return datetime.datetime.fromisoformat(upload_time.rstrip("Z")) + case sql.DistributionPlatform.PYPI: + if not (urls := distribution.PyPIResponse.model_validate(data).urls): + return None + if not (upload_time := urls[0].upload_time_iso_8601): + return None + return datetime.datetime.fromisoformat(upload_time.rstrip("Z")) + raise NotImplementedError(f"Platform {platform.name} is not yet supported") + + +def distribution_web_url( # noqa: C901 + platform: sql.DistributionPlatform, + data: basic.JSON, + version: str, +) -> str | None: + match platform: + case sql.DistributionPlatform.ARTIFACT_HUB: + ah = distribution.ArtifactHubResponse.model_validate(data) + repo_name = ah.repository.name if ah.repository else None + pkg_name = ah.name + ver = ah.version + if repo_name and pkg_name: + if ver: + return f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{ver}" + return f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{version}" + if ah.home_url: + return ah.home_url + for link in ah.links: + if link.url: + return link.url + return None + case sql.DistributionPlatform.DOCKER_HUB: + # The best we can do on Docker Hub is: + # f"https://hub.docker.com/_/{package}" + return None + # case models.sql.DistributionPlatform.GITHUB: + # gh = GitHubResponse.model_validate(data) + # return gh.html_url + case sql.DistributionPlatform.MAVEN: + return None + case sql.DistributionPlatform.NPM: + nr = distribution.NpmResponse.model_validate(data) + # return nr.homepage + return f"https://www.npmjs.com/package/{nr.name}/v/{version}" + case sql.DistributionPlatform.NPM_SCOPED: + nr = distribution.NpmResponse.model_validate(data) + # TODO: This is not correct + return nr.homepage + case sql.DistributionPlatform.PYPI: + info = distribution.PyPIResponse.model_validate(data).info + return info.release_url or info.project_url + raise NotImplementedError(f"Platform {platform.name} is not yet supported") + + +def get_api_url(dd: distribution.Data, staging: bool | None = None): + template_url = _template_url(dd, staging) + api_url = template_url.format( + owner_namespace=dd.owner_namespace, + package=dd.package, + version=dd.version, + ) + if dd.platform == sql.DistributionPlatform.MAVEN: + # We do this here because the CDNs break the namespace up into a / delimited URL + owner = (dd.owner_namespace or "").replace(".", "/") + api_url = template_url.format( + owner_namespace=owner, + package=dd.package, + version=dd.version, + ) + return api_url + + +def _template_url( + dd: distribution.Data, + staging: bool | None = None, +) -> str: + if staging is False: + return dd.platform.value.template_url + + supported = { + sql.DistributionPlatform.ARTIFACT_HUB, + sql.DistributionPlatform.PYPI, + sql.DistributionPlatform.MAVEN, + } + if dd.platform not in supported: + raise RuntimeError("Staging is currently supported only for ArtifactHub, PyPI and Maven Central.") + + template_url = dd.platform.value.template_staging_url + if template_url is None: + raise RuntimeError("This platform does not provide a staging API endpoint.") + + return template_url diff --git a/atr/shared/__init__.py b/atr/shared/web.py similarity index 81% copy from atr/shared/__init__.py copy to atr/shared/web.py index 4c9e400..66c4bc0 100644 --- a/atr/shared/__init__.py +++ b/atr/shared/web.py @@ -14,8 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING import htpy @@ -27,24 +26,7 @@ import atr.htm as htm import atr.models.results as results import atr.models.sql as sql import atr.post as post -import atr.shared.announce as announce -import atr.shared.distribution as distribution import atr.shared.draft as draft -import atr.shared.finish as finish -import atr.shared.ignores as ignores -import atr.shared.keys as keys -import atr.shared.manual as manual -import atr.shared.projects as projects -import atr.shared.resolve as resolve -import atr.shared.revisions as revisions -import atr.shared.sbom as sbom -import atr.shared.start as start -import atr.shared.test as test -import atr.shared.tokens as tokens -import atr.shared.upload as upload -import atr.shared.user as user -import atr.shared.vote as vote -import atr.shared.voting as voting import atr.storage as storage import atr.storage.types as types import atr.template as template @@ -55,32 +37,6 @@ if TYPE_CHECKING: from collections.abc import Sequence -# | 1 | RSA (Encrypt or Sign) [HAC] | -# | 2 | RSA Encrypt-Only [HAC] | -# | 3 | RSA Sign-Only [HAC] | -# | 16 | Elgamal (Encrypt-Only) [ELGAMAL] [HAC] | -# | 17 | DSA (Digital Signature Algorithm) [FIPS186] [HAC] | -# | 18 | ECDH public key algorithm | -# | 19 | ECDSA public key algorithm [FIPS186] | -# | 20 | Reserved (formerly Elgamal Encrypt or Sign) | -# | 21 | Reserved for Diffie-Hellman | -# | | (X9.42, as defined for IETF-S/MIME) | -# | 22 | EdDSA [I-D.irtf-cfrg-eddsa] | -# - https://lists.gnupg.org/pipermail/gnupg-devel/2017-April/032762.html - -algorithms: Final[dict[int, str]] = { - 1: "RSA", - 2: "RSA", - 3: "RSA", - 16: "Elgamal", - 17: "DSA", - 18: "ECDH", - 19: "ECDSA", - 21: "Diffie-Hellman", - 22: "EdDSA", -} - - async def check( session: web.Committer | None, release: sql.Release, @@ -270,27 +226,3 @@ def _warnings_from_vote_result(vote_task: sql.Task | None) -> list[str]: return ["Vote task result is not a results.VoteInitiate instance."] return vote_task_result.mail_send_warnings - - -__all__ = [ - "algorithms", - "announce", - "check", - "distribution", - "draft", - "finish", - "ignores", - "keys", - "manual", - "projects", - "resolve", - "revisions", - "sbom", - "start", - "test", - "tokens", - "upload", - "user", - "vote", - "voting", -] diff --git a/atr/storage/writers/distributions.py b/atr/storage/writers/distributions.py index 74fe074..312a471 100644 --- a/atr/storage/writers/distributions.py +++ b/atr/storage/writers/distributions.py @@ -19,16 +19,11 @@ from __future__ import annotations import datetime -import sqlite3 - -import aiohttp -import sqlalchemy.exc as exc import atr.db as db import atr.log as log -import atr.models.basic as basic -import atr.models.distribution as distribution -import atr.models.sql as sql +import atr.models as models +import atr.shared.distribution as distribution import atr.storage as storage import atr.storage.outcome as outcome import atr.tasks.gha as gha @@ -100,7 +95,7 @@ class CommitteeMember(CommitteeParticipant): async def automate( self, release_name: str, - platform: sql.DistributionPlatform, + platform: models.sql.DistributionPlatform, committee_name: str, owner_namespace: str | None, project_name: str, @@ -110,9 +105,9 @@ class CommitteeMember(CommitteeParticipant): package: str, version: str, staging: bool, - ) -> sql.Task: - dist_task = sql.Task( - task_type=sql.TaskType.DISTRIBUTION_WORKFLOW, + ) -> models.sql.Task: + dist_task = models.sql.Task( + task_type=models.sql.TaskType.DISTRIBUTION_WORKFLOW, task_args=gha.DistributionWorkflow( name=release_name, namespace=owner_namespace or "", @@ -129,7 +124,7 @@ class CommitteeMember(CommitteeParticipant): ).model_dump(), asf_uid=util.unwrap(self.__asf_uid), added=datetime.datetime.now(datetime.UTC), - status=sql.TaskStatus.QUEUED, + status=models.sql.TaskStatus.QUEUED, project_name=project_name, version_name=version_name, revision_number=revision_number, @@ -142,88 +137,97 @@ class CommitteeMember(CommitteeParticipant): async def record( self, release_name: str, - platform: sql.DistributionPlatform, + platform: models.sql.DistributionPlatform, owner_namespace: str | None, package: str, version: str, staging: bool, + pending: bool, upload_date: datetime.datetime | None, - api_url: str, + api_url: str | None = None, web_url: str | None = None, - ) -> tuple[sql.Distribution, bool]: - distribution = sql.Distribution( + ) -> tuple[models.sql.Distribution, bool]: + existing = await self.__data.distribution(release_name, platform, owner_namespace or "", package, version).get() + dist = models.sql.Distribution( platform=platform, release_name=release_name, owner_namespace=owner_namespace or "", package=package, version=version, staging=staging, + pending=pending, + retries=0, upload_date=upload_date, api_url=api_url, web_url=web_url, ) - self.__data.add(distribution) - try: + if existing is None: + self.__data.add(dist) await self.__data.commit() - except exc.IntegrityError as e: - # "The names and numeric values for existing result codes are fixed and unchanging." - # https://www.sqlite.org/rescode.html - # e.orig.sqlite_errorcode == 1555 - # e.orig.sqlite_errorname == "SQLITE_CONSTRAINT_PRIMARYKEY" - match e.orig: - # TODO: Document this - case sqlite3.IntegrityError(sqlite_errorcode=sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY): - if not staging: - upgraded = await self.__upgrade_staging_to_final( - release_name, - platform, - owner_namespace, - package, - version, - upload_date, - api_url, - web_url, - ) - if upgraded is not None: - return upgraded, False - return distribution, False - raise e - return distribution, True + return dist, True + # If we're doing production and existing was for staging, upgrade it + if (not staging) and existing.staging: + upgraded = await self.__upgrade_staging_to_final( + release_name, + platform, + owner_namespace, + package, + version, + upload_date, + api_url, + web_url, + ) + if upgraded is not None: + return upgraded, False + if existing.pending: + if pending: + existing.retries = existing.retries + 1 + await self.__data.commit() + return existing, False + else: + existing.pending = False + await self.__data.commit() + return existing, False + return dist, False async def record_from_data( self, release_name: str, staging: bool, - dd: distribution.Data, - ) -> tuple[sql.Distribution, bool, distribution.Metadata]: - template_url = await self.__template_url(dd, staging) - api_url = template_url.format( - owner_namespace=dd.owner_namespace, - package=dd.package, - version=dd.version, - ) - if dd.platform == sql.DistributionPlatform.MAVEN: - # We do this here because the CDNs break the namespace up into a / delimited URL - owner = (dd.owner_namespace or "").replace(".", "/") - api_url = template_url.format( - owner_namespace=owner, - package=dd.package, - version=dd.version, - ) - api_oc = await self.__json_from_maven_xml(api_url, dd.version) + dd: models.distribution.Data, + allow_retries: bool = False, + ) -> tuple[models.sql.Distribution, bool, models.distribution.Metadata]: + api_url = distribution.get_api_url(dd, staging) + if dd.platform == models.sql.DistributionPlatform.MAVEN: + api_oc = await distribution.json_from_maven_xml(api_url, dd.version) else: - api_oc = await self.__json_from_distribution_platform(api_url, dd.platform, dd.version) + api_oc = await distribution.json_from_distribution_platform(api_url, dd.platform, dd.version) match api_oc: case outcome.Result(result): pass case outcome.Error(error): log.error(f"Failed to get API response from {api_url}: {error}") + if allow_retries: + dist, added = await self.record( + release_name=release_name, + platform=dd.platform, + owner_namespace=dd.owner_namespace, + package=dd.package, + version=dd.version, + staging=staging, + pending=True, + upload_date=None, + api_url=None, + web_url=None, + ) + if added: + raise storage.AccessError("Distribution could not be found, ATR will retry this automatically") raise storage.AccessError(f"Failed to get API response from distribution platform: {error}") - upload_date = self.__distribution_upload_date(dd.platform, result, dd.version) + upload_date = distribution.distribution_upload_date(dd.platform, result, dd.version) if upload_date is None: raise storage.AccessError("Failed to get upload date from distribution platform") - web_url = self.__distribution_web_url(dd.platform, result, dd.version) - metadata = distribution.Metadata( + web_url = distribution.distribution_web_url(dd.platform, result, dd.version) + metadata = models.distribution.Metadata( api_url=api_url, result=result, upload_date=upload_date, @@ -236,244 +240,24 @@ class CommitteeMember(CommitteeParticipant): package=dd.package, version=dd.version, staging=staging, + pending=False, upload_date=upload_date, api_url=api_url, web_url=web_url, ) return dist, added, metadata - def __distribution_upload_date( # noqa: C901 - self, - platform: sql.DistributionPlatform, - data: basic.JSON, - version: str, - ) -> datetime.datetime | None: - match platform: - case sql.DistributionPlatform.ARTIFACT_HUB: - if not (versions := distribution.ArtifactHubResponse.model_validate(data).available_versions): - return None - return datetime.datetime.fromtimestamp(versions[0].ts, tz=datetime.UTC) - case sql.DistributionPlatform.DOCKER_HUB: - if not (pushed_at := distribution.DockerResponse.model_validate(data).tag_last_pushed): - return None - return datetime.datetime.fromisoformat(pushed_at.rstrip("Z")) - # case sql.DistributionPlatform.GITHUB: - # if not (published_at := GitHubResponse.model_validate(data).published_at): - # return None - # return datetime.datetime.fromisoformat(published_at.rstrip("Z")) - case sql.DistributionPlatform.MAVEN: - m = distribution.MavenResponse.model_validate(data) - docs = m.response.docs - if not docs: - return None - timestamp = docs[0].timestamp - if not timestamp: - return None - return datetime.datetime.fromtimestamp(timestamp / 1000, tz=datetime.UTC) - case sql.DistributionPlatform.NPM | sql.DistributionPlatform.NPM_SCOPED: - if not (times := distribution.NpmResponse.model_validate(data).time): - return None - # Versions can be in the form "1.2.3" or "v1.2.3", so we check for both - if not (upload_time := times.get(version) or times.get(f"v{version}")): - return None - return datetime.datetime.fromisoformat(upload_time.rstrip("Z")) - case sql.DistributionPlatform.PYPI: - if not (urls := distribution.PyPIResponse.model_validate(data).urls): - return None - if not (upload_time := urls[0].upload_time_iso_8601): - return None - return datetime.datetime.fromisoformat(upload_time.rstrip("Z")) - raise NotImplementedError(f"Platform {platform.name} is not yet supported") - - def __distribution_web_url( # noqa: C901 - self, - platform: sql.DistributionPlatform, - data: basic.JSON, - version: str, - ) -> str | None: - match platform: - case sql.DistributionPlatform.ARTIFACT_HUB: - ah = distribution.ArtifactHubResponse.model_validate(data) - repo_name = ah.repository.name if ah.repository else None - pkg_name = ah.name - ver = ah.version - if repo_name and pkg_name: - if ver: - return f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{ver}" - return f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{version}" - if ah.home_url: - return ah.home_url - for link in ah.links: - if link.url: - return link.url - return None - case sql.DistributionPlatform.DOCKER_HUB: - # The best we can do on Docker Hub is: - # f"https://hub.docker.com/_/{package}" - return None - # case sql.DistributionPlatform.GITHUB: - # gh = GitHubResponse.model_validate(data) - # return gh.html_url - case sql.DistributionPlatform.MAVEN: - return None - case sql.DistributionPlatform.NPM: - nr = distribution.NpmResponse.model_validate(data) - # return nr.homepage - return f"https://www.npmjs.com/package/{nr.name}/v/{version}" - case sql.DistributionPlatform.NPM_SCOPED: - nr = distribution.NpmResponse.model_validate(data) - # TODO: This is not correct - return nr.homepage - case sql.DistributionPlatform.PYPI: - info = distribution.PyPIResponse.model_validate(data).info - return info.release_url or info.project_url - raise NotImplementedError(f"Platform {platform.name} is not yet supported") - - async def __json_from_distribution_platform( - self, api_url: str, platform: sql.DistributionPlatform, version: str - ) -> outcome.Outcome[basic.JSON]: - try: - async with util.create_secure_session() as session: - async with session.get(api_url) as response: - response.raise_for_status() - response_json = await response.json() - result = basic.as_json(response_json) - except aiohttp.ClientError as e: - return outcome.Error(e) - match platform: - case sql.DistributionPlatform.NPM | sql.DistributionPlatform.NPM_SCOPED: - if version not in distribution.NpmResponse.model_validate(result).time: - e = RuntimeError(f"Version '{version}' not found") - return outcome.Error(e) - return outcome.Result(result) - - async def __json_from_maven_cdn( - self, api_url: str, group_id: str, artifact_id: str, version: str - ) -> outcome.Outcome[basic.JSON]: - import datetime - - try: - async with util.create_secure_session() as session: - async with session.get(api_url) as response: - response.raise_for_status() - - # Use current time as timestamp since we're just validating the package exists - timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() * 1000) - - # Convert to dict matching MavenResponse structure - result_dict = { - "response": { - "start": 0, - "docs": [ - { - "g": group_id, - "a": artifact_id, - "v": version, - "timestamp": timestamp_ms, - } - ], - } - } - result = basic.as_json(result_dict) - return outcome.Result(result) - except aiohttp.ClientError as e: - return outcome.Error(e) - - async def __json_from_maven_xml(self, api_url: str, version: str) -> outcome.Outcome[basic.JSON]: - import datetime - import xml.etree.ElementTree as ET - - try: - async with util.create_secure_session() as session: - async with session.get(api_url) as response: - response.raise_for_status() - xml_text = await response.text() - - # Parse the XML - root = ET.fromstring(xml_text) - - # Extract versioning info - group = root.find("groupId") - artifact = root.find("artifactId") - versioning = root.find("versioning") - if versioning is None: - e = RuntimeError("No versioning element found in Maven metadata") - return outcome.Error(e) - - # Get lastUpdated timestamp (format: yyyyMMddHHmmss) - last_updated_elem = versioning.find("lastUpdated") - if (last_updated_elem is None) or (not last_updated_elem.text): - e = RuntimeError("No lastUpdated timestamp found in Maven metadata") - return outcome.Error(e) - - # Convert lastUpdated string to Unix timestamp in milliseconds - last_updated_str = last_updated_elem.text - dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S") - dt = dt.replace(tzinfo=datetime.UTC) - timestamp_ms = int(dt.timestamp() * 1000) - - # Verify the version exists - versions_elem = versioning.find("versions") - if versions_elem is not None: - versions = [v.text for v in versions_elem.findall("version") if v.text] - if version not in versions: - e = RuntimeError(f"Version '{version}' not found in Maven metadata") - return outcome.Error(e) - - # Convert to dict matching MavenResponse structure - result_dict = { - "response": { - "start": 0, - "docs": [ - { - "g": group.text if (group is not None) else "", - "a": artifact.text if (artifact is not None) else "", - "v": version, - "timestamp": timestamp_ms, - } - ], - } - } - result = basic.as_json(result_dict) - return outcome.Result(result) - except aiohttp.ClientError as e: - return outcome.Error(e) - except ET.ParseError as e: - return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}")) - - async def __template_url( - self, - dd: distribution.Data, - staging: bool | None = None, - ) -> str: - if staging is False: - return dd.platform.value.template_url - - supported = { - sql.DistributionPlatform.ARTIFACT_HUB, - sql.DistributionPlatform.PYPI, - sql.DistributionPlatform.MAVEN, - } - if dd.platform not in supported: - raise storage.AccessError("Staging is currently supported only for ArtifactHub, PyPI and Maven Central.") - - template_url = dd.platform.value.template_staging_url - if template_url is None: - raise storage.AccessError("This platform does not provide a staging API endpoint.") - - return template_url - async def __upgrade_staging_to_final( self, release_name: str, - platform: sql.DistributionPlatform, + platform: models.sql.DistributionPlatform, owner_namespace: str | None, package: str, version: str, upload_date: datetime.datetime | None, - api_url: str, + api_url: str | None, web_url: str | None, - ) -> sql.Distribution | None: + ) -> models.sql.Distribution | None: tag = f"{release_name} {platform} {owner_namespace or ''} {package} {version}" existing = await self.__data.distribution( release_name=release_name, @@ -494,7 +278,7 @@ class CommitteeMember(CommitteeParticipant): async def delete_distribution( self, release_name: str, - platform: sql.DistributionPlatform, + platform: models.sql.DistributionPlatform, owner_namespace: str, package: str, version: str, diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 38d57d6..ea613bb 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -31,6 +31,7 @@ import atr.tasks.checks.rat as rat import atr.tasks.checks.signature as signature import atr.tasks.checks.targz as targz import atr.tasks.checks.zipformat as zipformat +import atr.tasks.distribution as distribution import atr.tasks.gha as gha import atr.tasks.keys as keys import atr.tasks.message as message @@ -80,6 +81,33 @@ async def clear_scheduled(caller_data: db.Session | None = None) -> None: await data.commit() +async def distribution_status_check( + asf_uid: str, + caller_data: db.Session | None = None, + schedule: datetime.datetime | None = None, + schedule_next: bool = False, +) -> sql.Task: + """Queue a workflow status update task.""" + args = distribution.DistributionStatusCheckArgs(next_schedule_seconds=0, asf_uid=asf_uid) + if schedule_next: + args.next_schedule_seconds = 2 * 60 + async with db.ensure_session(caller_data) as data: + task = sql.Task( + status=sql.TaskStatus.QUEUED, + task_type=sql.TaskType.DISTRIBUTION_STATUS, + task_args=args.model_dump(), + asf_uid=asf_uid, + revision_number=None, + primary_rel_path=None, + ) + if schedule: + task.scheduled = schedule + data.add(task) + await data.commit() + await data.flush() + return task + + async def draft_checks( asf_uid: str, project_name: str, release_version: str, revision_number: str, caller_data: db.Session | None = None ) -> int: @@ -222,6 +250,8 @@ def queued( def resolve(task_type: sql.TaskType) -> Callable[..., Awaitable[results.Results | None]]: # noqa: C901 match task_type: + case sql.TaskType.DISTRIBUTION_STATUS: + return distribution.status_check case sql.TaskType.DISTRIBUTION_WORKFLOW: return gha.trigger_workflow case sql.TaskType.HASHING_CHECK: diff --git a/atr/tasks/distribution.py b/atr/tasks/distribution.py new file mode 100644 index 0000000..541f918 --- /dev/null +++ b/atr/tasks/distribution.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime + +import pydantic + +import atr.db as db +import atr.log as log +import atr.models.distribution as distribution +import atr.models.results as results +import atr.models.schema as schema +import atr.storage as storage +import atr.tasks as tasks +import atr.tasks.checks as checks + +_RETRY_LIMIT = 5 + + +class DistributionStatusCheckArgs(schema.Strict): + """Arguments for the task to re-check distribution statuses.""" + + next_schedule_seconds: int = pydantic.Field(default=0, description="The next scheduled time") + asf_uid: str = schema.description("ASF UID of the user triggering the workflow") + + [email protected]_model(DistributionStatusCheckArgs) +async def status_check(args: DistributionStatusCheckArgs, *, task_id: int | None = None) -> results.Results | None: + dists = [] + async with db.session() as data: + dists = await data.distribution(pending=True, _with_release=True, _with_release_project=True).all() + for dist in dists: + dd = distribution.Data( + platform=dist.platform, + owner_namespace=dist.owner_namespace, + package=dist.package, + version=dist.version, + details=False, + ) + try: + async with storage.write_as_committee_member(str(dist.release.project.committee_name), args.asf_uid) as w: + if dist.retries >= _RETRY_LIMIT: + await w.distributions.delete_distribution( + dist.release_name, dist.platform, dist.owner_namespace, dist.package, dist.version + ) + name = f"{dist.platform} {dist.owner_namespace} {dist.package} {dist.version}" + log.error(f"Distribution {name} failed {_RETRY_LIMIT} times, skipping") + continue + await w.distributions.record_from_data( + dist.release_name, + dist.staging, + dd, + ) + except storage.AccessError as e: + msg = f"Failed to record distribution: {e}" + log.error(msg) + raise RuntimeError(msg) + if args.next_schedule_seconds: + next_schedule = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=args.next_schedule_seconds) + await tasks.distribution_status_check(args.asf_uid, schedule=next_schedule, schedule_next=True) + log.info( + f"Scheduled next workflow status update for: {next_schedule.strftime('%Y-%m-%d %H:%M:%S')}", + ) + return results.DistributionStatusCheck( + kind="distribution_status", + ) diff --git a/migrations/versions/0042_2026.01.28_3e434625.py b/migrations/versions/0042_2026.01.28_3e434625.py new file mode 100644 index 0000000..0e96b86 --- /dev/null +++ b/migrations/versions/0042_2026.01.28_3e434625.py @@ -0,0 +1,31 @@ +"""columns for pending distributions + +Revision ID: 0042_2026.01.28_3e434625 +Revises: 0041_2026.01.22_d1e357f5 +Create Date: 2026-01-28 16:30:09.232235+00:00 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# Revision identifiers, used by Alembic +revision: str = "0042_2026.01.28_3e434625" +down_revision: str | None = "0041_2026.01.22_d1e357f5" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + with op.batch_alter_table("distribution", schema=None) as batch_op: + batch_op.add_column(sa.Column("pending", sa.Boolean(), nullable=False, server_default=sa.false())) + batch_op.add_column(sa.Column("retries", sa.Integer(), nullable=False, server_default="0")) + batch_op.alter_column("api_url", existing_type=sa.VARCHAR(), nullable=True) + + +def downgrade() -> None: + with op.batch_alter_table("distribution", schema=None) as batch_op: + batch_op.alter_column("api_url", existing_type=sa.VARCHAR(), nullable=False) + batch_op.drop_column("retries") + batch_op.drop_column("pending") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
