This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new e8d0162  Add storage code to upload a KEYS file
e8d0162 is described below

commit e8d0162d4eab008ad171ca87f7adcca526a2cfa3
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jul 18 17:13:21 2025 +0100

    Add storage code to upload a KEYS file
---
 atr/blueprints/admin/admin.py |   6 +-
 atr/storage/__init__.py       | 189 +++++++++++++++++++++++++++++++++++++++++-
 atr/storage/writers/keys.py   | 176 ++++++++++++++++++++++++++++++++++++---
 3 files changed, 357 insertions(+), 14 deletions(-)

diff --git a/atr/blueprints/admin/admin.py b/atr/blueprints/admin/admin.py
index 3c46c88..73ae8a6 100644
--- a/atr/blueprints/admin/admin.py
+++ b/atr/blueprints/admin/admin.py
@@ -660,9 +660,11 @@ async def admin_test() -> quart.wrappers.response.Response:
     async with storage.write(asf_uid) as write:
         wacm = write.as_committee_member("tooling").writer_or_raise()
         start = time.perf_counter_ns()
-        fingerprints = list(wacm.keys.upload(keys_file_text))
+        outcomes = await wacm.keys.upload(keys_file_text)
         end = time.perf_counter_ns()
-        logging.info(f"Upload of {len(fingerprints)} keys took {end - start} 
ns")
+        logging.info(f"Upload of {outcomes.ok_count} keys took {end - start} 
ns")
+    for oe in outcomes.exceptions():
+        logging.error(f"Error uploading key: {oe}")
     return quart.Response(str(wacm), mimetype="text/plain")
 
 
diff --git a/atr/storage/__init__.py b/atr/storage/__init__.py
index db96f61..36fed44 100644
--- a/atr/storage/__init__.py
+++ b/atr/storage/__init__.py
@@ -15,9 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from __future__ import annotations
+
 import contextlib
-from collections.abc import AsyncGenerator
-from typing import Final, TypeVar
+from typing import TYPE_CHECKING, Final, NoReturn, TypeVar
+
+if TYPE_CHECKING:
+    from collections.abc import AsyncGenerator, Callable, Sequence
 
 import atr.db as db
 import atr.storage.writers as writers
@@ -214,6 +218,187 @@ class Write:
         return AccessOutcomeWrite(wacm)
 
 
+# Outcome
+
+E = TypeVar("E", bound=Exception)
+T = TypeVar("T", bound=object)
+
+
+class OutcomeCore[T]:
+    @property
+    def ok(self) -> bool:
+        raise NotImplementedError("ok is not implemented")
+
+    @property
+    def name(self) -> str | None:
+        raise NotImplementedError("name is not implemented")
+
+    def result_or_none(self) -> T | None:
+        raise NotImplementedError("result_or_none is not implemented")
+
+    def result_or_raise(self, exception_class: type[E] | None = None) -> T:
+        raise NotImplementedError("result_or_raise is not implemented")
+
+    def exception_or_none(self) -> Exception | None:
+        raise NotImplementedError("exception_or_none is not implemented")
+
+    def exception_type_or_none(self) -> type[Exception] | None:
+        raise NotImplementedError("exception_type_or_none is not implemented")
+
+
+class OutcomeResult[T](OutcomeCore[T]):
+    __result: T
+
+    def __init__(self, result: T, name: str | None = None):
+        self.__result = result
+        self.__name = name
+
+    @property
+    def ok(self) -> bool:
+        return True
+
+    @property
+    def name(self) -> str | None:
+        return self.__name
+
+    def result_or_none(self) -> T | None:
+        return self.__result
+
+    def result_or_raise(self, exception_class: type[Exception] | None = None) 
-> T:
+        return self.__result
+
+    def exception_or_none(self) -> Exception | None:
+        return None
+
+    def exception_type_or_none(self) -> type[Exception] | None:
+        return None
+
+
+class OutcomeError[T, E: Exception](OutcomeCore[T]):
+    __exception: E
+
+    def __init__(self, exception: E, name: str | None = None):
+        self.__exception = exception
+        self.__name = name
+
+    @property
+    def ok(self) -> bool:
+        return False
+
+    @property
+    def name(self) -> str | None:
+        return self.__name
+
+    def result_or_none(self) -> T | None:
+        return None
+
+    def result_or_raise(self, exception_class: type[Exception] | None = None) 
-> NoReturn:
+        if exception_class is not None:
+            raise exception_class(str(self.__exception)) from self.__exception
+        raise self.__exception
+
+    def exception_or_none(self) -> Exception | None:
+        return self.__exception
+
+    def exception_type_or_none(self) -> type[Exception] | None:
+        return type(self.__exception)
+
+
+class Outcomes[T]:
+    __outcomes: list[OutcomeResult[T] | OutcomeError[T, Exception]]
+
+    def __init__(self, *outcomes: OutcomeResult[T] | OutcomeError[T, 
Exception]):
+        self.__outcomes = list(outcomes)
+
+    @property
+    def any_ok(self) -> bool:
+        return any(outcome.ok for outcome in self.__outcomes)
+
+    @property
+    def all_ok(self) -> bool:
+        return all(outcome.ok for outcome in self.__outcomes)
+
+    def append(self, result_or_error: T | Exception, name: str | None = None) 
-> None:
+        if isinstance(result_or_error, Exception):
+            self.__outcomes.append(OutcomeError(result_or_error, name))
+        else:
+            self.__outcomes.append(OutcomeResult(result_or_error, name))
+
+    def extend(self, result_or_error_list: Sequence[T | Exception]) -> None:
+        for result_or_error in result_or_error_list:
+            self.append(result_or_error)
+
+    @property
+    def exception_count(self) -> int:
+        return sum(1 for outcome in self.__outcomes if isinstance(outcome, 
OutcomeError))
+
+    def exceptions(self) -> list[Exception]:
+        exceptions_list = []
+        for outcome in self.__outcomes:
+            if isinstance(outcome, OutcomeError):
+                exception_or_none = outcome.exception_or_none()
+                if exception_or_none is not None:
+                    exceptions_list.append(exception_or_none)
+        return exceptions_list
+
+    def named_results(self) -> dict[str, T]:
+        named = {}
+        for outcome in self.__outcomes:
+            if (outcome.name is not None) and outcome.ok:
+                named[outcome.name] = outcome.result_or_raise()
+        return named
+
+    def names(self) -> list[str | None]:
+        return [outcome.name for outcome in self.__outcomes if (outcome.name 
is not None)]
+
+    def results_or_raise(self, exception_class: type[Exception] | None = None) 
-> list[T]:
+        return [outcome.result_or_raise(exception_class) for outcome in 
self.__outcomes]
+
+    def results(self) -> list[T]:
+        return [outcome.result_or_raise() for outcome in self.__outcomes if 
outcome.ok]
+
+    @property
+    def ok_count(self) -> int:
+        return sum(1 for outcome in self.__outcomes if outcome.ok)
+
+    def outcomes(self) -> list[OutcomeResult[T] | OutcomeError[T, Exception]]:
+        return self.__outcomes
+
+    def update_results(self, f: Callable[[T], T]) -> None:
+        for i, outcome in enumerate(self.__outcomes):
+            if isinstance(outcome, OutcomeResult):
+                try:
+                    result = f(outcome.result_or_raise())
+                except Exception as e:
+                    self.__outcomes[i] = OutcomeError(e, outcome.name)
+                else:
+                    self.__outcomes[i] = OutcomeResult(result, outcome.name)
+
+    # def update_results_batch(self, f: Callable[[Sequence[T]], Sequence[T]]) 
-> None:
+    #     results = self.collect_results()
+    #     new_results = iter(f(results))
+    #     new_outcomes = []
+    #     for outcome in self.__outcomes:
+    #         match outcome:
+    #             case OutcomeResult():
+    #                 new_outcomes.append(OutcomeResult(next(new_results), 
outcome.name))
+    #             case OutcomeError():
+    #                 
new_outcomes.append(OutcomeError(outcome.exception_or_none(), outcome.name))
+    #     self.__outcomes[:] = new_outcomes
+
+    # def update_results_stream(self, f: Callable[[Sequence[T]], Generator[T | 
Exception]]) -> None:
+    #     results = self.collect_results()
+    #     new_results = f(results)
+    #     new_outcomes = []
+    #     for outcome in self.__outcomes:
+    #         match outcome:
+    #             case OutcomeResult():
+    #                 new_outcomes.append(OutcomeResult(next(new_results), 
outcome.name))
+    #             case OutcomeError():
+    #                 
new_outcomes.append(OutcomeError(outcome.exception_or_none(), outcome.name))
+    #     self.__outcomes[:] = new_outcomes
+
+
 # Context managers
 
 
diff --git a/atr/storage/writers/keys.py b/atr/storage/writers/keys.py
index 6a7cdf6..4ba7f77 100644
--- a/atr/storage/writers/keys.py
+++ b/atr/storage/writers/keys.py
@@ -18,15 +18,40 @@
 # Removing this will cause circular imports
 from __future__ import annotations
 
+import asyncio
 import logging
-from typing import TYPE_CHECKING
+import tempfile
+from typing import TYPE_CHECKING, NoReturn
+
+import pgpy
+import pgpy.constants as constants
+import sqlalchemy.dialects.sqlite as sqlite
 
 import atr.db as db
+import atr.models.sql as sql
 import atr.storage as storage
+import atr.user as user
 import atr.util as util
 
 if TYPE_CHECKING:
-    from collections.abc import Generator
+    KeyOutcomes = storage.Outcomes[sql.PublicSigningKey]
+
+
+class PostParseError(Exception):
+    def __init__(self, key: sql.PublicSigningKey, original_error: Exception):
+        self.__key = key
+        self.__original_error = original_error
+
+    def __str__(self) -> str:
+        return f"PostParseError: {self.__original_error}"
+
+    @property
+    def key(self) -> sql.PublicSigningKey:
+        return self.__key
+
+    @property
+    def original_error(self) -> Exception:
+        return self.__original_error
 
 
 class CommitteeMember:
@@ -40,15 +65,146 @@ class CommitteeMember:
         self.__data = data
         self.__asf_uid = asf_uid
         self.__committee_name = committee_name
+        self.__key_block_models_cache = {}
+
+    async def committee(self) -> sql.Committee:
+        return await self.__data.committee(name=self.__committee_name, 
_public_signing_keys=True).demand(
+            storage.AccessError(f"Committee not found: 
{self.__committee_name}")
+        )
 
-    def upload(self, keys_file_text: str) -> Generator[str]:
-        for key_block in util.parse_key_blocks(keys_file_text):
+    async def upload(self, keys_file_text: str) -> KeyOutcomes:
+        outcomes = storage.Outcomes[sql.PublicSigningKey]()
+        try:
+            ldap_data = await util.email_to_uid_map()
+            key_blocks = util.parse_key_blocks(keys_file_text)
+        except Exception as e:
+            outcomes.append(e)
+            return outcomes
+        for key_block in key_blocks:
             try:
-                for fingerprint in self.__load_key_block(key_block):
-                    yield fingerprint.lower()
+                key_models = await asyncio.to_thread(self.__block_models, 
key_block, ldap_data)
+                outcomes.extend(key_models)
             except Exception as e:
-                logging.error(f"Error loading key block: {e}")
+                outcomes.append(e)
+        # Try adding the keys to the database
+        # If not, all keys will be replaced with a PostParseError
+        return await self.__database_add_models(outcomes)
+
+    def __block_models(self, key_block: str, ldap_data: dict[str, str]) -> 
list[sql.PublicSigningKey | Exception]:
+        # This cache is only held for the session
+        if key_block in self.__key_block_models_cache:
+            return self.__key_block_models_cache[key_block]
+
+        with tempfile.NamedTemporaryFile(delete=True) as tmpfile:
+            tmpfile.write(key_block.encode())
+            tmpfile.flush()
+            keyring = pgpy.PGPKeyring()
+            fingerprints = keyring.load(tmpfile.name)
+            models = []
+            for fingerprint in fingerprints:
+                try:
+                    model = self.__keyring_fingerprint_model(keyring, 
fingerprint, ldap_data)
+                    if model is None:
+                        # Was not a primary key, so skip it
+                        continue
+                    models.append(model)
+                except Exception as e:
+                    models.append(e)
+            self.__key_block_models_cache[key_block] = models
+            return models
+
+    async def __database_add_models(self, outcomes: KeyOutcomes) -> 
KeyOutcomes:
+        # Try to upsert all models and link to the committee in one transaction
+        try:
+            key_models = outcomes.results()
+
+            await self.__data.begin_immediate()
+            committee = await self.committee()
+
+            persisted_fingerprints: set[str] = set()
+            for model in key_models:
+                merged_key: sql.PublicSigningKey = await 
self.__data.merge(model)
+                persisted_fingerprints.add(merged_key.fingerprint)
+            await self.__data.flush()
+
+            existing_fingerprints = {k.fingerprint for k in 
committee.public_signing_keys}
+            new_fingerprints = persisted_fingerprints - existing_fingerprints
+
+            if new_fingerprints:
+                insert_values = [
+                    {"committee_name": self.__committee_name, 
"key_fingerprint": fp} for fp in new_fingerprints
+                ]
+                stmt = sqlite.insert(sql.KeyLink).values(insert_values)
+                stmt = 
stmt.on_conflict_do_nothing(index_elements=["committee_name", 
"key_fingerprint"])
+                await self.__data.execute(stmt)
+
+            await self.__data.commit()
+        except Exception as e:
+            # This logging is just so that ruff does not erase e
+            logging.info(f"Post-parse error: {e}")
+
+            def raise_post_parse_error(model: sql.PublicSigningKey) -> 
NoReturn:
+                nonlocal e
+                raise PostParseError(model, e)
+
+            outcomes.update_results(raise_post_parse_error)
+        return outcomes
+
+    def __keyring_fingerprint_model(
+        self, keyring: pgpy.PGPKeyring, fingerprint: str, ldap_data: dict[str, 
str]
+    ) -> sql.PublicSigningKey | None:
+        with keyring.key(fingerprint) as key:
+            if not key.is_primary:
+                return None
+            uids = [uid.userid for uid in key.userids]
+            asf_uid = self.__uids_asf_uid(uids, ldap_data)
+            key_size = key.key_size
+            length = 0
+            if isinstance(key_size, constants.EllipticCurveOID):
+                if isinstance(key_size.key_size, int):
+                    length = key_size.key_size
+                else:
+                    raise ValueError(f"Key size is not an integer: 
{type(key_size.key_size)}, {key_size.key_size}")
+            elif isinstance(key_size, int):
+                length = key_size
+            else:
+                raise ValueError(f"Key size is not an integer: 
{type(key_size)}, {key_size}")
+            return sql.PublicSigningKey(
+                fingerprint=str(key.fingerprint).lower(),
+                algorithm=key.key_algorithm.value,
+                length=length,
+                created=key.created,
+                latest_self_signature=key.expires_at,
+                expires=key.expires_at,
+                primary_declared_uid=uids[0],
+                secondary_declared_uids=uids[1:],
+                apache_uid=asf_uid,
+                ascii_armored_key=str(key),
+            )
+
+    def __uids_asf_uid(self, uids: list[str], ldap_data: dict[str, str]) -> 
str | None:
+        # Test data
+        test_key_uids = [
+            "Apache Tooling (For test use only) 
<[email protected]>",
+        ]
+        is_admin = user.is_admin(self.__asf_uid)
+        if (uids == test_key_uids) and is_admin:
+            # Allow the test key
+            # TODO: We should fix the test key, not add an exception for it
+            # But the admin check probably makes this safe enough
+            return self.__asf_uid
 
-    def __load_key_block(self, key_block: str) -> Generator[str]:
-        if False:
-            yield ""
+        # Regular data
+        emails = []
+        for uid in uids:
+            # This returns a lower case email address, whatever the case of 
the input
+            if email := util.email_from_uid(uid):
+                if email.endswith("@apache.org"):
+                    return email.removesuffix("@apache.org")
+                emails.append(email)
+        # We did not find a direct @apache.org email address
+        # Therefore, search cached LDAP data
+        for email in emails:
+            if email in ldap_data:
+                return ldap_data[email]
+        return None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to