jedcunningham commented on code in PR #46503:
URL: https://github.com/apache/airflow/pull/46503#discussion_r1968204065


##########
airflow/dag_processing/bundles/base.py:
##########
@@ -18,12 +18,224 @@
 from __future__ import annotations
 
 import fcntl
+import logging
+import os
+import shutil
 import tempfile
 from abc import ABC, abstractmethod
 from contextlib import contextmanager
+from dataclasses import dataclass, field
+from datetime import timedelta
+from fcntl import LOCK_SH, flock
+from operator import attrgetter
 from pathlib import Path
+from typing import TYPE_CHECKING
+
+from pendulum.parsing import ParserError
+from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
 
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+if TYPE_CHECKING:
+    from pendulum import DateTime
+
+log = logging.getLogger(__name__)
+
+STALE_BUNDLE_TRACKING_FOLDER = Path(
+    tempfile.gettempdir(),
+    "airflow",
+    "dag_bundles",
+    "_tracking",
+)
+
+STALE_BUNDLE_CHECK_INTERVAL: int = conf.getint(
+    "dag_processor", "stale_bundle_cleanup_interval", fallback=10 * 60
+)
+"""How frequently (in seconds) a worker should check for stale bundles."""
+
+
+def get_bundle_storage_root_path():
+    if configured_location := conf.get("dag_processor", 
"dag_bundle_storage_path", fallback=None):
+        return Path(configured_location)
+    else:
+        return Path(tempfile.gettempdir(), "airflow", "dag_bundles")
+
+
+def get_bundle_tracking_dir(bundle_name: str) -> Path:
+    return STALE_BUNDLE_TRACKING_FOLDER / bundle_name
+
+
+def get_bundle_tracking_file(bundle_name: str, version: str) -> Path:
+    tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
+    return Path(tracking_dir, version)
+
+
+def get_bundle_base_folder(bundle_type: str, bundle_name: str) -> Path:
+    return get_bundle_storage_root_path() / bundle_type / bundle_name
+
+
+def get_bundle_versions_base_folder(bundle_type: str, bundle_name: str) -> 
Path:
+    return get_bundle_base_folder(bundle_type=bundle_type, 
bundle_name=bundle_name) / "versions"
+
+
+def get_bundle_version_path(bundle_type: str, bundle_name: str, version: str) 
-> Path:
+    base_folder = get_bundle_versions_base_folder(bundle_type=bundle_type, 
bundle_name=bundle_name)
+    return base_folder / version
+
+
+@dataclass(frozen=True)
+class TrackedBundleVersionInfo:
+    """
+    Internal info class for stale bundle cleanup.
+
+    :meta private:
+    """
+
+    lock_file_path: Path
+    version: str = field(compare=False)
+    dt: DateTime = field(compare=False)
+
+
+class BundleUsageTrackingManager:
+    """
+    Utility helper for removing stale bundles.
+
+    :meta private:
+    """
+
+    def _parse_dt(self, val) -> DateTime | None:
+        try:
+            return pendulum.parse(val)
+        except ParserError:
+            return None
+
+    @staticmethod
+    def _remove_last_n(val: list[TrackedBundleVersionInfo]) -> 
list[TrackedBundleVersionInfo]:
+        min_versions_to_keep = conf.getint(
+            section="dag_processor",
+            key="stale_bundle_cleanup_min_versions",
+            fallback=10,
+        )
+        return sorted(val, key=attrgetter("dt"), 
reverse=True)[min_versions_to_keep:]
+
+    @staticmethod
+    def _remove_recent(val: list[TrackedBundleVersionInfo]) -> 
list[TrackedBundleVersionInfo]:
+        age_threshold = conf.getint(
+            section="dag_processor",
+            key="stale_bundle_cleanup_age_threshold",

Review Comment:
   And this one.



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -18,12 +18,224 @@
 from __future__ import annotations
 
 import fcntl
+import logging
+import os
+import shutil
 import tempfile
 from abc import ABC, abstractmethod
 from contextlib import contextmanager
+from dataclasses import dataclass, field
+from datetime import timedelta
+from fcntl import LOCK_SH, flock
+from operator import attrgetter
 from pathlib import Path
+from typing import TYPE_CHECKING
+
+from pendulum.parsing import ParserError
+from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
 
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+if TYPE_CHECKING:
+    from pendulum import DateTime
+
+log = logging.getLogger(__name__)
+
+STALE_BUNDLE_TRACKING_FOLDER = Path(
+    tempfile.gettempdir(),
+    "airflow",
+    "dag_bundles",
+    "_tracking",
+)
+
+STALE_BUNDLE_CHECK_INTERVAL: int = conf.getint(
+    "dag_processor", "stale_bundle_cleanup_interval", fallback=10 * 60
+)
+"""How frequently (in seconds) a worker should check for stale bundles."""
+
+
+def get_bundle_storage_root_path():
+    if configured_location := conf.get("dag_processor", 
"dag_bundle_storage_path", fallback=None):
+        return Path(configured_location)
+    else:
+        return Path(tempfile.gettempdir(), "airflow", "dag_bundles")
+
+
+def get_bundle_tracking_dir(bundle_name: str) -> Path:
+    return STALE_BUNDLE_TRACKING_FOLDER / bundle_name
+
+
+def get_bundle_tracking_file(bundle_name: str, version: str) -> Path:
+    tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
+    return Path(tracking_dir, version)
+
+
+def get_bundle_base_folder(bundle_type: str, bundle_name: str) -> Path:
+    return get_bundle_storage_root_path() / bundle_type / bundle_name
+
+
+def get_bundle_versions_base_folder(bundle_type: str, bundle_name: str) -> 
Path:
+    return get_bundle_base_folder(bundle_type=bundle_type, 
bundle_name=bundle_name) / "versions"
+
+
+def get_bundle_version_path(bundle_type: str, bundle_name: str, version: str) 
-> Path:
+    base_folder = get_bundle_versions_base_folder(bundle_type=bundle_type, 
bundle_name=bundle_name)
+    return base_folder / version
+
+
+@dataclass(frozen=True)
+class TrackedBundleVersionInfo:
+    """
+    Internal info class for stale bundle cleanup.
+
+    :meta private:
+    """
+
+    lock_file_path: Path
+    version: str = field(compare=False)
+    dt: DateTime = field(compare=False)
+
+
+class BundleUsageTrackingManager:
+    """
+    Utility helper for removing stale bundles.
+
+    :meta private:
+    """
+
+    def _parse_dt(self, val) -> DateTime | None:
+        try:
+            return pendulum.parse(val)
+        except ParserError:
+            return None
+
+    @staticmethod
+    def _remove_last_n(val: list[TrackedBundleVersionInfo]) -> 
list[TrackedBundleVersionInfo]:
+        min_versions_to_keep = conf.getint(
+            section="dag_processor",
+            key="stale_bundle_cleanup_min_versions",

Review Comment:
   Also needs to move to `config.yml`.



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -18,12 +18,224 @@
 from __future__ import annotations
 
 import fcntl
+import logging
+import os
+import shutil
 import tempfile
 from abc import ABC, abstractmethod
 from contextlib import contextmanager
+from dataclasses import dataclass, field
+from datetime import timedelta
+from fcntl import LOCK_SH, flock
+from operator import attrgetter
 from pathlib import Path
+from typing import TYPE_CHECKING
+
+from pendulum.parsing import ParserError
+from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
 
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+if TYPE_CHECKING:
+    from pendulum import DateTime
+
+log = logging.getLogger(__name__)
+
+STALE_BUNDLE_TRACKING_FOLDER = Path(
+    tempfile.gettempdir(),
+    "airflow",
+    "dag_bundles",
+    "_tracking",
+)
+
+STALE_BUNDLE_CHECK_INTERVAL: int = conf.getint(
+    "dag_processor", "stale_bundle_cleanup_interval", fallback=10 * 60

Review Comment:
   This needs to be added to `config.yml`, and the default moved there too.



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -62,6 +278,14 @@ def __init__(
         self.refresh_interval = refresh_interval
         self.is_initialized: bool = False
 
+        self.base_folder = get_bundle_base_folder(bundle_name=self.name, 
bundle_type=self.bundle_type)
+        """Base directory for all bundle files"""
+
+        self.versions_path = get_bundle_versions_base_folder(

Review Comment:
   Weird one is `_folder` and the other is `_path` imo.



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -48,6 +260,10 @@ class BaseDagBundle(ABC):
     """
 
     supports_versioning: bool = False
+
+    bundle_type: str
+    """This is used in generating some the paths used for local storage."""

Review Comment:
   ```suggestion
       """This is used in generating some paths used for local storage."""
   ```
   
   Might be good to have an example...



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -18,12 +18,224 @@
 from __future__ import annotations
 
 import fcntl
+import logging
+import os
+import shutil
 import tempfile
 from abc import ABC, abstractmethod
 from contextlib import contextmanager
+from dataclasses import dataclass, field
+from datetime import timedelta
+from fcntl import LOCK_SH, flock
+from operator import attrgetter
 from pathlib import Path
+from typing import TYPE_CHECKING
+
+from pendulum.parsing import ParserError
+from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
 
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+if TYPE_CHECKING:
+    from pendulum import DateTime
+
+log = logging.getLogger(__name__)
+
+STALE_BUNDLE_TRACKING_FOLDER = Path(

Review Comment:
   Should this be based on dag_bundle_storage_path instead?



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -149,3 +362,72 @@ def lock(self):
             finally:
                 fcntl.flock(lock_file, fcntl.LOCK_UN)
                 self._locked = False
+
+    def __repr__(self):
+        return f"{self.__class__.__name__}(name={self.name})"
+
+
+class BundleVersionLock:
+    """
+    Lock version of bundle when in use to prevent deletion.
+
+    :meta private:
+    """
+
+    def __init__(self, bundle_name, bundle_version, **kwargs):
+        super().__init__(**kwargs)
+        self.bundle_name = bundle_name
+        self.version = bundle_version
+        self.lock_file_path = get_bundle_tracking_file(
+            bundle_name=self.bundle_name,
+            version=self.version,
+        )
+        self.lock_file = None
+
+    def _log_exc(self, msg):
+        log.exception(
+            "% name=%s version=%s lock_file=%s",
+            msg,
+            self.bundle_name,
+            self.version,
+            self.lock_file_path,
+        )
+
+    def _update_version_file(self):
+        """Create a version file containing last-used timestamp."""
+        self.lock_file_path.parent.mkdir(parents=True, exist_ok=True)
+
+        with tempfile.TemporaryDirectory() as td:
+            temp_file = Path(td, self.lock_file_path)

Review Comment:
   Would using `TemporaryFile` or `NamedTemporaryFile` be a bit cleaner? Might 
need `delete=False` on the latter?



##########
airflow/dag_processing/bundles/base.py:
##########
@@ -18,12 +18,224 @@
 from __future__ import annotations
 
 import fcntl
+import logging
+import os
+import shutil
 import tempfile
 from abc import ABC, abstractmethod
 from contextlib import contextmanager
+from dataclasses import dataclass, field
+from datetime import timedelta
+from fcntl import LOCK_SH, flock
+from operator import attrgetter
 from pathlib import Path
+from typing import TYPE_CHECKING
+
+from pendulum.parsing import ParserError
+from sqlalchemy_utils.types.enriched_datetime.pendulum_datetime import pendulum
 
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.manager import DagBundlesManager
+
+if TYPE_CHECKING:
+    from pendulum import DateTime
+
+log = logging.getLogger(__name__)
+
+STALE_BUNDLE_TRACKING_FOLDER = Path(
+    tempfile.gettempdir(),
+    "airflow",
+    "dag_bundles",
+    "_tracking",
+)
+
+STALE_BUNDLE_CHECK_INTERVAL: int = conf.getint(
+    "dag_processor", "stale_bundle_cleanup_interval", fallback=10 * 60
+)
+"""How frequently (in seconds) a worker should check for stale bundles."""
+
+
+def get_bundle_storage_root_path():
+    if configured_location := conf.get("dag_processor", 
"dag_bundle_storage_path", fallback=None):
+        return Path(configured_location)
+    else:
+        return Path(tempfile.gettempdir(), "airflow", "dag_bundles")
+
+
+def get_bundle_tracking_dir(bundle_name: str) -> Path:
+    return STALE_BUNDLE_TRACKING_FOLDER / bundle_name
+
+
+def get_bundle_tracking_file(bundle_name: str, version: str) -> Path:
+    tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
+    return Path(tracking_dir, version)
+
+
+def get_bundle_base_folder(bundle_type: str, bundle_name: str) -> Path:
+    return get_bundle_storage_root_path() / bundle_type / bundle_name
+
+
+def get_bundle_versions_base_folder(bundle_type: str, bundle_name: str) -> 
Path:
+    return get_bundle_base_folder(bundle_type=bundle_type, 
bundle_name=bundle_name) / "versions"
+
+
+def get_bundle_version_path(bundle_type: str, bundle_name: str, version: str) 
-> Path:
+    base_folder = get_bundle_versions_base_folder(bundle_type=bundle_type, 
bundle_name=bundle_name)
+    return base_folder / version
+
+
+@dataclass(frozen=True)
+class TrackedBundleVersionInfo:
+    """
+    Internal info class for stale bundle cleanup.
+
+    :meta private:
+    """
+
+    lock_file_path: Path
+    version: str = field(compare=False)
+    dt: DateTime = field(compare=False)
+
+
+class BundleUsageTrackingManager:
+    """
+    Utility helper for removing stale bundles.
+
+    :meta private:
+    """
+
+    def _parse_dt(self, val) -> DateTime | None:
+        try:
+            return pendulum.parse(val)
+        except ParserError:
+            return None
+
+    @staticmethod
+    def _remove_last_n(val: list[TrackedBundleVersionInfo]) -> 
list[TrackedBundleVersionInfo]:
+        min_versions_to_keep = conf.getint(
+            section="dag_processor",
+            key="stale_bundle_cleanup_min_versions",
+            fallback=10,
+        )
+        return sorted(val, key=attrgetter("dt"), 
reverse=True)[min_versions_to_keep:]
+
+    @staticmethod
+    def _remove_recent(val: list[TrackedBundleVersionInfo]) -> 
list[TrackedBundleVersionInfo]:
+        age_threshold = conf.getint(
+            section="dag_processor",
+            key="stale_bundle_cleanup_age_threshold",
+            fallback=60 * 60 * 24,
+        )
+        ret = []
+        now = pendulum.now(tz=pendulum.UTC)
+        cutoff = now - timedelta(seconds=age_threshold)
+        for item in val:
+            if item.dt < cutoff:
+                ret.append(item)
+        return ret
+
+    def _find_all_tracking_files(self, bundle_name) -> 
list[TrackedBundleVersionInfo] | None:
+        tracking_dir = get_bundle_tracking_dir(bundle_name=bundle_name)
+        found: list[TrackedBundleVersionInfo] = []
+        if not tracking_dir.exists():
+            log.debug("bundle usage tracking directory does not exist. 
tracking_dir=%s", tracking_dir)
+            return None
+        for file in tracking_dir.iterdir():
+            log.debug("found bundle tracking file, path=%s", file)
+            version = file.name
+            dt_str = file.read_text()
+            dt = self._parse_dt(val=dt_str)
+            if not dt:
+                log.error(
+                    "could not parse val as datetime bundle_name=%s val=%s 
version=%s",
+                    bundle_name,
+                    dt_str,
+                    version,
+                )
+                continue
+            found.append(TrackedBundleVersionInfo(lock_file_path=file, 
version=version, dt=dt))
+        return found
+
+    @staticmethod
+    def _remove_stale_bundle(bundle_type: str, bundle_name: str, info: 
TrackedBundleVersionInfo) -> None:
+        bundle_version_path = get_bundle_version_path(
+            bundle_type=bundle_type,
+            bundle_name=bundle_name,
+            version=info.version,
+        )
+
+        def log_info(msg):
+            log.info(
+                "%s bundle_name=%s bundle_version=%s bundle_path=%s 
lock_file=%s",
+                msg,
+                bundle_name,
+                info.version,
+                bundle_version_path,
+                info.lock_file_path,
+            )
+
+        try:
+            log_info("removing stale bundle.")
+            with open(info.lock_file_path, "a") as f:
+                flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)  # exclusive lock, do 
not wait
+                # remove the actual bundle copy
+                shutil.rmtree(bundle_version_path)
+                # remove the lock file
+                os.remove(info.lock_file_path)
+        except BlockingIOError:
+            log_info("could not obtain lock. stale bundle will not be 
removed.")
+            return
+
+    def _find_candidates(self, found):
+        """Remove the recently used bundles."""
+        candidates = self._remove_last_n(found)
+        candidates = self._remove_recent(candidates)

Review Comment:
   Doesn't this mean we don't necessarily keep the last n?
   
   Feels like maybe it should be "keep newer than x or at least the last n"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to