val2k edited a comment on issue #13542:
URL: https://github.com/apache/airflow/issues/13542#issuecomment-1029786174


   @jpkoponen I adapted the @danmactough DAG to make it automatic and fit our 
use case. In filter DAGs that have been stuck for more than 5 minutes, and 
simply delete them. (In my case, changing the `try_number` and the state has no 
other effect than queuing the DAG again).
   
   ```python
   import os
   import requests
   import time
   import json
   from datetime import datetime, timedelta
   from pprint import pprint
   
   from airflow import DAG
   from airflow.models.dagrun import DagRun
   from airflow.models.taskinstance import TaskInstance
   from airflow.operators.python import PythonOperator
   from airflow.utils import timezone
   from airflow.utils.db import provide_session
   from airflow.utils.state import State
   from dependencies.utils.var import DATADOG_API_KEY
   
   DAG_NAME = os.path.splitext(os.path.basename(__file__))[0]
   DEFAULT_ARGS = {
       "owner": "airflow",
       "depends_on_past": False,
       "email_on_failure": False,
       "email_on_retry": False,
       "execution_timeout": timedelta(minutes=10),
       "retries": 0,
   }
   
   @provide_session
   def unstick_dag_callable(dag_run, session, **kwargs):
       filter = [
           TaskInstance.state == State.QUEUED,
           TaskInstance.queued_dttm < datetime.now(timezone.utc) - 
timedelta(minutes=5)
       ]
   
       tis = session.query(TaskInstance).filter(*filter).all()
       print(f"Task instances: {tis}")
       print(f"Updating {len(tis)} task instances")
   
   
       for ti in tis:
           dr = (
               session.query(DagRun)
               .filter(DagRun.run_id == ti.dag_run.run_id)
               .first()
           )
   
           dagrun = (
               dict(
                   id=dr.id,
                   dag_id=dr.dag_id,
                   execution_date=dr.execution_date,
                   start_date=dr.start_date,
                   end_date=dr.end_date,
                   _state=dr._state,
                   run_id=dr.run_id,
                   creating_job_id=dr.creating_job_id,
                   external_trigger=dr.external_trigger,
                   run_type=dr.run_type,
                   conf=dr.conf,
                   last_scheduling_decision=dr.last_scheduling_decision,
                   dag_hash=dr.dag_hash,
               )
               if dr
               else {}
           )
           pprint(
               dict(
                   task_id=ti.task_id,
                   job_id=ti.job_id,
                   key=ti.key,
                   dag_id=ti.dag_id,
                   execution_date=ti.execution_date,
                   state=ti.state,
                   dag_run={**dagrun},
               )
           )
   
   
           dr.state = State.FAILED
           print(f"Deleting {str(ti)}.")
           session.delete(ti)
   
       session.commit()
       print("Done.")
   
   
   with DAG(
       DAG_NAME,
       description="Utility DAG to fix TaskInstances stuck in queued state",
       default_args=DEFAULT_ARGS,
       schedule_interval="*/5 * * * *",
       start_date=datetime(year=2021, month=8, day=1),
       max_active_runs=1,
       catchup=False,
       default_view="graph",
       is_paused_upon_creation=False,
   ) as dag:
       PythonOperator(task_id="unstick_dag", 
python_callable=unstick_dag_callable)
   ```
      The DAG runs every 5 minutes and I never caught it in a queued state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to