amoghrajesh opened a new pull request, #67530:
URL: https://github.com/apache/airflow/pull/67530
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes - claude sonnet 4.6
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
### What problem are we solving?
When a custom worker backend (e.g. S3, GCS) stores a state value externally
and writes a reference string back to the DB, the UI has no way to tell whether
a value like `s3://bucket/ti_123/job_id` is:
- The user's actual state value (a plain string they stored), or
- An opaque reference to externally-stored data
Without this distinction, the UI would show the raw path as if it were the
value, which can be confusing and misleading.
### Current behaviour
Custom backends return a reference string from
`serialize_task_state_to_ref()`, which is stored verbatim in the DB. The DB
value column contains either a plain JSON value or a reference string with no
structural difference between the two — the UI cannot differentiate them.
### Proposed change
When a custom worker backend is configured, the framework now automatically
wraps the reference returned by `serialize_task_state_to_ref()` in a typed
envelope before storing:
`{"__type": "ExternalState", "__var": "s3://bucket/ti_123/job_id"}`
On read, the framework detects the envelope, extracts `__var`, and passes
the raw ref to `deserialize_task_state_from_ref()` — the backend never sees the
envelope.
The default path (no custom backend) is unaffected — plain JSON values are
stored and returned as before.
### Testing
Created a custom worker side backend based on file system:
```python
from __future__ import annotations
import json
from pathlib import Path
from typing import TYPE_CHECKING
from airflow.sdk.state import BaseStateBackend
if TYPE_CHECKING:
from datetime import datetime
from pydantic import JsonValue
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from airflow_shared.state import StateScope
BASE_DIR = Path("/tmp/airflow_state")
class FileStateBackend(BaseStateBackend):
"""Stores task/asset state values as local JSON files; returns the path
as the ref."""
def serialize_task_state_to_ref(self, *, value: JsonValue, key: str,
ti_id: str) -> str:
path = BASE_DIR / f"ti_{ti_id}" / f"{key}.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(value))
return str(path)
def deserialize_task_state_from_ref(self, stored: str) -> JsonValue:
return json.loads(Path(stored).read_text())
def serialize_asset_state_to_ref(self, *, value: JsonValue, key: str,
asset_ref: str) -> str:
safe = asset_ref.replace("/", "_").replace(":", "")
path = BASE_DIR / "assets" / safe / f"{key}.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(value))
return str(path)
def deserialize_asset_state_from_ref(self, stored: str) -> JsonValue:
return json.loads(Path(stored).read_text())
def get(self, scope: StateScope, key: str, *, session: Session | None =
None) -> str | None:
raise NotImplementedError(
"FileStateBackend is a worker-side backend; server uses
MetastoreStateBackend"
)
def set(
self,
scope: StateScope,
key: str,
value: str,
*,
expires_at: datetime | None = None,
session: Session | None = None,
) -> None:
raise NotImplementedError
def delete(self, scope: StateScope, key: str, *, session: Session | None
= None) -> None:
raise NotImplementedError
def clear(
self, scope: StateScope, *, all_map_indices: bool = False, session:
Session | None = None
) -> None:
raise NotImplementedError
async def aget(self, scope: StateScope, key: str, *, session:
AsyncSession | None = None) -> str | None:
raise NotImplementedError
async def aset(
self,
scope: StateScope,
key: str,
value: str,
*,
expires_at: datetime | None = None,
session: AsyncSession | None = None,
) -> None:
raise NotImplementedError
async def adelete(self, scope: StateScope, key: str, *, session:
AsyncSession | None = None) -> None:
raise NotImplementedError
async def aclear(
self, scope: StateScope, *, all_map_indices: bool = False, session:
AsyncSession | None = None
) -> None:
raise NotImplementedError
```
Ran breeze with: `export
AIRFLOW__WORKERS__STATE_BACKEND=file_state_backend.FileStateBackend`
DAG:
```python
from airflow.providers.standard.operators.bash import BashOperator
from airflow.sdk import dag, task, Context
from datetime import datetime, timedelta
@dag(schedule=None, start_date=datetime(2026, 4, 23), catchup=True)
def simple_task_state():
@task
def my_task(**context: Context):
task_state = context["task_state"]
task_state.set("job_id", "12345")
task_state.set("secret-dict", {"key": "value"})
print("Fetching task states I stored earlier")
print("job_id:", task_state.get("job_id"))
print("secret-dict:", task_state.get("secret-dict"))
my_task()
simple_task_state()
```
The task is agnostic to the envelope.
<img width="2504" height="987" alt="image"
src="https://github.com/user-attachments/assets/810d4b94-2a84-4884-84ca-7d9bdeb61f05"
/>
File system is updated with the custom backend generated files:
```shell
[Breeze:3.10.20]
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$
cat
job_id.json secret-dict.json
[Breeze:3.10.20]
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$
cat job_id.json
"12345"[Breeze:3.10.20]
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$
cat secret-dict.json
{"key": "value"}[Breeze:3.10.20]
root@4bdd2c99b18b:/tmp/airflow_state/ti_019e62fc-9e44-715b-8cb9-cfd6865f8c83$
```
Core API will return the external reference for UI to build upon:
<img width="2559" height="987" alt="image"
src="https://github.com/user-attachments/assets/c0fb9c05-5a86-48c1-9d1f-0f6174bb7d89"
/>
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]