This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7a43d8b004d Reject negative `default_retention_days` in task SDK and
core API routes (#67890)
7a43d8b004d is described below
commit 7a43d8b004d1bd5841d116bfca0f9b765fc7cc35
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Jun 3 10:07:23 2026 +0530
Reject negative `default_retention_days` in task SDK and core API routes
(#67890)
---
.../api_fastapi/core_api/routes/public/task_store.py | 11 +++++++++--
.../api_fastapi/core_api/routes/public/test_task_store.py | 13 +++++++++++++
task-sdk/src/airflow/sdk/execution_time/context.py | 7 ++++++-
task-sdk/tests/task_sdk/execution_time/test_context.py | 6 ++++++
4 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
index 32c16241dfd..f7ff94c25fb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_store.py
@@ -55,13 +55,20 @@ def _resolve_expires_at(expires_at: datetime | None |
Literal["default"]) -> dat
"""
Resolve the expires_at value from the request body.
- - ``"default"``: apply configured default_retention_days
+ - ``"default"``: apply configured ``[state_store] default_retention_days``.
+ ``0`` means never expire. Negative values raise HTTP 400.
- ``None``: never expire
- datetime: use as-is
"""
if expires_at == "default":
days = conf.getint("state_store", "default_retention_days")
- return datetime.now(tz=timezone.utc) + timedelta(days=days)
+ if days < 0:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=f"[state_store] default_retention_days must be >= 0,
got {days}. "
+ "Set to 0 to disable expiry.",
+ )
+ return None if days == 0 else datetime.now(tz=timezone.utc) +
timedelta(days=days)
return expires_at
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
index 30184d09c00..46f71add33a 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_store.py
@@ -268,6 +268,19 @@ class TestSetTaskState(TestTaskStateEndpoint):
resp = test_client.get(f"{BASE_URL}/job_id").json()
assert resp["expires_at"] == "2026-01-08T00:00:00Z"
+ def test_new_key_default_retention_zero_never_expires(self, test_client):
+ """PUT with expires_at=default and default_retention_days=0 stores a
key that never expires."""
+ with conf_vars({("state_store", "default_retention_days"): "0"}):
+ test_client.put(f"{BASE_URL}/job_id", json={"value": "v",
"expires_at": "default"})
+ assert test_client.get(f"{BASE_URL}/job_id").json()["expires_at"] is
None
+
+ def test_new_key_negative_retention_days_returns_400(self, test_client):
+ """PUT with expires_at=default and default_retention_days<0 returns
HTTP 400."""
+ with conf_vars({("state_store", "default_retention_days"): "-1"}):
+ resp = test_client.put(f"{BASE_URL}/job_id", json={"value": "v",
"expires_at": "default"})
+ assert resp.status_code == 400
+ assert "default_retention_days" in resp.json()["detail"]
+
def test_new_key_never_expiry(self, test_client):
"""PUT with expires_at=null stores a key that never expires."""
test_client.put(f"{BASE_URL}/job_id", json={"value": "v",
"expires_at": None})
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 7a601e6f320..d39238ab3b2 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -563,7 +563,12 @@ class TaskStoreAccessor:
expires_at = now + retention
else:
days = conf.getint("state_store", "default_retention_days")
- expires_at = None if days <= 0 else now + timedelta(days=days)
+ if days < 0:
+ raise ValueError(
+ f"[state_store] default_retention_days must be >= 0, got
{days}. "
+ "Set to 0 to disable expiry."
+ )
+ expires_at = None if days == 0 else now + timedelta(days=days)
# if custom backend is configured, store the value on the custom
backend, and return the reference
# to the stored value to store in the DB
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index 9bcaa513d90..a7d20c43bcf 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1213,6 +1213,12 @@ class TestTaskStoreAccessor:
SetTaskStore(ti_id=self.TI_ID, key="job_id", value="app_001",
expires_at=None)
)
+ def test_set_raises_on_negative_retention_days(self,
mock_supervisor_comms):
+ """set() raises ValueError when default_retention_days is negative."""
+ with conf_vars({("state_store", "default_retention_days"): "-1"}):
+ with pytest.raises(ValueError, match="default_retention_days must
be >= 0"):
+ TaskStoreAccessor(ti_id=self.TI_ID,
scope=self.SCOPE).set("job_id", "app_001")
+
def test_delete_operation(self, mock_supervisor_comms):
mock_supervisor_comms.send.return_value = OKResponse(ok=True)