This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new 327d594 Add commands to inspect the content of JWTs
327d594 is described below
commit 327d594890615bf8ff13fb8141785550f8445764
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jul 4 17:21:35 2025 +0100
Add commands to inspect the content of JWTs
---
client/atr | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
1 file changed, 86 insertions(+), 8 deletions(-)
diff --git a/client/atr b/client/atr
index 0670054..5eea599 100755
--- a/client/atr
+++ b/client/atr
@@ -24,6 +24,7 @@
# "cyclopts",
# "filelock",
# "platformdirs",
+# "pyjwt",
# "strictyaml",
# ]
# [tool.uv]
@@ -48,6 +49,8 @@ from __future__ import annotations
import asyncio
import contextlib
+import datetime
+import json
import logging
import os
import pathlib
@@ -58,11 +61,13 @@ from typing import Any
import aiohttp # type: ignore[import-not-found]
import cyclopts # type: ignore[import-not-found]
import filelock # type: ignore[import-not-found]
+import jwt # type: ignore[import-not-found]
import platformdirs # type: ignore[import-not-found]
import strictyaml # type: ignore[import-not-found]
APP: cyclopts.App = cyclopts.App()
CONFIG: cyclopts.App = cyclopts.App(name="config", help="Configuration
operations.")
+JWT: cyclopts.App = cyclopts.App(name="jwt", help="JWT operations.")
LOGGER = logging.getLogger(__name__)
PAT: cyclopts.App = cyclopts.App(name="pat", help="Personal Access Token
operations.")
RELEASE: cyclopts.App = cyclopts.App(name="release", help="Release
operations.")
@@ -71,11 +76,17 @@ YAML_SCHEMA: strictyaml.Map = strictyaml.Map(
{
strictyaml.Optional("atr"):
strictyaml.Map({strictyaml.Optional("host"): strictyaml.Str()}),
strictyaml.Optional("asf"):
strictyaml.Map({strictyaml.Optional("uid"): strictyaml.Str()}),
- strictyaml.Optional("tokens"):
strictyaml.Map({strictyaml.Optional("pat"): strictyaml.Str()}),
+ strictyaml.Optional("tokens"): strictyaml.Map(
+ {
+ strictyaml.Optional("pat"): strictyaml.Str(),
+ strictyaml.Optional("jwt"): strictyaml.Str(),
+ }
+ ),
}
)
APP.command(CONFIG)
+APP.command(JWT)
APP.command(PAT)
APP.command(RELEASE)
@@ -128,12 +139,56 @@ def app_pat_drop() -> None:
sys.exit(1)
[email protected](name="jwt", help="Fetch a JWT using the stored PAT.")
-def app_pat_jwt(asf_uid: str | None = None) -> None:
[email protected](name="dump", help="Show decoded JWT payload from stored config.")
+def app_jwt_dump() -> None:
with config_lock():
cfg = config_read()
+ jwt_token = cfg.get("tokens", {}).get("jwt")
+
+ if jwt_token is None:
+ LOGGER.error("No JWT stored in configuration.")
+ sys.exit(1)
+
+ try:
+ payload = jwt.decode(jwt_token, options={"verify_signature": False})
+ except jwt.PyJWTError as e:
+ LOGGER.error(f"Failed to decode JWT: {e}")
+ sys.exit(1)
+
+ print(json.dumps(payload, indent=None))
+
+
[email protected](name="info", help="Show JWT payload in human-readable form.")
+def app_jwt_info() -> None:
+ with config_lock():
+ cfg = config_read()
+ token = cfg.get("tokens", {}).get("jwt")
+
+ if token is None:
+ LOGGER.error("No JWT stored in configuration.")
+ sys.exit(1)
+
+ try:
+ payload = jwt.decode(token, options={"verify_signature": False})
+ except jwt.PyJWTError as e:
+ LOGGER.error(f"Failed to decode JWT: {e}")
+ sys.exit(1)
+
+ lines: list[str] = []
+ for key, val in payload.items():
+ if key in ("exp", "iat", "nbf"):
+ val = timestamp_format(val)
+ lines.append(f"{key.title()}: {val}")
+
+ print("\n".join(lines))
+
+
[email protected](name="refresh", help="Fetch a JWT using the stored PAT and store
it in config.")
+def app_jwt_refresh(asf_uid: str | None = None) -> None:
+ with config_lock():
+ cfg = config_read()
+ pat_token = cfg.get("tokens", {}).get("pat")
- pat_token = cfg.get("tokens", {}).get("pat")
if pat_token is None:
LOGGER.error("No Personal Access Token stored.")
sys.exit(1)
@@ -149,8 +204,20 @@ def app_pat_jwt(asf_uid: str | None = None) -> None:
sys.exit(1)
verify_ssl = not host.startswith("127.0.0.1")
- jwt = asyncio.run(web_fetch(url, asf_uid, pat_token, verify_ssl))
- print(jwt)
+ jwt_token = asyncio.run(web_fetch(url, asf_uid, pat_token, verify_ssl))
+
+ with config_lock():
+ cfg = config_read()
+ cfg.setdefault("tokens", {})
+ cfg["tokens"]["jwt"] = jwt_token
+ config_write(cfg)
+
+ print(jwt_token)
+
+
[email protected](name="show", help="Show stored JWT token.")
+def app_jwt_show() -> None:
+ return app_show("tokens.jwt")
@RELEASE.command(name="add", help="Add a release.")
@@ -241,8 +308,8 @@ def config_read() -> dict[str, Any]:
if config_file.exists():
try:
return strictyaml.load(config_file.read_text(), YAML_SCHEMA).data
- except strictyaml.YAMLValidationError as exc:
- raise RuntimeError(f"Invalid atr.yaml: {exc}") from exc
+ except strictyaml.YAMLValidationError as e:
+ raise RuntimeError(f"Invalid atr.yaml: {e}") from e
return YAML_DEFAULTS.copy()
@@ -273,6 +340,17 @@ def main() -> None:
APP()
+def timestamp_format(ts: int | str | None) -> str | None:
+ if ts is None:
+ return None
+ try:
+ t = int(ts)
+ dt = datetime.datetime.fromtimestamp(t, datetime.UTC)
+ return dt.strftime("%d %b %Y at %H:%M:%S UTC")
+ except Exception:
+ return str(ts)
+
+
async def web_fetch(url: str, asfuid: str, pat_token: str, verify_ssl: bool =
True) -> str:
connector = aiohttp.TCPConnector(ssl=verify_ssl) if not verify_ssl else
None
async with aiohttp.ClientSession(connector=connector) as session:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]