This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-releases-client.git
The following commit(s) were added to refs/heads/main by this push:
new d60aa9f Add reusable functions to call API endpoints
d60aa9f is described below
commit d60aa9fd3fab3b923214fdf0395cb4608d3a32c5
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Jul 15 19:47:13 2025 +0100
Add reusable functions to call API endpoints
---
pyproject.toml | 4 +-
src/atrclient/client.py | 350 ++++++++++++++++++++++++------------------------
uv.lock | 4 +-
3 files changed, 180 insertions(+), 178 deletions(-)
diff --git a/pyproject.toml b/pyproject.toml
index 5bda5ad..8dbbede 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,7 +11,7 @@ build-backend = "hatchling.build"
[project]
name = "apache-trusted-releases"
-version = "0.20250715.1754"
+version = "0.20250715.1846"
description = "ATR CLI and Python API"
readme = "README.md"
requires-python = ">=3.13"
@@ -72,4 +72,4 @@ select = [
]
[tool.uv]
-exclude-newer = "2025-07-15T17:54:00Z"
+exclude-newer = "2025-07-15T18:46:00Z"
diff --git a/src/atrclient/client.py b/src/atrclient/client.py
index c0eb39d..0799eac 100755
--- a/src/atrclient/client.py
+++ b/src/atrclient/client.py
@@ -34,7 +34,8 @@ import re
import signal
import sys
import time
-from typing import TYPE_CHECKING, Annotated, Any, Literal, NoReturn, TypeGuard
+from collections.abc import Callable
+from typing import TYPE_CHECKING, Annotated, Any, Literal, NoReturn,
TypeGuard, TypeVar
import aiohttp
import cyclopts
@@ -75,6 +76,146 @@ YAML_SCHEMA: strictyaml.Map = strictyaml.Map(
JSON = dict[str, Any] | list[Any] | str | int | float | bool | None
+class ApiCore:
+ def __init__(self, path: str):
+ host, verify_ssl = config_host_get()
+ self.url = f"https://{host}/api{path}"
+ self.verify_ssl = verify_ssl
+
+
+class ApiGet(ApiCore):
+ def get(self, *args: str, **kwargs: str | None) -> JSON:
+ url = self.url + "/" + "/".join(args)
+ for value in kwargs.values():
+ if value is not None:
+ url += f"/{value}"
+ jwt_value = None
+ return asyncio.run(web_get(url, jwt_value, self.verify_ssl))
+
+
+class ApiPost(ApiCore):
+ def post(self, args: models.schema.Strict) -> JSON:
+ jwt_value = config_jwt_usable()
+ return asyncio.run(web_post(self.url, args, jwt_value,
self.verify_ssl))
+
+
+A = TypeVar("A", bound=models.schema.Strict)
+R = TypeVar("R", bound=models.api.Results)
+
+ApiGetFunction = Callable[..., R]
+ApiPostFunction = Callable[[ApiPost, A], R]
+
+
+def api_get(path: str) -> Callable[[ApiGetFunction], Callable[..., R]]:
+ def decorator(func: ApiGetFunction) -> Callable[..., R]:
+ def wrapper(*args: str, **kwargs: str | None) -> R:
+ api_instance = ApiGet(path)
+ try:
+ response = func(api_instance, *args, **kwargs)
+ except (pydantic.ValidationError, models.api.ResultsTypeError) as
e:
+ show_error_and_exit(f"Unexpected API GET response: {e}")
+ return response
+
+ return wrapper
+
+ return decorator
+
+
+def api_post(path: str) -> Callable[[ApiPostFunction], Callable[[A], R]]:
+ def decorator(func: ApiPostFunction) -> Callable[[A], R]:
+ def wrapper(args: A) -> R:
+ api_instance = ApiPost(path)
+ try:
+ response = func(api_instance, args)
+ except (pydantic.ValidationError, models.api.ResultsTypeError) as
e:
+ show_error_and_exit(f"Unexpected API POST response: {e}")
+ return response
+
+ return wrapper
+
+ return decorator
+
+
+@api_post("/announce")
+def api_announce(api: ApiPost, args: models.api.AnnounceArgs) ->
models.api.AnnounceResults:
+ response = api.post(args)
+ return models.api.validate_announce(response)
+
+
+@api_get("/checks/list")
+def api_checks_list(api: ApiGet, project: str, version: str, revision: str) ->
models.api.ChecksListResults:
+ response = api.get(project, version, revision)
+ return models.api.validate_checks_list(response)
+
+
+@api_get("/checks/ongoing")
+def api_checks_ongoing(
+ api: ApiGet, project: str, version: str, revision: str | None = None
+) -> models.api.ChecksOngoingResults:
+ response = api.get(project, version, revision=revision)
+ return models.api.validate_checks_ongoing(response)
+
+
+@api_post("/draft/delete")
+def api_draft_delete(api: ApiPost, args: models.api.DraftDeleteArgs) ->
models.api.DraftDeleteResults:
+ response = api.post(args)
+ return models.api.validate_draft_delete(response)
+
+
+@api_get("/list")
+def api_list(api: ApiGet, project: str, version: str) ->
models.api.ListResults:
+ response = api.get(project, version)
+ return models.api.validate_list(response)
+
+
+@api_post("/releases/create")
+def api_releases_create(api: ApiPost, args: models.api.ReleasesCreateArgs) ->
models.api.ReleasesCreateResults:
+ response = api.post(args)
+ return models.api.validate_releases_create(response)
+
+
+@api_post("/releases/delete")
+def api_releases_delete(api: ApiPost, args: models.api.ReleasesDeleteArgs) ->
models.api.ReleasesDeleteResults:
+ response = api.post(args)
+ return models.api.validate_releases_delete(response)
+
+
+@api_get("/releases/project")
+def api_releases_project(api: ApiGet, project: str) ->
models.api.ReleasesProjectResults:
+ response = api.get(project)
+ return models.api.validate_releases_project(response)
+
+
+@api_get("/releases/version")
+def api_releases_version(api: ApiGet, project: str, version: str) ->
models.api.ReleasesVersionResults:
+ response = api.get(project, version)
+ return models.api.validate_releases_version(response)
+
+
+@api_get("/revisions")
+def api_revisions(api: ApiGet, project: str, version: str) ->
models.api.RevisionsResults:
+ response = api.get(project, version)
+ return models.api.validate_revisions(response)
+
+
+@api_post("/upload")
+def api_upload(api: ApiPost, args: models.api.UploadArgs) ->
models.api.UploadResults:
+ response = api.post(args)
+ return models.api.validate_upload(response)
+
+
+@api_post("/vote/resolve")
+def api_vote_resolve(api: ApiPost, args: models.api.VoteResolveArgs) ->
models.api.VoteResolveResults:
+ response = api.post(args)
+ return models.api.validate_vote_resolve(response)
+
+
+@api_post("/vote/start")
+def api_vote_start(api: ApiPost, args: models.api.VoteStartArgs) ->
models.api.VoteStartResults:
+ response = api.post(args)
+ return models.api.validate_vote_start(response)
+
+
@APP.command(name="announce", help="Announce a release.")
def app_announce(
project: str,
@@ -86,9 +227,7 @@ def app_announce(
body: Annotated[str | None, cyclopts.Parameter(alias="-b", name="--body")]
= None,
path_suffix: Annotated[str | None, cyclopts.Parameter(alias="-p",
name="--path-suffix")] = None,
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- announce = models.api.AnnounceArgs(
+ announce_args = models.api.AnnounceArgs(
project=project,
version=version,
revision=revision,
@@ -97,12 +236,7 @@ def app_announce(
body=body or f"Release {project} {version} has been announced.",
path_suffix=path_suffix or "",
)
- url = f"https://{host}/api/announce"
- response = asyncio.run(web_post(url, announce, jwt_value, verify_ssl))
- try:
- announce = models.api.validate_announce(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ announce = api_announce(announce_args)
print(announce.success)
@@ -114,14 +248,7 @@ def app_checks_exceptions(
/,
members: Annotated[bool, cyclopts.Parameter(alias="-m", name="--members")]
= False,
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/checks/list/{project}/{version}/{revision}"
- response = asyncio.run(web_get(url, jwt_value, verify_ssl))
- try:
- checks_list = models.api.validate_checks_list(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ checks_list = api_checks_list(project, version, revision)
checks_display_status("exception", checks_list.checks, members=members)
@@ -133,14 +260,7 @@ def app_checks_failures(
/,
members: Annotated[bool, cyclopts.Parameter(alias="-m", name="--members")]
= False,
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/checks/list/{project}/{version}/{revision}"
- response = asyncio.run(web_get(url, jwt_value, verify_ssl))
- try:
- checks_list = models.api.validate_checks_list(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ checks_list = api_checks_list(project, version, revision)
checks_display_status("failure", checks_list.checks, members=members)
@@ -152,16 +272,7 @@ def app_checks_status(
revision: str | None = None,
verbose: Annotated[bool, cyclopts.Parameter(alias="-v", name="--verbose")]
= False,
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
-
- release_url = f"https://{host}/api/releases/version/{project}/{version}"
- response = asyncio.run(web_get_public(release_url, verify_ssl))
- try:
- releases_version = models.api.validate_releases_version(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
-
+ releases_version = api_releases_version(project, version)
release = releases_version.release
# TODO: Handle the not found case better
if release.phase != "release_candidate_draft":
@@ -176,12 +287,7 @@ def app_checks_status(
show_error_and_exit(f"Unexpected API response:
{release.latest_revision_number}")
revision = release.latest_revision_number
- url = f"https://{host}/api/checks/list/{project}/{version}/{revision}"
- response = asyncio.run(web_get(url, jwt_value, verify_ssl))
- try:
- checks_list = models.api.validate_checks_list(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ checks_list = api_checks_list(project, version, revision)
checks_display(checks_list.checks, verbose)
@@ -194,23 +300,15 @@ def app_checks_wait(
timeout: Annotated[float, cyclopts.Parameter(alias="-t",
name="--timeout")] = 60,
interval: Annotated[int, cyclopts.Parameter(alias="-i",
name="--interval")] = 500,
) -> None:
- host, verify_ssl = config_host_get()
+ _host, verify_ssl = config_host_get()
if verify_ssl is True:
if interval < 500:
show_error_and_exit("Interval must be at least 500ms.")
interval_seconds = interval / 1000
if interval_seconds > timeout:
show_error_and_exit("Interval must be less than timeout.")
- jwt_value = config_jwt_usable()
while True:
- url = f"https://{host}/api/checks/ongoing/{project}/{version}"
- if revision is not None:
- url += f"/{revision}"
- response = asyncio.run(web_get(url, jwt_value, verify_ssl))
- try:
- checks_ongoing = models.api.validate_checks_ongoing(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ checks_ongoing = api_checks_ongoing(project, version, revision)
if checks_ongoing.ongoing == 0:
break
time.sleep(interval_seconds)
@@ -228,14 +326,7 @@ def app_checks_warnings(
/,
members: Annotated[bool, cyclopts.Parameter(alias="-m", name="--members")]
= False,
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/checks/list/{project}/{version}/{revision}"
- response = asyncio.run(web_get(url, jwt_value, verify_ssl))
- try:
- checks_list = models.api.validate_checks_list(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ checks_list = api_checks_list(project, version, revision)
checks_display_status("warning", checks_list.checks, members=members)
@@ -257,16 +348,8 @@ def app_config_path() -> None:
@APP_DEV.command(name="delete", help="Delete a release.")
def app_dev_delete(project: str, version: str, /) -> None:
- # Only ATR admins may do this
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- args = models.api.ReleasesDeleteArgs(project=project, version=version)
- url = f"https://{host}/api/releases/delete"
- response = asyncio.run(web_post(url, args, jwt_value, verify_ssl))
- try:
- release_delete = models.api.validate_releases_delete(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ releases_delete_args = models.api.ReleasesDeleteArgs(project=project,
version=version)
+ release_delete = api_releases_delete(releases_delete_args)
print(release_delete.deleted)
@@ -345,15 +428,8 @@ def app_dev_user() -> None:
@APP_DRAFT.command(name="delete", help="Delete a draft release.")
def app_draft_delete(project: str, version: str, /) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- args = models.api.DraftDeleteArgs(project=project, version=version)
- url = f"https://{host}/api/draft/delete"
- response = asyncio.run(web_post(url, args, jwt_value, verify_ssl))
- try:
- draft_delete = models.api.validate_draft_delete(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ draft_delete_args = models.api.DraftDeleteArgs(project=project,
version=version)
+ draft_delete = api_draft_delete(draft_delete_args)
print(draft_delete.success)
@@ -426,68 +502,34 @@ def app_jwt_show() -> None:
@APP.command(name="list", help="List all files within a release.")
def app_list(project: str, version: str, revision: str | None = None, /) ->
None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/list/{project}/{version}"
- if revision:
- url += f"/{revision}"
- response = asyncio.run(web_get(url, jwt_value, verify_ssl))
- try:
- list_results = models.api.validate_list(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ list_results = api_list(project, version, revision)
for rel_path in list_results.rel_paths:
print(rel_path)
@APP_RELEASE.command(name="info", help="Show information about a release.")
def app_release_info(project: str, version: str, /) -> None:
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/releases/version/{project}/{version}"
- response = asyncio.run(web_get_public(url, verify_ssl))
- try:
- releases_version = models.api.validate_releases_version(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ releases_version = api_releases_version(project, version)
print(releases_version.release.model_dump_json(indent=None))
@APP_RELEASE.command(name="list", help="List releases for a project.")
def app_release_list(project: str, /) -> None:
# TODO: Support showing all of a user's releases if no project is provided
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/releases/project/{project}"
- response = asyncio.run(web_get_public(url, verify_ssl))
- try:
- releases_project = models.api.validate_releases_project(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ releases_project = api_releases_project(project)
releases_display(releases_project.data)
@APP_RELEASE.command(name="start", help="Start a release.")
def app_release_start(project: str, version: str, /) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/releases/create"
- args = models.api.ReleasesCreateArgs(project=project, version=version)
- response = asyncio.run(web_post(url, args, jwt_value, verify_ssl))
- try:
- releases_create = models.api.validate_releases_create(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ releases_create_args = models.api.ReleasesCreateArgs(project=project,
version=version)
+ releases_create = api_releases_create(releases_create_args)
print(releases_create.release.model_dump_json(indent=None))
@APP.command(name="revisions", help="List all revisions for a release.")
def app_revisions(project: str, version: str, /) -> None:
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/revisions/{project}/{version}"
- response = asyncio.run(web_get_public(url, verify_ssl))
- try:
- revisions = models.api.validate_revisions(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ revisions = api_revisions(project, version)
for revision in revisions.revisions:
print(revision)
@@ -521,25 +563,17 @@ def app_show(path: str, /) -> None:
@APP.command(name="upload", help="Upload a file to a release.")
def app_upload(project: str, version: str, path: str, filepath: str, /) ->
None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/upload"
-
- with open(filepath, "rb") as f:
- content = f.read()
+ with open(filepath, "rb") as fh:
+ content = fh.read()
- args = models.api.UploadArgs(
+ upload_args = models.api.UploadArgs(
project=project,
version=version,
relpath=path,
content=base64.b64encode(content).decode("utf-8"),
)
- response = asyncio.run(web_post(url, args, jwt_value, verify_ssl))
- try:
- upload = models.api.validate_upload(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ upload = api_upload(upload_args)
print(upload.revision.model_dump_json(indent=None))
@@ -549,19 +583,12 @@ def app_vote_resolve(
version: str,
resolution: Literal["passed", "failed"],
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/vote/resolve"
- args = models.api.VoteResolveArgs(
+ vote_resolve_args = models.api.VoteResolveArgs(
project=project,
version=version,
resolution=resolution,
)
- response = asyncio.run(web_post(url, args, jwt_value, verify_ssl))
- try:
- vote_resolve = models.api.validate_vote_resolve(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ vote_resolve = api_vote_resolve(vote_resolve_args)
print(vote_resolve.success)
@@ -576,14 +603,12 @@ def app_vote_start(
subject: Annotated[str | None, cyclopts.Parameter(alias="-s",
name="--subject")] = None,
body: Annotated[str | None, cyclopts.Parameter(alias="-b", name="--body")]
= None,
) -> None:
- jwt_value = config_jwt_usable()
- host, verify_ssl = config_host_get()
- url = f"https://{host}/api/vote/start"
body_text = None
if body:
- with open(body, encoding="utf-8") as f:
- body_text = f.read()
- args = models.api.VoteStartArgs(
+ with open(body, encoding="utf-8") as fh:
+ body_text = fh.read()
+
+ vote_start_args = models.api.VoteStartArgs(
project=project,
version=version,
revision=revision,
@@ -592,11 +617,7 @@ def app_vote_start(
subject=subject or f"[VOTE] Release {project} {version}",
body=body_text or f"Release {project} {version} is ready for voting.",
)
- response = asyncio.run(web_post(url, args, jwt_value, verify_ssl))
- try:
- vote_start = models.api.validate_vote_start(response)
- except (pydantic.ValidationError, models.api.ResultsTypeError) as e:
- show_error_and_exit(f"Unexpected API response: {response}\n{e}")
+ vote_start = api_vote_start(vote_start_args)
print(vote_start.task.model_dump_json(indent=None))
@@ -728,7 +749,6 @@ def config_jwt_refresh(asf_uid: str | None = None) -> str:
host, verify_ssl = config_host_get()
url = f"https://{host}/api/jwt"
-
args = models.api.JwtArgs(asfuid=asf_uid, pat=pat_value)
response = asyncio.run(web_post(url, args, jwt_token=None,
verify_ssl=verify_ssl))
try:
@@ -1004,9 +1024,11 @@ def timestamp_format(ts: int | str | None) -> str | None:
return str(ts)
-async def web_get(url: str, jwt_token: str, verify_ssl: bool = True) -> JSON:
+async def web_get(url: str, jwt_token: str | None, verify_ssl: bool = True) ->
JSON:
connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
- headers = {"Authorization": f"Bearer {jwt_token}"}
+ headers = {}
+ if jwt_token is not None:
+ headers["Authorization"] = f"Bearer {jwt_token}"
async with aiohttp.ClientSession(connector=connector, headers=headers) as
session:
async with session.get(url) as resp:
if resp.status != 200:
@@ -1025,26 +1047,6 @@ async def web_get(url: str, jwt_token: str, verify_ssl:
bool = True) -> JSON:
return data
-async def web_get_public(url: str, verify_ssl: bool = True) -> JSON:
- connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
- async with aiohttp.ClientSession(connector=connector) as session:
- async with session.get(url) as resp:
- if resp.status != 200:
- text = await resp.text()
- try:
- error_data = json.loads(text)
- if isinstance(error_data, dict) and "error" in error_data:
- show_error_and_exit(error_data["error"])
- else:
- show_error_and_exit(f"Request failed: {resp.status}
{text}")
- except json.JSONDecodeError:
- show_error_and_exit(f"Request failed: {resp.status}
{text}")
- data = await resp.json()
- if not is_json(data):
- show_error_and_exit(f"Unexpected API response: {data}")
- return data
-
-
async def web_post(url: str, args: models.schema.Strict, jwt_token: str |
None, verify_ssl: bool = True) -> JSON:
connector = None if verify_ssl else aiohttp.TCPConnector(ssl=False)
headers = {}
diff --git a/uv.lock b/uv.lock
index f578f60..2cd71f9 100644
--- a/uv.lock
+++ b/uv.lock
@@ -2,7 +2,7 @@ version = 1
requires-python = ">=3.13"
[options]
-exclude-newer = "2025-07-15T17:54:00Z"
+exclude-newer = "2025-07-15T18:46:00Z"
[[package]]
name = "aiohappyeyeballs"
@@ -83,7 +83,7 @@ wheels = [
[[package]]
name = "apache-trusted-releases"
-version = "0.20250715.1754"
+version = "0.20250715.1846"
source = { editable = "." }
dependencies = [
{ name = "aiohttp" },
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]