This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 177da9016b Enhance docs for zombie tasks (#35825)
177da9016b is described below

commit 177da9016bbedcfa49c08256fdaf2fb537b97d6c
Author: Pankaj Koti <pankajkoti...@gmail.com>
AuthorDate: Sat Nov 25 23:16:21 2023 +0530

    Enhance docs for zombie tasks (#35825)
    
    closes: #35698
---
 airflow/jobs/scheduler_job_runner.py        |  3 ++
 docs/apache-airflow/core-concepts/tasks.rst | 77 ++++++++++++++++++++++++++++-
 2 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 1264af56a5..b9d18098f6 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1711,6 +1711,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         if num_timed_out_tasks:
             self.log.info("Timed out %i deferred tasks without fired 
triggers", num_timed_out_tasks)
 
+    # [START find_zombies]
     def _find_zombies(self) -> None:
         """
         Find zombie task instances and create a TaskCallbackRequest to be 
handled by the DAG processor.
@@ -1763,6 +1764,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.job.executor.send_callback(request)
             Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": 
ti.task_id})
 
+    # [END find_zombies]
+
     @staticmethod
     def _generate_zombie_message_details(ti: TI) -> dict[str, Any]:
         zombie_message_details = {
diff --git a/docs/apache-airflow/core-concepts/tasks.rst 
b/docs/apache-airflow/core-concepts/tasks.rst
index 5e305e2fe8..d72c14a27e 100644
--- a/docs/apache-airflow/core-concepts/tasks.rst
+++ b/docs/apache-airflow/core-concepts/tasks.rst
@@ -243,9 +243,82 @@ Zombie/Undead Tasks
 
 No system runs perfectly, and task instances are expected to die once in a 
while. Airflow detects two kinds of task/process mismatch:
 
-* *Zombie tasks* are tasks that are supposed to be running but suddenly died 
(e.g. their process was killed, or the machine died). Airflow will find these 
periodically, clean them up, and either fail or retry the task depending on its 
settings.
+* *Zombie tasks* are ``TaskInstances`` stuck in a ``running`` state despite 
their associated jobs being inactive
+  (e.g. their process didn't send a recent heartbeat as it got killed, or the 
machine died). Airflow will find these
+  periodically, clean them up, and either fail or retry the task depending on 
its settings.
+
+* *Undead tasks* are tasks that are *not* supposed to be running but are, 
often caused when you manually edit Task
+  Instances via the UI. Airflow will find them periodically and terminate them.
+
+
+Below is the code snippet from the Airflow scheduler that runs periodically to 
detect zombie/undead tasks.
+
+.. exampleinclude:: /../../airflow/jobs/scheduler_job_runner.py
+    :language: python
+    :start-after: [START find_zombies]
+    :end-before: [END find_zombies]
+
+
+The explanation of the criteria used in the above snippet to detect zombie 
tasks is as below:
+
+1. **Task Instance State**
+
+    Only task instances in the RUNNING state are considered potential zombies.
+
+2. **Job State and Heartbeat Check**
+
+    Zombie tasks are identified if the associated job is not in the RUNNING 
state or if the latest heartbeat of the job is
+    earlier than the calculated time threshold (limit_dttm). The heartbeat is 
a mechanism to indicate that a task or job is
+    still alive and running.
+
+3. **Job Type**
+
+    The job associated with the task must be of type "LocalTaskJob."
+
+4. **Queued by Job ID**
+
+    Only tasks queued by the same job that is currently being processed are 
considered.
+
+These conditions collectively help identify running tasks that may be zombies 
based on their state, associated job
+state, heartbeat status, job type, and the specific job that queued them. If a 
task meets these criteria, it is
+considered a potential zombie, and further actions, such as logging and 
sending a callback request, are taken.
+
+Reproducing zombie tasks locally
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If you'd like to reproduce zombie tasks for development/testing processes, 
follow the steps below:
+
+1. Set the below environment variables for your local Airflow setup 
(alternatively you could tweak the corresponding config values in airflow.cfg)
+
+.. code-block:: bash
+
+    export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
+    export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
+    export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
+
+
+2. Have a DAG with a task that takes about 10 minutes to complete(i.e. a 
long-running task). For example, you could use the below DAG:
+
+.. code-block:: python
+
+    from airflow.decorators import dag
+    from airflow.operators.bash import BashOperator
+    from datetime import datetime
+
+
+    @dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
+    def sleep_dag():
+        t1 = BashOperator(
+            task_id="sleep_10_minutes",
+            bash_command="sleep 600",
+        )
+
+
+    sleep_dag()
+
+
+Run the above DAG and wait for a while. You should see the task instance 
becoming a zombie task and then being killed by the scheduler.
 
-* *Undead tasks* are tasks that are *not* supposed to be running but are, 
often caused when you manually edit Task Instances via the UI. Airflow will 
find them periodically and terminate them.
 
 
 Executor Configuration

Reply via email to