This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 65a82aa10ad Rename config for state_backend on workers to
state_store_backend (#68270)
65a82aa10ad is described below
commit 65a82aa10ada4a617e7d736b92f6a95092f72bb1
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 9 12:30:44 2026 +0530
Rename config for state_backend on workers to state_store_backend (#68270)
---
.../src/airflow/config_templates/config.yml | 4 ++--
task-sdk/src/airflow/sdk/execution_time/context.py | 22 +++++++++++-----------
.../tests/task_sdk/execution_time/test_context.py | 18 ++++++++++++------
.../task_sdk/execution_time/test_task_runner.py | 6 +++---
4 files changed, 28 insertions(+), 22 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index bba829b3d08..9ed0433c6c4 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1939,9 +1939,9 @@ workers:
sensitive: true
example: ~
default: ""
- state_backend:
+ state_store_backend:
description: |
- Full class name of a custom worker-side state backend. When set, task
state values are
+ Full class name of a custom worker-side store backend. When set, task
store values are
routed through this backend so large payloads or credentialed storage
stay on worker
infrastructure. The Execution API still records a reference string in
the database.
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 25741dd3d97..fa83ce3a5f5 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -486,9 +486,9 @@ class VariableAccessor:
@cache
-def _get_worker_state_backend() -> BaseStoreBackend | None:
+def _get_worker_state_store_backend() -> BaseStoreBackend | None:
"""Return the configured worker-side state backend, instantiated once and
cached."""
- class_name = conf.get("workers", "state_backend", fallback="")
+ class_name = conf.get("workers", "state_store_backend", fallback="")
if not class_name:
return None
from airflow.sdk._shared.module_loading import import_string
@@ -541,7 +541,7 @@ class TaskStoreAccessor:
raise AirflowRuntimeError(resp)
if isinstance(resp, TaskStoreResult):
stored = resp.value
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None and isinstance(stored, dict) and (ref :=
_unwrap_external_ref(stored)):
# unwrap the marker to get the ref, and retrieve the actual
value from the backend using the ref
return backend.deserialize_task_store_from_ref(ref)
@@ -590,7 +590,7 @@ class TaskStoreAccessor:
# if custom backend is configured, store the value on the custom
backend, and return the reference
# to the stored value to store in the DB
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
stored: JsonValue = value
if backend is not None:
ref: str = backend.serialize_task_store_to_ref(value=value,
key=key, ti_id=str(self._ti_id))
@@ -607,7 +607,7 @@ class TaskStoreAccessor:
# cleanup the DB ref first, if backend cleanup fails after this, the
ref is gone and
# deterministic keys are recoverable on next set().
SUPERVISOR_COMMS.send(DeleteTaskStore(ti_id=self._ti_id, key=key))
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None:
backend.delete(self._scope, key)
@@ -625,7 +625,7 @@ class TaskStoreAccessor:
# cleanup the DB ref first, if backend cleanup fails after this, the
ref is gone and
# deterministic keys are recoverable on next set().
SUPERVISOR_COMMS.send(ClearTaskStore(ti_id=self._ti_id,
all_map_indices=all_map_indices))
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None:
backend.clear(self._scope, all_map_indices=all_map_indices)
@@ -636,7 +636,7 @@ class TaskStoreAccessor:
Used by clear_on_success: the server already clears DB rows as part of
SucceedTask,
so the comms round-trip is redundant.
"""
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None:
backend.clear(self._scope)
@@ -689,7 +689,7 @@ class AssetStoreAccessor:
raise AirflowRuntimeError(resp)
if isinstance(resp, AssetStoreResult):
stored = resp.value
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None and isinstance(stored, dict) and (ref :=
_unwrap_external_ref(stored)):
# unwrap the marker to get the ref, and retrieve the actual
value from the backend using the ref
return backend.deserialize_asset_store_from_ref(ref)
@@ -714,7 +714,7 @@ class AssetStoreAccessor:
# if custom backend is configured, store the value on the custom
backend, and return the reference
# to the stored value to store in the DB
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
asset_ref = self._name or self._uri or ""
stored: JsonValue = value
if backend is not None:
@@ -746,7 +746,7 @@ class AssetStoreAccessor:
# DB ref first: if backend cleanup fails after this, the ref is gone
and
# deterministic keys are recoverable on next set().
SUPERVISOR_COMMS.send(msg)
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None:
backend.delete(AssetScope(name=self._name, uri=self._uri), key)
@@ -766,7 +766,7 @@ class AssetStoreAccessor:
elif self._uri:
msg = ClearAssetStoreByUri(uri=self._uri)
SUPERVISOR_COMMS.send(msg)
- backend = _get_worker_state_backend()
+ backend = _get_worker_state_store_backend()
if backend is not None:
backend.clear(AssetScope(name=self._name, uri=self._uri))
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index a7d20c43bcf..1e668fb9ffd 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1262,7 +1262,7 @@ class TestTaskStoreAccessor:
backend.serialize_task_store_to_ref.return_value =
"s3://bucket/ti_123/job_id"
with (
-
patch("airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=backend),
+
patch("airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=backend),
conf_vars({("state_store", "default_retention_days"): "0"}),
):
TaskStoreAccessor(ti_id=self.TI_ID,
scope=self.SCOPE).set("job_id", "spark_001")
@@ -1285,7 +1285,9 @@ class TestTaskStoreAccessor:
backend = MagicMock(spec=BaseStoreBackend)
backend.deserialize_task_store_from_ref.return_value = {"rows": 123}
- with
patch("airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=backend):
+ with patch(
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=backend
+ ):
result = TaskStoreAccessor(ti_id=self.TI_ID,
scope=self.SCOPE).get("job_id")
assert result == {"rows": 123}
@@ -1414,7 +1416,9 @@ class TestAssetStoreAccessor:
backend = MagicMock(spec=BaseStoreBackend)
backend.serialize_asset_store_to_ref.return_value =
"s3://bucket/assets/orders/watermark"
- with
patch("airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=backend):
+ with patch(
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=backend
+ ):
AssetStoreAccessor(name=self.ASSET_NAME).set("watermark",
"2026-05-01")
mock_supervisor_comms.send.assert_called_once_with(
@@ -1434,7 +1438,9 @@ class TestAssetStoreAccessor:
backend = MagicMock(spec=BaseStoreBackend)
backend.deserialize_asset_store_from_ref.return_value = "2026-05-01"
- with
patch("airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=backend):
+ with patch(
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=backend
+ ):
result = AssetStoreAccessor(name=self.ASSET_NAME).get("watermark")
assert result == "2026-05-01"
@@ -1658,7 +1664,7 @@ class TestTaskStoreAccessorWithCustomBackend:
def backend(self):
b = InMemoryStoreBackend()
with mock.patch(
- "airflow.sdk.execution_time.context._get_worker_state_backend",
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=b,
):
yield b
@@ -1727,7 +1733,7 @@ class TestAssetStoreAccessorWithCustomBackend:
def backend(self):
b = InMemoryStoreBackend()
with mock.patch(
- "airflow.sdk.execution_time.context._get_worker_state_backend",
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=b,
):
yield b
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 29bc2558c0c..1968de74973 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5812,7 +5812,7 @@ class TestTaskInstanceStateOperations:
mock_backend.serialize_asset_store_to_ref.return_value =
"mem://my_asset/watermark"
with mock.patch(
- "airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=mock_backend
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=mock_backend
):
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
@@ -5847,7 +5847,7 @@ class TestTaskInstanceStateOperations:
mock_backend.serialize_task_store_to_ref.return_value = ref
with mock.patch(
- "airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=mock_backend
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=mock_backend
):
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
@@ -5878,7 +5878,7 @@ class TestTaskInstanceStateOperations:
runtime_ti = create_runtime_ti(task=task)
with mock.patch(
- "airflow.sdk.execution_time.context._get_worker_state_backend",
return_value=mock_backend
+
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
return_value=mock_backend
):
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())