amoghrajesh opened a new pull request, #66586:
URL: https://github.com/apache/airflow/pull/66586

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   closes: https://github.com/apache/airflow/issues/66460
   
   
   ### What?
   
   Task state can be written during a run and for operators that don't need 
state after a successful run — where the work is done and the state is no 
longer useful — there needs to be a way to automatically clean it up without 
waiting for the global retention period.
   
   ### Current behaviour
   Task state rows persist in the task_state table after a task succeeds. The 
only cleanup path is the global retention-based airflow state-store cleanup 
command: https://github.com/apache/airflow/pull/66463
   
   ### Proposed change
   Adds a `clear_on_success = False` config option under `[state_store]`. When 
enabled, task state is automatically cleared on three paths:
   
   1. Worker-reported success —> fires after a task transitions to SUCCESS via 
the worker reporting back in the execution API. Logs "Cleared task state on 
success" in the API server logs. The worker also logs a notice in the task log 
so operators can see it.
   
   2. Manual mark-as-success (TI level) —> hook in core API which fires when an 
operator marks an individual TI as SUCCESS via UI or API. Applies to both 
single-TI and task-group patches.
   
   3. Manual mark-as-success (DAG run level) — hook in DAG run core API that 
fires when an operator marks a whole DAG run as SUCCESS. Clears state for all 
TIs in the run because marking a run SUCCESS means the whole run is done, 
regardless of which TIs were already in SUCCESS.
   
   **Cleanup is fire-and-forget: a failure never rolls back or fails the state 
update.**
   
   ### Design decision worth flagging
   
   * Cleanup is server-side only for now, ie: cleanup runs on the API server 
for configured backends on server, not on workers. This is cos worker-side 
support for custom backends is deferred and will be done in 
https://github.com/apache/airflow/issues/66337 and this will be handled there.
   
   * default off: `clear_on_success = False` because state often has 
observability value after success (submitted job IDs, watermarks the UI can 
display). Operators opt in when they know their tasks don't need post-success 
visibility.
   
   ### User implications / backcompat
   New config option `[state_store] clear_on_success (boolean, default False)`. 
No behaviour change unless explicitly enabled.
   
   
   ### Testing
   
   Created this operator that stores couple of task states:
   
   ```python
   class StoreStateOperator(BaseOperator):
       """
       Writes a few keys into task_state and prints them back.
   
       Useful for manually verifying clear_on_success: run the task, check the
       task_state table, then mark it as success — rows should disappear when
       clear_on_success=True.
       """
   
       def execute(self, context):
           task_state = context["task_state"]
   
           task_state.set("job_id", "app_9999")
           task_state.set("checkpoint", "step_3")
           task_state.set("watermark", "2026-05-08T00:00:00Z")
   
           print("Stored 3 keys in task_state:")
           for key in ("job_id", "checkpoint", "watermark"):
               print(f"  {key} = {task_state.get(key)}")
   ```
   
   DAG:
   ```python
   from airflow.sdk import dag
   from datetime import datetime
   
   from aip103_operators import StoreStateOperator
   
   
   @dag(schedule=None, start_date=datetime(2026, 4, 23), catchup=True)
   def my_stateful_dag():
       StoreStateOperator(task_id="t1")
   
   my_stateful_dag()
   
   ```
   
   #### Test 1: Set `export AIRFLOW__STATE_STORE__CLEAR_ON_SUCCESS=true` and 
testing execution
   
   Ran the DAG and this is how it should have looked:
   
   <img width="3446" height="1760" alt="image (82)" 
src="https://github.com/user-attachments/assets/69888b3e-1c01-4559-9f25-4ca3622b81c8";
 />
   
   
   <img width="2842" height="1752" alt="image (80)" 
src="https://github.com/user-attachments/assets/2d0481df-4af4-4790-8578-c7538a01f796";
 />
   
   
   But this is how it looks because cleanup happens
   
   <img width="2842" height="1752" alt="image (81)" 
src="https://github.com/user-attachments/assets/2d6d9b4a-d02d-46de-a5fe-21c0c134badf";
 />
   
   
   
   #### Test 2: Set `export AIRFLOW__STATE_STORE__CLEAR_ON_SUCCESS=true` and 
testing manual mark on success
   
   Operator slightly modified to fail intentionally:
   
   ```python
   class StoreStateOperator(BaseOperator):
       """
       Writes a few keys into task_state and prints them back.
   
       Useful for manually verifying clear_on_success: run the task, check the
       task_state table, then mark it as success — rows should disappear when
       clear_on_success=True.
       """
   
       def execute(self, context):
           task_state = context["task_state"]
   
           task_state.set("job_id", "app_9999")
           task_state.set("checkpoint", "step_3")
           task_state.set("watermark", "2026-05-08T00:00:00Z")
   
           print("Stored 3 keys in task_state:")
           for key in ("job_id", "checkpoint", "watermark"):
               print(f"  {key} = {task_state.get(key)}")
   
           raise Exception("Intentional failure.")
   ```
   
   State before DAG run:
   <img width="3446" height="1760" alt="image (83)" 
src="https://github.com/user-attachments/assets/e457b3d8-891e-4f7b-848c-31076fe4a50d";
 />
   
   
   Marking TI as success:
   <img width="3446" height="1760" alt="image (85)" 
src="https://github.com/user-attachments/assets/dfb12285-e77a-462e-9549-f7f1d2e52ca3";
 />
   
   
   After:
   <img width="3446" height="1760" alt="image (84)" 
src="https://github.com/user-attachments/assets/38913569-c16f-452e-aba6-cd0485fc7ce1";
 />
   
   
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [ ] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to