Repository: incubator-airflow Updated Branches: refs/heads/master 7fa86f72c -> e9fe64af0
[AIRFLOW-695] Retries do not execute because dagrun is in FAILED state The scheduler checks the tasks instances without taking into account if the executor already reported back. In this case the executor reports back several iterations later, but the task is queued nevertheless. Due to the fact tasks will not enter the queue when the task is considered running, the task state will be "queuedâ indefinitely and in limbo between the scheduler and the executor. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2e166b79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2e166b79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2e166b79 Branch: refs/heads/master Commit: 2e166b7928c5f66735c687a830f82ff9e1a733b6 Parents: 937142d Author: root <[email protected]> Authored: Sun Dec 18 20:19:58 2016 +0000 Committer: Bolke de Bruin <[email protected]> Committed: Tue Jan 3 14:21:43 2017 +0100 ---------------------------------------------------------------------- airflow/executors/base_executor.py | 9 ++++ airflow/jobs.py | 5 +++ tests/jobs.py | 76 +++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e166b79/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index d702ff2..7a4065e 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -76,6 +76,15 @@ class BaseExecutor(LoggingMixin): priority=task_instance.task.priority_weight_total, queue=task_instance.task.queue) + def has_task(self, task_instance): + """ + Checks if a task is either queued or running in this executor + :param task_instance: TaskInstance + :return: True if the task is known to this executor + """ + if task_instance.key in self.queued_tasks or task_instance.key in self.running: + return True + def sync(self): """ Sync will get called periodically by the heartbeat method. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e166b79/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 81c77a8..819d107 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -989,6 +989,11 @@ class SchedulerJob(BaseJob): # Can't schedule any more since there are no more open slots. break + if self.executor.has_task(task_instance): + self.logger.debug("Not handling task {} as the executor reports it is running" + .format(task_instance.key)) + continue + if simple_dag_bag.get_dag(task_instance.dag_id).is_paused: self.logger.info("Not executing queued {} since {} is paused" .format(task_instance, task_instance.dag_id)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2e166b79/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 62e88e5..d7dfbe7 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -21,6 +21,7 @@ import datetime import logging import os import unittest +import six from airflow import AirflowException, settings from airflow import models @@ -29,6 +30,7 @@ from airflow.executors import DEFAULT_EXECUTOR from airflow.jobs import BackfillJob, SchedulerJob from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.bash_operator import BashOperator from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout @@ -899,6 +901,80 @@ class SchedulerJobTest(unittest.TestCase): do_schedule() self.assertEquals(2, len(executor.queued_tasks)) + def test_retry_still_in_executor(self): + """ + Checks if the scheduler does not put a task in limbo, when a task is retried + but is still present in the executor. + """ + executor = TestExecutor() + dagbag = DagBag(executor=executor) + dagbag.dags.clear() + dagbag.executor = executor + + dag = DAG( + dag_id='test_retry_still_in_executor', + start_date=DEFAULT_DATE) + dag_task1 = BashOperator( + task_id='test_retry_handling_op', + bash_command='exit 1', + retries=1, + dag=dag, + owner='airflow') + + dag.clear() + dag.is_subdag = False + + session = settings.Session() + orm_dag = DagModel(dag_id=dag.dag_id) + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag) + + @mock.patch('airflow.models.DagBag', return_value=dagbag) + @mock.patch('airflow.models.DagBag.collect_dags') + def do_schedule(function, function2): + # Use a empty file since the above mock will return the + # expected DAGs. Also specify only a single file so that it doesn't + # try to schedule the above DAG repeatedly. + scheduler = SchedulerJob(num_runs=1, + executor=executor, + subdir=os.path.join(models.DAGS_FOLDER, + "no_dags.py")) + scheduler.heartrate = 0 + scheduler.run() + + do_schedule() + self.assertEquals(1, len(executor.queued_tasks)) + + def run_with_error(task): + try: + task.run() + except AirflowException: + pass + + ti_tuple = six.next(six.itervalues(executor.queued_tasks)) + (command, priority, queue, ti) = ti_tuple + ti.task = dag_task1 + + # fail execution + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti.try_number, 1) + + # do not schedule + do_schedule() + self.assertTrue(executor.has_task(ti)) + ti.refresh_from_db() + self.assertEqual(ti.state, State.UP_FOR_RETRY) + + # now the executor has cleared and it should be allowed the re-queue + executor.queued_tasks.clear() + do_schedule() + ti.refresh_from_db() + self.assertEqual(ti.state, State.QUEUED) + def test_scheduler_run_duration(self): """ Verifies that the scheduler run duration limit is followed.
