This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new 96eb7f5 Fix form typing
96eb7f5 is described below
commit 96eb7f5115a8db9e591f4fe27878f4dbec9b6d2a
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Mar 14 17:27:53 2025 +0200
Fix form typing
---
atr/routes/{project.py => projects.py} | 95 +++++++++++++-------------
atr/server.py | 4 +-
atr/util.py | 121 ++++++++++++++++++++-------------
3 files changed, 120 insertions(+), 100 deletions(-)
diff --git a/atr/routes/project.py b/atr/routes/projects.py
similarity index 88%
rename from atr/routes/project.py
rename to atr/routes/projects.py
index 427843f..96176bc 100644
--- a/atr/routes/project.py
+++ b/atr/routes/projects.py
@@ -20,17 +20,59 @@
import http.client
import quart
-import quart_wtf
import werkzeug.wrappers.response as response
import wtforms
+import asfquart.base as base
import asfquart.session as session
import atr.db as db
import atr.db.models as models
import atr.db.service as service
import atr.routes as routes
-from asfquart import base
-from atr.util import unwrap
+import atr.util as util
+
+
+class CreateVotePolicyForm(util.QuartFormTyped):
+ project_name = wtforms.HiddenField("project_name")
+ mailto_addresses = wtforms.StringField(
+ "Email",
+ validators=[
+ wtforms.validators.InputRequired("Please provide a valid email
address"),
+ wtforms.validators.Email(),
+ ],
+ )
+ min_hours = wtforms.IntegerField(
+ "Minimum Voting Period:", widget=wtforms.widgets.NumberInput(min=0,
max=144), default=72
+ )
+ manual_vote = wtforms.BooleanField("Voting Process:")
+ release_checklist = wtforms.StringField("Release Checklist:",
widget=wtforms.widgets.TextArea())
+ pause_for_rm = wtforms.BooleanField("Pause for RM:")
+
+ submit = wtforms.SubmitField("Add")
+
+
+async def add_voting_policy(session: session.ClientSession, form:
CreateVotePolicyForm) -> response.Response:
+ name = form.project_name.data
+
+ async with db.session() as data:
+ async with data.begin():
+ pmc = await
data.committee(name=name).demand(base.ASFQuartException("PMC not found",
errorcode=404))
+ if pmc.name not in session.committees:
+ raise base.ASFQuartException(
+ f"You must be a PMC member of {pmc.display_name} to submit
a voting policy", errorcode=403
+ )
+
+ vote_policy = models.VotePolicy(
+ mailto_addresses=[util.unwrap(form.mailto_addresses.data)],
+ manual_vote=form.manual_vote.data,
+ min_hours=util.unwrap(form.min_hours.data),
+ release_checklist=util.unwrap(form.release_checklist.data),
+ pause_for_rm=form.pause_for_rm.data,
+ )
+ data.add(vote_policy)
+
+ # Redirect to the add package page with the storage token
+ return quart.redirect(quart.url_for("root_project_view",
project_name=name))
@routes.app_route("/projects")
@@ -63,12 +105,10 @@ async def root_project_voting_policy_add(name: str) ->
response.Response | str:
f"You must be a PMC member of {pmc.display_name} to submit a
voting policy", errorcode=403
)
- # TODO: the create_form method does not return the correct type but
QuartForm
- # we should create our own baseclass that correctly add typing info
form = await CreateVotePolicyForm.create_form(data={"project_name":
pmc.name})
if await form.validate_on_submit():
- return await add_voting_policy(web_session, form) # pyright: ignore
[reportArgumentType]
+ return await add_voting_policy(web_session, form)
# For GET requests, show the form
return await quart.render_template(
@@ -77,46 +117,3 @@ async def root_project_voting_policy_add(name: str) ->
response.Response | str:
project=pmc,
form=form,
)
-
-
-class CreateVotePolicyForm(quart_wtf.QuartForm):
- project_name = wtforms.HiddenField("project_name")
- mailto_addresses = wtforms.StringField(
- "Email",
- validators=[
- wtforms.validators.InputRequired("Please provide a valid email
address"),
- wtforms.validators.Email(),
- ],
- )
- min_hours = wtforms.IntegerField(
- "Minimum Voting Period:", widget=wtforms.widgets.NumberInput(min=0,
max=144), default=72
- )
- manual_vote = wtforms.BooleanField("Voting Process:")
- release_checklist = wtforms.StringField("Release Checklist:",
widget=wtforms.widgets.TextArea())
- pause_for_rm = wtforms.BooleanField("Pause for RM:")
-
- submit = wtforms.SubmitField("Add")
-
-
-async def add_voting_policy(session: session.ClientSession, form:
CreateVotePolicyForm) -> response.Response:
- name = form.project_name.data
-
- async with db.session() as data:
- async with data.begin():
- pmc = await
data.committee(name=name).demand(base.ASFQuartException("PMC not found",
errorcode=404))
- if pmc.name not in session.committees:
- raise base.ASFQuartException(
- f"You must be a PMC member of {pmc.display_name} to submit
a voting policy", errorcode=403
- )
-
- vote_policy = models.VotePolicy(
- mailto_addresses=[unwrap(form.mailto_addresses.data)],
- manual_vote=form.manual_vote.data,
- min_hours=unwrap(form.min_hours.data),
- release_checklist=unwrap(form.release_checklist.data),
- pause_for_rm=form.pause_for_rm.data,
- )
- data.add(vote_policy)
-
- # Redirect to the add package page with the storage token
- return quart.redirect(quart.url_for("root_project_view",
project_name=name))
diff --git a/atr/server.py b/atr/server.py
index ac745e1..fd7a711 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -62,7 +62,7 @@ def register_routes(app: base.QuartApp) -> tuple[str, ...]:
import atr.routes.download as download
import atr.routes.keys as keys
import atr.routes.package as package
- import atr.routes.project as project
+ import atr.routes.projects as projects
import atr.routes.release as release
import atr.routes.root as root
import atr.routes.vote_policy as vote_policy
@@ -99,7 +99,7 @@ def register_routes(app: base.QuartApp) -> tuple[str, ...]:
download.__name__,
keys.__name__,
package.__name__,
- project.__name__,
+ projects.__name__,
release.__name__,
vote_policy.__name__,
root.__name__,
diff --git a/atr/util.py b/atr/util.py
index c5266b6..d51b3a9 100644
--- a/atr/util.py
+++ b/atr/util.py
@@ -25,26 +25,51 @@ from typing import Annotated, Any, TypeVar
import aiofiles
import pydantic
import pydantic_core
+import quart_wtf
+import quart_wtf.typing
import atr.config as config
+F = TypeVar("F", bound="QuartFormTyped")
T = TypeVar("T")
[email protected]
-def get_admin_users() -> set[str]:
- return set(config.get().ADMIN_USERS)
+# from
https://github.com/pydantic/pydantic/discussions/8755#discussioncomment-8417979
[email protected]
+class DictToList:
+ key: str
+
+ def __get_pydantic_core_schema__(
+ self,
+ source_type: Any,
+ handler: pydantic.GetCoreSchemaHandler,
+ ) -> pydantic_core.CoreSchema:
+ adapter = _get_dict_to_list_inner_type_adapter(source_type, self.key)
+ return pydantic_core.core_schema.no_info_before_validator_function(
+ _get_dict_to_list_validator(adapter, self.key),
+ handler(source_type),
+ )
-def is_admin(user_id: str | None) -> bool:
- """Check whether a user is an admin."""
- if user_id is None:
- return False
- return user_id in get_admin_users()
+class QuartFormTyped(quart_wtf.QuartForm):
+ """Quart form with type annotations."""
-def get_release_storage_dir() -> str:
- return str(config.get().RELEASE_STORAGE_DIR)
+ @classmethod
+ async def create_form(
+ cls: type[F],
+ formdata: object | quart_wtf.typing.FormData = quart_wtf.form._Auto,
+ obj: Any | None = None,
+ prefix: str = "",
+ data: dict | None = None,
+ meta: dict | None = None,
+ **kwargs: dict[str, Any],
+ ) -> F:
+ """Create a form instance with typing."""
+ form = await super().create_form(formdata, obj, prefix, data, meta,
**kwargs)
+ if not isinstance(form, cls):
+ raise TypeError(f"Form is not of type {cls.__name__}")
+ return form
def compute_sha3_256(file_data: bytes) -> str:
@@ -63,6 +88,43 @@ async def compute_sha512(file_path: pathlib.Path) -> str:
return sha512.hexdigest()
[email protected]
+def get_admin_users() -> set[str]:
+ return set(config.get().ADMIN_USERS)
+
+
+def get_release_storage_dir() -> str:
+ return str(config.get().RELEASE_STORAGE_DIR)
+
+
+def is_admin(user_id: str | None) -> bool:
+ """Check whether a user is an admin."""
+ if user_id is None:
+ return False
+ return user_id in get_admin_users()
+
+
+def unwrap(value: T | None, error_message: str = "unexpected None when
unwrapping value") -> T:
+ """
+ Will unwrap the given value or raise a ValueError if it is None
+
+ :param value: the optional value to unwrap
+ :param error_message: the error message when failing to unwrap
+ :return: the value or a ValueError if it is None
+ """
+ if value is None:
+ raise ValueError(error_message)
+ else:
+ return value
+
+
+def validate_as_type(value: Any, t: type[T]) -> T:
+ """Validate the given value as the given type."""
+ if not isinstance(value, t):
+ raise ValueError(f"Expected {t}, got {type(value)}")
+ return value
+
+
def _get_dict_to_list_inner_type_adapter(source_type: Any, key: str) ->
pydantic.TypeAdapter[dict[Any, Any]]:
root_adapter = pydantic.TypeAdapter(source_type)
schema = root_adapter.core_schema
@@ -109,42 +171,3 @@ def _get_dict_to_list_validator(inner_adapter:
pydantic.TypeAdapter[dict[Any, An
return val
return validator
-
-
-# from
https://github.com/pydantic/pydantic/discussions/8755#discussioncomment-8417979
[email protected]
-class DictToList:
- key: str
-
- def __get_pydantic_core_schema__(
- self,
- source_type: Any,
- handler: pydantic.GetCoreSchemaHandler,
- ) -> pydantic_core.CoreSchema:
- adapter = _get_dict_to_list_inner_type_adapter(source_type, self.key)
-
- return pydantic_core.core_schema.no_info_before_validator_function(
- _get_dict_to_list_validator(adapter, self.key),
- handler(source_type),
- )
-
-
-def validate_as_type(value: Any, t: type[T]) -> T:
- """Validate the given value as the given type."""
- if not isinstance(value, t):
- raise ValueError(f"Expected {t}, got {type(value)}")
- return value
-
-
-def unwrap(value: T | None, error_message: str = "unexpected None when
unwrapping value") -> T:
- """
- Will unwrap the given value or raise a ValueError if it is None
-
- :param value: the optional value to unwrap
- :param error_message: the error message when failing to unwrap
- :return: the value or a ValueError if it is None
- """
- if value is None:
- raise ValueError(error_message)
- else:
- return value
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]