kaxil commented on code in PR #60804:
URL: https://github.com/apache/airflow/pull/60804#discussion_r3067247838
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,114 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, no caching is used.
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL is used.
+ """
self.load_op_links = load_op_links
+ self._cache_size = cache_size
+ self._cache_ttl = cache_ttl
+ self._disable_cache = cache_size == 0
+
+ self._lock: RLock | None = None
+ self._use_cache = False
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+
+ # Initialize cache if cache_size is provided
+ if cache_size and cache_size > 0:
Review Comment:
Consolidated into a single `_use_cache` flag in the latest revision. No more
`_disable_cache`.
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,114 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, no caching is used.
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL is used.
+ """
self.load_op_links = load_op_links
+ self._cache_size = cache_size
+ self._cache_ttl = cache_ttl
+ self._disable_cache = cache_size == 0
+
+ self._lock: RLock | None = None
+ self._use_cache = False
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+
+ # Initialize cache if cache_size is provided
+ if cache_size and cache_size > 0:
+ if cache_ttl and cache_ttl > 0:
+ self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+ else:
+ self._dags = LRUCache(maxsize=cache_size)
+ self._lock = RLock()
+ self._use_cache = True
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+ """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
+ dag = serdag.dag
+ if not dag or self._disable_cache:
+ return dag
+ if self._use_cache and self._lock:
+ try:
+ with self._lock:
+ self._dags[serdag.dag_version_id] = dag
+ Stats.gauge("api_server.dag_bag.cache_size",
len(self._dags))
+ except MemoryError:
+ # Re-raise MemoryError to avoid masking OOM conditions
+ raise
+ except Exception:
+ log.warning("Failed to cache DAG %s", serdag.dag_id,
exc_info=True)
+ else:
self._dags[serdag.dag_version_id] = dag
return dag
def _get_dag(self, version_id: str, session: Session) -> SerializedDAG |
None:
- if dag := self._dags.get(version_id):
- return dag
+ if not self._disable_cache:
+ if self._lock:
+ with self._lock:
+ dag = self._dags.get(version_id)
+ else:
+ dag = self._dags.get(version_id)
+ if dag:
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_hit")
+ return dag
+ if self._use_cache:
Review Comment:
Done -- same change as above.
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,114 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, no caching is used.
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL is used.
+ """
self.load_op_links = load_op_links
+ self._cache_size = cache_size
+ self._cache_ttl = cache_ttl
+ self._disable_cache = cache_size == 0
+
+ self._lock: RLock | None = None
+ self._use_cache = False
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+
+ # Initialize cache if cache_size is provided
+ if cache_size and cache_size > 0:
+ if cache_ttl and cache_ttl > 0:
+ self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+ else:
+ self._dags = LRUCache(maxsize=cache_size)
+ self._lock = RLock()
+ self._use_cache = True
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+ """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
+ dag = serdag.dag
+ if not dag or self._disable_cache:
+ return dag
+ if self._use_cache and self._lock:
+ try:
+ with self._lock:
+ self._dags[serdag.dag_version_id] = dag
+ Stats.gauge("api_server.dag_bag.cache_size",
len(self._dags))
+ except MemoryError:
+ # Re-raise MemoryError to avoid masking OOM conditions
+ raise
+ except Exception:
+ log.warning("Failed to cache DAG %s", serdag.dag_id,
exc_info=True)
+ else:
self._dags[serdag.dag_version_id] = dag
return dag
def _get_dag(self, version_id: str, session: Session) -> SerializedDAG |
None:
- if dag := self._dags.get(version_id):
- return dag
+ if not self._disable_cache:
+ if self._lock:
+ with self._lock:
+ dag = self._dags.get(version_id)
+ else:
+ dag = self._dags.get(version_id)
+ if dag:
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_hit")
+ return dag
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_miss")
dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
if not dag_version:
return None
if not (serdag := dag_version.serialized_dag):
return None
+ if self._lock and not self._disable_cache:
+ with self._lock:
+ if dag := self._dags.get(version_id):
+ return dag
Review Comment:
The double-checked locking is still needed to deduplicate concurrent DB
queries. Without it, N threads that miss on the same version_id each query the
DB independently. With it, only the first thread queries; the rest find it
cached after acquiring the lock. Now also emits `cache_hit` metric in this path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]