[ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
---------------------------------
    Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
                                               PythonOperator)
import sys, os

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
    dag_id='bug_testing_dag',
    default_args=args,
    schedule_interval='@daily',
    max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
    task = PythonOperator(
        task_id='task_{0}'.format(i),
        python_callable=func,
        dag=dag,)
    if prev_task:
        prev_task >> task
    
    prev_task = task

if __name__ == "__main__":
    dag.cli(){code}
I am using the LocalExecutor.

 

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
<TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [scheduled]>, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
<TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [failed]>, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
                                               PythonOperator)
import sys, os

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
    dag_id='bug_testing_dag',
    default_args=args,
    schedule_interval='@daily',
    max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
    task = PythonOperator(
        task_id='task_{0}'.format(i),
        python_callable=func,
        dag=dag,)
    if prev_task:
        prev_task >> task
    
    prev_task = task

if __name__ == "__main__":
    dag.cli(){code}
I am using the SequentialExecutor.

 

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
<TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [scheduled]>, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
<TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 [failed]>, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 


> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> ----------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6190
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 1.10.6
>            Reporter: Bjorn Olsen
>            Assignee: Bjorn Olsen
>            Priority: Minor
>         Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>                                                PythonOperator)
> import sys, os
> args = {
>     'owner': 'airflow',
>     'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
>     dag_id='bug_testing_dag',
>     default_args=args,
>     schedule_interval='@daily',
>     max_active_runs=1
> )
> def func():
>    pass
> prev_task = None
> for i in range(0,20):
>     task = PythonOperator(
>         task_id='task_{0}'.format(i),
>         python_callable=func,
>         dag=dag,)
>     if prev_task:
>         prev_task >> task
>     
>     prev_task = task
> if __name__ == "__main__":
>     dag.cli(){code}
> I am using the LocalExecutor.
>  
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> for <TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 
> [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 
> 'scheduled' state which is not a valid state for execution. The task must be 
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met 
> for <TaskInstance: bug_testing_dag.task_1 2019-12-01T00:00:00+00:00 
> [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the 
> previous TaskInstance (because there is a delay between when a TaskInstance 
> state is set to SUCCESS, and when the worker is actually done with it, 
> because of the worker heartbeat). The scheduler thinks the next task can be 
> SCHEDULED -> QUEUED, but does not start as the worker is still busy, and 
> therefore it goes back to QUEUED -> SCHEDULED. The task is still in the 
> worker queue, causing the failure above when the worker eventually wants to 
> start it.
> However what is a mystery to me is why it works the first time the dag_run 
> runs, and not the second time. Perhaps it is something specific to my 
> environment. 
> I'm going to try and debug this myself but if anyone else can replicate this 
> issue in their environment it could help me understand if it is just 
> affecting me (or not). 
>  Just install the DAG, let it run 100% once, then clear it and let it run 
> again (and you should start seeing random failures)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to