[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bjorn Olsen closed AIRFLOW-6190. -------------------------------- Resolution: Duplicate Closing as Duplicate > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > ---------------------------------------------------------------------------------------------- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core > Affects Versions: 1.10.6 > Reporter: Bjorn Olsen > Assignee: Bjorn Olsen > Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, > PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): > pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met > for <TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 > [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the > 'scheduled' state which is not a valid state for execution. The task must be > cleared in order to be run. > [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 > 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run > [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met > for <TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 > [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 > 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run > {code} > > At first I thought this was because the workers were still busy with the > previous TaskInstance (because there is a delay between when a TaskInstance > state is set to SUCCESS, and when the worker is actually done with it, > because of the worker heartbeat). The scheduler thinks the next task can be > SCHEDULED -> QUEUED, but does not start as the worker is still busy, and > therefore it goes back to QUEUED -> SCHEDULED. The task is still in the > worker queue, causing the failure above when the worker eventually wants to > start it. > However what is a mystery to me is why it works the first time the dag_run > runs, and not the second time. Perhaps it is something specific to my > environment. > I'm going to try and debug this myself but if anyone else can replicate this > issue in their environment it could help me understand if it is just > affecting me (or not). > Just install the DAG, let it run 100% once, then clear it and let it run > again (and you should start seeing random failures) > -- This message was sent by Atlassian Jira (v8.3.4#803005)