This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new e8d0162 Add storage code to upload a KEYS file
e8d0162 is described below
commit e8d0162d4eab008ad171ca87f7adcca526a2cfa3
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jul 18 17:13:21 2025 +0100
Add storage code to upload a KEYS file
---
atr/blueprints/admin/admin.py | 6 +-
atr/storage/__init__.py | 189 +++++++++++++++++++++++++++++++++++++++++-
atr/storage/writers/keys.py | 176 ++++++++++++++++++++++++++++++++++++---
3 files changed, 357 insertions(+), 14 deletions(-)
diff --git a/atr/blueprints/admin/admin.py b/atr/blueprints/admin/admin.py
index 3c46c88..73ae8a6 100644
--- a/atr/blueprints/admin/admin.py
+++ b/atr/blueprints/admin/admin.py
@@ -660,9 +660,11 @@ async def admin_test() -> quart.wrappers.response.Response:
async with storage.write(asf_uid) as write:
wacm = write.as_committee_member("tooling").writer_or_raise()
start = time.perf_counter_ns()
- fingerprints = list(wacm.keys.upload(keys_file_text))
+ outcomes = await wacm.keys.upload(keys_file_text)
end = time.perf_counter_ns()
- logging.info(f"Upload of {len(fingerprints)} keys took {end - start}
ns")
+ logging.info(f"Upload of {outcomes.ok_count} keys took {end - start}
ns")
+ for oe in outcomes.exceptions():
+ logging.error(f"Error uploading key: {oe}")
return quart.Response(str(wacm), mimetype="text/plain")
diff --git a/atr/storage/__init__.py b/atr/storage/__init__.py
index db96f61..36fed44 100644
--- a/atr/storage/__init__.py
+++ b/atr/storage/__init__.py
@@ -15,9 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
+
import contextlib
-from collections.abc import AsyncGenerator
-from typing import Final, TypeVar
+from typing import TYPE_CHECKING, Final, NoReturn, TypeVar
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncGenerator, Callable, Sequence
import atr.db as db
import atr.storage.writers as writers
@@ -214,6 +218,187 @@ class Write:
return AccessOutcomeWrite(wacm)
+# Outcome
+
+E = TypeVar("E", bound=Exception)
+T = TypeVar("T", bound=object)
+
+
+class OutcomeCore[T]:
+ @property
+ def ok(self) -> bool:
+ raise NotImplementedError("ok is not implemented")
+
+ @property
+ def name(self) -> str | None:
+ raise NotImplementedError("name is not implemented")
+
+ def result_or_none(self) -> T | None:
+ raise NotImplementedError("result_or_none is not implemented")
+
+ def result_or_raise(self, exception_class: type[E] | None = None) -> T:
+ raise NotImplementedError("result_or_raise is not implemented")
+
+ def exception_or_none(self) -> Exception | None:
+ raise NotImplementedError("exception_or_none is not implemented")
+
+ def exception_type_or_none(self) -> type[Exception] | None:
+ raise NotImplementedError("exception_type_or_none is not implemented")
+
+
+class OutcomeResult[T](OutcomeCore[T]):
+ __result: T
+
+ def __init__(self, result: T, name: str | None = None):
+ self.__result = result
+ self.__name = name
+
+ @property
+ def ok(self) -> bool:
+ return True
+
+ @property
+ def name(self) -> str | None:
+ return self.__name
+
+ def result_or_none(self) -> T | None:
+ return self.__result
+
+ def result_or_raise(self, exception_class: type[Exception] | None = None)
-> T:
+ return self.__result
+
+ def exception_or_none(self) -> Exception | None:
+ return None
+
+ def exception_type_or_none(self) -> type[Exception] | None:
+ return None
+
+
+class OutcomeError[T, E: Exception](OutcomeCore[T]):
+ __exception: E
+
+ def __init__(self, exception: E, name: str | None = None):
+ self.__exception = exception
+ self.__name = name
+
+ @property
+ def ok(self) -> bool:
+ return False
+
+ @property
+ def name(self) -> str | None:
+ return self.__name
+
+ def result_or_none(self) -> T | None:
+ return None
+
+ def result_or_raise(self, exception_class: type[Exception] | None = None)
-> NoReturn:
+ if exception_class is not None:
+ raise exception_class(str(self.__exception)) from self.__exception
+ raise self.__exception
+
+ def exception_or_none(self) -> Exception | None:
+ return self.__exception
+
+ def exception_type_or_none(self) -> type[Exception] | None:
+ return type(self.__exception)
+
+
+class Outcomes[T]:
+ __outcomes: list[OutcomeResult[T] | OutcomeError[T, Exception]]
+
+ def __init__(self, *outcomes: OutcomeResult[T] | OutcomeError[T,
Exception]):
+ self.__outcomes = list(outcomes)
+
+ @property
+ def any_ok(self) -> bool:
+ return any(outcome.ok for outcome in self.__outcomes)
+
+ @property
+ def all_ok(self) -> bool:
+ return all(outcome.ok for outcome in self.__outcomes)
+
+ def append(self, result_or_error: T | Exception, name: str | None = None)
-> None:
+ if isinstance(result_or_error, Exception):
+ self.__outcomes.append(OutcomeError(result_or_error, name))
+ else:
+ self.__outcomes.append(OutcomeResult(result_or_error, name))
+
+ def extend(self, result_or_error_list: Sequence[T | Exception]) -> None:
+ for result_or_error in result_or_error_list:
+ self.append(result_or_error)
+
+ @property
+ def exception_count(self) -> int:
+ return sum(1 for outcome in self.__outcomes if isinstance(outcome,
OutcomeError))
+
+ def exceptions(self) -> list[Exception]:
+ exceptions_list = []
+ for outcome in self.__outcomes:
+ if isinstance(outcome, OutcomeError):
+ exception_or_none = outcome.exception_or_none()
+ if exception_or_none is not None:
+ exceptions_list.append(exception_or_none)
+ return exceptions_list
+
+ def named_results(self) -> dict[str, T]:
+ named = {}
+ for outcome in self.__outcomes:
+ if (outcome.name is not None) and outcome.ok:
+ named[outcome.name] = outcome.result_or_raise()
+ return named
+
+ def names(self) -> list[str | None]:
+ return [outcome.name for outcome in self.__outcomes if (outcome.name
is not None)]
+
+ def results_or_raise(self, exception_class: type[Exception] | None = None)
-> list[T]:
+ return [outcome.result_or_raise(exception_class) for outcome in
self.__outcomes]
+
+ def results(self) -> list[T]:
+ return [outcome.result_or_raise() for outcome in self.__outcomes if
outcome.ok]
+
+ @property
+ def ok_count(self) -> int:
+ return sum(1 for outcome in self.__outcomes if outcome.ok)
+
+ def outcomes(self) -> list[OutcomeResult[T] | OutcomeError[T, Exception]]:
+ return self.__outcomes
+
+ def update_results(self, f: Callable[[T], T]) -> None:
+ for i, outcome in enumerate(self.__outcomes):
+ if isinstance(outcome, OutcomeResult):
+ try:
+ result = f(outcome.result_or_raise())
+ except Exception as e:
+ self.__outcomes[i] = OutcomeError(e, outcome.name)
+ else:
+ self.__outcomes[i] = OutcomeResult(result, outcome.name)
+
+ # def update_results_batch(self, f: Callable[[Sequence[T]], Sequence[T]])
-> None:
+ # results = self.collect_results()
+ # new_results = iter(f(results))
+ # new_outcomes = []
+ # for outcome in self.__outcomes:
+ # match outcome:
+ # case OutcomeResult():
+ # new_outcomes.append(OutcomeResult(next(new_results),
outcome.name))
+ # case OutcomeError():
+ #
new_outcomes.append(OutcomeError(outcome.exception_or_none(), outcome.name))
+ # self.__outcomes[:] = new_outcomes
+
+ # def update_results_stream(self, f: Callable[[Sequence[T]], Generator[T |
Exception]]) -> None:
+ # results = self.collect_results()
+ # new_results = f(results)
+ # new_outcomes = []
+ # for outcome in self.__outcomes:
+ # match outcome:
+ # case OutcomeResult():
+ # new_outcomes.append(OutcomeResult(next(new_results),
outcome.name))
+ # case OutcomeError():
+ #
new_outcomes.append(OutcomeError(outcome.exception_or_none(), outcome.name))
+ # self.__outcomes[:] = new_outcomes
+
+
# Context managers
diff --git a/atr/storage/writers/keys.py b/atr/storage/writers/keys.py
index 6a7cdf6..4ba7f77 100644
--- a/atr/storage/writers/keys.py
+++ b/atr/storage/writers/keys.py
@@ -18,15 +18,40 @@
# Removing this will cause circular imports
from __future__ import annotations
+import asyncio
import logging
-from typing import TYPE_CHECKING
+import tempfile
+from typing import TYPE_CHECKING, NoReturn
+
+import pgpy
+import pgpy.constants as constants
+import sqlalchemy.dialects.sqlite as sqlite
import atr.db as db
+import atr.models.sql as sql
import atr.storage as storage
+import atr.user as user
import atr.util as util
if TYPE_CHECKING:
- from collections.abc import Generator
+ KeyOutcomes = storage.Outcomes[sql.PublicSigningKey]
+
+
+class PostParseError(Exception):
+ def __init__(self, key: sql.PublicSigningKey, original_error: Exception):
+ self.__key = key
+ self.__original_error = original_error
+
+ def __str__(self) -> str:
+ return f"PostParseError: {self.__original_error}"
+
+ @property
+ def key(self) -> sql.PublicSigningKey:
+ return self.__key
+
+ @property
+ def original_error(self) -> Exception:
+ return self.__original_error
class CommitteeMember:
@@ -40,15 +65,146 @@ class CommitteeMember:
self.__data = data
self.__asf_uid = asf_uid
self.__committee_name = committee_name
+ self.__key_block_models_cache = {}
+
+ async def committee(self) -> sql.Committee:
+ return await self.__data.committee(name=self.__committee_name,
_public_signing_keys=True).demand(
+ storage.AccessError(f"Committee not found:
{self.__committee_name}")
+ )
- def upload(self, keys_file_text: str) -> Generator[str]:
- for key_block in util.parse_key_blocks(keys_file_text):
+ async def upload(self, keys_file_text: str) -> KeyOutcomes:
+ outcomes = storage.Outcomes[sql.PublicSigningKey]()
+ try:
+ ldap_data = await util.email_to_uid_map()
+ key_blocks = util.parse_key_blocks(keys_file_text)
+ except Exception as e:
+ outcomes.append(e)
+ return outcomes
+ for key_block in key_blocks:
try:
- for fingerprint in self.__load_key_block(key_block):
- yield fingerprint.lower()
+ key_models = await asyncio.to_thread(self.__block_models,
key_block, ldap_data)
+ outcomes.extend(key_models)
except Exception as e:
- logging.error(f"Error loading key block: {e}")
+ outcomes.append(e)
+ # Try adding the keys to the database
+ # If not, all keys will be replaced with a PostParseError
+ return await self.__database_add_models(outcomes)
+
+ def __block_models(self, key_block: str, ldap_data: dict[str, str]) ->
list[sql.PublicSigningKey | Exception]:
+ # This cache is only held for the session
+ if key_block in self.__key_block_models_cache:
+ return self.__key_block_models_cache[key_block]
+
+ with tempfile.NamedTemporaryFile(delete=True) as tmpfile:
+ tmpfile.write(key_block.encode())
+ tmpfile.flush()
+ keyring = pgpy.PGPKeyring()
+ fingerprints = keyring.load(tmpfile.name)
+ models = []
+ for fingerprint in fingerprints:
+ try:
+ model = self.__keyring_fingerprint_model(keyring,
fingerprint, ldap_data)
+ if model is None:
+ # Was not a primary key, so skip it
+ continue
+ models.append(model)
+ except Exception as e:
+ models.append(e)
+ self.__key_block_models_cache[key_block] = models
+ return models
+
+ async def __database_add_models(self, outcomes: KeyOutcomes) ->
KeyOutcomes:
+ # Try to upsert all models and link to the committee in one transaction
+ try:
+ key_models = outcomes.results()
+
+ await self.__data.begin_immediate()
+ committee = await self.committee()
+
+ persisted_fingerprints: set[str] = set()
+ for model in key_models:
+ merged_key: sql.PublicSigningKey = await
self.__data.merge(model)
+ persisted_fingerprints.add(merged_key.fingerprint)
+ await self.__data.flush()
+
+ existing_fingerprints = {k.fingerprint for k in
committee.public_signing_keys}
+ new_fingerprints = persisted_fingerprints - existing_fingerprints
+
+ if new_fingerprints:
+ insert_values = [
+ {"committee_name": self.__committee_name,
"key_fingerprint": fp} for fp in new_fingerprints
+ ]
+ stmt = sqlite.insert(sql.KeyLink).values(insert_values)
+ stmt =
stmt.on_conflict_do_nothing(index_elements=["committee_name",
"key_fingerprint"])
+ await self.__data.execute(stmt)
+
+ await self.__data.commit()
+ except Exception as e:
+ # This logging is just so that ruff does not erase e
+ logging.info(f"Post-parse error: {e}")
+
+ def raise_post_parse_error(model: sql.PublicSigningKey) ->
NoReturn:
+ nonlocal e
+ raise PostParseError(model, e)
+
+ outcomes.update_results(raise_post_parse_error)
+ return outcomes
+
+ def __keyring_fingerprint_model(
+ self, keyring: pgpy.PGPKeyring, fingerprint: str, ldap_data: dict[str,
str]
+ ) -> sql.PublicSigningKey | None:
+ with keyring.key(fingerprint) as key:
+ if not key.is_primary:
+ return None
+ uids = [uid.userid for uid in key.userids]
+ asf_uid = self.__uids_asf_uid(uids, ldap_data)
+ key_size = key.key_size
+ length = 0
+ if isinstance(key_size, constants.EllipticCurveOID):
+ if isinstance(key_size.key_size, int):
+ length = key_size.key_size
+ else:
+ raise ValueError(f"Key size is not an integer:
{type(key_size.key_size)}, {key_size.key_size}")
+ elif isinstance(key_size, int):
+ length = key_size
+ else:
+ raise ValueError(f"Key size is not an integer:
{type(key_size)}, {key_size}")
+ return sql.PublicSigningKey(
+ fingerprint=str(key.fingerprint).lower(),
+ algorithm=key.key_algorithm.value,
+ length=length,
+ created=key.created,
+ latest_self_signature=key.expires_at,
+ expires=key.expires_at,
+ primary_declared_uid=uids[0],
+ secondary_declared_uids=uids[1:],
+ apache_uid=asf_uid,
+ ascii_armored_key=str(key),
+ )
+
+ def __uids_asf_uid(self, uids: list[str], ldap_data: dict[str, str]) ->
str | None:
+ # Test data
+ test_key_uids = [
+ "Apache Tooling (For test use only)
<[email protected]>",
+ ]
+ is_admin = user.is_admin(self.__asf_uid)
+ if (uids == test_key_uids) and is_admin:
+ # Allow the test key
+ # TODO: We should fix the test key, not add an exception for it
+ # But the admin check probably makes this safe enough
+ return self.__asf_uid
- def __load_key_block(self, key_block: str) -> Generator[str]:
- if False:
- yield ""
+ # Regular data
+ emails = []
+ for uid in uids:
+ # This returns a lower case email address, whatever the case of
the input
+ if email := util.email_from_uid(uid):
+ if email.endswith("@apache.org"):
+ return email.removesuffix("@apache.org")
+ emails.append(email)
+ # We did not find a direct @apache.org email address
+ # Therefore, search cached LDAP data
+ for email in emails:
+ if email in ldap_data:
+ return ldap_data[email]
+ return None
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]