This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 7a8773993451bdd1ddbafb3e11cf9d66fa23f9b8
Author: Sean B. Palmer <[email protected]>
AuthorDate: Wed Apr 1 19:01:25 2026 +0100

    Migrate code from asfpy to the pubsub module
---
 atr/pubsub.py  | 108 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 pyproject.toml |   1 +
 2 files changed, 107 insertions(+), 2 deletions(-)

diff --git a/atr/pubsub.py b/atr/pubsub.py
index 3c23cf93..c3cb7b3a 100644
--- a/atr/pubsub.py
+++ b/atr/pubsub.py
@@ -16,17 +16,28 @@
 # under the License.
 
 import asyncio
+import json
 import os
 import pathlib
 import urllib.parse
+from collections.abc import AsyncGenerator
 from typing import Any
 
-import asfpy.pubsub
+import aiohttp
 
 import atr.ldap as ldap
 import atr.log as log
 import atr.svn.commits as commits
 
+# The server sends keepalives every 5 seconds, so we should see
+# activity well within this timeout period.
+_DEFAULT_INACTIVITY_TIMEOUT = 11
+### for debug:
+# _DEFAULT_INACTIVITY_TIMEOUT = 4.5
+
+# Default read buffer size. Max payload size in pypubsub is 256kb (plus 
metadata and JSON overhead)
+_DEFAULT_READ_BUFFER_SIZE = 300 * 1024
+
 
 def is_ldap_payload(payload: dict[str, Any]) -> bool:
     return "ldap" in payload.get("pubsub_topics", [])
@@ -36,6 +47,99 @@ def is_commit_payload(payload: dict[str, Any]) -> bool:
     return "commit" in payload.get("pubsub_topics", [])
 
 
+#
+# TYPICAL USAGE:
+#
+#   async for payload in listen(PUBSUB_URL):
+#
+# This will produce a series of payloads, forever.
+#
+# NOTE: this listener is intended for pypubsub, which terminates
+#   payloads with a newline. The old svnpubsub used NUL characters,
+#   so this client will not work with that server.
+#
+
+
+async def listen(
+    pubsub_url: str,
+    username: str | None = None,
+    password: str | None = None,
+    sock_read: float | None = None,
+    buffersize: int | None = None,
+) -> AsyncGenerator[Any | None]:
+    if username:
+        if password is None:
+            raise ValueError("PubSub password is required")
+        auth = aiohttp.BasicAuth(username, password)
+    else:
+        auth = None
+
+    if sock_read is None:
+        sock_read = _DEFAULT_INACTIVITY_TIMEOUT
+    ct = aiohttp.ClientTimeout(sock_read=sock_read)
+
+    if buffersize is None:
+        buffersize = _DEFAULT_READ_BUFFER_SIZE
+
+    async with aiohttp.ClientSession(auth=auth, timeout=ct, 
read_bufsize=buffersize) as session:
+        # Retry immediately, and then back it off.
+        delay = 0.0
+
+        ### tbd: look at event loop, to see if it has been halted
+        while True:
+            log.debug("Opening new connection...")
+            try:
+                async for payload in _process_connection(session, pubsub_url):
+                    if not payload:
+                        pass  ### tbd?: event loop killed or hit EOF
+
+                    # We got a payload, so reset the DELAY.
+                    delay = 0.0
+
+                    yield payload
+
+            except (
+                ConnectionRefusedError,
+                aiohttp.ClientConnectorError,
+                aiohttp.ServerTimeoutError,
+                aiohttp.ClientPayloadError,
+            ) as e:
+                log.error(f"Connection failed ({type(e).__name__}: {e}), 
reconnecting in {delay} seconds")
+                await asyncio.sleep(delay)
+
+                # Back off on the delay. Step it up from 0s, doubling each
+                # time, and top out at 30s retry. Steps: 0, 2, 6, 14, 30.
+                delay = min(30.0, (delay + 1.0) * 2)
+
+
+async def _process_connection(session, pubsub_url):
+    # Connect to pubsub and listen for payloads.
+    async with session.get(pubsub_url) as conn:
+        # print('LIMITS:', conn.content.get_read_buffer_limits())
+
+        while True:
+            # The pubsub server defines stream payloads as:
+            #    ENCODED_JSON(payload)+"\n"
+            #
+            # Due to the encoding, bare newlines will not occur
+            # within the encoded part. Thus, we can read content
+            # until we find a newline.
+            #
+            # Note: this newline is in RAW, but the json loader
+            # ignores it.
+            try:
+                raw = await conn.content.readuntil(b"\n")
+            except ValueError as e:
+                log.error(f'Saw "{e}"; re-raising as ClientPayloadError to 
close/reconnect')
+                raise aiohttp.ClientPayloadError("re-raised from ValueError in 
readuntil()")
+
+            if not raw:
+                # We just hit EOF.
+                yield None
+
+            yield json.loads(raw)
+
+
 class PubSubListener:
     def __init__(
         self,
@@ -76,7 +180,7 @@ class PubSubListener:
         log.info(f"PubSubListener starting with URL: {full_url}")
 
         try:
-            async for payload in asfpy.pubsub.listen(
+            async for payload in listen(
                 full_url,
                 username=self.username,
                 password=self.password,
diff --git a/pyproject.toml b/pyproject.toml
index 08893f90..c105a0a2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -48,6 +48,7 @@ dependencies = [
   "quart-rate-limiter>=0.12.1",
   "quart-schema[pydantic]~=0.21",
   "quart-wtforms~=1.0.3",
+  "requests>=2.33.0",
   "rich>=14.0.0,<15.0.0",
   "semver>=3.0.4",
   "sqlmodel~=0.0.24",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to