This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 501235f  Migrate state files with extensive checks
501235f is described below

commit 501235f3b0fe07523e91f636caba8f08f2c9aec4
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jan 16 16:50:41 2026 +0000

    Migrate state files with extensive checks
---
 atr/server.py | 297 +++++++++++++++++++++++++++++++---------------------------
 1 file changed, 158 insertions(+), 139 deletions(-)

diff --git a/atr/server.py b/atr/server.py
index 9dde3d0..5859c72 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -21,9 +21,11 @@ import asyncio
 import contextlib
 import datetime
 import fcntl
+import multiprocessing
 import os
 import pathlib
 import queue
+import sys
 import urllib.parse
 from collections.abc import Iterable
 from typing import Any, Final
@@ -60,6 +62,19 @@ import atr.util as util
 # We should probably find a cleaner way to do this
 app: base.QuartApp | None = None
 
+# The order of these migrations must be checked carefully to avoid conflicts
+_MIGRATIONS: Final[list[tuple[str, str]]] = [
+    # Audit
+    ("storage-audit.log", "audit/storage-audit.log"),
+    # Cache
+    ("routes.json", "cache/routes.json"),
+    ("user_session_cache.json", "cache/user_session_cache.json"),
+    # Database
+    ("atr.db", "database/atr.db"),
+    ("atr.db-shm", "database/atr.db-shm"),
+    ("atr.db-wal", "database/atr.db-wal"),
+]
+
 _SWAGGER_UI_TEMPLATE: Final[str] = """<!DOCTYPE html>
 <html lang="en">
 <head>
@@ -396,7 +411,17 @@ def _create_app(app_config: type[config.AppConfig]) -> 
base.QuartApp:
     if os.sep != "/":
         raise RuntimeError('ATR requires a POSIX compatible filesystem where 
os.sep is "/"')
     config_mode = config.get_mode()
-    _migrate_state_directory(app_config)
+
+    # Custom configuration for the database path is no longer supported
+    configured_path = app_config.SQLITE_DB_PATH
+    if configured_path != "database/atr.db":
+        print("!!!", file=sys.stderr)
+        print("ERROR: Custom values of SQLITE_DB_PATH are no longer 
supported!", file=sys.stderr)
+        print("Please unset SQLITE_DB_PATH to allow the server to start", 
file=sys.stderr)
+        print("!!!", file=sys.stderr)
+        sys.exit(1)
+
+    _migrate_state(app_config)
     _app_dirs_setup(app_config)
     log.performance_init()
     app = _app_create_base(app_config)
@@ -439,50 +464,6 @@ def _create_app(app_config: type[config.AppConfig]) -> 
base.QuartApp:
     return app
 
 
-def _get_parent_process_age() -> float:
-    import datetime
-    import subprocess
-    import time
-
-    ppid = os.getppid()
-
-    try:
-        with open(f"/proc/{ppid}/stat") as f:
-            stat = f.read().split()
-        starttime_ticks = int(stat[21])
-        ticks_per_sec = os.sysconf("SC_CLK_TCK")
-        with open("/proc/stat") as f:
-            for line in f:
-                if line.startswith("btime "):
-                    boot_time = int(line.split()[1])
-                    break
-            else:
-                return 0.0
-        process_start = boot_time + (starttime_ticks / ticks_per_sec)
-        return time.time() - process_start
-    except (FileNotFoundError, IndexError, ValueError, OSError):
-        pass
-
-    try:
-        result = subprocess.run(
-            ["ps", "-o", "lstart=", "-p", str(ppid)],
-            capture_output=True,
-            text=True,
-        )
-        if result.returncode == 0:
-            start_str = result.stdout.strip()
-            for fmt in ["%a %b %d %H:%M:%S %Y", "%a %d %b %H:%M:%S %Y"]:
-                try:
-                    dt = datetime.datetime.strptime(start_str, fmt)
-                    return time.time() - dt.timestamp()
-                except ValueError:
-                    continue
-    except OSError:
-        pass
-
-    return 0.0
-
-
 async def _initialise_test_environment() -> None:
     if not config.get().ALLOW_TESTS:
         return
@@ -515,100 +496,124 @@ async def _initialise_test_environment() -> None:
             await data.commit()
 
 
-async def _register_recurrent_tasks() -> None:
-    """Schedule recurring tasks"""
-    # Start scheduled tasks 5 min after server start
-    await asyncio.sleep(300)
-    try:
-        await tasks.clear_scheduled()
-        metadata = await tasks.metadata_update(asf_uid="system", 
schedule_next=True)
-        log.info(f"Scheduled remote metadata update with ID {metadata.id}")
-        await asyncio.sleep(60)
-        workflow = await tasks.workflow_update(asf_uid="system", 
schedule_next=True)
-        log.info(f"Scheduled workflow status update with ID {workflow.id}")
+def _is_hot_reload() -> bool:
+    proc = multiprocessing.current_process()
+    if proc.name == "MainProcess":
+        # Reloading is on, but this is the parent process
+        return False
+    if "--reload" not in sys.argv:
+        # Reloading is off
+        return False
+    return True
 
-    except Exception as e:
-        log.exception(f"Failed to schedule recurrent tasks: {e!s}")
 
+def _migrate_path(old_path: pathlib.Path, new_path: pathlib.Path) -> None:
+    # Keep track of ancestor directories that we create
+    root_to_leaf_created: list[pathlib.Path] = []
 
-def _migrate_audit(state_dir: pathlib.Path) -> None:
-    _migrate_file(
-        state_dir / "storage-audit.log",
-        state_dir / "audit" / "storage-audit.log",
-    )
+    try:
+        # Create all ancestor directories of new_path if they do not exist
+        # We keep track of this so that we can attempt to roll back on failure
+        focused_ancestor_directory = new_path.parent
+        leaf_to_root_to_create = []
+        while not focused_ancestor_directory.exists():
+            leaf_to_root_to_create.append(focused_ancestor_directory)
+            focused_ancestor_directory = focused_ancestor_directory.parent
+
+        # It is not safe to run the rest of this function across filesystems
+        # Now that we have the closest existing ancestor, we can check its 
device ID
+        if os.stat(old_path).st_dev != 
os.stat(focused_ancestor_directory).st_dev:
+            raise RuntimeError(f"Cannot migrate across filesystems: {old_path} 
-> {new_path}")
+
+        # Start from the root, and create towards the leaf
+        for ancestor_directory in reversed(leaf_to_root_to_create):
+            ancestor_directory.mkdir()
+            root_to_leaf_created.append(ancestor_directory)
+
+        # Perform the actual migration as safely as possible
+        _migrate_path_by_type(old_path, new_path)
 
+    except Exception as e:
+        # Roll back any created directories from leaf to root
+        for created_directory in reversed(root_to_leaf_created):
+            created_directory.rmdir()
 
-def _migrate_cache(state_dir: pathlib.Path) -> None:
-    _migrate_file(
-        state_dir / "routes.json",
-        state_dir / "cache" / "routes.json",
-    )
-    _migrate_file(
-        state_dir / "user_session_cache.json",
-        state_dir / "cache" / "user_session_cache.json",
-    )
+        if isinstance(e, FileNotFoundError):
+            # We check all paths before attempting to migrate
+            # So if a file mysteriously disappears, we should raise an error
+            raise RuntimeError(f"Migration path disappeared before migration: 
{old_path}") from e
+        raise
 
 
-def _migrate_database(state_dir: pathlib.Path, app_config: 
type[config.AppConfig]) -> None:
-    configured_path = app_config.SQLITE_DB_PATH
-    if configured_path not in ("atr.db", "database/atr.db"):
-        raise RuntimeError(
-            f"SQLITE_DB_PATH is set to '{configured_path}' but migration only 
supports "
-            f"the default value 'atr.db'. Please manually migrate your 
database to "
-            f"'database/atr.db' and update SQLITE_DB_PATH, or remove the 
custom setting."
-        )
-    _migrate_file(
-        state_dir / "atr.db",
-        state_dir / "database" / "atr.db",
-    )
-    for suffix in ["-shm", "-wal"]:
-        old_path = state_dir / f"atr.db{suffix}"
-        new_path = state_dir / "database" / f"atr.db{suffix}"
-        if old_path.exists():
-            _migrate_file(old_path, new_path)
-
-
-def _migrate_directory(old_path: pathlib.Path, new_path: pathlib.Path) -> None:
-    if old_path.exists() and (not new_path.exists()):
-        old_path.rename(new_path)
-        print(f"Migrated directory: {old_path} -> {new_path}")
-    elif old_path.exists() and new_path.exists():
-        raise RuntimeError(f"Migration conflict: both {old_path} and 
{new_path} exist")
-    else:
-        print(f"No directory migration needed: {old_path}")
+def _migrate_path_by_type(old_path: pathlib.Path, new_path: pathlib.Path) -> 
None:
+    # Migrate a regular file
+    if old_path.is_file():
+        try:
+            # Hard linking fails if new_path already exists
+            os.link(old_path, new_path)
+        except FileExistsError:
+            # If the migration was interrupted, there may be two hard links
+            # If they link to the same inode, we can remove old_path
+            # If not, then it's a real conflict
+            if not os.path.samefile(old_path, new_path):
+                # The inodes are different, so this is a real conflict
+                raise RuntimeError(f"Migration conflict: {new_path} already 
exists")
+            # Otherwise, the inodes are the same, so this is a partial 
migration
+            # We fall through to complete the migration, but report the 
detection first
+            print(f"Partial migration detected: {old_path} -> {new_path}")
+
+        # Hard linking was successful, so we can remove old_path
+        try:
+            os.unlink(old_path)
+        except FileNotFoundError:
+            # Some other process must have deleted old_path
+            print(f"Migration path removed by a third party during migration: 
{old_path}")
+            # We do not return here, because the file is migrated
+        print(f"Migrated file: {old_path} -> {new_path}")
 
+    # Migrate a directory
+    elif old_path.is_dir():
+        if new_path.exists():
+            # This is a TOCTOU susceptible check, but os.rename has further 
safeguards
+            raise RuntimeError(f"Migration conflict: {new_path} already 
exists")
+        try:
+            # We assume that old_path is not replaced by a file before this 
rename
+            # If new_path is a file, this raises a NotADirectoryError
+            # If new_path is a directory and not empty, this raises an OSError
+            # If new_path is an empty directory, it will be replaced
+            # (We accept this behaviour, but also have a TOCTOU susceptible 
check above)
+            os.rename(old_path, new_path)
+            print(f"Migrated directory: {old_path} -> {new_path}")
+        except OSError as e:
+            # In this case, new_path was probably a directory and not empty
+            raise RuntimeError(f"Migration conflict: {new_path} already 
exists") from e
 
-def _migrate_file(old_path: pathlib.Path, new_path: pathlib.Path) -> None:
-    if old_path.exists() and (not new_path.exists()):
-        new_path.parent.mkdir(parents=True, exist_ok=True)
-        old_path.rename(new_path)
-        print(f"Migrated file: {old_path} -> {new_path}")
-    elif old_path.exists() and new_path.exists():
-        raise RuntimeError(f"Migration conflict: both {old_path} and 
{new_path} exist")
     else:
-        print(f"No file migration needed: {old_path}")
+        raise RuntimeError(f"Migration path is neither a file nor a directory: 
{old_path}")
 
 
-def _migrate_state_directory(app_config: type[config.AppConfig]) -> None:
+def _migrate_state(app_config: type[config.AppConfig]) -> None:
+    # It's okay to use synchronous code in this function and in any functions 
that it calls
     state_dir = pathlib.Path(app_config.STATE_DIR)
 
-    pending = _pending_migrations(state_dir, app_config)
-    if pending:
-        parent_age = _get_parent_process_age()
-        if parent_age > 10.0:
-            import sys
-
-            print("=" * 70, file=sys.stderr)
-            print("ERROR: State directory migration required but hot reload 
detected", file=sys.stderr)
-            print(f"Parent process age: {parent_age:.1f}s", file=sys.stderr)
-            print("Pending migrations:", file=sys.stderr)
-            for p in pending:
-                print(f"  - {p}", file=sys.stderr)
-            print("", file=sys.stderr)
-            print("Please restart the server, not hot reload, to apply 
migrations", file=sys.stderr)
-            print("=" * 70, file=sys.stderr)
-            sys.exit(1)
+    # Are there migrations to apply?
+    pending_migrations = _pending_migrations(state_dir)
+    if not pending_migrations:
+        return
 
+    # Are we hot reloading?
+    if _is_hot_reload():
+        print("!!!", file=sys.stderr)
+        print("ERROR: Cannot migrate files during hot reload!", 
file=sys.stderr)
+        print("The following files need to be migrated:", file=sys.stderr)
+        for old_path, new_path in sorted(pending_migrations):
+            print(f"  - {old_path} -> {new_path}", file=sys.stderr)
+        print("", file=sys.stderr)
+        print("Restart the server to apply the migrations", file=sys.stderr)
+        print("!!!", file=sys.stderr)
+        sys.exit(1)
+
+    # Are we already migrating?
     runtime_dir = state_dir / "runtime"
     runtime_dir.mkdir(parents=True, exist_ok=True)
     lock_path = runtime_dir / "migration.lock"
@@ -616,28 +621,42 @@ def _migrate_state_directory(app_config: 
type[config.AppConfig]) -> None:
     with open(lock_path, "w") as lock_file:
         fcntl.flock(lock_file, fcntl.LOCK_EX)
         try:
-            _migrate_audit(state_dir)
-            _migrate_cache(state_dir)
-            _migrate_database(state_dir, app_config)
+            _migrate_state_files(state_dir, pending_migrations)
         finally:
             fcntl.flock(lock_file, fcntl.LOCK_UN)
 
 
-def _pending_migrations(state_dir: pathlib.Path, app_config: 
type[config.AppConfig]) -> list[str]:
-    pending = []
-    if (state_dir / "storage-audit.log").exists():
-        pending.append("storage-audit.log -> audit/storage-audit.log")
-    if (state_dir / "routes.json").exists():
-        pending.append("routes.json -> cache/routes.json")
-    if (state_dir / "user_session_cache.json").exists():
-        pending.append("user_session_cache.json -> 
cache/user_session_cache.json")
-    configured_path = app_config.SQLITE_DB_PATH
-    if configured_path in ("atr.db", "database/atr.db"):
-        if (state_dir / "atr.db").exists():
-            pending.append("atr.db -> database/atr.db")
+def _migrate_state_files(state_dir: pathlib.Path, pending_migrations: 
set[tuple[str, str]]) -> None:
+    for old_path, new_path in _MIGRATIONS:
+        if (old_path, new_path) not in pending_migrations:
+            continue
+        _migrate_path(state_dir / old_path, state_dir / new_path)
+
+
+def _pending_migrations(state_dir: pathlib.Path) -> set[tuple[str, str]]:
+    pending: set[tuple[str, str]] = set()
+    for old_path, new_path in _MIGRATIONS:
+        if (state_dir / old_path).exists():
+            pending.add((old_path, new_path))
     return pending
 
 
+async def _register_recurrent_tasks() -> None:
+    """Schedule recurring tasks"""
+    # Start scheduled tasks 5 min after server start
+    await asyncio.sleep(300)
+    try:
+        await tasks.clear_scheduled()
+        metadata = await tasks.metadata_update(asf_uid="system", 
schedule_next=True)
+        log.info(f"Scheduled remote metadata update with ID {metadata.id}")
+        await asyncio.sleep(60)
+        workflow = await tasks.workflow_update(asf_uid="system", 
schedule_next=True)
+        log.info(f"Scheduled workflow status update with ID {workflow.id}")
+
+    except Exception as e:
+        log.exception(f"Failed to schedule recurrent tasks: {e!s}")
+
+
 def _register_routes(app: base.QuartApp) -> None:
     # Add a global error handler to show helpful error messages with tracebacks
     @app.errorhandler(Exception)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to