val2k edited a comment on issue #13542: URL: https://github.com/apache/airflow/issues/13542#issuecomment-1029786174
@jpkoponen I adapted the @danmactough DAG to make it automatic and fit our use case. In filter DAGs that have been stuck for more than 5 minutes, and simply delete them. (In my case, changing the `try_number` and the state has no other effect than queuing the DAG again). ```python import os import requests import time import json from datetime import datetime, timedelta from pprint import pprint from airflow import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.operators.python import PythonOperator from airflow.utils import timezone from airflow.utils.db import provide_session from airflow.utils.state import State from dependencies.utils.var import DATADOG_API_KEY DAG_NAME = os.path.splitext(os.path.basename(__file__))[0] DEFAULT_ARGS = { "owner": "airflow", "depends_on_past": False, "email_on_failure": False, "email_on_retry": False, "execution_timeout": timedelta(minutes=10), "retries": 0, } @provide_session def unstick_dag_callable(dag_run, session, **kwargs): filter = [ TaskInstance.state == State.QUEUED, TaskInstance.queued_dttm < datetime.now(timezone.utc) - timedelta(minutes=5) ] tis = session.query(TaskInstance).filter(*filter).all() print(f"Task instances: {tis}") print(f"Updating {len(tis)} task instances") for ti in tis: dr = ( session.query(DagRun) .filter(DagRun.run_id == ti.dag_run.run_id) .first() ) dagrun = ( dict( id=dr.id, dag_id=dr.dag_id, execution_date=dr.execution_date, start_date=dr.start_date, end_date=dr.end_date, _state=dr._state, run_id=dr.run_id, creating_job_id=dr.creating_job_id, external_trigger=dr.external_trigger, run_type=dr.run_type, conf=dr.conf, last_scheduling_decision=dr.last_scheduling_decision, dag_hash=dr.dag_hash, ) if dr else {} ) pprint( dict( task_id=ti.task_id, job_id=ti.job_id, key=ti.key, dag_id=ti.dag_id, execution_date=ti.execution_date, state=ti.state, dag_run={**dagrun}, ) ) dr.state = State.FAILED print(f"Deleting {str(ti)}.") session.delete(ti) session.commit() print("Done.") with DAG( DAG_NAME, description="Utility DAG to fix TaskInstances stuck in queued state", default_args=DEFAULT_ARGS, schedule_interval="*/5 * * * *", start_date=datetime(year=2021, month=8, day=1), max_active_runs=1, catchup=False, default_view="graph", is_paused_upon_creation=False, ) as dag: PythonOperator(task_id="unstick_dag", python_callable=unstick_dag_callable) ``` The DAG runs every 5 minutes and I never caught it in a queued state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org