uranusjr commented on code in PR #66586:
URL: https://github.com/apache/airflow/pull/66586#discussion_r3219471573
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py:
##########
@@ -47,15 +48,44 @@
)
from airflow.api_fastapi.core_api.security import GetUserDep
from airflow.api_fastapi.core_api.services.public.common import BulkService
+from airflow.configuration import conf
from airflow.listeners.listener import get_listener_manager
from airflow.models.dag import DagModel
from airflow.models.taskinstance import TaskInstance as TI
from airflow.serialization.definitions.dag import SerializedDAG
+from airflow.state import get_state_backend
from airflow.utils.state import TaskInstanceState
log = structlog.get_logger(__name__)
+def _clear_task_state_on_success(tis: Sequence[TI], session: Session) -> None:
+ """Clear task state rows for each TI if clear_on_success is enabled."""
+ if not conf.getboolean("state_store", "clear_on_success", fallback=False):
+ return
+ backend = get_state_backend()
+ for ti in tis:
+ scope = TaskScope(
+ dag_id=ti.dag_id, run_id=ti.run_id, task_id=ti.task_id,
map_index=ti.map_index or -1
+ )
+ try:
+ backend.clear(scope=scope, session=session) # type:
ignore[call-arg] # @provide_session adds session kwarg at runtime;
BaseStateBackend signature omits it so mypy can't see it
Review Comment:
Or maybe the other way around, use `create_session` instead in the subclass?
(And not have an argument.) This makes session handling a bit hairy but it’s
better if it works.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]