This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new c2da557 Fixes #320
c2da557 is described below
commit c2da557e943aae7b7889f2de5e78eeb306b7014a
Author: Andrew K. Musselman <[email protected]>
AuthorDate: Thu Nov 20 15:40:00 2025 -0800
Fixes #320
---
atr/server.py | 7 +++++--
atr/svn/pubsub.py | 58 ++++++++++++++++++++++++++++++++++++++++---------------
2 files changed, 47 insertions(+), 18 deletions(-)
diff --git a/atr/server.py b/atr/server.py
index 12ef32b..34a721f 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -22,6 +22,7 @@ import contextlib
import datetime
import os
import queue
+import urllib.parse
from collections.abc import Iterable
from typing import Any
@@ -173,8 +174,10 @@ def app_setup_lifecycle(app: base.QuartApp) -> None:
pubsub_url = conf.PUBSUB_URL
pubsub_user = conf.PUBSUB_USER
pubsub_password = conf.PUBSUB_PASSWORD
+ parsed_pubsub_url = urllib.parse.urlparse(pubsub_url) if pubsub_url
else None
+ valid_pubsub_url = bool(parsed_pubsub_url and parsed_pubsub_url.scheme
and parsed_pubsub_url.netloc)
- if pubsub_url and pubsub_user and pubsub_password:
+ if valid_pubsub_url and pubsub_url and pubsub_user and pubsub_password:
log.info("Starting PubSub SVN listener")
listener = pubsub.SVNListener(
working_copy_root=conf.SVN_STORAGE_DIR,
@@ -188,7 +191,7 @@ def app_setup_lifecycle(app: base.QuartApp) -> None:
else:
log.info(
"PubSub SVN listener not started: pubsub_url=%s pubsub_user=%s
pubsub_password=%s",
- bool(pubsub_url),
+ bool(valid_pubsub_url),
bool(pubsub_user),
# Essential to use bool(...) here to avoid logging the password
bool(pubsub_password),
diff --git a/atr/svn/pubsub.py b/atr/svn/pubsub.py
index 2fe1d69..9c53658 100644
--- a/atr/svn/pubsub.py
+++ b/atr/svn/pubsub.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import asyncio
import os
import pathlib
import urllib.parse
@@ -54,24 +55,49 @@ class SVNListener:
"""Run forever, processing PubSub payloads as they arrive."""
# TODO: Add reconnection logic here?
# Or does asfpy.pubsub.listen() already do this?
- log.info("SVNListener.start() called")
- async for payload in asfpy.pubsub.listen(
- # TODO: Upstream this change to BAT
- urllib.parse.urljoin(self.url, self.topics),
- username=self.username,
- password=self.password,
- ):
- if (payload is None) or ("stillalive" in payload):
- continue
+ if not self.url:
+ log.error("PubSub URL is not configured")
+ log.warning("SVNListener disabled: no URL provided")
+ return
- pubsub_path = str(payload.get("pubsub_path", ""))
- if not pubsub_path.startswith(_WATCHED_PREFIXES):
- # Ignore commits outside dist/dev or dist/release
- continue
+ if (not self.username) or (not self.password):
+ log.error("PubSub credentials not configured")
+ log.warning("SVNListener disabled: missing credentials")
+ return
+
+ if not self.url.startswith(("http://", "https://")):
+ log.error(
+ "Invalid PubSub URL: %r. Expected full URL like
'https://pubsub.apache.org:2069'",
+ self.url,
+ )
+ log.warning("SVNListener disabled due to invalid URL")
+ return
+
+ full_url = urllib.parse.urljoin(self.url, self.topics)
+ log.info(f"SVNListener starting with URL: {full_url}")
+
+ try:
+ async for payload in asfpy.pubsub.listen(
+ full_url,
+ username=self.username,
+ password=self.password,
+ ):
+ if (payload is None) or ("stillalive" in payload):
+ continue
- log.debug("PubSub payload: %s", payload)
- await self._process_payload(payload)
- log.info("SVNListener.start() finished")
+ pubsub_path = str(payload.get("pubsub_path", ""))
+ if not pubsub_path.startswith(_WATCHED_PREFIXES):
+ # Ignore commits outside dist/dev or dist/release
+ continue
+ log.debug("PubSub payload: %s", payload)
+ await self._process_payload(payload)
+ except asyncio.CancelledError:
+ log.info("SVNListener cancelled, shutting down gracefully")
+ raise
+ except Exception as exc:
+ log.error("SVNListener error: %s", exc, exc_info=True)
+ finally:
+ log.info("SVNListener.start() finished")
async def _process_payload(self, payload: dict) -> None:
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]