amoghrajesh opened a new pull request, #66586:
URL: https://github.com/apache/airflow/pull/66586
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
closes: https://github.com/apache/airflow/issues/66460
### What?
Task state can be written during a run and for operators that don't need
state after a successful run — where the work is done and the state is no
longer useful — there needs to be a way to automatically clean it up without
waiting for the global retention period.
### Current behaviour
Task state rows persist in the task_state table after a task succeeds. The
only cleanup path is the global retention-based airflow state-store cleanup
command: https://github.com/apache/airflow/pull/66463
### Proposed change
Adds a `clear_on_success = False` config option under `[state_store]`. When
enabled, task state is automatically cleared on three paths:
1. Worker-reported success —> fires after a task transitions to SUCCESS via
the worker reporting back in the execution API. Logs "Cleared task state on
success" in the API server logs. The worker also logs a notice in the task log
so operators can see it.
2. Manual mark-as-success (TI level) —> hook in core API which fires when an
operator marks an individual TI as SUCCESS via UI or API. Applies to both
single-TI and task-group patches.
3. Manual mark-as-success (DAG run level) — hook in DAG run core API that
fires when an operator marks a whole DAG run as SUCCESS. Clears state for all
TIs in the run because marking a run SUCCESS means the whole run is done,
regardless of which TIs were already in SUCCESS.
**Cleanup is fire-and-forget: a failure never rolls back or fails the state
update.**
### Design decision worth flagging
* Cleanup is server-side only for now, ie: cleanup runs on the API server
for configured backends on server, not on workers. This is cos worker-side
support for custom backends is deferred and will be done in
https://github.com/apache/airflow/issues/66337 and this will be handled there.
* default off: `clear_on_success = False` because state often has
observability value after success (submitted job IDs, watermarks the UI can
display). Operators opt in when they know their tasks don't need post-success
visibility.
### User implications / backcompat
New config option `[state_store] clear_on_success (boolean, default False)`.
No behaviour change unless explicitly enabled.
### Testing
Created this operator that stores couple of task states:
```python
class StoreStateOperator(BaseOperator):
"""
Writes a few keys into task_state and prints them back.
Useful for manually verifying clear_on_success: run the task, check the
task_state table, then mark it as success — rows should disappear when
clear_on_success=True.
"""
def execute(self, context):
task_state = context["task_state"]
task_state.set("job_id", "app_9999")
task_state.set("checkpoint", "step_3")
task_state.set("watermark", "2026-05-08T00:00:00Z")
print("Stored 3 keys in task_state:")
for key in ("job_id", "checkpoint", "watermark"):
print(f" {key} = {task_state.get(key)}")
```
DAG:
```python
from airflow.sdk import dag
from datetime import datetime
from aip103_operators import StoreStateOperator
@dag(schedule=None, start_date=datetime(2026, 4, 23), catchup=True)
def my_stateful_dag():
StoreStateOperator(task_id="t1")
my_stateful_dag()
```
#### Test 1: Set `export AIRFLOW__STATE_STORE__CLEAR_ON_SUCCESS=true` and
testing execution
Ran the DAG and this is how it should have looked:
<img width="3446" height="1760" alt="image (82)"
src="https://github.com/user-attachments/assets/69888b3e-1c01-4559-9f25-4ca3622b81c8"
/>
<img width="2842" height="1752" alt="image (80)"
src="https://github.com/user-attachments/assets/2d0481df-4af4-4790-8578-c7538a01f796"
/>
But this is how it looks because cleanup happens
<img width="2842" height="1752" alt="image (81)"
src="https://github.com/user-attachments/assets/2d6d9b4a-d02d-46de-a5fe-21c0c134badf"
/>
#### Test 2: Set `export AIRFLOW__STATE_STORE__CLEAR_ON_SUCCESS=true` and
testing manual mark on success
Operator slightly modified to fail intentionally:
```python
class StoreStateOperator(BaseOperator):
"""
Writes a few keys into task_state and prints them back.
Useful for manually verifying clear_on_success: run the task, check the
task_state table, then mark it as success — rows should disappear when
clear_on_success=True.
"""
def execute(self, context):
task_state = context["task_state"]
task_state.set("job_id", "app_9999")
task_state.set("checkpoint", "step_3")
task_state.set("watermark", "2026-05-08T00:00:00Z")
print("Stored 3 keys in task_state:")
for key in ("job_id", "checkpoint", "watermark"):
print(f" {key} = {task_state.get(key)}")
raise Exception("Intentional failure.")
```
State before DAG run:
<img width="3446" height="1760" alt="image (83)"
src="https://github.com/user-attachments/assets/e457b3d8-891e-4f7b-848c-31076fe4a50d"
/>
Marking TI as success:
<img width="3446" height="1760" alt="image (85)"
src="https://github.com/user-attachments/assets/dfb12285-e77a-462e-9549-f7f1d2e52ca3"
/>
After:
<img width="3446" height="1760" alt="image (84)"
src="https://github.com/user-attachments/assets/38913569-c16f-452e-aba6-cd0485fc7ce1"
/>
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]