This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch sbp in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 7a8773993451bdd1ddbafb3e11cf9d66fa23f9b8 Author: Sean B. Palmer <[email protected]> AuthorDate: Wed Apr 1 19:01:25 2026 +0100 Migrate code from asfpy to the pubsub module --- atr/pubsub.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- pyproject.toml | 1 + 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/atr/pubsub.py b/atr/pubsub.py index 3c23cf93..c3cb7b3a 100644 --- a/atr/pubsub.py +++ b/atr/pubsub.py @@ -16,17 +16,28 @@ # under the License. import asyncio +import json import os import pathlib import urllib.parse +from collections.abc import AsyncGenerator from typing import Any -import asfpy.pubsub +import aiohttp import atr.ldap as ldap import atr.log as log import atr.svn.commits as commits +# The server sends keepalives every 5 seconds, so we should see +# activity well within this timeout period. +_DEFAULT_INACTIVITY_TIMEOUT = 11 +### for debug: +# _DEFAULT_INACTIVITY_TIMEOUT = 4.5 + +# Default read buffer size. Max payload size in pypubsub is 256kb (plus metadata and JSON overhead) +_DEFAULT_READ_BUFFER_SIZE = 300 * 1024 + def is_ldap_payload(payload: dict[str, Any]) -> bool: return "ldap" in payload.get("pubsub_topics", []) @@ -36,6 +47,99 @@ def is_commit_payload(payload: dict[str, Any]) -> bool: return "commit" in payload.get("pubsub_topics", []) +# +# TYPICAL USAGE: +# +# async for payload in listen(PUBSUB_URL): +# +# This will produce a series of payloads, forever. +# +# NOTE: this listener is intended for pypubsub, which terminates +# payloads with a newline. The old svnpubsub used NUL characters, +# so this client will not work with that server. +# + + +async def listen( + pubsub_url: str, + username: str | None = None, + password: str | None = None, + sock_read: float | None = None, + buffersize: int | None = None, +) -> AsyncGenerator[Any | None]: + if username: + if password is None: + raise ValueError("PubSub password is required") + auth = aiohttp.BasicAuth(username, password) + else: + auth = None + + if sock_read is None: + sock_read = _DEFAULT_INACTIVITY_TIMEOUT + ct = aiohttp.ClientTimeout(sock_read=sock_read) + + if buffersize is None: + buffersize = _DEFAULT_READ_BUFFER_SIZE + + async with aiohttp.ClientSession(auth=auth, timeout=ct, read_bufsize=buffersize) as session: + # Retry immediately, and then back it off. + delay = 0.0 + + ### tbd: look at event loop, to see if it has been halted + while True: + log.debug("Opening new connection...") + try: + async for payload in _process_connection(session, pubsub_url): + if not payload: + pass ### tbd?: event loop killed or hit EOF + + # We got a payload, so reset the DELAY. + delay = 0.0 + + yield payload + + except ( + ConnectionRefusedError, + aiohttp.ClientConnectorError, + aiohttp.ServerTimeoutError, + aiohttp.ClientPayloadError, + ) as e: + log.error(f"Connection failed ({type(e).__name__}: {e}), reconnecting in {delay} seconds") + await asyncio.sleep(delay) + + # Back off on the delay. Step it up from 0s, doubling each + # time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30. + delay = min(30.0, (delay + 1.0) * 2) + + +async def _process_connection(session, pubsub_url): + # Connect to pubsub and listen for payloads. + async with session.get(pubsub_url) as conn: + # print('LIMITS:', conn.content.get_read_buffer_limits()) + + while True: + # The pubsub server defines stream payloads as: + # ENCODED_JSON(payload)+"\n" + # + # Due to the encoding, bare newlines will not occur + # within the encoded part. Thus, we can read content + # until we find a newline. + # + # Note: this newline is in RAW, but the json loader + # ignores it. + try: + raw = await conn.content.readuntil(b"\n") + except ValueError as e: + log.error(f'Saw "{e}"; re-raising as ClientPayloadError to close/reconnect') + raise aiohttp.ClientPayloadError("re-raised from ValueError in readuntil()") + + if not raw: + # We just hit EOF. + yield None + + yield json.loads(raw) + + class PubSubListener: def __init__( self, @@ -76,7 +180,7 @@ class PubSubListener: log.info(f"PubSubListener starting with URL: {full_url}") try: - async for payload in asfpy.pubsub.listen( + async for payload in listen( full_url, username=self.username, password=self.password, diff --git a/pyproject.toml b/pyproject.toml index 08893f90..c105a0a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ dependencies = [ "quart-rate-limiter>=0.12.1", "quart-schema[pydantic]~=0.21", "quart-wtforms~=1.0.3", + "requests>=2.33.0", "rich>=14.0.0,<15.0.0", "semver>=3.0.4", "sqlmodel~=0.0.24", --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
