amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3271635522


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1455,6 +1465,10 @@ def _handle_current_task_success(
     if conf.getboolean("state_store", "clear_on_success"):
         log.info("Task state will be cleared by the server because 
clear_on_success is enabled.")
 
+        if _get_worker_state_backend() is not None:
+            # clear the task state keys for custom state backends configured 
on worker side
+            context["task_state"].clear()

Review Comment:
   Umm your sparked a thought in me.  I was under the impression that "task 
states" only need to be cleared for `clear_on_success` because those are bound 
to _tasks_ and asset states are intentionally not cleared on task success, 
server-side or worker-side cos asset state is designed to persist across task 
runs (e.g. watermarks, file counts that accumulate), so clearing it on success 
would not be beneficial by default. If there's a case for clearing asset state 
on success in the future, it should be a separate opt-in config (e.g. 
[state_store] clear_asset_state_on_success) as per me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to