This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new af09a4f Simplify the client configuration code
af09a4f is described below
commit af09a4fb644298bae0ba38421a646a920fbc9761
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jul 4 19:29:33 2025 +0100
Simplify the client configuration code
---
client/atr | 215 ++++++++++++++++++++++++++++---------------------------------
1 file changed, 99 insertions(+), 116 deletions(-)
diff --git a/client/atr b/client/atr
index f5e51d4..19fad3c 100755
--- a/client/atr
+++ b/client/atr
@@ -53,10 +53,8 @@ import datetime
import json
import logging
import os
-import pathlib
import sys
-import tempfile
-from typing import Any
+from typing import TYPE_CHECKING, Any, Literal
import aiohttp # type: ignore[import-not-found]
import cyclopts # type: ignore[import-not-found]
@@ -65,11 +63,14 @@ import jwt # type: ignore[import-not-found]
import platformdirs # type: ignore[import-not-found]
import strictyaml # type: ignore[import-not-found]
+if TYPE_CHECKING:
+ import pathlib
+ from collections.abc import Generator
+
APP: cyclopts.App = cyclopts.App()
CONFIG: cyclopts.App = cyclopts.App(name="config", help="Configuration
operations.")
JWT: cyclopts.App = cyclopts.App(name="jwt", help="JWT operations.")
LOGGER = logging.getLogger(__name__)
-PAT: cyclopts.App = cyclopts.App(name="pat", help="Personal Access Token
operations.")
RELEASE: cyclopts.App = cyclopts.App(name="release", help="Release
operations.")
YAML_DEFAULTS: dict[str, Any] = {"asf": {}, "atr": {}, "tokens": {}}
YAML_SCHEMA: strictyaml.Map = strictyaml.Map(
@@ -87,18 +88,17 @@ YAML_SCHEMA: strictyaml.Map = strictyaml.Map(
APP.command(CONFIG)
APP.command(JWT)
-APP.command(PAT)
APP.command(RELEASE)
@CONFIG.command(name="file", help="Display the configuration file contents.")
def app_config_file() -> None:
- cfg = config_path()
- if not cfg.exists():
+ path = config_path()
+ if not path.exists():
LOGGER.error("No configuration file found.")
sys.exit(1)
- with cfg.open("r", encoding="utf-8") as fh:
+ with path.open("r", encoding="utf-8") as fh:
for chunk in fh:
print(chunk, end="")
@@ -115,42 +115,31 @@ def app_drop(path: str) -> None:
LOGGER.error("Not a valid configuration key")
sys.exit(1)
- with config_lock():
- cfg = config_read()
- if not config_nested_drop(cfg, parts):
+ with config_lock(write=True) as config:
+ present, _ = config_walk(config, parts, "drop")
+ if not present:
LOGGER.error(f"Could not find {path} in the configuration file")
sys.exit(1)
- config_write(cfg)
LOGGER.info(f"Removed {path}.")
[email protected](name="drop", help="Remove the stored Personal Access Token.")
-def app_pat_drop() -> None:
- with config_lock():
- cfg = config_read()
- if cfg.get("tokens", {}).pop("pat", None) is not None:
- if not cfg["tokens"]:
- cfg.pop("tokens")
- config_write(cfg)
- LOGGER.info("Personal Access Token removed.")
- else:
- LOGGER.error("No Personal Access Token stored.")
- sys.exit(1)
-
-
@JWT.command(name="dump", help="Show decoded JWT payload from stored config.")
def app_jwt_dump() -> None:
- with config_lock():
- cfg = config_read()
- jwt_token = cfg.get("tokens", {}).get("jwt")
+ with config_lock() as config:
+ jwt_value = config_get(config, ["tokens", "jwt"])
- if jwt_token is None:
+ if jwt_value is None:
LOGGER.error("No JWT stored in configuration.")
sys.exit(1)
+ header = jwt.get_unverified_header(jwt_value)
+ if header != {"alg": "HS256", "typ": "JWT"}:
+ LOGGER.error("Invalid JWT header.")
+ sys.exit(1)
+
try:
- payload = jwt.decode(jwt_token, options={"verify_signature": False})
+ payload = jwt.decode(jwt_value, options={"verify_signature": False})
except jwt.PyJWTError as e:
LOGGER.error(f"Failed to decode JWT: {e}")
sys.exit(1)
@@ -160,16 +149,15 @@ def app_jwt_dump() -> None:
@JWT.command(name="info", help="Show JWT payload in human-readable form.")
def app_jwt_info() -> None:
- with config_lock():
- cfg = config_read()
- token = cfg.get("tokens", {}).get("jwt")
+ with config_lock() as config:
+ jwt_value = config_get(config, ["tokens", "jwt"])
- if token is None:
+ if jwt_value is None:
LOGGER.error("No JWT stored in configuration.")
sys.exit(1)
try:
- payload = jwt.decode(token, options={"verify_signature": False})
+ payload = jwt.decode(jwt_value, options={"verify_signature": False})
except jwt.PyJWTError as e:
LOGGER.error(f"Failed to decode JWT: {e}")
sys.exit(1)
@@ -185,32 +173,28 @@ def app_jwt_info() -> None:
@JWT.command(name="refresh", help="Fetch a JWT using the stored PAT and store
it in config.")
def app_jwt_refresh(asf_uid: str | None = None) -> None:
- with config_lock():
- cfg = config_read()
- pat_token = cfg.get("tokens", {}).get("pat")
+ with config_lock() as config:
+ pat_value = config_get(config, ["tokens", "pat"])
- if pat_token is None:
+ if pat_value is None:
LOGGER.error("No Personal Access Token stored.")
sys.exit(1)
- host = cfg.get("atr", {}).get("host", "release-test.apache.org")
+ host = config.get("atr", {}).get("host", "release-test.apache.org")
url = f"https://{host}/api/jwt"
if asf_uid is None:
- asf_uid = cfg.get("asf", {}).get("uid")
+ asf_uid = config.get("asf", {}).get("uid")
if asf_uid is None:
LOGGER.error("No ASF UID provided and asf.uid not configured.")
sys.exit(1)
verify_ssl = not host.startswith("127.0.0.1")
- jwt_token = asyncio.run(web_fetch(url, asf_uid, pat_token, verify_ssl))
+ jwt_token = asyncio.run(web_fetch(url, asf_uid, pat_value, verify_ssl))
- with config_lock():
- cfg = config_read()
- cfg.setdefault("tokens", {})
- cfg["tokens"]["jwt"] = jwt_token
- config_write(cfg)
+ with config_lock(write=True) as config:
+ config_set(config, ["tokens", "jwt"], jwt_token)
print(jwt_token)
@@ -222,21 +206,20 @@ def app_jwt_show() -> None:
@RELEASE.command(name="start", help="Start a release.")
def app_release_start(project: str, version: str) -> None:
- with config_lock():
- cfg = config_read()
- jwt_token = cfg.get("tokens", {}).get("jwt")
+ with config_lock() as config:
+ jwt_value = config_get(config, ["tokens", "jwt"])
- if jwt_token is None:
+ if jwt_value is None:
LOGGER.error("No JWT stored in configuration.")
sys.exit(1)
- host = cfg.get("atr", {}).get("host", "release-test.apache.org")
+ host = config.get("atr", {}).get("host", "release-test.apache.org")
url = f"https://{host}/api/releases/create"
payload: dict[str, str] = {"project_name": project, "version": version}
verify_ssl = not host.startswith("127.0.0.1")
- result = asyncio.run(web_post(url, payload, jwt_token, verify_ssl))
+ result = asyncio.run(web_post(url, payload, jwt_value, verify_ssl))
print(result)
@@ -247,10 +230,8 @@ def app_set(path: str, value: str) -> None:
LOGGER.error("Not a valid configuration key.")
sys.exit(1)
- with config_lock():
- cfg = config_read()
- config_nested_set(cfg, parts, value)
- config_write(cfg)
+ with config_lock(write=True) as config:
+ config_set(config, path.split("."), value)
LOGGER.info(f"Set {path} to {value}.")
@@ -262,8 +243,8 @@ def app_show(path: str) -> None:
LOGGER.error("Not a valid configuration key.")
sys.exit(1)
- with config_lock():
- value = config_nested_get(config_read(), parts)
+ with config_lock() as config:
+ value = config_get(config, parts)
if value is None:
LOGGER.error(f"Could not find {path} in the configuration file.")
@@ -272,45 +253,22 @@ def app_show(path: str) -> None:
print(value)
+def config_drop(cfg: dict[str, Any], parts: list[str]) -> None:
+ config_walk(cfg, parts, "drop")
+
+
+def config_get(cfg: dict[str, Any], parts: list[str]) -> Any | None:
+ return config_walk(cfg, parts, "get")[1]
+
+
@contextlib.contextmanager
-def config_lock():
+def config_lock(write: bool = False) -> Generator[dict[str, Any]]:
lock = filelock.FileLock(str(config_path()) + ".lock")
with lock:
- yield
-
-
-def config_nested_drop(cfg: dict[str, Any], parts: list[str]) -> bool:
- stack: list[tuple[dict[str, Any], str]] = []
- cur: dict[str, Any] = cfg
- for key in parts[:-1]:
- if key not in cur or not isinstance(cur[key], dict):
- return False
- stack.append((cur, key))
- cur = cur[key]
- if parts[-1] not in cur:
- return False
- cur.pop(parts[-1])
- while stack:
- parent, key = stack.pop()
- if not parent[key]:
- parent.pop(key)
- return True
-
-
-def config_nested_get(cfg: dict[str, Any], parts: list[str]) -> Any | None:
- cur: Any = cfg
- for key in parts:
- if not isinstance(cur, dict) or key not in cur:
- return None
- cur = cur[key]
- return cur
-
-
-def config_nested_set(cfg: dict[str, Any], parts: list[str], value: str) ->
None:
- cur: dict[str, Any] = cfg
- for key in parts[:-1]:
- cur = cur.setdefault(key, {})
- cur[parts[-1]] = value
+ config = config_read()
+ yield config
+ if write is True:
+ config_write(config)
def config_path() -> pathlib.Path:
@@ -327,26 +285,51 @@ def config_read() -> dict[str, Any]:
return YAML_DEFAULTS.copy()
-def config_write(data: dict[str, Any]) -> None:
- config_file = config_path()
-
- if not any(bool(v) for v in data.values()):
- if config_file.exists():
- config_file.unlink()
- return
+def config_set(cfg: dict[str, Any], parts: list[str], val: Any) -> None:
+ config_walk(cfg, parts, "set", val)
+
+
+def config_walk(
+ config: dict[str, Any], parts: list[str], op: Literal["drop", "get",
"set"], value: Any | None = None
+) -> tuple[bool, Any | None]:
+ match (op, parts):
+ case ("get", [k, *tail]) if tail:
+ return config_walk(config.get(k, {}), tail, op)
+ case ("get", [k]):
+ return (k in config), config.get(k)
+ case ("set", [k, *tail]) if tail:
+ child = config.setdefault(k, {})
+ changed, _ = config_walk(child, tail, op, value)
+ return changed, value
+ case ("set", [k]):
+ changed = config.get(k) != value
+ config[k] = value
+ return changed, value
+ case ("drop", [k, *tail]) if tail:
+ if (k not in config) or (not isinstance(config[k], dict)):
+ return False, None
+ changed, removed_value = config_walk(config[k], tail, op)
+ if changed and not config[k]:
+ config.pop(k)
+ return changed, removed_value
+ case ("drop", [k]):
+ if k in config:
+ removed_value = config.pop(k)
+ return True, removed_value
+ return False, None
+ raise ValueError(f"Invalid operation: {op} with parts: {parts}")
- config_file.parent.mkdir(parents=True, exist_ok=True)
- yaml_doc = strictyaml.as_document(data, YAML_SCHEMA)
- yaml_str = yaml_doc.as_yaml()
-
- with tempfile.NamedTemporaryFile("w", encoding="utf-8",
dir=config_file.parent, delete=False) as tmp:
- tmp.write(yaml_str)
- tmp.flush()
- os.fsync(tmp.fileno())
- temp_path = pathlib.Path(tmp.name)
-
- os.replace(temp_path, config_file)
+def config_write(data: dict[str, Any]) -> None:
+ path = config_path()
+ if not any(data.values()):
+ if path.exists():
+ path.unlink()
+ return
+ tmp = path.with_suffix(".tmp")
+ tmp.parent.mkdir(parents=True, exist_ok=True)
+ tmp.write_text(strictyaml.as_document(data, YAML_SCHEMA).as_yaml(),
"utf-8")
+ os.replace(tmp, path)
def main() -> None:
@@ -366,7 +349,7 @@ def timestamp_format(ts: int | str | None) -> str | None:
async def web_fetch(url: str, asfuid: str, pat_token: str, verify_ssl: bool =
True) -> str:
- connector = aiohttp.TCPConnector(ssl=verify_ssl) if (not verify_ssl) else
None
+ connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
payload = {"asfuid": asfuid, "pat": pat_token}
async with session.post(url, json=payload) as resp:
@@ -382,7 +365,7 @@ async def web_fetch(url: str, asfuid: str, pat_token: str,
verify_ssl: bool = Tr
async def web_post(url: str, payload: dict[str, Any], jwt_token: str,
verify_ssl: bool = True) -> Any:
- connector = aiohttp.TCPConnector(ssl=verify_ssl) if (not verify_ssl) else
None
+ connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
headers = {"Authorization": f"Bearer {jwt_token}"}
async with aiohttp.ClientSession(connector=connector, headers=headers) as
session:
async with session.post(url, json=payload) as resp:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]