This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new af09a4f  Simplify the client configuration code
af09a4f is described below

commit af09a4fb644298bae0ba38421a646a920fbc9761
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jul 4 19:29:33 2025 +0100

    Simplify the client configuration code
---
 client/atr | 215 ++++++++++++++++++++++++++++---------------------------------
 1 file changed, 99 insertions(+), 116 deletions(-)

diff --git a/client/atr b/client/atr
index f5e51d4..19fad3c 100755
--- a/client/atr
+++ b/client/atr
@@ -53,10 +53,8 @@ import datetime
 import json
 import logging
 import os
-import pathlib
 import sys
-import tempfile
-from typing import Any
+from typing import TYPE_CHECKING, Any, Literal
 
 import aiohttp  # type: ignore[import-not-found]
 import cyclopts  # type: ignore[import-not-found]
@@ -65,11 +63,14 @@ import jwt  # type: ignore[import-not-found]
 import platformdirs  # type: ignore[import-not-found]
 import strictyaml  # type: ignore[import-not-found]
 
+if TYPE_CHECKING:
+    import pathlib
+    from collections.abc import Generator
+
 APP: cyclopts.App = cyclopts.App()
 CONFIG: cyclopts.App = cyclopts.App(name="config", help="Configuration 
operations.")
 JWT: cyclopts.App = cyclopts.App(name="jwt", help="JWT operations.")
 LOGGER = logging.getLogger(__name__)
-PAT: cyclopts.App = cyclopts.App(name="pat", help="Personal Access Token 
operations.")
 RELEASE: cyclopts.App = cyclopts.App(name="release", help="Release 
operations.")
 YAML_DEFAULTS: dict[str, Any] = {"asf": {}, "atr": {}, "tokens": {}}
 YAML_SCHEMA: strictyaml.Map = strictyaml.Map(
@@ -87,18 +88,17 @@ YAML_SCHEMA: strictyaml.Map = strictyaml.Map(
 
 APP.command(CONFIG)
 APP.command(JWT)
-APP.command(PAT)
 APP.command(RELEASE)
 
 
 @CONFIG.command(name="file", help="Display the configuration file contents.")
 def app_config_file() -> None:
-    cfg = config_path()
-    if not cfg.exists():
+    path = config_path()
+    if not path.exists():
         LOGGER.error("No configuration file found.")
         sys.exit(1)
 
-    with cfg.open("r", encoding="utf-8") as fh:
+    with path.open("r", encoding="utf-8") as fh:
         for chunk in fh:
             print(chunk, end="")
 
@@ -115,42 +115,31 @@ def app_drop(path: str) -> None:
         LOGGER.error("Not a valid configuration key")
         sys.exit(1)
 
-    with config_lock():
-        cfg = config_read()
-        if not config_nested_drop(cfg, parts):
+    with config_lock(write=True) as config:
+        present, _ = config_walk(config, parts, "drop")
+        if not present:
             LOGGER.error(f"Could not find {path} in the configuration file")
             sys.exit(1)
-        config_write(cfg)
 
     LOGGER.info(f"Removed {path}.")
 
 
[email protected](name="drop", help="Remove the stored Personal Access Token.")
-def app_pat_drop() -> None:
-    with config_lock():
-        cfg = config_read()
-        if cfg.get("tokens", {}).pop("pat", None) is not None:
-            if not cfg["tokens"]:
-                cfg.pop("tokens")
-            config_write(cfg)
-            LOGGER.info("Personal Access Token removed.")
-        else:
-            LOGGER.error("No Personal Access Token stored.")
-            sys.exit(1)
-
-
 @JWT.command(name="dump", help="Show decoded JWT payload from stored config.")
 def app_jwt_dump() -> None:
-    with config_lock():
-        cfg = config_read()
-        jwt_token = cfg.get("tokens", {}).get("jwt")
+    with config_lock() as config:
+        jwt_value = config_get(config, ["tokens", "jwt"])
 
-    if jwt_token is None:
+    if jwt_value is None:
         LOGGER.error("No JWT stored in configuration.")
         sys.exit(1)
 
+    header = jwt.get_unverified_header(jwt_value)
+    if header != {"alg": "HS256", "typ": "JWT"}:
+        LOGGER.error("Invalid JWT header.")
+        sys.exit(1)
+
     try:
-        payload = jwt.decode(jwt_token, options={"verify_signature": False})
+        payload = jwt.decode(jwt_value, options={"verify_signature": False})
     except jwt.PyJWTError as e:
         LOGGER.error(f"Failed to decode JWT: {e}")
         sys.exit(1)
@@ -160,16 +149,15 @@ def app_jwt_dump() -> None:
 
 @JWT.command(name="info", help="Show JWT payload in human-readable form.")
 def app_jwt_info() -> None:
-    with config_lock():
-        cfg = config_read()
-        token = cfg.get("tokens", {}).get("jwt")
+    with config_lock() as config:
+        jwt_value = config_get(config, ["tokens", "jwt"])
 
-    if token is None:
+    if jwt_value is None:
         LOGGER.error("No JWT stored in configuration.")
         sys.exit(1)
 
     try:
-        payload = jwt.decode(token, options={"verify_signature": False})
+        payload = jwt.decode(jwt_value, options={"verify_signature": False})
     except jwt.PyJWTError as e:
         LOGGER.error(f"Failed to decode JWT: {e}")
         sys.exit(1)
@@ -185,32 +173,28 @@ def app_jwt_info() -> None:
 
 @JWT.command(name="refresh", help="Fetch a JWT using the stored PAT and store 
it in config.")
 def app_jwt_refresh(asf_uid: str | None = None) -> None:
-    with config_lock():
-        cfg = config_read()
-        pat_token = cfg.get("tokens", {}).get("pat")
+    with config_lock() as config:
+        pat_value = config_get(config, ["tokens", "pat"])
 
-    if pat_token is None:
+    if pat_value is None:
         LOGGER.error("No Personal Access Token stored.")
         sys.exit(1)
 
-    host = cfg.get("atr", {}).get("host", "release-test.apache.org")
+    host = config.get("atr", {}).get("host", "release-test.apache.org")
     url = f"https://{host}/api/jwt";
 
     if asf_uid is None:
-        asf_uid = cfg.get("asf", {}).get("uid")
+        asf_uid = config.get("asf", {}).get("uid")
 
     if asf_uid is None:
         LOGGER.error("No ASF UID provided and asf.uid not configured.")
         sys.exit(1)
 
     verify_ssl = not host.startswith("127.0.0.1")
-    jwt_token = asyncio.run(web_fetch(url, asf_uid, pat_token, verify_ssl))
+    jwt_token = asyncio.run(web_fetch(url, asf_uid, pat_value, verify_ssl))
 
-    with config_lock():
-        cfg = config_read()
-        cfg.setdefault("tokens", {})
-        cfg["tokens"]["jwt"] = jwt_token
-        config_write(cfg)
+    with config_lock(write=True) as config:
+        config_set(config, ["tokens", "jwt"], jwt_token)
 
     print(jwt_token)
 
@@ -222,21 +206,20 @@ def app_jwt_show() -> None:
 
 @RELEASE.command(name="start", help="Start a release.")
 def app_release_start(project: str, version: str) -> None:
-    with config_lock():
-        cfg = config_read()
-        jwt_token = cfg.get("tokens", {}).get("jwt")
+    with config_lock() as config:
+        jwt_value = config_get(config, ["tokens", "jwt"])
 
-    if jwt_token is None:
+    if jwt_value is None:
         LOGGER.error("No JWT stored in configuration.")
         sys.exit(1)
 
-    host = cfg.get("atr", {}).get("host", "release-test.apache.org")
+    host = config.get("atr", {}).get("host", "release-test.apache.org")
     url = f"https://{host}/api/releases/create";
 
     payload: dict[str, str] = {"project_name": project, "version": version}
 
     verify_ssl = not host.startswith("127.0.0.1")
-    result = asyncio.run(web_post(url, payload, jwt_token, verify_ssl))
+    result = asyncio.run(web_post(url, payload, jwt_value, verify_ssl))
     print(result)
 
 
@@ -247,10 +230,8 @@ def app_set(path: str, value: str) -> None:
         LOGGER.error("Not a valid configuration key.")
         sys.exit(1)
 
-    with config_lock():
-        cfg = config_read()
-        config_nested_set(cfg, parts, value)
-        config_write(cfg)
+    with config_lock(write=True) as config:
+        config_set(config, path.split("."), value)
 
     LOGGER.info(f"Set {path} to {value}.")
 
@@ -262,8 +243,8 @@ def app_show(path: str) -> None:
         LOGGER.error("Not a valid configuration key.")
         sys.exit(1)
 
-    with config_lock():
-        value = config_nested_get(config_read(), parts)
+    with config_lock() as config:
+        value = config_get(config, parts)
 
     if value is None:
         LOGGER.error(f"Could not find {path} in the configuration file.")
@@ -272,45 +253,22 @@ def app_show(path: str) -> None:
     print(value)
 
 
+def config_drop(cfg: dict[str, Any], parts: list[str]) -> None:
+    config_walk(cfg, parts, "drop")
+
+
+def config_get(cfg: dict[str, Any], parts: list[str]) -> Any | None:
+    return config_walk(cfg, parts, "get")[1]
+
+
 @contextlib.contextmanager
-def config_lock():
+def config_lock(write: bool = False) -> Generator[dict[str, Any]]:
     lock = filelock.FileLock(str(config_path()) + ".lock")
     with lock:
-        yield
-
-
-def config_nested_drop(cfg: dict[str, Any], parts: list[str]) -> bool:
-    stack: list[tuple[dict[str, Any], str]] = []
-    cur: dict[str, Any] = cfg
-    for key in parts[:-1]:
-        if key not in cur or not isinstance(cur[key], dict):
-            return False
-        stack.append((cur, key))
-        cur = cur[key]
-    if parts[-1] not in cur:
-        return False
-    cur.pop(parts[-1])
-    while stack:
-        parent, key = stack.pop()
-        if not parent[key]:
-            parent.pop(key)
-    return True
-
-
-def config_nested_get(cfg: dict[str, Any], parts: list[str]) -> Any | None:
-    cur: Any = cfg
-    for key in parts:
-        if not isinstance(cur, dict) or key not in cur:
-            return None
-        cur = cur[key]
-    return cur
-
-
-def config_nested_set(cfg: dict[str, Any], parts: list[str], value: str) -> 
None:
-    cur: dict[str, Any] = cfg
-    for key in parts[:-1]:
-        cur = cur.setdefault(key, {})
-    cur[parts[-1]] = value
+        config = config_read()
+        yield config
+        if write is True:
+            config_write(config)
 
 
 def config_path() -> pathlib.Path:
@@ -327,26 +285,51 @@ def config_read() -> dict[str, Any]:
     return YAML_DEFAULTS.copy()
 
 
-def config_write(data: dict[str, Any]) -> None:
-    config_file = config_path()
-
-    if not any(bool(v) for v in data.values()):
-        if config_file.exists():
-            config_file.unlink()
-        return
+def config_set(cfg: dict[str, Any], parts: list[str], val: Any) -> None:
+    config_walk(cfg, parts, "set", val)
+
+
+def config_walk(
+    config: dict[str, Any], parts: list[str], op: Literal["drop", "get", 
"set"], value: Any | None = None
+) -> tuple[bool, Any | None]:
+    match (op, parts):
+        case ("get", [k, *tail]) if tail:
+            return config_walk(config.get(k, {}), tail, op)
+        case ("get", [k]):
+            return (k in config), config.get(k)
+        case ("set", [k, *tail]) if tail:
+            child = config.setdefault(k, {})
+            changed, _ = config_walk(child, tail, op, value)
+            return changed, value
+        case ("set", [k]):
+            changed = config.get(k) != value
+            config[k] = value
+            return changed, value
+        case ("drop", [k, *tail]) if tail:
+            if (k not in config) or (not isinstance(config[k], dict)):
+                return False, None
+            changed, removed_value = config_walk(config[k], tail, op)
+            if changed and not config[k]:
+                config.pop(k)
+            return changed, removed_value
+        case ("drop", [k]):
+            if k in config:
+                removed_value = config.pop(k)
+                return True, removed_value
+            return False, None
+    raise ValueError(f"Invalid operation: {op} with parts: {parts}")
 
-    config_file.parent.mkdir(parents=True, exist_ok=True)
 
-    yaml_doc = strictyaml.as_document(data, YAML_SCHEMA)
-    yaml_str = yaml_doc.as_yaml()
-
-    with tempfile.NamedTemporaryFile("w", encoding="utf-8", 
dir=config_file.parent, delete=False) as tmp:
-        tmp.write(yaml_str)
-        tmp.flush()
-        os.fsync(tmp.fileno())
-        temp_path = pathlib.Path(tmp.name)
-
-    os.replace(temp_path, config_file)
+def config_write(data: dict[str, Any]) -> None:
+    path = config_path()
+    if not any(data.values()):
+        if path.exists():
+            path.unlink()
+        return
+    tmp = path.with_suffix(".tmp")
+    tmp.parent.mkdir(parents=True, exist_ok=True)
+    tmp.write_text(strictyaml.as_document(data, YAML_SCHEMA).as_yaml(), 
"utf-8")
+    os.replace(tmp, path)
 
 
 def main() -> None:
@@ -366,7 +349,7 @@ def timestamp_format(ts: int | str | None) -> str | None:
 
 
 async def web_fetch(url: str, asfuid: str, pat_token: str, verify_ssl: bool = 
True) -> str:
-    connector = aiohttp.TCPConnector(ssl=verify_ssl) if (not verify_ssl) else 
None
+    connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
     async with aiohttp.ClientSession(connector=connector) as session:
         payload = {"asfuid": asfuid, "pat": pat_token}
         async with session.post(url, json=payload) as resp:
@@ -382,7 +365,7 @@ async def web_fetch(url: str, asfuid: str, pat_token: str, 
verify_ssl: bool = Tr
 
 
 async def web_post(url: str, payload: dict[str, Any], jwt_token: str, 
verify_ssl: bool = True) -> Any:
-    connector = aiohttp.TCPConnector(ssl=verify_ssl) if (not verify_ssl) else 
None
+    connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
     headers = {"Authorization": f"Bearer {jwt_token}"}
     async with aiohttp.ClientSession(connector=connector, headers=headers) as 
session:
         async with session.post(url, json=payload) as resp:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to