This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 5e2550b Add a cache module with admin functions, and tests
5e2550b is described below
commit 5e2550b86b735cf07c8cec3d5beffe3b98f571e9
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jan 23 15:48:58 2026 +0000
Add a cache module with admin functions, and tests
---
atr/cache.py | 97 ++++++++++++++++++++++++++++++++++
tests/unit/test_cache.py | 132 +++++++++++++++++++++++++++++++++++++++++++++++
tests/unit/test_util.py | 107 +++++++++++++++++---------------------
3 files changed, 276 insertions(+), 60 deletions(-)
diff --git a/atr/cache.py b/atr/cache.py
new file mode 100644
index 0000000..fef42c5
--- /dev/null
+++ b/atr/cache.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import datetime
+import pathlib
+from typing import Final
+
+import aiofiles
+import asfquart
+import pydantic
+
+import atr.config as config
+import atr.ldap as ldap
+import atr.log as log
+import atr.models.schema as schema
+
+# Fifth prime after 3600
+ADMINS_POLL_INTERVAL_SECONDS: Final[int] = 3631
+
+
+class AdminsCache(schema.Strict):
+ refreshed: datetime.datetime = schema.description("When the cache was last
refreshed")
+ admins: frozenset[str] = schema.description("Set of admin user IDs from
LDAP")
+
+
+async def admins_read_from_file() -> AdminsCache | None:
+ cache_path = _admins_path()
+ if not cache_path.exists():
+ return None
+ try:
+ async with aiofiles.open(cache_path) as f:
+ raw_data = await f.read()
+ return AdminsCache.model_validate_json(raw_data)
+ except (pydantic.ValidationError, OSError) as e:
+ log.warning(f"Failed to read admin users cache: {e}")
+ return None
+
+
+async def admins_refresh_loop() -> None:
+ while True:
+ await asyncio.sleep(ADMINS_POLL_INTERVAL_SECONDS)
+ try:
+ users = await ldap.fetch_admin_users()
+ await admins_save_to_file(users)
+ _admins_update_app_extensions(users)
+ log.info(f"Admin users cache refreshed: {len(users)} users")
+ except Exception as e:
+ log.warning(f"Admin refresh failed: {e}")
+
+
+async def admins_save_to_file(admins: frozenset[str]) -> None:
+ cache_path = _admins_path()
+ cache_path.parent.mkdir(parents=True, exist_ok=True)
+ cache_data = AdminsCache(refreshed=datetime.datetime.now(datetime.UTC),
admins=admins)
+ async with aiofiles.open(cache_path, "w") as f:
+ await f.write(cache_data.model_dump_json())
+
+
+async def admins_startup_load() -> None:
+ cache_data = await admins_read_from_file()
+ if cache_data is not None:
+ _admins_update_app_extensions(cache_data.admins)
+ log.info(f"Loaded {len(cache_data.admins)} admin users from cache
(refreshed: {cache_data.refreshed})")
+ return
+ log.info("No admin users cache found, fetching from LDAP")
+ try:
+ users = await ldap.fetch_admin_users()
+ await admins_save_to_file(users)
+ _admins_update_app_extensions(users)
+ log.info(f"Fetched {len(users)} admin users from LDAP")
+ except Exception as e:
+ log.warning(f"Failed to fetch admin users from LDAP at startup: {e}")
+
+
+def _admins_path() -> pathlib.Path:
+ return pathlib.Path(config.get().STATE_DIR) / "cache" / "admins.json"
+
+
+def _admins_update_app_extensions(admins: frozenset[str]) -> None:
+ app = asfquart.APP
+ app.extensions["admins"] = admins
+ app.extensions["admins_refreshed"] = datetime.datetime.now(datetime.UTC)
diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py
new file mode 100644
index 0000000..4aba1f9
--- /dev/null
+++ b/tests/unit/test_cache.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import json
+import pathlib
+from typing import TYPE_CHECKING
+
+import pydantic
+import pytest
+
+import atr.cache as cache
+
+if TYPE_CHECKING:
+ from pytest import MonkeyPatch
+
+
+class _MockConfig:
+ def __init__(self, state_dir: pathlib.Path):
+ self.STATE_DIR = str(state_dir)
+
+
[email protected]
+def state_dir(tmp_path: pathlib.Path, monkeypatch: "MonkeyPatch") ->
pathlib.Path:
+ monkeypatch.setattr("atr.config.get", lambda: _MockConfig(tmp_path))
+ return tmp_path
+
+
+def test_admins_cache_rejects_missing_admins():
+ with pytest.raises(pydantic.ValidationError):
+ cache.AdminsCache.model_validate({"refreshed": "2025-01-01T00:00:00Z"})
+
+
+def test_admins_cache_rejects_missing_refreshed():
+ with pytest.raises(pydantic.ValidationError):
+ cache.AdminsCache.model_validate({"admins": ["alice"]})
+
+
+def test_admins_cache_roundtrip_json():
+ original = cache.AdminsCache(
+ refreshed=datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.UTC),
+ admins=frozenset({"alice", "bob", "charlie"}),
+ )
+ json_str = original.model_dump_json()
+ restored = cache.AdminsCache.model_validate_json(json_str)
+ assert restored.refreshed == original.refreshed
+ assert restored.admins == original.admins
+
+
+def test_admins_cache_serializes_to_json():
+ data = cache.AdminsCache(
+ refreshed=datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.UTC),
+ admins=frozenset({"alice", "bob"}),
+ )
+ json_str = data.model_dump_json()
+ parsed = json.loads(json_str)
+ assert "refreshed" in parsed
+ assert "admins" in parsed
+ assert set(parsed["admins"]) == {"alice", "bob"}
+
+
+def test_admins_cache_validates_with_good_data():
+ data = cache.AdminsCache(
+ refreshed=datetime.datetime.now(datetime.UTC),
+ admins=frozenset({"alice", "bob"}),
+ )
+ assert isinstance(data.refreshed, datetime.datetime)
+ assert data.admins == frozenset({"alice", "bob"})
+
+
[email protected]
+async def test_admins_read_from_file_returns_none_for_invalid_json(state_dir:
pathlib.Path):
+ cache_path = state_dir / "cache" / "admins.json"
+ cache_path.parent.mkdir(parents=True)
+ cache_path.write_text("not valid json {{{")
+ result = await cache.admins_read_from_file()
+ assert result is None
+
+
[email protected]
+async def
test_admins_read_from_file_returns_none_for_invalid_schema(state_dir:
pathlib.Path):
+ cache_path = state_dir / "cache" / "admins.json"
+ cache_path.parent.mkdir(parents=True)
+ cache_path.write_text('{"wrong_field": "value"}')
+ result = await cache.admins_read_from_file()
+ assert result is None
+
+
[email protected]
+async def test_admins_read_from_file_returns_none_for_missing(state_dir:
pathlib.Path):
+ result = await cache.admins_read_from_file()
+ assert result is None
+
+
[email protected]
+async def test_admins_save_and_read_roundtrip(state_dir: pathlib.Path):
+ original_admins = frozenset({"alice", "bob", "charlie"})
+ await cache.admins_save_to_file(original_admins)
+ result = await cache.admins_read_from_file()
+ assert result is not None
+ assert result.admins == original_admins
+
+
[email protected]
+async def test_admins_save_to_file_creates_file(state_dir: pathlib.Path):
+ admins = frozenset({"alice", "bob"})
+ await cache.admins_save_to_file(admins)
+ cache_path = state_dir / "cache" / "admins.json"
+ assert cache_path.exists()
+
+
[email protected]
+async def test_admins_save_to_file_creates_parent_dirs(state_dir:
pathlib.Path):
+ cache_dir = state_dir / "cache"
+ assert not cache_dir.exists()
+ await cache.admins_save_to_file(frozenset({"alice"}))
+ assert cache_dir.exists()
+ assert cache_dir.is_dir()
diff --git a/tests/unit/test_util.py b/tests/unit/test_util.py
index 0a8c814..3253aa0 100644
--- a/tests/unit/test_util.py
+++ b/tests/unit/test_util.py
@@ -18,87 +18,74 @@
import os
import pathlib
import stat
-import tempfile
import atr.util as util
-def test_chmod_files_does_not_change_directory_permissions():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = pathlib.Path(tmp_dir)
- subdir = tmp_path / "subdir"
- subdir.mkdir()
- os.chmod(subdir, 0o700)
- test_file = subdir / "test.txt"
- test_file.write_text("content")
+def test_chmod_files_does_not_change_directory_permissions(tmp_path:
pathlib.Path):
+ subdir = tmp_path / "subdir"
+ subdir.mkdir()
+ os.chmod(subdir, 0o700)
+ test_file = subdir / "test.txt"
+ test_file.write_text("content")
- util.chmod_files(tmp_path, 0o444)
+ util.chmod_files(tmp_path, 0o444)
- dir_mode = stat.S_IMODE(subdir.stat().st_mode)
- assert dir_mode == 0o700
+ dir_mode = stat.S_IMODE(subdir.stat().st_mode)
+ assert dir_mode == 0o700
-def test_chmod_files_handles_empty_directory():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = pathlib.Path(tmp_dir)
- util.chmod_files(tmp_path, 0o444)
+def test_chmod_files_handles_empty_directory(tmp_path: pathlib.Path):
+ util.chmod_files(tmp_path, 0o444)
-def test_chmod_files_handles_multiple_files():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = pathlib.Path(tmp_dir)
- files = [tmp_path / f"file{i}.txt" for i in range(5)]
- for f in files:
- f.write_text("content")
- os.chmod(f, 0o644)
+def test_chmod_files_handles_multiple_files(tmp_path: pathlib.Path):
+ files = [tmp_path / f"file{i}.txt" for i in range(5)]
+ for f in files:
+ f.write_text("content")
+ os.chmod(f, 0o644)
- util.chmod_files(tmp_path, 0o400)
+ util.chmod_files(tmp_path, 0o400)
- for f in files:
- file_mode = stat.S_IMODE(f.stat().st_mode)
- assert file_mode == 0o400
+ for f in files:
+ file_mode = stat.S_IMODE(f.stat().st_mode)
+ assert file_mode == 0o400
-def test_chmod_files_handles_nested_directories():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = pathlib.Path(tmp_dir)
- nested_dir = tmp_path / "subdir" / "nested"
- nested_dir.mkdir(parents=True)
- file1 = tmp_path / "root.txt"
- file2 = tmp_path / "subdir" / "mid.txt"
- file3 = nested_dir / "deep.txt"
- for f in [file1, file2, file3]:
- f.write_text("content")
- os.chmod(f, 0o644)
+def test_chmod_files_handles_nested_directories(tmp_path: pathlib.Path):
+ nested_dir = tmp_path / "subdir" / "nested"
+ nested_dir.mkdir(parents=True)
+ file1 = tmp_path / "root.txt"
+ file2 = tmp_path / "subdir" / "mid.txt"
+ file3 = nested_dir / "deep.txt"
+ for f in [file1, file2, file3]:
+ f.write_text("content")
+ os.chmod(f, 0o644)
- util.chmod_files(tmp_path, 0o444)
+ util.chmod_files(tmp_path, 0o444)
- for f in [file1, file2, file3]:
- file_mode = stat.S_IMODE(f.stat().st_mode)
- assert file_mode == 0o444
+ for f in [file1, file2, file3]:
+ file_mode = stat.S_IMODE(f.stat().st_mode)
+ assert file_mode == 0o444
-def test_chmod_files_sets_custom_permissions():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = pathlib.Path(tmp_dir)
- test_file = tmp_path / "test.txt"
- test_file.write_text("content")
- os.chmod(test_file, 0o644)
+def test_chmod_files_sets_custom_permissions(tmp_path: pathlib.Path):
+ test_file = tmp_path / "test.txt"
+ test_file.write_text("content")
+ os.chmod(test_file, 0o644)
- util.chmod_files(tmp_path, 0o400)
+ util.chmod_files(tmp_path, 0o400)
- file_mode = stat.S_IMODE(test_file.stat().st_mode)
- assert file_mode == 0o400
+ file_mode = stat.S_IMODE(test_file.stat().st_mode)
+ assert file_mode == 0o400
-def test_chmod_files_sets_default_permissions():
- with tempfile.TemporaryDirectory() as tmp_dir:
- tmp_path = pathlib.Path(tmp_dir)
- test_file = tmp_path / "test.txt"
- test_file.write_text("content")
- os.chmod(test_file, 0o644)
+def test_chmod_files_sets_default_permissions(tmp_path: pathlib.Path):
+ test_file = tmp_path / "test.txt"
+ test_file.write_text("content")
+ os.chmod(test_file, 0o644)
- util.chmod_files(tmp_path, 0o444)
+ util.chmod_files(tmp_path, 0o444)
- file_mode = stat.S_IMODE(test_file.stat().st_mode)
- assert file_mode == 0o444
+ file_mode = stat.S_IMODE(test_file.stat().st_mode)
+ assert file_mode == 0o444
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]