kaxil commented on code in PR #60804:
URL: https://github.com/apache/airflow/pull/60804#discussion_r3083383168
##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -63,45 +67,239 @@ def test__read_dag_returns_none_when_no_dag(self):
assert result is None
assert "v1" not in self.db_dag_bag._dags
- def test_get_serialized_dag_model(self):
- """It should return the cached SerializedDagModel if already loaded."""
+ def test_get_dag_fetches_from_db_on_miss(self):
+ """It should query the DB and cache the result when not in cache."""
+ mock_dag = MagicMock(spec=SerializedDAG)
mock_serdag = MagicMock(spec=SerializedDagModel)
+ mock_serdag.dag = mock_dag
mock_serdag.dag_version_id = "v1"
mock_dag_version = MagicMock()
mock_dag_version.serialized_dag = mock_serdag
self.session.get.return_value = mock_dag_version
- self.db_dag_bag.get_serialized_dag_model("v1", session=self.session)
- result = self.db_dag_bag.get_serialized_dag_model("v1",
session=self.session)
+ result = self.db_dag_bag.get_dag("v1", session=self.session)
- assert result == mock_serdag
self.session.get.assert_called_once()
+ assert result == mock_dag
- def test_get_serialized_dag_model_returns_none_when_not_found(self):
+ def test_get_dag_returns_cached_on_hit(self):
+ """It should return cached DAG without querying DB."""
+ mock_dag = MagicMock(spec=SerializedDAG)
+ self.db_dag_bag._dags["v1"] = mock_dag
+
+ result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+ assert result == mock_dag
+ self.session.get.assert_not_called()
+
+ def test_get_dag_returns_none_when_not_found(self):
"""It should return None if version_id not found in DB."""
self.session.get.return_value = None
- result = self.db_dag_bag.get_serialized_dag_model("v1",
session=self.session)
+ result = self.db_dag_bag.get_dag("v1", session=self.session)
assert result is None
- def test_get_dag_calls_get_dag_model_and__read_dag(self):
- """It should call get_dag_model and then _read_dag."""
+
+class TestDBDagBagCache:
+ """Tests for DBDagBag optional caching behavior."""
+
+ def test_no_caching_by_default(self):
+ """Test that DBDagBag uses a simple dict without caching by default."""
+ dag_bag = DBDagBag()
+ assert dag_bag._use_cache is False
+ assert isinstance(dag_bag._dags, dict)
+
+ def test_lru_cache_enabled_with_cache_size(self):
+ """Test that LRU cache is enabled when cache_size is provided."""
+ dag_bag = DBDagBag(cache_size=10)
+ assert dag_bag._use_cache is True
+ assert isinstance(dag_bag._dags, LRUCache)
+
+ def test_ttl_cache_enabled_with_cache_size_and_ttl(self):
+ """Test that TTL cache is enabled when both cache_size and cache_ttl
are provided."""
+ dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+ assert dag_bag._use_cache is True
+ assert isinstance(dag_bag._dags, TTLCache)
+
+ def test_zero_cache_size_uses_unbounded_dict(self):
+ """Test that cache_size=0 uses unbounded dict (same as no caching)."""
+ dag_bag = DBDagBag(cache_size=0, cache_ttl=60)
+ assert dag_bag._use_cache is False
+ assert isinstance(dag_bag._dags, dict)
+
+ def test_clear_cache_with_caching(self):
+ """Test clear_cache() with caching enabled."""
+ dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+ mock_dag = MagicMock()
+ dag_bag._dags["version_1"] = mock_dag
+ dag_bag._dags["version_2"] = mock_dag
+ assert len(dag_bag._dags) == 2
+
+ count = dag_bag.clear_cache()
+ assert count == 2
+ assert len(dag_bag._dags) == 0
+
+ def test_clear_cache_without_caching(self):
+ """Test clear_cache() without caching enabled."""
+ dag_bag = DBDagBag()
+
+ mock_dag = MagicMock()
+ dag_bag._dags["version_1"] = mock_dag
+ assert len(dag_bag._dags) == 1
+
+ count = dag_bag.clear_cache()
+ assert count == 1
+ assert len(dag_bag._dags) == 0
+
+ def test_ttl_cache_expiry(self):
+ """Test that cached DAGs expire after TTL."""
+ # TTLCache defaults to time.monotonic which time_machine cannot
control.
+ # Use time.time as the timer so time_machine can advance it.
+ dag_bag = DBDagBag(cache_size=10, cache_ttl=1)
+ dag_bag._dags = TTLCache(maxsize=10, ttl=1, timer=time.time)
+
+ with time_machine.travel("2025-01-01 00:00:00", tick=False):
+ dag_bag._dags["test_version_id"] = MagicMock()
+ assert "test_version_id" in dag_bag._dags
+
+ # Jump ahead beyond TTL
+ with time_machine.travel("2025-01-01 00:00:02", tick=False):
+ assert dag_bag._dags.get("test_version_id") is None
+
+ def test_lru_eviction(self):
+ """Test that LRU eviction works when cache is full."""
+ dag_bag = DBDagBag(cache_size=2)
+
+ dag_bag._dags["version_1"] = MagicMock()
+ dag_bag._dags["version_2"] = MagicMock()
+ dag_bag._dags["version_3"] = MagicMock()
+
+ # version_1 should be evicted (LRU)
+ assert dag_bag._dags.get("version_1") is None
+ assert dag_bag._dags.get("version_2") is not None
+ assert dag_bag._dags.get("version_3") is not None
+
+ def test_thread_safety_with_caching(self):
+ """Test concurrent access doesn't cause race conditions with caching
enabled."""
+ dag_bag = DBDagBag(cache_size=100, cache_ttl=60)
+ errors = []
+ mock_session = MagicMock()
+
+ def make_dag_version(version_id):
+ serdag = MagicMock()
+ serdag.dag = MagicMock()
+ serdag.dag_version_id = version_id
+ return MagicMock(serialized_dag=serdag)
+
Review Comment:
dummy
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1606,6 +1606,30 @@ api:
type: string
example: ~
default: "False"
+ dag_cache_size:
+ description: |
+ Size of the LRU cache for SerializedDAG objects in the API server.
+ Set to 0 to use an unbounded dict (no eviction, pre-3.2 behavior). The
cache is keyed by DAG version ID,
+ so lookups by DAG ID (e.g., viewing a DAG's details) always query
+ the database for the latest version, but the deserialized result is
+ cached for subsequent version-specific lookups.
+ version_added: 3.2.0
Review Comment:
Fixed in latest commit -- updated to 3.3.0.
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1606,6 +1606,30 @@ api:
type: string
example: ~
default: "False"
+ dag_cache_size:
+ description: |
+ Size of the LRU cache for SerializedDAG objects in the API server.
+ Set to 0 to use an unbounded dict (no eviction, pre-3.2 behavior). The
cache is keyed by DAG version ID,
+ so lookups by DAG ID (e.g., viewing a DAG's details) always query
+ the database for the latest version, but the deserialized result is
+ cached for subsequent version-specific lookups.
+ version_added: 3.2.0
+ type: integer
+ example: ~
+ default: "64"
+ dag_cache_ttl:
+ description: |
+ Time-to-live (seconds) for cached SerializedDAG objects in the API
server.
+ After this time, cached DAGs will be re-fetched from the database on
next access.
+ Set to 0 to disable TTL (cache entries will only be evicted by LRU
policy).
+
+ Note: After a DAG is updated, the API server may serve the previous
version
+ until the cached entry expires. Lower values reduce staleness but
increase
+ database load.
+ version_added: 3.2.0
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]