This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 9a1b5378066 [v2-10-test] Ensure teardown tasks are executed when DAG 
run is set to failed (#45530) (#45581)
9a1b5378066 is described below

commit 9a1b537806641296a67d51297d6f7711f7ec8412
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Jan 11 20:27:50 2025 +0100

    [v2-10-test] Ensure teardown tasks are executed when DAG run is set to 
failed (#45530) (#45581)
    
    * [v2-10-test] Ensure teardown tasks are executed when DAG run is set to 
failed (#45530)
    
    * Ensure teardown tasks are executed when DAG run is set to failed
    
    * Also handle the case of setting DAG to success
    
    * Add some documentation to behavior changes
    
    * Add some documentation to behavior changes
    (cherry picked from commit 1e8977a2ea24e989c6c57ee3cb8e7b6bc4cf6c56)
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    * Remove type hints only working in Airflow 3
    
    ---------
    
    Co-authored-by: Jens Scheffler <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
---
 airflow/api/common/mark_tasks.py                 | 41 +++++++------
 docs/apache-airflow/howto/setup-and-teardown.rst |  8 ++-
 newsfragments/45530.significant.rst              | 12 ++++
 tests/api/common/test_mark_tasks.py              | 74 ++++++++++++++++++++++++
 4 files changed, 117 insertions(+), 18 deletions(-)

diff --git a/airflow/api/common/mark_tasks.py b/airflow/api/common/mark_tasks.py
index 58ca737a571..6b656e85a69 100644
--- a/airflow/api/common/mark_tasks.py
+++ b/airflow/api/common/mark_tasks.py
@@ -411,15 +411,18 @@ def set_dag_run_state_to_success(
         run_id = dag_run.run_id
     if not run_id:
         raise ValueError(f"Invalid dag_run_id: {run_id}")
+
+    # Mark all task instances of the dag run to success - except for teardown 
as they need to complete work.
+    normal_tasks = [task for task in dag.tasks if not task.is_teardown]
+
     # Mark the dag run to success.
-    if commit:
+    if commit and len(normal_tasks) == len(dag.tasks):
         _set_dag_run_state(dag.dag_id, run_id, DagRunState.SUCCESS, session)
 
-    # Mark all task instances of the dag run to success.
-    for task in dag.tasks:
+    for task in normal_tasks:
         task.dag = dag
     return set_state(
-        tasks=dag.tasks,
+        tasks=normal_tasks,
         run_id=run_id,
         state=TaskInstanceState.SUCCESS,
         commit=commit,
@@ -466,10 +469,6 @@ def set_dag_run_state_to_failed(
     if not run_id:
         raise ValueError(f"Invalid dag_run_id: {run_id}")
 
-    # Mark the dag run to failed.
-    if commit:
-        _set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session)
-
     running_states = (
         TaskInstanceState.RUNNING,
         TaskInstanceState.DEFERRED,
@@ -478,25 +477,26 @@ def set_dag_run_state_to_failed(
 
     # Mark only RUNNING task instances.
     task_ids = [task.task_id for task in dag.tasks]
-    tis = session.scalars(
+    running_tis: list[TaskInstance] = session.scalars(
         select(TaskInstance).where(
             TaskInstance.dag_id == dag.dag_id,
             TaskInstance.run_id == run_id,
             TaskInstance.task_id.in_(task_ids),
             TaskInstance.state.in_(running_states),
         )
-    )
+    ).all()
 
-    task_ids_of_running_tis = [task_instance.task_id for task_instance in tis]
+    # Do not kill teardown tasks
+    task_ids_of_running_tis = [ti.task_id for ti in running_tis if not 
dag.task_dict[ti.task_id].is_teardown]
 
-    tasks = []
+    running_tasks = []
     for task in dag.tasks:
         if task.task_id in task_ids_of_running_tis:
             task.dag = dag
-            tasks.append(task)
+            running_tasks.append(task)
 
     # Mark non-finished tasks as SKIPPED.
-    tis = session.scalars(
+    pending_tis: list[TaskInstance] = session.scalars(
         select(TaskInstance).filter(
             TaskInstance.dag_id == dag.dag_id,
             TaskInstance.run_id == run_id,
@@ -510,12 +510,19 @@ def set_dag_run_state_to_failed(
         )
     ).all()
 
+    # Do not skip teardown tasks
+    pending_normal_tis = [ti for ti in pending_tis if not 
dag.task_dict[ti.task_id].is_teardown]
+
     if commit:
-        for ti in tis:
+        for ti in pending_normal_tis:
             ti.set_state(TaskInstanceState.SKIPPED)
 
-    return tis + set_state(
-        tasks=tasks,
+        # Mark the dag run to failed if there is no pending teardown (else 
this would not be scheduled later).
+        if not any(dag.task_dict[ti.task_id].is_teardown for ti in 
(running_tis + pending_tis)):
+            _set_dag_run_state(dag.dag_id, run_id, DagRunState.FAILED, session)
+
+    return pending_normal_tis + set_state(
+        tasks=running_tasks,
         run_id=run_id,
         state=TaskInstanceState.FAILED,
         commit=commit,
diff --git a/docs/apache-airflow/howto/setup-and-teardown.rst 
b/docs/apache-airflow/howto/setup-and-teardown.rst
index 7afb3c4a350..c802c8bedaf 100644
--- a/docs/apache-airflow/howto/setup-and-teardown.rst
+++ b/docs/apache-airflow/howto/setup-and-teardown.rst
@@ -24,8 +24,9 @@ Key features of setup and teardown tasks:
 
   * If you clear a task, its setups and teardowns will be cleared.
   * By default, teardown tasks are ignored for the purpose of evaluating dag 
run state.
-  * A teardown task will run if its setup was successful, even if its work 
tasks failed.
+  * A teardown task will run if its setup was successful, even if its work 
tasks failed. But it will skip if the setup was skipped.
   * Teardown tasks are ignored when setting dependencies against task groups.
+  * Teardown will also be carried out if the DAG run is manually set to 
"failed" or "success" to ensure resources will be cleaned-up.
 
 How setup and teardown works
 """"""""""""""""""""""""""""
@@ -231,3 +232,8 @@ Trigger rule behavior for teardowns
 """""""""""""""""""""""""""""""""""
 
 Teardowns use a (non-configurable) trigger rule called ALL_DONE_SETUP_SUCCESS. 
 With this rule, as long as all upstreams are done and at least one directly 
connected setup is successful, the teardown will run.  If all of a teardown's 
setups were skipped or failed, those states will propagate to the teardown.
+
+Side-effect on manual DAG state changes
+"""""""""""""""""""""""""""""""""""""""
+
+As teardown tasks are often used to clean-up resources they need to run also 
if the DAG is manually terminated. For the purpose of early termination a user 
can manually mark the DAG run as "success" or "failed" which kills all tasks 
before completion. If the DAG contains teardown tasks, they will still be 
executed. Therefore as a side effect allowing teardown tasks to be scheduled, a 
DAG will not be immediately set to a terminal state if the user requests so.
diff --git a/newsfragments/45530.significant.rst 
b/newsfragments/45530.significant.rst
new file mode 100644
index 00000000000..7e2ae8e8ac6
--- /dev/null
+++ b/newsfragments/45530.significant.rst
@@ -0,0 +1,12 @@
+Ensure teardown tasks are executed when DAG run is set to failed
+
+Previously when a DAG run was manually set to "failed" or to "success" state 
the terminal state was set to all tasks.
+But this was a gap for cases when setup- and teardown tasks were defined: If 
teardown was used to clean-up infrastructure
+or other resources, they were also skipped and thus resources could stay 
allocated.
+
+As of now when setup tasks had been executed before and the DAG is manually 
set to "failed" or "success" then teardown
+tasks are executed. Teardown tasks are skipped  if the setup was also skipped.
+
+As a side effect this means if the DAG contains teardown tasks, then the 
manual marking of DAG as "failed" or "success"
+will need to keep the DAG in running state to ensure that teardown tasks will 
be scheduled. They would not be scheduled
+if the DAG is diorectly set to "failed" or "success".
diff --git a/tests/api/common/test_mark_tasks.py 
b/tests/api/common/test_mark_tasks.py
new file mode 100644
index 00000000000..0cf58ee74a6
--- /dev/null
+++ b/tests/api/common/test_mark_tasks.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+
+from airflow.api.common.mark_tasks import set_dag_run_state_to_failed, 
set_dag_run_state_to_success
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.state import TaskInstanceState
+
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+
+pytestmark = pytest.mark.db_test
+
+
+def test_set_dag_run_state_to_failed(dag_maker):
+    with dag_maker("TEST_DAG_1"):
+        with EmptyOperator(task_id="teardown").as_teardown():
+            EmptyOperator(task_id="running")
+            EmptyOperator(task_id="pending")
+    dr = dag_maker.create_dagrun()
+    for ti in dr.get_task_instances():
+        if ti.task_id == "running":
+            ti.set_state(TaskInstanceState.RUNNING)
+    dag_maker.session.flush()
+    assert dr.dag
+
+    updated_tis: list[TaskInstance] = set_dag_run_state_to_failed(
+        dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session
+    )
+    assert len(updated_tis) == 2
+    task_dict = {ti.task_id: ti for ti in updated_tis}
+    assert task_dict["running"].state == TaskInstanceState.FAILED
+    assert task_dict["pending"].state == TaskInstanceState.SKIPPED
+    assert "teardown" not in task_dict
+
+
+def test_set_dag_run_state_to_success(dag_maker):
+    with dag_maker("TEST_DAG_1"):
+        with EmptyOperator(task_id="teardown").as_teardown():
+            EmptyOperator(task_id="running")
+            EmptyOperator(task_id="pending")
+    dr = dag_maker.create_dagrun()
+    for ti in dr.get_task_instances():
+        if ti.task_id == "running":
+            ti.set_state(TaskInstanceState.RUNNING)
+    dag_maker.session.flush()
+    assert dr.dag
+
+    updated_tis: list[TaskInstance] = set_dag_run_state_to_success(
+        dag=dr.dag, run_id=dr.run_id, commit=True, session=dag_maker.session
+    )
+    assert len(updated_tis) == 2
+    task_dict = {ti.task_id: ti for ti in updated_tis}
+    assert task_dict["running"].state == TaskInstanceState.SUCCESS
+    assert task_dict["pending"].state == TaskInstanceState.SUCCESS
+    assert "teardown" not in task_dict

Reply via email to