This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 65a82aa10ad Rename config for state_backend on workers to 
state_store_backend (#68270)
65a82aa10ad is described below

commit 65a82aa10ada4a617e7d736b92f6a95092f72bb1
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Jun 9 12:30:44 2026 +0530

    Rename config for state_backend on workers to state_store_backend (#68270)
---
 .../src/airflow/config_templates/config.yml        |  4 ++--
 task-sdk/src/airflow/sdk/execution_time/context.py | 22 +++++++++++-----------
 .../tests/task_sdk/execution_time/test_context.py  | 18 ++++++++++++------
 .../task_sdk/execution_time/test_task_runner.py    |  6 +++---
 4 files changed, 28 insertions(+), 22 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index bba829b3d08..9ed0433c6c4 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1939,9 +1939,9 @@ workers:
       sensitive: true
       example: ~
       default: ""
-    state_backend:
+    state_store_backend:
       description: |
-        Full class name of a custom worker-side state backend. When set, task 
state values are
+        Full class name of a custom worker-side store backend. When set, task 
store values are
         routed through this backend so large payloads or credentialed storage 
stay on worker
         infrastructure. The Execution API still records a reference string in 
the database.
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 25741dd3d97..fa83ce3a5f5 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -486,9 +486,9 @@ class VariableAccessor:
 
 
 @cache
-def _get_worker_state_backend() -> BaseStoreBackend | None:
+def _get_worker_state_store_backend() -> BaseStoreBackend | None:
     """Return the configured worker-side state backend, instantiated once and 
cached."""
-    class_name = conf.get("workers", "state_backend", fallback="")
+    class_name = conf.get("workers", "state_store_backend", fallback="")
     if not class_name:
         return None
     from airflow.sdk._shared.module_loading import import_string
@@ -541,7 +541,7 @@ class TaskStoreAccessor:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, TaskStoreResult):
             stored = resp.value
-            backend = _get_worker_state_backend()
+            backend = _get_worker_state_store_backend()
             if backend is not None and isinstance(stored, dict) and (ref := 
_unwrap_external_ref(stored)):
                 # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
                 return backend.deserialize_task_store_from_ref(ref)
@@ -590,7 +590,7 @@ class TaskStoreAccessor:
 
         # if custom backend is configured, store the value on the custom 
backend, and return the reference
         # to the stored value to store in the DB
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         stored: JsonValue = value
         if backend is not None:
             ref: str = backend.serialize_task_store_to_ref(value=value, 
key=key, ti_id=str(self._ti_id))
@@ -607,7 +607,7 @@ class TaskStoreAccessor:
         # cleanup the DB ref first, if backend cleanup fails after this, the 
ref is gone and
         # deterministic keys are recoverable on next set().
         SUPERVISOR_COMMS.send(DeleteTaskStore(ti_id=self._ti_id, key=key))
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         if backend is not None:
             backend.delete(self._scope, key)
 
@@ -625,7 +625,7 @@ class TaskStoreAccessor:
         # cleanup the DB ref first, if backend cleanup fails after this, the 
ref is gone and
         # deterministic keys are recoverable on next set().
         SUPERVISOR_COMMS.send(ClearTaskStore(ti_id=self._ti_id, 
all_map_indices=all_map_indices))
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         if backend is not None:
             backend.clear(self._scope, all_map_indices=all_map_indices)
 
@@ -636,7 +636,7 @@ class TaskStoreAccessor:
         Used by clear_on_success: the server already clears DB rows as part of 
SucceedTask,
         so the comms round-trip is redundant.
         """
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         if backend is not None:
             backend.clear(self._scope)
 
@@ -689,7 +689,7 @@ class AssetStoreAccessor:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, AssetStoreResult):
             stored = resp.value
-            backend = _get_worker_state_backend()
+            backend = _get_worker_state_store_backend()
             if backend is not None and isinstance(stored, dict) and (ref := 
_unwrap_external_ref(stored)):
                 # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
                 return backend.deserialize_asset_store_from_ref(ref)
@@ -714,7 +714,7 @@ class AssetStoreAccessor:
 
         # if custom backend is configured, store the value on the custom 
backend, and return the reference
         # to the stored value to store in the DB
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         asset_ref = self._name or self._uri or ""
         stored: JsonValue = value
         if backend is not None:
@@ -746,7 +746,7 @@ class AssetStoreAccessor:
         # DB ref first: if backend cleanup fails after this, the ref is gone 
and
         # deterministic keys are recoverable on next set().
         SUPERVISOR_COMMS.send(msg)
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         if backend is not None:
             backend.delete(AssetScope(name=self._name, uri=self._uri), key)
 
@@ -766,7 +766,7 @@ class AssetStoreAccessor:
         elif self._uri:
             msg = ClearAssetStoreByUri(uri=self._uri)
         SUPERVISOR_COMMS.send(msg)
-        backend = _get_worker_state_backend()
+        backend = _get_worker_state_store_backend()
         if backend is not None:
             backend.clear(AssetScope(name=self._name, uri=self._uri))
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py 
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index a7d20c43bcf..1e668fb9ffd 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1262,7 +1262,7 @@ class TestTaskStoreAccessor:
         backend.serialize_task_store_to_ref.return_value = 
"s3://bucket/ti_123/job_id"
 
         with (
-            
patch("airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=backend),
+            
patch("airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=backend),
             conf_vars({("state_store", "default_retention_days"): "0"}),
         ):
             TaskStoreAccessor(ti_id=self.TI_ID, 
scope=self.SCOPE).set("job_id", "spark_001")
@@ -1285,7 +1285,9 @@ class TestTaskStoreAccessor:
         backend = MagicMock(spec=BaseStoreBackend)
         backend.deserialize_task_store_from_ref.return_value = {"rows": 123}
 
-        with 
patch("airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=backend):
+        with patch(
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=backend
+        ):
             result = TaskStoreAccessor(ti_id=self.TI_ID, 
scope=self.SCOPE).get("job_id")
 
         assert result == {"rows": 123}
@@ -1414,7 +1416,9 @@ class TestAssetStoreAccessor:
         backend = MagicMock(spec=BaseStoreBackend)
         backend.serialize_asset_store_to_ref.return_value = 
"s3://bucket/assets/orders/watermark"
 
-        with 
patch("airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=backend):
+        with patch(
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=backend
+        ):
             AssetStoreAccessor(name=self.ASSET_NAME).set("watermark", 
"2026-05-01")
 
         mock_supervisor_comms.send.assert_called_once_with(
@@ -1434,7 +1438,9 @@ class TestAssetStoreAccessor:
         backend = MagicMock(spec=BaseStoreBackend)
         backend.deserialize_asset_store_from_ref.return_value = "2026-05-01"
 
-        with 
patch("airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=backend):
+        with patch(
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=backend
+        ):
             result = AssetStoreAccessor(name=self.ASSET_NAME).get("watermark")
 
         assert result == "2026-05-01"
@@ -1658,7 +1664,7 @@ class TestTaskStoreAccessorWithCustomBackend:
     def backend(self):
         b = InMemoryStoreBackend()
         with mock.patch(
-            "airflow.sdk.execution_time.context._get_worker_state_backend",
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
             return_value=b,
         ):
             yield b
@@ -1727,7 +1733,7 @@ class TestAssetStoreAccessorWithCustomBackend:
     def backend(self):
         b = InMemoryStoreBackend()
         with mock.patch(
-            "airflow.sdk.execution_time.context._get_worker_state_backend",
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend",
             return_value=b,
         ):
             yield b
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 29bc2558c0c..1968de74973 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -5812,7 +5812,7 @@ class TestTaskInstanceStateOperations:
         mock_backend.serialize_asset_store_to_ref.return_value = 
"mem://my_asset/watermark"
 
         with mock.patch(
-            "airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=mock_backend
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=mock_backend
         ):
             run(runtime_ti, context=runtime_ti.get_template_context(), 
log=mock.MagicMock())
 
@@ -5847,7 +5847,7 @@ class TestTaskInstanceStateOperations:
         mock_backend.serialize_task_store_to_ref.return_value = ref
 
         with mock.patch(
-            "airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=mock_backend
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=mock_backend
         ):
             run(runtime_ti, context=runtime_ti.get_template_context(), 
log=mock.MagicMock())
 
@@ -5878,7 +5878,7 @@ class TestTaskInstanceStateOperations:
         runtime_ti = create_runtime_ti(task=task)
 
         with mock.patch(
-            "airflow.sdk.execution_time.context._get_worker_state_backend", 
return_value=mock_backend
+            
"airflow.sdk.execution_time.context._get_worker_state_store_backend", 
return_value=mock_backend
         ):
             run(runtime_ti, context=runtime_ti.get_template_context(), 
log=mock.MagicMock())
 

Reply via email to