This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a43d8b004d Reject negative `default_retention_days` in task SDK and 
core API routes (#67890)
7a43d8b004d is described below

commit 7a43d8b004d1bd5841d116bfca0f9b765fc7cc35
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Jun 3 10:07:23 2026 +0530

    Reject negative `default_retention_days` in task SDK and core API routes 
(#67890)
---
 .../api_fastapi/core_api/routes/public/task_store.py        | 11 +++++++++--
 .../api_fastapi/core_api/routes/public/test_task_store.py   | 13 +++++++++++++
 task-sdk/src/airflow/sdk/execution_time/context.py          |  7 ++++++-
 task-sdk/tests/task_sdk/execution_time/test_context.py      |  6 ++++++
 4 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
index 32c16241dfd..f7ff94c25fb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
@@ -55,13 +55,20 @@ def _resolve_expires_at(expires_at: datetime | None | 
Literal["default"]) -> dat
     """
     Resolve the expires_at value from the request body.
 
-    - ``"default"``: apply configured default_retention_days
+    - ``"default"``: apply configured ``[state_store] default_retention_days``.
+      ``0`` means never expire. Negative values raise HTTP 400.
     - ``None``: never expire
     - datetime: use as-is
     """
     if expires_at == "default":
         days = conf.getint("state_store", "default_retention_days")
-        return datetime.now(tz=timezone.utc) + timedelta(days=days)
+        if days < 0:
+            raise HTTPException(
+                status_code=status.HTTP_400_BAD_REQUEST,
+                detail=f"[state_store] default_retention_days must be >= 0, 
got {days}. "
+                "Set to 0 to disable expiry.",
+            )
+        return None if days == 0 else datetime.now(tz=timezone.utc) + 
timedelta(days=days)
     return expires_at
 
 
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
index 30184d09c00..46f71add33a 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
@@ -268,6 +268,19 @@ class TestSetTaskState(TestTaskStateEndpoint):
         resp = test_client.get(f"{BASE_URL}/job_id").json()
         assert resp["expires_at"] == "2026-01-08T00:00:00Z"
 
+    def test_new_key_default_retention_zero_never_expires(self, test_client):
+        """PUT with expires_at=default and default_retention_days=0 stores a 
key that never expires."""
+        with conf_vars({("state_store", "default_retention_days"): "0"}):
+            test_client.put(f"{BASE_URL}/job_id", json={"value": "v", 
"expires_at": "default"})
+        assert test_client.get(f"{BASE_URL}/job_id").json()["expires_at"] is 
None
+
+    def test_new_key_negative_retention_days_returns_400(self, test_client):
+        """PUT with expires_at=default and default_retention_days<0 returns 
HTTP 400."""
+        with conf_vars({("state_store", "default_retention_days"): "-1"}):
+            resp = test_client.put(f"{BASE_URL}/job_id", json={"value": "v", 
"expires_at": "default"})
+        assert resp.status_code == 400
+        assert "default_retention_days" in resp.json()["detail"]
+
     def test_new_key_never_expiry(self, test_client):
         """PUT with expires_at=null stores a key that never expires."""
         test_client.put(f"{BASE_URL}/job_id", json={"value": "v", 
"expires_at": None})
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 7a601e6f320..d39238ab3b2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -563,7 +563,12 @@ class TaskStoreAccessor:
             expires_at = now + retention
         else:
             days = conf.getint("state_store", "default_retention_days")
-            expires_at = None if days <= 0 else now + timedelta(days=days)
+            if days < 0:
+                raise ValueError(
+                    f"[state_store] default_retention_days must be >= 0, got 
{days}. "
+                    "Set to 0 to disable expiry."
+                )
+            expires_at = None if days == 0 else now + timedelta(days=days)
 
         # if custom backend is configured, store the value on the custom 
backend, and return the reference
         # to the stored value to store in the DB
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py 
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index 9bcaa513d90..a7d20c43bcf 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1213,6 +1213,12 @@ class TestTaskStoreAccessor:
             SetTaskStore(ti_id=self.TI_ID, key="job_id", value="app_001", 
expires_at=None)
         )
 
+    def test_set_raises_on_negative_retention_days(self, 
mock_supervisor_comms):
+        """set() raises ValueError when default_retention_days is negative."""
+        with conf_vars({("state_store", "default_retention_days"): "-1"}):
+            with pytest.raises(ValueError, match="default_retention_days must 
be >= 0"):
+                TaskStoreAccessor(ti_id=self.TI_ID, 
scope=self.SCOPE).set("job_id", "app_001")
+
     def test_delete_operation(self, mock_supervisor_comms):
         mock_supervisor_comms.send.return_value = OKResponse(ok=True)
 

Reply via email to