This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch pending_dist_changes
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit d7d89670f92ddb54e81104d66e55d19f50177702
Author: Alastair McFarlane <[email protected]>
AuthorDate: Thu Jan 29 11:48:53 2026 +0000

    #216 - Add pending distribution status and background task to check it. 
Refactor some of the distribution logic out to shared module and some of shared 
module to precent circular references.
---
 atr/db/__init__.py                              |   9 +
 atr/get/checks.py                               |   2 +-
 atr/get/compose.py                              |   2 +-
 atr/models/__init__.py                          |   4 +-
 atr/models/results.py                           |   9 +-
 atr/models/sql.py                               |   5 +-
 atr/post/distribution.py                        |   1 +
 atr/shared/__init__.py                          | 215 +-------------
 atr/shared/distribution.py                      | 247 ++++++++++++++++
 atr/shared/{__init__.py => web.py}              |  70 +----
 atr/storage/writers/distributions.py            | 360 +++++-------------------
 atr/tasks/__init__.py                           |  30 ++
 atr/tasks/distribution.py                       |  79 ++++++
 migrations/versions/0042_2026.01.28_3e434625.py |  31 ++
 14 files changed, 489 insertions(+), 575 deletions(-)

diff --git a/atr/db/__init__.py b/atr/db/__init__.py
index 4ff46d7..80ebbc5 100644
--- a/atr/db/__init__.py
+++ b/atr/db/__init__.py
@@ -289,6 +289,9 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession):
         owner_namespace: Opt[str] = NOT_SET,
         package: Opt[str] = NOT_SET,
         version: Opt[str] = NOT_SET,
+        pending: Opt[bool] = NOT_SET,
+        _with_release: bool = False,
+        _with_release_project: bool = False,
     ) -> Query[sql.Distribution]:
         query = sqlmodel.select(sql.Distribution)
         if is_defined(release_name):
@@ -301,6 +304,12 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession):
             query = query.where(sql.Distribution.package == package)
         if is_defined(version):
             query = query.where(sql.Distribution.version == version)
+        if is_defined(pending):
+            query = query.where(sql.Distribution.pending == pending)
+        if _with_release_project:
+            query = query.options(joined_load_nested(sql.Distribution.release, 
sql.Release.project))
+        elif _with_release:
+            query = query.options(joined_load(sql.Distribution.release))
         return Query(self, query)
 
     async def execute_query(self, query: sqlalchemy.sql.expression.Executable) 
-> sqlalchemy.engine.Result:
diff --git a/atr/get/checks.py b/atr/get/checks.py
index 1781b2f..df36f9e 100644
--- a/atr/get/checks.py
+++ b/atr/get/checks.py
@@ -162,7 +162,7 @@ async def selected_revision(
 
     ongoing_count = await interaction.tasks_ongoing(project_name, 
version_name, revision_number)
 
-    checks_summary_elem = shared._render_checks_summary(info, project_name, 
version_name)
+    checks_summary_elem = shared.web._render_checks_summary(info, 
project_name, version_name)
     checks_summary_html = str(checks_summary_elem) if checks_summary_elem else 
""
 
     delete_file_forms: dict[str, str] = {}
diff --git a/atr/get/compose.py b/atr/get/compose.py
index 304f246..6e40087 100644
--- a/atr/get/compose.py
+++ b/atr/get/compose.py
@@ -39,4 +39,4 @@ async def selected(session: web.Committer, project_name: str, 
version_name: str)
         ).demand(base.ASFQuartException("Release does not exist", 
errorcode=404))
     if release.phase != sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT:
         return await mapping.release_as_redirect(session, release)
-    return await shared.check(session, release)
+    return await shared.web.check(session, release)
diff --git a/atr/models/__init__.py b/atr/models/__init__.py
index 52c3cad..daca400 100644
--- a/atr/models/__init__.py
+++ b/atr/models/__init__.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from . import api, distribution, helpers, results, schema, sql, tabulate, 
validation
+from . import api, basic, distribution, helpers, results, schema, sql, 
tabulate, validation
 
 # If we use .__name__, pyright gives a warning
-__all__ = ["api", "distribution", "helpers", "results", "schema", "sql", 
"tabulate", "validation"]
+__all__ = ["api", "basic", "distribution", "helpers", "results", "schema", 
"sql", "tabulate", "validation"]
diff --git a/atr/models/results.py b/atr/models/results.py
index 1ecb4d9..eeb73b9 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -22,6 +22,12 @@ import pydantic
 from . import schema
 
 
+class DistributionStatusCheck(schema.Strict):
+    """Result of the task to check pending distribution statuses."""
+
+    kind: Literal["distribution_status"] = schema.Field(alias="kind")
+
+
 class DistributionWorkflow(schema.Strict):
     """Result of the task to run a Github workflow."""
 
@@ -228,7 +234,8 @@ class MetadataUpdate(schema.Strict):
 
 
 Results = Annotated[
-    DistributionWorkflow
+    DistributionStatusCheck
+    | DistributionWorkflow
     | DistributionWorkflowStatus
     | HashingCheck
     | MessageSend
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 9887711..e830083 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -190,6 +190,7 @@ class TaskStatus(str, enum.Enum):
 
 
 class TaskType(str, enum.Enum):
+    DISTRIBUTION_STATUS = "distribution_status"
     DISTRIBUTION_WORKFLOW = "distribution_workflow"
     HASHING_CHECK = "hashing_check"
     KEYS_IMPORT_FILE = "keys_import_file"
@@ -970,8 +971,10 @@ class Distribution(sqlmodel.SQLModel, table=True):
     package: str = sqlmodel.Field(primary_key=True, index=True)
     version: str = sqlmodel.Field(primary_key=True, index=True)
     staging: bool = sqlmodel.Field(default=False)
+    pending: bool = sqlmodel.Field(default=False)
+    retries: int = sqlmodel.Field(default=0)
     upload_date: datetime.datetime | None = sqlmodel.Field(default=None)
-    api_url: str
+    api_url: str | None = sqlmodel.Field(default=None)
     web_url: str | None = sqlmodel.Field(default=None)
     # The API response can be huge, e.g. from npm
     # So we do not store it in the database
diff --git a/atr/post/distribution.py b/atr/post/distribution.py
index 706bafa..438a16a 100644
--- a/atr/post/distribution.py
+++ b/atr/post/distribution.py
@@ -176,6 +176,7 @@ async def record_form_process_page(
                 release_name=release.name,
                 staging=staging,
                 dd=dd,
+                allow_retries=True,
             )
         except storage.AccessError as e:
             # Instead of calling record_form_page_new, redirect with error 
message
diff --git a/atr/shared/__init__.py b/atr/shared/__init__.py
index 4c9e400..4090d59 100644
--- a/atr/shared/__init__.py
+++ b/atr/shared/__init__.py
@@ -15,18 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import TYPE_CHECKING, Final
+from typing import Final
 
-import htpy
-
-import atr.db as db
-import atr.db.interaction as interaction
-import atr.form as form
-import atr.get as get
-import atr.htm as htm
-import atr.models.results as results
-import atr.models.sql as sql
-import atr.post as post
 import atr.shared.announce as announce
 import atr.shared.distribution as distribution
 import atr.shared.draft as draft
@@ -45,15 +35,7 @@ import atr.shared.upload as upload
 import atr.shared.user as user
 import atr.shared.vote as vote
 import atr.shared.voting as voting
-import atr.storage as storage
-import atr.storage.types as types
-import atr.template as template
-import atr.util as util
-import atr.web as web
-
-if TYPE_CHECKING:
-    from collections.abc import Sequence
-
+import atr.shared.web as web
 
 # |         1 | RSA (Encrypt or Sign) [HAC]                        |
 # |         2 | RSA Encrypt-Only [HAC]                             |
@@ -81,201 +63,9 @@ algorithms: Final[dict[int, str]] = {
 }
 
 
-async def check(
-    session: web.Committer | None,
-    release: sql.Release,
-    task_mid: str | None = None,
-    vote_form: htm.Element | None = None,
-    resolve_form: htm.Element | None = None,
-    archive_url: str | None = None,
-    vote_task: sql.Task | None = None,
-    can_vote: bool = False,
-    can_resolve: bool = False,
-) -> web.WerkzeugResponse | str:
-    base_path = util.release_directory(release)
-
-    # TODO: This takes 180ms for providers
-    # We could cache it
-    paths = [path async for path in util.paths_recursive(base_path)]
-    paths.sort()
-
-    async with storage.read(session) as read:
-        ragp = read.as_general_public()
-        info = await ragp.releases.path_info(release, paths)
-
-    user_ssh_keys: Sequence[sql.SSHKey] = []
-    asf_id: str | None = None
-    server_domain: str | None = None
-    server_host: str | None = None
-
-    if session is not None:
-        asf_id = session.uid
-        server_domain = session.app_host.split(":", 1)[0]
-        server_host = session.app_host
-        async with db.session() as data:
-            user_ssh_keys = await data.ssh_key(asf_uid=session.uid).all()
-
-    # Get the number of ongoing tasks for the current revision
-    ongoing_tasks_count = 0
-    match await interaction.latest_info(release.project.name, release.version):
-        case (revision_number, revision_editor, revision_timestamp):
-            ongoing_tasks_count = await interaction.tasks_ongoing(
-                release.project.name,
-                release.version,
-                revision_number,
-            )
-        case None:
-            revision_number = None
-            revision_editor = None
-            revision_timestamp = None
-
-    delete_form = form.render(
-        model_cls=form.Empty,
-        action=util.as_url(get.compose.selected, 
project_name=release.project.name, version_name=release.version),
-        submit_label="Delete this draft",
-        submit_classes="btn btn-danger",
-        empty=True,
-        confirm="Are you sure you want to delete this draft? This cannot be 
undone.",
-    )
-
-    delete_file_forms: dict[str, htm.Element] = {}
-    for path in paths:
-        delete_file_forms[str(path)] = form.render(
-            model_cls=draft.DeleteFileForm,
-            action=util.as_url(post.draft.delete_file, 
project_name=release.project.name, version_name=release.version),
-            form_classes=".d-inline-block.m-0",
-            submit_classes="btn-sm btn-outline-danger",
-            submit_label="Delete",
-            empty=True,
-            defaults={"file_path": str(path)},
-            # TODO: Add a static check for the confirm syntax
-            confirm=(
-                "Are you sure you want to delete this file? "
-                "This will also delete any associated metadata files. "
-                "This cannot be undone."
-            ),
-        )
-
-    empty_form = form.render(
-        model_cls=form.Empty,
-        action=util.as_url(post.draft.fresh, 
project_name=release.project.name, version_name=release.version),
-        submit_label="Restart all checks",
-        submit_classes="btn btn-primary",
-    )
-
-    vote_task_warnings = _warnings_from_vote_result(vote_task)
-    has_files = await util.has_files(release)
-
-    has_any_errors = any(info.errors.get(path, []) for path in paths) if info 
else False
-    strict_checking = release.project.policy_strict_checking
-    strict_checking_errors = strict_checking and has_any_errors
-
-    checks_summary_html = _render_checks_summary(info, release.project.name, 
release.version)
-
-    return await template.render(
-        "check-selected.html",
-        project_name=release.project.name,
-        version_name=release.version,
-        release=release,
-        paths=paths,
-        info=info,
-        revision_editor=revision_editor,
-        revision_time=revision_timestamp,
-        revision_number=revision_number,
-        ongoing_tasks_count=ongoing_tasks_count,
-        delete_form=delete_form,
-        delete_file_forms=delete_file_forms,
-        asf_id=asf_id,
-        server_domain=server_domain,
-        server_host=server_host,
-        user_ssh_keys=user_ssh_keys,
-        format_datetime=util.format_datetime,
-        models=sql,
-        task_mid=task_mid,
-        vote_form=vote_form,
-        vote_task=vote_task,
-        archive_url=archive_url,
-        vote_task_warnings=vote_task_warnings,
-        empty_form=empty_form,
-        csrf_input=str(form.csrf_input()),
-        resolve_form=resolve_form,
-        has_files=has_files,
-        strict_checking_errors=strict_checking_errors,
-        can_vote=can_vote,
-        can_resolve=can_resolve,
-        checks_summary_html=checks_summary_html,
-    )
-
-
-def _checker_display_name(checker: str) -> str:
-    return checker.removeprefix("atr.tasks.checks.").replace("_", " 
").replace(".", " ").title()
-
-
-def _render_checks_summary(info: types.PathInfo | None, project_name: str, 
version_name: str) -> htm.Element | None:
-    if (info is None) or (not info.checker_stats):
-        return None
-
-    card = htm.Block(htm.div, classes=".card.mb-4")
-    card.div(".card-header")[htpy.h5(".mb-0")["Checks summary"]]
-
-    body = htm.Block(htm.div, classes=".card-body")
-    for i, stat in enumerate(info.checker_stats):
-        stripe_class = ".atr-stripe-odd" if ((i % 2) == 0) else 
".atr-stripe-even"
-        details = htm.Block(htm.details, classes=f".mb-0.p-2{stripe_class}")
-
-        summary_content: list[htm.Element | str] = []
-        if stat.warning_count > 0:
-            
summary_content.append(htpy.span(".badge.bg-warning.text-dark.me-2")[str(stat.warning_count)])
-        if stat.failure_count > 0:
-            
summary_content.append(htpy.span(".badge.bg-danger.me-2")[str(stat.failure_count)])
-        
summary_content.append(htpy.strong[_checker_display_name(stat.checker)])
-
-        details.summary[*summary_content]
-
-        files_div = htm.Block(htm.div, classes=".mt-2.atr-checks-files")
-        all_files = set(stat.failure_files.keys()) | 
set(stat.warning_files.keys())
-        for file_path in sorted(all_files):
-            report_url = f"/report/{project_name}/{version_name}/{file_path}"
-            error_count = stat.failure_files.get(file_path, 0)
-            warning_count = stat.warning_files.get(file_path, 0)
-
-            file_content: list[htm.Element | str] = []
-            if error_count > 0:
-                
file_content.append(htpy.span(".badge.bg-danger.me-2")[util.plural(error_count, 
"error")])
-            if warning_count > 0:
-                file_content.append(
-                    
htpy.span(".badge.bg-warning.text-dark.me-2")[util.plural(warning_count, 
"warning")]
-                )
-            
file_content.append(htpy.a(href=report_url)[htpy.strong[htpy.code[file_path]]])
-
-            files_div.div[*file_content]
-
-        details.append(files_div.collect())
-        body.append(details.collect())
-
-    card.append(body.collect())
-    return card.collect()
-
-
-def _warnings_from_vote_result(vote_task: sql.Task | None) -> list[str]:
-    # TODO: Replace this with a schema.Strict model
-    # But we'd still need to do some of this parsing and validation
-    # We should probably rethink how to send data through tasks
-
-    if (not vote_task) or (not vote_task.result):
-        return ["No vote task result found."]
-
-    vote_task_result = vote_task.result
-    if not isinstance(vote_task_result, results.VoteInitiate):
-        return ["Vote task result is not a results.VoteInitiate instance."]
-
-    return vote_task_result.mail_send_warnings
-
-
 __all__ = [
     "algorithms",
     "announce",
-    "check",
     "distribution",
     "draft",
     "finish",
@@ -293,4 +83,5 @@ __all__ = [
     "user",
     "vote",
     "voting",
+    "web",
 ]
diff --git a/atr/shared/distribution.py b/atr/shared/distribution.py
index 743c781..e0af07a 100644
--- a/atr/shared/distribution.py
+++ b/atr/shared/distribution.py
@@ -17,15 +17,20 @@
 
 from __future__ import annotations
 
+import datetime
 import enum
 
+import aiohttp
 import pydantic
 
 import atr.db as db
 import atr.form as form
 import atr.htm as htm
+import atr.models.basic as basic
 import atr.models.distribution as distribution
 import atr.models.sql as sql
+import atr.util as util
+from atr.storage import outcome
 
 
 class DistributionPlatform(enum.Enum):
@@ -131,6 +136,25 @@ def html_tr_a(label: str, value: str | None) -> 
htm.Element:
     return htm.tr[htm.th[label], htm.td[htm.a(href=value)[value] if value else 
"-"]]
 
 
+async def json_from_distribution_platform(
+    api_url: str, platform: sql.DistributionPlatform, version: str
+) -> outcome.Outcome[basic.JSON]:
+    try:
+        async with util.create_secure_session() as session:
+            async with session.get(api_url) as response:
+                response.raise_for_status()
+                response_json = await response.json()
+        result = basic.as_json(response_json)
+    except aiohttp.ClientError as e:
+        return outcome.Error(e)
+    match platform:
+        case sql.DistributionPlatform.NPM | 
sql.DistributionPlatform.NPM_SCOPED:
+            if version not in 
distribution.NpmResponse.model_validate(result).time:
+                e = RuntimeError(f"Version '{version}' not found")
+                return outcome.Error(e)
+    return outcome.Result(result)
+
+
 async def release_validated(
     project: str, version: str, committee: bool = False, staging: bool | None 
= None, release_policy: bool = False
 ) -> sql.Release:
@@ -163,3 +187,226 @@ async def release_validated_and_committee(
     if committee is None:
         raise RuntimeError(f"Release {project} {version} has no committee")
     return release, committee
+
+
+# async def __json_from_maven_cdn(
+#     self, api_url: str, group_id: str, artifact_id: str, version: str
+# ) -> outcome.Outcome[models.basic.JSON]:
+#     import datetime
+#
+#     try:
+#         async with util.create_secure_session() as session:
+#             async with session.get(api_url) as response:
+#                 response.raise_for_status()
+#
+#         # Use current time as timestamp since we're just validating the 
package exists
+#         timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() * 
1000)
+#
+#         # Convert to dict matching MavenResponse structure
+#         result_dict = {
+#             "response": {
+#                 "start": 0,
+#                 "docs": [
+#                     {
+#                         "g": group_id,
+#                         "a": artifact_id,
+#                         "v": version,
+#                         "timestamp": timestamp_ms,
+#                     }
+#                 ],
+#             }
+#         }
+#         result = models.basic.as_json(result_dict)
+#         return outcome.Result(result)
+#     except aiohttp.ClientError as e:
+#         return outcome.Error(e)
+
+
+async def json_from_maven_xml(api_url: str, version: str) -> 
outcome.Outcome[basic.JSON]:
+    import datetime
+    import xml.etree.ElementTree as ET
+
+    try:
+        async with util.create_secure_session() as session:
+            async with session.get(api_url) as response:
+                response.raise_for_status()
+                xml_text = await response.text()
+
+        # Parse the XML
+        root = ET.fromstring(xml_text)
+
+        # Extract versioning info
+        group = root.find("groupId")
+        artifact = root.find("artifactId")
+        versioning = root.find("versioning")
+        if versioning is None:
+            e = RuntimeError("No versioning element found in Maven metadata")
+            return outcome.Error(e)
+
+        # Get lastUpdated timestamp (format: yyyyMMddHHmmss)
+        last_updated_elem = versioning.find("lastUpdated")
+        if (last_updated_elem is None) or (not last_updated_elem.text):
+            e = RuntimeError("No lastUpdated timestamp found in Maven 
metadata")
+            return outcome.Error(e)
+
+        # Convert lastUpdated string to Unix timestamp in milliseconds
+        last_updated_str = last_updated_elem.text
+        dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S")
+        dt = dt.replace(tzinfo=datetime.UTC)
+        timestamp_ms = int(dt.timestamp() * 1000)
+
+        # Verify the version exists
+        versions_elem = versioning.find("versions")
+        if versions_elem is not None:
+            versions = [v.text for v in versions_elem.findall("version") if 
v.text]
+            if version not in versions:
+                e = RuntimeError(f"Version '{version}' not found in Maven 
metadata")
+                return outcome.Error(e)
+
+        # Convert to dict matching MavenResponse structure
+        result_dict = {
+            "response": {
+                "start": 0,
+                "docs": [
+                    {
+                        "g": group.text if (group is not None) else "",
+                        "a": artifact.text if (artifact is not None) else "",
+                        "v": version,
+                        "timestamp": timestamp_ms,
+                    }
+                ],
+            }
+        }
+        result = basic.as_json(result_dict)
+        return outcome.Result(result)
+    except aiohttp.ClientError as e:
+        return outcome.Error(e)
+    except ET.ParseError as e:
+        return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}"))
+
+
+def distribution_upload_date(  # noqa: C901
+    platform: sql.DistributionPlatform,
+    data: basic.JSON,
+    version: str,
+) -> datetime.datetime | None:
+    match platform:
+        case sql.DistributionPlatform.ARTIFACT_HUB:
+            if not (versions := 
distribution.ArtifactHubResponse.model_validate(data).available_versions):
+                return None
+            return datetime.datetime.fromtimestamp(versions[0].ts, 
tz=datetime.UTC)
+        case sql.DistributionPlatform.DOCKER_HUB:
+            if not (pushed_at := 
distribution.DockerResponse.model_validate(data).tag_last_pushed):
+                return None
+            return datetime.datetime.fromisoformat(pushed_at.rstrip("Z"))
+        # case models.sql.DistributionPlatform.GITHUB:
+        #     if not (published_at := 
GitHubResponse.model_validate(data).published_at):
+        #         return None
+        #     return datetime.datetime.fromisoformat(published_at.rstrip("Z"))
+        case sql.DistributionPlatform.MAVEN:
+            m = distribution.MavenResponse.model_validate(data)
+            docs = m.response.docs
+            if not docs:
+                return None
+            timestamp = docs[0].timestamp
+            if not timestamp:
+                return None
+            return datetime.datetime.fromtimestamp(timestamp / 1000, 
tz=datetime.UTC)
+        case sql.DistributionPlatform.NPM | 
sql.DistributionPlatform.NPM_SCOPED:
+            if not (times := 
distribution.NpmResponse.model_validate(data).time):
+                return None
+            # Versions can be in the form "1.2.3" or "v1.2.3", so we check for 
both
+            if not (upload_time := times.get(version) or 
times.get(f"v{version}")):
+                return None
+            return datetime.datetime.fromisoformat(upload_time.rstrip("Z"))
+        case sql.DistributionPlatform.PYPI:
+            if not (urls := 
distribution.PyPIResponse.model_validate(data).urls):
+                return None
+            if not (upload_time := urls[0].upload_time_iso_8601):
+                return None
+            return datetime.datetime.fromisoformat(upload_time.rstrip("Z"))
+    raise NotImplementedError(f"Platform {platform.name} is not yet supported")
+
+
+def distribution_web_url(  # noqa: C901
+    platform: sql.DistributionPlatform,
+    data: basic.JSON,
+    version: str,
+) -> str | None:
+    match platform:
+        case sql.DistributionPlatform.ARTIFACT_HUB:
+            ah = distribution.ArtifactHubResponse.model_validate(data)
+            repo_name = ah.repository.name if ah.repository else None
+            pkg_name = ah.name
+            ver = ah.version
+            if repo_name and pkg_name:
+                if ver:
+                    return 
f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{ver}";
+                return 
f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{version}";
+            if ah.home_url:
+                return ah.home_url
+            for link in ah.links:
+                if link.url:
+                    return link.url
+            return None
+        case sql.DistributionPlatform.DOCKER_HUB:
+            # The best we can do on Docker Hub is:
+            # f"https://hub.docker.com/_/{package}";
+            return None
+        # case models.sql.DistributionPlatform.GITHUB:
+        #     gh = GitHubResponse.model_validate(data)
+        #     return gh.html_url
+        case sql.DistributionPlatform.MAVEN:
+            return None
+        case sql.DistributionPlatform.NPM:
+            nr = distribution.NpmResponse.model_validate(data)
+            # return nr.homepage
+            return f"https://www.npmjs.com/package/{nr.name}/v/{version}";
+        case sql.DistributionPlatform.NPM_SCOPED:
+            nr = distribution.NpmResponse.model_validate(data)
+            # TODO: This is not correct
+            return nr.homepage
+        case sql.DistributionPlatform.PYPI:
+            info = distribution.PyPIResponse.model_validate(data).info
+            return info.release_url or info.project_url
+    raise NotImplementedError(f"Platform {platform.name} is not yet supported")
+
+
+def get_api_url(dd: distribution.Data, staging: bool | None = None):
+    template_url = _template_url(dd, staging)
+    api_url = template_url.format(
+        owner_namespace=dd.owner_namespace,
+        package=dd.package,
+        version=dd.version,
+    )
+    if dd.platform == sql.DistributionPlatform.MAVEN:
+        # We do this here because the CDNs break the namespace up into a / 
delimited URL
+        owner = (dd.owner_namespace or "").replace(".", "/")
+        api_url = template_url.format(
+            owner_namespace=owner,
+            package=dd.package,
+            version=dd.version,
+        )
+    return api_url
+
+
+def _template_url(
+    dd: distribution.Data,
+    staging: bool | None = None,
+) -> str:
+    if staging is False:
+        return dd.platform.value.template_url
+
+    supported = {
+        sql.DistributionPlatform.ARTIFACT_HUB,
+        sql.DistributionPlatform.PYPI,
+        sql.DistributionPlatform.MAVEN,
+    }
+    if dd.platform not in supported:
+        raise RuntimeError("Staging is currently supported only for 
ArtifactHub, PyPI and Maven Central.")
+
+    template_url = dd.platform.value.template_staging_url
+    if template_url is None:
+        raise RuntimeError("This platform does not provide a staging API 
endpoint.")
+
+    return template_url
diff --git a/atr/shared/__init__.py b/atr/shared/web.py
similarity index 81%
copy from atr/shared/__init__.py
copy to atr/shared/web.py
index 4c9e400..66c4bc0 100644
--- a/atr/shared/__init__.py
+++ b/atr/shared/web.py
@@ -14,8 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from typing import TYPE_CHECKING, Final
+from typing import TYPE_CHECKING
 
 import htpy
 
@@ -27,24 +26,7 @@ import atr.htm as htm
 import atr.models.results as results
 import atr.models.sql as sql
 import atr.post as post
-import atr.shared.announce as announce
-import atr.shared.distribution as distribution
 import atr.shared.draft as draft
-import atr.shared.finish as finish
-import atr.shared.ignores as ignores
-import atr.shared.keys as keys
-import atr.shared.manual as manual
-import atr.shared.projects as projects
-import atr.shared.resolve as resolve
-import atr.shared.revisions as revisions
-import atr.shared.sbom as sbom
-import atr.shared.start as start
-import atr.shared.test as test
-import atr.shared.tokens as tokens
-import atr.shared.upload as upload
-import atr.shared.user as user
-import atr.shared.vote as vote
-import atr.shared.voting as voting
 import atr.storage as storage
 import atr.storage.types as types
 import atr.template as template
@@ -55,32 +37,6 @@ if TYPE_CHECKING:
     from collections.abc import Sequence
 
 
-# |         1 | RSA (Encrypt or Sign) [HAC]                        |
-# |         2 | RSA Encrypt-Only [HAC]                             |
-# |         3 | RSA Sign-Only [HAC]                                |
-# |        16 | Elgamal (Encrypt-Only) [ELGAMAL] [HAC]             |
-# |        17 | DSA (Digital Signature Algorithm) [FIPS186] [HAC]  |
-# |        18 | ECDH public key algorithm                          |
-# |        19 | ECDSA public key algorithm [FIPS186]               |
-# |        20 | Reserved (formerly Elgamal Encrypt or Sign)        |
-# |        21 | Reserved for Diffie-Hellman                        |
-# |           | (X9.42, as defined for IETF-S/MIME)                |
-# |        22 | EdDSA [I-D.irtf-cfrg-eddsa]                        |
-# - https://lists.gnupg.org/pipermail/gnupg-devel/2017-April/032762.html
-
-algorithms: Final[dict[int, str]] = {
-    1: "RSA",
-    2: "RSA",
-    3: "RSA",
-    16: "Elgamal",
-    17: "DSA",
-    18: "ECDH",
-    19: "ECDSA",
-    21: "Diffie-Hellman",
-    22: "EdDSA",
-}
-
-
 async def check(
     session: web.Committer | None,
     release: sql.Release,
@@ -270,27 +226,3 @@ def _warnings_from_vote_result(vote_task: sql.Task | None) 
-> list[str]:
         return ["Vote task result is not a results.VoteInitiate instance."]
 
     return vote_task_result.mail_send_warnings
-
-
-__all__ = [
-    "algorithms",
-    "announce",
-    "check",
-    "distribution",
-    "draft",
-    "finish",
-    "ignores",
-    "keys",
-    "manual",
-    "projects",
-    "resolve",
-    "revisions",
-    "sbom",
-    "start",
-    "test",
-    "tokens",
-    "upload",
-    "user",
-    "vote",
-    "voting",
-]
diff --git a/atr/storage/writers/distributions.py 
b/atr/storage/writers/distributions.py
index 74fe074..312a471 100644
--- a/atr/storage/writers/distributions.py
+++ b/atr/storage/writers/distributions.py
@@ -19,16 +19,11 @@
 from __future__ import annotations
 
 import datetime
-import sqlite3
-
-import aiohttp
-import sqlalchemy.exc as exc
 
 import atr.db as db
 import atr.log as log
-import atr.models.basic as basic
-import atr.models.distribution as distribution
-import atr.models.sql as sql
+import atr.models as models
+import atr.shared.distribution as distribution
 import atr.storage as storage
 import atr.storage.outcome as outcome
 import atr.tasks.gha as gha
@@ -100,7 +95,7 @@ class CommitteeMember(CommitteeParticipant):
     async def automate(
         self,
         release_name: str,
-        platform: sql.DistributionPlatform,
+        platform: models.sql.DistributionPlatform,
         committee_name: str,
         owner_namespace: str | None,
         project_name: str,
@@ -110,9 +105,9 @@ class CommitteeMember(CommitteeParticipant):
         package: str,
         version: str,
         staging: bool,
-    ) -> sql.Task:
-        dist_task = sql.Task(
-            task_type=sql.TaskType.DISTRIBUTION_WORKFLOW,
+    ) -> models.sql.Task:
+        dist_task = models.sql.Task(
+            task_type=models.sql.TaskType.DISTRIBUTION_WORKFLOW,
             task_args=gha.DistributionWorkflow(
                 name=release_name,
                 namespace=owner_namespace or "",
@@ -129,7 +124,7 @@ class CommitteeMember(CommitteeParticipant):
             ).model_dump(),
             asf_uid=util.unwrap(self.__asf_uid),
             added=datetime.datetime.now(datetime.UTC),
-            status=sql.TaskStatus.QUEUED,
+            status=models.sql.TaskStatus.QUEUED,
             project_name=project_name,
             version_name=version_name,
             revision_number=revision_number,
@@ -142,88 +137,97 @@ class CommitteeMember(CommitteeParticipant):
     async def record(
         self,
         release_name: str,
-        platform: sql.DistributionPlatform,
+        platform: models.sql.DistributionPlatform,
         owner_namespace: str | None,
         package: str,
         version: str,
         staging: bool,
+        pending: bool,
         upload_date: datetime.datetime | None,
-        api_url: str,
+        api_url: str | None = None,
         web_url: str | None = None,
-    ) -> tuple[sql.Distribution, bool]:
-        distribution = sql.Distribution(
+    ) -> tuple[models.sql.Distribution, bool]:
+        existing = await self.__data.distribution(release_name, platform, 
owner_namespace or "", package, version).get()
+        dist = models.sql.Distribution(
             platform=platform,
             release_name=release_name,
             owner_namespace=owner_namespace or "",
             package=package,
             version=version,
             staging=staging,
+            pending=pending,
+            retries=0,
             upload_date=upload_date,
             api_url=api_url,
             web_url=web_url,
         )
-        self.__data.add(distribution)
-        try:
+        if existing is None:
+            self.__data.add(dist)
             await self.__data.commit()
-        except exc.IntegrityError as e:
-            # "The names and numeric values for existing result codes are 
fixed and unchanging."
-            # https://www.sqlite.org/rescode.html
-            # e.orig.sqlite_errorcode == 1555
-            # e.orig.sqlite_errorname == "SQLITE_CONSTRAINT_PRIMARYKEY"
-            match e.orig:
-                # TODO: Document this
-                case 
sqlite3.IntegrityError(sqlite_errorcode=sqlite3.SQLITE_CONSTRAINT_PRIMARYKEY):
-                    if not staging:
-                        upgraded = await self.__upgrade_staging_to_final(
-                            release_name,
-                            platform,
-                            owner_namespace,
-                            package,
-                            version,
-                            upload_date,
-                            api_url,
-                            web_url,
-                        )
-                        if upgraded is not None:
-                            return upgraded, False
-                    return distribution, False
-            raise e
-        return distribution, True
+            return dist, True
+        # If we're doing production and existing was for staging, upgrade it
+        if (not staging) and existing.staging:
+            upgraded = await self.__upgrade_staging_to_final(
+                release_name,
+                platform,
+                owner_namespace,
+                package,
+                version,
+                upload_date,
+                api_url,
+                web_url,
+            )
+            if upgraded is not None:
+                return upgraded, False
+        if existing.pending:
+            if pending:
+                existing.retries = existing.retries + 1
+                await self.__data.commit()
+                return existing, False
+            else:
+                existing.pending = False
+                await self.__data.commit()
+                return existing, False
+        return dist, False
 
     async def record_from_data(
         self,
         release_name: str,
         staging: bool,
-        dd: distribution.Data,
-    ) -> tuple[sql.Distribution, bool, distribution.Metadata]:
-        template_url = await self.__template_url(dd, staging)
-        api_url = template_url.format(
-            owner_namespace=dd.owner_namespace,
-            package=dd.package,
-            version=dd.version,
-        )
-        if dd.platform == sql.DistributionPlatform.MAVEN:
-            # We do this here because the CDNs break the namespace up into a / 
delimited URL
-            owner = (dd.owner_namespace or "").replace(".", "/")
-            api_url = template_url.format(
-                owner_namespace=owner,
-                package=dd.package,
-                version=dd.version,
-            )
-            api_oc = await self.__json_from_maven_xml(api_url, dd.version)
+        dd: models.distribution.Data,
+        allow_retries: bool = False,
+    ) -> tuple[models.sql.Distribution, bool, models.distribution.Metadata]:
+        api_url = distribution.get_api_url(dd, staging)
+        if dd.platform == models.sql.DistributionPlatform.MAVEN:
+            api_oc = await distribution.json_from_maven_xml(api_url, 
dd.version)
         else:
-            api_oc = await self.__json_from_distribution_platform(api_url, 
dd.platform, dd.version)
+            api_oc = await 
distribution.json_from_distribution_platform(api_url, dd.platform, dd.version)
         match api_oc:
             case outcome.Result(result):
                 pass
             case outcome.Error(error):
                 log.error(f"Failed to get API response from {api_url}: 
{error}")
+                if allow_retries:
+                    dist, added = await self.record(
+                        release_name=release_name,
+                        platform=dd.platform,
+                        owner_namespace=dd.owner_namespace,
+                        package=dd.package,
+                        version=dd.version,
+                        staging=staging,
+                        pending=True,
+                        upload_date=None,
+                        api_url=None,
+                        web_url=None,
+                    )
+                    if added:
+                        raise storage.AccessError("Distribution could not be 
found, ATR will retry this automatically")
                 raise storage.AccessError(f"Failed to get API response from 
distribution platform: {error}")
-        upload_date = self.__distribution_upload_date(dd.platform, result, 
dd.version)
+        upload_date = distribution.distribution_upload_date(dd.platform, 
result, dd.version)
         if upload_date is None:
             raise storage.AccessError("Failed to get upload date from 
distribution platform")
-        web_url = self.__distribution_web_url(dd.platform, result, dd.version)
-        metadata = distribution.Metadata(
+        web_url = distribution.distribution_web_url(dd.platform, result, 
dd.version)
+        metadata = models.distribution.Metadata(
             api_url=api_url,
             result=result,
             upload_date=upload_date,
@@ -236,244 +240,24 @@ class CommitteeMember(CommitteeParticipant):
             package=dd.package,
             version=dd.version,
             staging=staging,
+            pending=False,
             upload_date=upload_date,
             api_url=api_url,
             web_url=web_url,
         )
         return dist, added, metadata
 
-    def __distribution_upload_date(  # noqa: C901
-        self,
-        platform: sql.DistributionPlatform,
-        data: basic.JSON,
-        version: str,
-    ) -> datetime.datetime | None:
-        match platform:
-            case sql.DistributionPlatform.ARTIFACT_HUB:
-                if not (versions := 
distribution.ArtifactHubResponse.model_validate(data).available_versions):
-                    return None
-                return datetime.datetime.fromtimestamp(versions[0].ts, 
tz=datetime.UTC)
-            case sql.DistributionPlatform.DOCKER_HUB:
-                if not (pushed_at := 
distribution.DockerResponse.model_validate(data).tag_last_pushed):
-                    return None
-                return datetime.datetime.fromisoformat(pushed_at.rstrip("Z"))
-            # case sql.DistributionPlatform.GITHUB:
-            #     if not (published_at := 
GitHubResponse.model_validate(data).published_at):
-            #         return None
-            #     return 
datetime.datetime.fromisoformat(published_at.rstrip("Z"))
-            case sql.DistributionPlatform.MAVEN:
-                m = distribution.MavenResponse.model_validate(data)
-                docs = m.response.docs
-                if not docs:
-                    return None
-                timestamp = docs[0].timestamp
-                if not timestamp:
-                    return None
-                return datetime.datetime.fromtimestamp(timestamp / 1000, 
tz=datetime.UTC)
-            case sql.DistributionPlatform.NPM | 
sql.DistributionPlatform.NPM_SCOPED:
-                if not (times := 
distribution.NpmResponse.model_validate(data).time):
-                    return None
-                # Versions can be in the form "1.2.3" or "v1.2.3", so we check 
for both
-                if not (upload_time := times.get(version) or 
times.get(f"v{version}")):
-                    return None
-                return datetime.datetime.fromisoformat(upload_time.rstrip("Z"))
-            case sql.DistributionPlatform.PYPI:
-                if not (urls := 
distribution.PyPIResponse.model_validate(data).urls):
-                    return None
-                if not (upload_time := urls[0].upload_time_iso_8601):
-                    return None
-                return datetime.datetime.fromisoformat(upload_time.rstrip("Z"))
-        raise NotImplementedError(f"Platform {platform.name} is not yet 
supported")
-
-    def __distribution_web_url(  # noqa: C901
-        self,
-        platform: sql.DistributionPlatform,
-        data: basic.JSON,
-        version: str,
-    ) -> str | None:
-        match platform:
-            case sql.DistributionPlatform.ARTIFACT_HUB:
-                ah = distribution.ArtifactHubResponse.model_validate(data)
-                repo_name = ah.repository.name if ah.repository else None
-                pkg_name = ah.name
-                ver = ah.version
-                if repo_name and pkg_name:
-                    if ver:
-                        return 
f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{ver}";
-                    return 
f"https://artifacthub.io/packages/helm/{repo_name}/{pkg_name}/{version}";
-                if ah.home_url:
-                    return ah.home_url
-                for link in ah.links:
-                    if link.url:
-                        return link.url
-                return None
-            case sql.DistributionPlatform.DOCKER_HUB:
-                # The best we can do on Docker Hub is:
-                # f"https://hub.docker.com/_/{package}";
-                return None
-            # case sql.DistributionPlatform.GITHUB:
-            #     gh = GitHubResponse.model_validate(data)
-            #     return gh.html_url
-            case sql.DistributionPlatform.MAVEN:
-                return None
-            case sql.DistributionPlatform.NPM:
-                nr = distribution.NpmResponse.model_validate(data)
-                # return nr.homepage
-                return f"https://www.npmjs.com/package/{nr.name}/v/{version}";
-            case sql.DistributionPlatform.NPM_SCOPED:
-                nr = distribution.NpmResponse.model_validate(data)
-                # TODO: This is not correct
-                return nr.homepage
-            case sql.DistributionPlatform.PYPI:
-                info = distribution.PyPIResponse.model_validate(data).info
-                return info.release_url or info.project_url
-        raise NotImplementedError(f"Platform {platform.name} is not yet 
supported")
-
-    async def __json_from_distribution_platform(
-        self, api_url: str, platform: sql.DistributionPlatform, version: str
-    ) -> outcome.Outcome[basic.JSON]:
-        try:
-            async with util.create_secure_session() as session:
-                async with session.get(api_url) as response:
-                    response.raise_for_status()
-                    response_json = await response.json()
-            result = basic.as_json(response_json)
-        except aiohttp.ClientError as e:
-            return outcome.Error(e)
-        match platform:
-            case sql.DistributionPlatform.NPM | 
sql.DistributionPlatform.NPM_SCOPED:
-                if version not in 
distribution.NpmResponse.model_validate(result).time:
-                    e = RuntimeError(f"Version '{version}' not found")
-                    return outcome.Error(e)
-        return outcome.Result(result)
-
-    async def __json_from_maven_cdn(
-        self, api_url: str, group_id: str, artifact_id: str, version: str
-    ) -> outcome.Outcome[basic.JSON]:
-        import datetime
-
-        try:
-            async with util.create_secure_session() as session:
-                async with session.get(api_url) as response:
-                    response.raise_for_status()
-
-            # Use current time as timestamp since we're just validating the 
package exists
-            timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() 
* 1000)
-
-            # Convert to dict matching MavenResponse structure
-            result_dict = {
-                "response": {
-                    "start": 0,
-                    "docs": [
-                        {
-                            "g": group_id,
-                            "a": artifact_id,
-                            "v": version,
-                            "timestamp": timestamp_ms,
-                        }
-                    ],
-                }
-            }
-            result = basic.as_json(result_dict)
-            return outcome.Result(result)
-        except aiohttp.ClientError as e:
-            return outcome.Error(e)
-
-    async def __json_from_maven_xml(self, api_url: str, version: str) -> 
outcome.Outcome[basic.JSON]:
-        import datetime
-        import xml.etree.ElementTree as ET
-
-        try:
-            async with util.create_secure_session() as session:
-                async with session.get(api_url) as response:
-                    response.raise_for_status()
-                    xml_text = await response.text()
-
-            # Parse the XML
-            root = ET.fromstring(xml_text)
-
-            # Extract versioning info
-            group = root.find("groupId")
-            artifact = root.find("artifactId")
-            versioning = root.find("versioning")
-            if versioning is None:
-                e = RuntimeError("No versioning element found in Maven 
metadata")
-                return outcome.Error(e)
-
-            # Get lastUpdated timestamp (format: yyyyMMddHHmmss)
-            last_updated_elem = versioning.find("lastUpdated")
-            if (last_updated_elem is None) or (not last_updated_elem.text):
-                e = RuntimeError("No lastUpdated timestamp found in Maven 
metadata")
-                return outcome.Error(e)
-
-            # Convert lastUpdated string to Unix timestamp in milliseconds
-            last_updated_str = last_updated_elem.text
-            dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S")
-            dt = dt.replace(tzinfo=datetime.UTC)
-            timestamp_ms = int(dt.timestamp() * 1000)
-
-            # Verify the version exists
-            versions_elem = versioning.find("versions")
-            if versions_elem is not None:
-                versions = [v.text for v in versions_elem.findall("version") 
if v.text]
-                if version not in versions:
-                    e = RuntimeError(f"Version '{version}' not found in Maven 
metadata")
-                    return outcome.Error(e)
-
-            # Convert to dict matching MavenResponse structure
-            result_dict = {
-                "response": {
-                    "start": 0,
-                    "docs": [
-                        {
-                            "g": group.text if (group is not None) else "",
-                            "a": artifact.text if (artifact is not None) else 
"",
-                            "v": version,
-                            "timestamp": timestamp_ms,
-                        }
-                    ],
-                }
-            }
-            result = basic.as_json(result_dict)
-            return outcome.Result(result)
-        except aiohttp.ClientError as e:
-            return outcome.Error(e)
-        except ET.ParseError as e:
-            return outcome.Error(RuntimeError(f"Failed to parse Maven XML: 
{e}"))
-
-    async def __template_url(
-        self,
-        dd: distribution.Data,
-        staging: bool | None = None,
-    ) -> str:
-        if staging is False:
-            return dd.platform.value.template_url
-
-        supported = {
-            sql.DistributionPlatform.ARTIFACT_HUB,
-            sql.DistributionPlatform.PYPI,
-            sql.DistributionPlatform.MAVEN,
-        }
-        if dd.platform not in supported:
-            raise storage.AccessError("Staging is currently supported only for 
ArtifactHub, PyPI and Maven Central.")
-
-        template_url = dd.platform.value.template_staging_url
-        if template_url is None:
-            raise storage.AccessError("This platform does not provide a 
staging API endpoint.")
-
-        return template_url
-
     async def __upgrade_staging_to_final(
         self,
         release_name: str,
-        platform: sql.DistributionPlatform,
+        platform: models.sql.DistributionPlatform,
         owner_namespace: str | None,
         package: str,
         version: str,
         upload_date: datetime.datetime | None,
-        api_url: str,
+        api_url: str | None,
         web_url: str | None,
-    ) -> sql.Distribution | None:
+    ) -> models.sql.Distribution | None:
         tag = f"{release_name} {platform} {owner_namespace or ''} {package} 
{version}"
         existing = await self.__data.distribution(
             release_name=release_name,
@@ -494,7 +278,7 @@ class CommitteeMember(CommitteeParticipant):
     async def delete_distribution(
         self,
         release_name: str,
-        platform: sql.DistributionPlatform,
+        platform: models.sql.DistributionPlatform,
         owner_namespace: str,
         package: str,
         version: str,
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 38d57d6..ea613bb 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -31,6 +31,7 @@ import atr.tasks.checks.rat as rat
 import atr.tasks.checks.signature as signature
 import atr.tasks.checks.targz as targz
 import atr.tasks.checks.zipformat as zipformat
+import atr.tasks.distribution as distribution
 import atr.tasks.gha as gha
 import atr.tasks.keys as keys
 import atr.tasks.message as message
@@ -80,6 +81,33 @@ async def clear_scheduled(caller_data: db.Session | None = 
None) -> None:
         await data.commit()
 
 
+async def distribution_status_check(
+    asf_uid: str,
+    caller_data: db.Session | None = None,
+    schedule: datetime.datetime | None = None,
+    schedule_next: bool = False,
+) -> sql.Task:
+    """Queue a workflow status update task."""
+    args = distribution.DistributionStatusCheckArgs(next_schedule_seconds=0, 
asf_uid=asf_uid)
+    if schedule_next:
+        args.next_schedule_seconds = 2 * 60
+    async with db.ensure_session(caller_data) as data:
+        task = sql.Task(
+            status=sql.TaskStatus.QUEUED,
+            task_type=sql.TaskType.DISTRIBUTION_STATUS,
+            task_args=args.model_dump(),
+            asf_uid=asf_uid,
+            revision_number=None,
+            primary_rel_path=None,
+        )
+        if schedule:
+            task.scheduled = schedule
+        data.add(task)
+        await data.commit()
+        await data.flush()
+        return task
+
+
 async def draft_checks(
     asf_uid: str, project_name: str, release_version: str, revision_number: 
str, caller_data: db.Session | None = None
 ) -> int:
@@ -222,6 +250,8 @@ def queued(
 
 def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results | None]]:  # noqa: C901
     match task_type:
+        case sql.TaskType.DISTRIBUTION_STATUS:
+            return distribution.status_check
         case sql.TaskType.DISTRIBUTION_WORKFLOW:
             return gha.trigger_workflow
         case sql.TaskType.HASHING_CHECK:
diff --git a/atr/tasks/distribution.py b/atr/tasks/distribution.py
new file mode 100644
index 0000000..541f918
--- /dev/null
+++ b/atr/tasks/distribution.py
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+
+import pydantic
+
+import atr.db as db
+import atr.log as log
+import atr.models.distribution as distribution
+import atr.models.results as results
+import atr.models.schema as schema
+import atr.storage as storage
+import atr.tasks as tasks
+import atr.tasks.checks as checks
+
+_RETRY_LIMIT = 5
+
+
+class DistributionStatusCheckArgs(schema.Strict):
+    """Arguments for the task to re-check distribution statuses."""
+
+    next_schedule_seconds: int = pydantic.Field(default=0, description="The 
next scheduled time")
+    asf_uid: str = schema.description("ASF UID of the user triggering the 
workflow")
+
+
[email protected]_model(DistributionStatusCheckArgs)
+async def status_check(args: DistributionStatusCheckArgs, *, task_id: int | 
None = None) -> results.Results | None:
+    dists = []
+    async with db.session() as data:
+        dists = await data.distribution(pending=True, _with_release=True, 
_with_release_project=True).all()
+    for dist in dists:
+        dd = distribution.Data(
+            platform=dist.platform,
+            owner_namespace=dist.owner_namespace,
+            package=dist.package,
+            version=dist.version,
+            details=False,
+        )
+        try:
+            async with 
storage.write_as_committee_member(str(dist.release.project.committee_name), 
args.asf_uid) as w:
+                if dist.retries >= _RETRY_LIMIT:
+                    await w.distributions.delete_distribution(
+                        dist.release_name, dist.platform, 
dist.owner_namespace, dist.package, dist.version
+                    )
+                    name = f"{dist.platform} {dist.owner_namespace} 
{dist.package} {dist.version}"
+                    log.error(f"Distribution {name} failed {_RETRY_LIMIT} 
times, skipping")
+                    continue
+                await w.distributions.record_from_data(
+                    dist.release_name,
+                    dist.staging,
+                    dd,
+                )
+        except storage.AccessError as e:
+            msg = f"Failed to record distribution: {e}"
+            log.error(msg)
+            raise RuntimeError(msg)
+    if args.next_schedule_seconds:
+        next_schedule = datetime.datetime.now(datetime.UTC) + 
datetime.timedelta(seconds=args.next_schedule_seconds)
+        await tasks.distribution_status_check(args.asf_uid, 
schedule=next_schedule, schedule_next=True)
+        log.info(
+            f"Scheduled next workflow status update for: 
{next_schedule.strftime('%Y-%m-%d %H:%M:%S')}",
+        )
+    return results.DistributionStatusCheck(
+        kind="distribution_status",
+    )
diff --git a/migrations/versions/0042_2026.01.28_3e434625.py 
b/migrations/versions/0042_2026.01.28_3e434625.py
new file mode 100644
index 0000000..0e96b86
--- /dev/null
+++ b/migrations/versions/0042_2026.01.28_3e434625.py
@@ -0,0 +1,31 @@
+"""columns for pending distributions
+
+Revision ID: 0042_2026.01.28_3e434625
+Revises: 0041_2026.01.22_d1e357f5
+Create Date: 2026-01-28 16:30:09.232235+00:00
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+from alembic import op
+
+# Revision identifiers, used by Alembic
+revision: str = "0042_2026.01.28_3e434625"
+down_revision: str | None = "0041_2026.01.22_d1e357f5"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+    with op.batch_alter_table("distribution", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("pending", sa.Boolean(), nullable=False, 
server_default=sa.false()))
+        batch_op.add_column(sa.Column("retries", sa.Integer(), nullable=False, 
server_default="0"))
+        batch_op.alter_column("api_url", existing_type=sa.VARCHAR(), 
nullable=True)
+
+
+def downgrade() -> None:
+    with op.batch_alter_table("distribution", schema=None) as batch_op:
+        batch_op.alter_column("api_url", existing_type=sa.VARCHAR(), 
nullable=False)
+        batch_op.drop_column("retries")
+        batch_op.drop_column("pending")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to