This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 30eef68 Use a wrapper for plain text responses
30eef68 is described below
commit 30eef6845e9450ab4b59fdd4beed080d527842a3
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Oct 27 19:24:10 2025 +0000
Use a wrapper for plain text responses
---
atr/admin/__init__.py | 18 ++++----
atr/routes/download.py | 13 +++---
atr/routes/draft.py | 3 +-
atr/routes/keys.py | 5 ++-
atr/routes/preview.py | 7 ++--
atr/routes/tokens.py | 109 +++++++++++++++++++++----------------------------
atr/web.py | 5 +++
7 files changed, 77 insertions(+), 83 deletions(-)
diff --git a/atr/admin/__init__.py b/atr/admin/__init__.py
index 676d572..dd23ae7 100644
--- a/atr/admin/__init__.py
+++ b/atr/admin/__init__.py
@@ -190,7 +190,7 @@ async def configuration(session: web.Committer) ->
quart.wrappers.response.Respo
values.append(f"{name}={val}")
values.sort()
- return quart.Response("\n".join(values), mimetype="text/plain")
+ return web.TextResponse("\n".join(values))
@admin.get("/consistency")
@@ -433,7 +433,7 @@ async def env(session: web.Committer) ->
quart.wrappers.response.Response:
env_vars = []
for key, value in os.environ.items():
env_vars.append(f"{key}={value}")
- return quart.Response("\n".join(env_vars), mimetype="text/plain")
+ return web.TextResponse("\n".join(env_vars))
@admin.get("/keys/check")
@@ -455,10 +455,10 @@ async def _keys_check(session: web.Committer) ->
quart.Response:
try:
result = await _check_keys()
- return quart.Response(result, mimetype="text/plain")
+ return web.TextResponse(result)
except Exception as e:
log.exception("Exception during key check:")
- return quart.Response(f"Exception during key check: {e!s}",
mimetype="text/plain")
+ return web.TextResponse(f"Exception during key check: {e!s}")
@admin.get("/keys/regenerate-all")
@@ -496,7 +496,7 @@ async def _keys_regenerate_all(session: web.Committer) ->
quart.Response:
for oce in outcomes.errors():
response_lines.append(f"Error regenerating: {type(oce).__name__}
{oce}")
- return quart.Response("\n".join(response_lines), mimetype="text/plain")
+ return web.TextResponse("\n".join(response_lines))
@admin.get("/keys/update")
@@ -598,10 +598,10 @@ async def _ongoing_tasks(
) -> quart.wrappers.response.Response:
try:
ongoing = await interaction.tasks_ongoing(project_name, version_name,
revision)
- return quart.Response(str(ongoing), mimetype="text/plain")
+ return web.TextResponse(str(ongoing))
except Exception:
log.exception(f"Error fetching ongoing task count for {project_name}
{version_name} rev {revision}:")
- return quart.Response("", mimetype="text/plain")
+ return web.TextResponse("")
@admin.get("/performance")
@@ -736,7 +736,7 @@ async def task_times(
ms_elapsed = (task.completed - task.started).total_seconds() * 1000
values.append(f"{task.task_type} {ms_elapsed:.2f}ms")
- return quart.Response("\n".join(values), mimetype="text/plain")
+ return web.TextResponse("\n".join(values))
@admin.get("/test")
@@ -769,7 +769,7 @@ async def test(session: web.Committer) ->
quart.wrappers.response.Response:
log.info(f"Inserted: {inserted_count}")
log.info(f"Linked: {linked_count}")
log.info(f"InsertedAndLinked: {inserted_and_linked_count}")
- return quart.Response(str(wacm), mimetype="text/plain")
+ return web.TextResponse(str(wacm))
@admin.get("/toggle-view")
diff --git a/atr/routes/download.py b/atr/routes/download.py
index 8a8614d..1dbb5c5 100644
--- a/atr/routes/download.py
+++ b/atr/routes/download.py
@@ -37,6 +37,7 @@ import atr.routes.mapping as mapping
import atr.routes.root as root
import atr.template as template
import atr.util as util
+import atr.web as web
@route.committer("/download/all/<project_name>/<version_name>")
@@ -109,11 +110,11 @@ async def urls_selected(
ValueError("Release not found")
)
url_list_str = await _generate_file_url_list(release)
- return quart.Response(url_list_str, mimetype="text/plain")
+ return web.TextResponse(url_list_str)
except ValueError as e:
- return quart.Response(f"Error: {e}", status=404, mimetype="text/plain")
+ return web.TextResponse(f"Error: {e}", status=404)
except Exception as e:
- return quart.Response(f"Internal server error: {e}", status=500,
mimetype="text/plain")
+ return web.TextResponse(f"Internal server error: {e}", status=500)
@route.committer("/download/zip/<project_name>/<version_name>")
@@ -123,9 +124,9 @@ async def zip_selected(
try:
release = await session.release(project_name=project_name,
version_name=version_name, phase=None)
except ValueError as e:
- return quart.Response(f"Error: {e}", status=404, mimetype="text/plain")
+ return web.TextResponse(f"Error: {e}", status=404)
except Exception as e:
- return quart.Response(f"Server error: {e}", status=500,
mimetype="text/plain")
+ return web.TextResponse(f"Server error: {e}", status=500)
base_dir = util.release_directory(release)
files_to_zip = []
@@ -135,7 +136,7 @@ async def zip_selected(
if await aiofiles.os.path.isfile(full_item_path):
files_to_zip.append({"file": str(full_item_path), "name":
str(rel_path)})
except FileNotFoundError:
- return quart.Response("Error: Release directory not found.",
status=404, mimetype="text/plain")
+ return web.TextResponse("Error: Release directory not found.",
status=404)
async def stream_zip(file_list: list[dict[str, str]]) ->
AsyncGenerator[bytes]:
aiozip = zipstream.AioZipStream(file_list, chunksize=32768)
diff --git a/atr/routes/draft.py b/atr/routes/draft.py
index a6a944f..f1b6932 100644
--- a/atr/routes/draft.py
+++ b/atr/routes/draft.py
@@ -39,6 +39,7 @@ import atr.routes.upload as upload
import atr.storage as storage
import atr.template as template
import atr.util as util
+import atr.web as web
if TYPE_CHECKING:
import werkzeug.wrappers.response as response
@@ -412,4 +413,4 @@ async def vote_preview(
vote_duration=vote_duration,
),
)
- return quart.Response(body, mimetype="text/plain")
+ return web.TextResponse(body)
diff --git a/atr/routes/keys.py b/atr/routes/keys.py
index 738a6b0..6cbda29 100644
--- a/atr/routes/keys.py
+++ b/atr/routes/keys.py
@@ -42,6 +42,7 @@ import atr.storage.types as types
import atr.template as template
import atr.user as user
import atr.util as util
+import atr.web as web
class AddOpenPGPKeyForm(forms.Typed):
@@ -282,13 +283,13 @@ async def details(session: route.CommitterSession,
fingerprint: str) -> str | re
@route.committer("/keys/export/<committee_name>")
-async def export(session: route.CommitterSession, committee_name: str) ->
quart.Response:
+async def export(session: route.CommitterSession, committee_name: str) ->
web.TextResponse:
"""Export a KEYS file for a specific committee."""
async with storage.write() as write:
wafc = write.as_foundation_committer()
keys_file_text = await wafc.keys.keys_file_text(committee_name)
- return quart.Response(keys_file_text, mimetype="text/plain")
+ return web.TextResponse(keys_file_text)
@route.committer("/keys/import/<project_name>/<version_name>",
methods=["POST"])
diff --git a/atr/routes/preview.py b/atr/routes/preview.py
index 87889e7..e29e130 100644
--- a/atr/routes/preview.py
+++ b/atr/routes/preview.py
@@ -30,6 +30,7 @@ import atr.routes.root as root
import atr.storage as storage
import atr.template as template
import atr.util as util
+import atr.web as web
if asfquart.APP is ...:
raise RuntimeError("APP is not set")
@@ -64,7 +65,7 @@ async def announce_preview(
if form.errors:
error_details = "; ".join([f"{field}: {', '.join(errs)}" for
field, errs in form.errors.items()])
error_message = f"{error_message}: {error_details}"
- return quart.Response(f"Error: {error_message}", status=400,
mimetype="text/plain")
+ return web.TextResponse(f"Error: {error_message}", status=400)
try:
# Construct options and generate body
@@ -76,11 +77,11 @@ async def announce_preview(
)
preview_body = await
construct.announce_release_body(str(form.body.data), options)
- return quart.Response(preview_body, mimetype="text/plain")
+ return web.TextResponse(preview_body)
except Exception as e:
log.exception("Error generating announcement preview:")
- return quart.Response(f"Error generating preview: {e!s}", status=500,
mimetype="text/plain")
+ return web.TextResponse(f"Error generating preview: {e!s}", status=500)
@route.committer("/preview/delete", methods=["POST"])
diff --git a/atr/routes/tokens.py b/atr/routes/tokens.py
index 7250828..55f2ca0 100644
--- a/atr/routes/tokens.py
+++ b/atr/routes/tokens.py
@@ -28,26 +28,10 @@ import sqlmodel
import werkzeug.datastructures as datastructures
import werkzeug.wrappers.response as response
import wtforms.fields.core as core
-from htpy import (
- Element,
- code,
- div,
- form,
- h1,
- h2,
- p,
- pre,
- strong,
- table,
- tbody,
- td,
- th,
- thead,
- tr,
-)
import atr.db as db
import atr.forms as forms
+import atr.htm as htm
import atr.jwtoken as jwtoken
import atr.log as log
import atr.models.sql as sql
@@ -55,11 +39,12 @@ import atr.route as route
import atr.storage as storage
import atr.template as templates
import atr.util as util
+import atr.web as web
_EXPIRY_DAYS: Final[int] = 180
-type Fragment = Element | core.Field | str
+type Fragment = htm.Element | core.Field | str
class AddTokenForm(forms.Typed):
@@ -81,7 +66,7 @@ async def jwt_post(session: route.CommitterSession) ->
quart.Response:
await util.validate_empty_form()
jwt_token = jwtoken.issue(session.uid)
- return quart.Response(jwt_token, mimetype="text/plain")
+ return web.TextResponse(jwt_token)
@route.committer("/tokens", methods=["GET", "POST"])
@@ -109,7 +94,7 @@ async def tokens(session: route.CommitterSession) -> str |
response.Response:
tokens_table = _build_tokens_table(tokens_list)
issue_jwt = [
- p[
+ htm.p[
"""Generate a JSON Web Token (JWT) to authenticate calls to ATR's
private API routes. Treat the token like a password and include it
in the Authorization header as a Bearer token when invoking the
@@ -117,35 +102,35 @@ async def tokens(session: route.CommitterSession) -> str
| response.Response:
# p["Example"],
],
issue_form_elem,
- pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap border
rounded w-50"),
+ htm.pre(id="jwt-output", class_="d-none mt-2 p-3 atr-word-wrap border
rounded w-50"),
]
if most_recent_pat and most_recent_pat.last_used:
issue_jwt.append(
- p(".mt-3")[
+ htm.p(".mt-3")[
"You most recently used a PAT to issue a JWT at ",
- strong[util.format_datetime(most_recent_pat.last_used) + "Z"],
+ htm.strong[util.format_datetime(most_recent_pat.last_used) +
"Z"],
", using the PAT labelled ",
- code[most_recent_pat.label or "[Untitled]"],
+ htm.code[most_recent_pat.label or "[Untitled]"],
".",
]
)
- content_elem = div[
- h1["Tokens"],
- h2["Personal Access Tokens (PATs)"],
- p[
+ content_elem = htm.div[
+ htm.h1["Tokens"],
+ htm.h2["Personal Access Tokens (PATs)"],
+ htm.p[
"""Generate tokens for API access. For security, the plaintext
token is shown only once when you create it. You can revoke tokens
you no longer need."""
],
- div(".card.mb-4")[
- div(".card-header")["Generate new token"],
- div(".card-body")[add_form_elem],
+ htm.div(".card.mb-4")[
+ htm.div(".card-header")["Generate new token"],
+ htm.div(".card-body")[add_form_elem],
],
tokens_table,
- h2["JSON Web Token (JWT)"],
- div[issue_jwt],
+ htm.h2["JSON Web Token (JWT)"],
+ htm.div[issue_jwt],
]
end = time.perf_counter_ns()
log.info("Content elem built in %dms", (end - start) / 1_000_000)
@@ -169,9 +154,9 @@ def _as_markup(fragment: Fragment) -> markupsafe.Markup:
def _build_add_form_element(a_form: AddTokenForm) -> markupsafe.Markup:
- elem = form(method="post", action=util.as_url(tokens))[
+ elem = htm.form(method="post", action=util.as_url(tokens))[
_as_markup(a_form.csrf_token),
- div(".mb-3")[
+ htm.div(".mb-3")[
a_form.label.label,
a_form.label(class_="form-control"),
],
@@ -183,7 +168,7 @@ def _build_add_form_element(a_form: AddTokenForm) ->
markupsafe.Markup:
def _build_delete_form_element(token_id: int | None) -> markupsafe.Markup:
d_form = DeleteTokenForm()
d_form.token_id.data = "" if token_id is None else str(token_id)
- elem = form(".mb-0", method="post", action=util.as_url(tokens))[
+ elem = htm.form(".mb-0", method="post", action=util.as_url(tokens))[
_as_markup(d_form.csrf_token),
_as_markup(d_form.token_id),
d_form.submit(class_="btn btn-sm btn-danger"),
@@ -192,7 +177,7 @@ def _build_delete_form_element(token_id: int | None) ->
markupsafe.Markup:
def _build_issue_jwt_form_element(j_form: IssueJWTForm) -> markupsafe.Markup:
- elem = form("#issue-jwt-form", method="post",
action=util.as_url(jwt_post))[
+ elem = htm.form("#issue-jwt-form", method="post",
action=util.as_url(jwt_post))[
_as_markup(j_form.csrf_token),
j_form.submit(class_="btn btn-primary"),
]
@@ -201,30 +186,30 @@ def _build_issue_jwt_form_element(j_form: IssueJWTForm)
-> markupsafe.Markup:
def _build_tokens_table(tokens_list: list[sql.PersonalAccessToken]) ->
markupsafe.Markup:
if not tokens_list:
- return _as_markup(p["No tokens found."])
+ return _as_markup(htm.p["No tokens found."])
rows = [
- tr(".align-middle")[
- td[t.label or ""],
- td[util.format_datetime(t.created)],
- td[util.format_datetime(t.expires)],
- td[util.format_datetime(t.last_used) if t.last_used else "Never"],
- td[_build_delete_form_element(t.id)],
+ htm.tr(".align-middle")[
+ htm.td[t.label or ""],
+ htm.td[util.format_datetime(t.created)],
+ htm.td[util.format_datetime(t.expires)],
+ htm.td[util.format_datetime(t.last_used) if t.last_used else
"Never"],
+ htm.td[_build_delete_form_element(t.id)],
]
for t in tokens_list
]
- table_elem = table(".table.table-striped")[
- thead[
- tr[
- th["Label"],
- th["Created"],
- th["Expires"],
- th["Last used"],
- th[""],
+ table_elem = htm.table(".table.table-striped")[
+ htm.thead[
+ htm.tr[
+ htm.th["Label"],
+ htm.th["Created"],
+ htm.th["Expires"],
+ htm.th["Last used"],
+ htm.th[""],
]
],
- tbody[rows],
+ htm.tbody[rows],
]
return _as_markup(table_elem)
@@ -272,13 +257,13 @@ async def _handle_add_token_post(
if await add_form.validate_on_submit():
label_val = str(add_form.label.data) if add_form.label.data else None
plaintext = await _create_token(session.uid, label_val)
- success_msg = div[
- p[
- strong["Your new token"],
+ success_msg = htm.div[
+ htm.p[
+ htm.strong["Your new token"],
" is ",
- code(".bg-light.border.rounded.px-1")[plaintext],
+ htm.code(".bg-light.border.rounded.px-1")[plaintext],
],
- p(".mb-0")["Copy it now as you will not be able to see it again."],
+ htm.p(".mb-0")["Copy it now as you will not be able to see it
again."],
]
await quart.flash(_as_markup(success_msg), "success")
return await session.redirect(tokens)
@@ -306,12 +291,12 @@ async def _handle_issue_jwt_post(
issue_form = await IssueJWTForm.create_form(data=request_form)
if await issue_form.validate_on_submit():
jwt_token = jwtoken.issue(session.uid)
- success_msg = div[
- p[
- strong["Your new JWT"],
+ success_msg = htm.div[
+ htm.p[
+ htm.strong["Your new JWT"],
" is:",
],
- p[code(".bg-light.border.rounded.px-1.atr-word-wrap")[jwt_token],],
+
htm.p[htm.code(".bg-light.border.rounded.px-1.atr-word-wrap")[jwt_token],],
]
await quart.flash(_as_markup(success_msg), "success")
return await session.redirect(tokens)
diff --git a/atr/web.py b/atr/web.py
index 4fec710..1bfa057 100644
--- a/atr/web.py
+++ b/atr/web.py
@@ -180,6 +180,11 @@ class RouteFunction(Protocol[R]):
def __call__(self, *args: Any, **kwargs: Any) -> Awaitable[R]: ...
+class TextResponse(quart.Response):
+ def __init__(self, text: str, status: int = 200) -> None:
+ super().__init__(text, status=status, mimetype="text/plain")
+
+
async def redirect[R](
route: RouteFunction[R], success: str | None = None, error: str | None =
None, **kwargs: Any
) -> response.Response:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]