[AIRFLOW-1294] Backfills can loose tasks to execute

In backfills we can loose tasks to execute due to
a task
setting its own state to NONE if concurrency
limits are reached,
this makes them fall outside of the scope the
backfill is
managing hence they will not be executed.

Dear Airflow maintainers,

Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!

### JIRA
- [X] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "[AIRFLOW-XXX] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-1294

### Description
- [X] Here are some details about my PR, including
screenshots of any UI changes:

In backfills we can loose tasks to execute due to
a task
setting its own state to NONE if concurrency
limits are reached,
this makes them fall outside of the scope the
backfill is
managing hence they will not be executed.

### Tests
- [X] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
Should be covered by current tests, will adjust if
required.

### Commits
- [X] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

mistercrunch aoen saguziel This is a simplified
fix that should be easier to digest in 1.8.2. It
does not address all underlying issues as in
https://github.com/apache/incubator-
airflow/pull/2356 , but those can be addressed
separately and in smaller bits.

Closes #2360 from bolkedebruin/fix_race_backfill_2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/570b2ed3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/570b2ed3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/570b2ed3

Branch: refs/heads/v1-8-test
Commit: 570b2ed3ef01123dace11b620b4fcafde3bcd8b8
Parents: 3f48d48
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jun 13 08:21:26 2017 -0700
Committer: Maxime Beauchemin <maximebeauche...@gmail.com>
Committed: Tue Jun 13 08:23:33 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py | 47 ++++++++++++++++++++---------
 tests/jobs.py   | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 118 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/570b2ed3/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 457966f..c517350 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1668,7 +1668,8 @@ class BackfillJob(BaseJob):
 
     def _update_counters(self, started, succeeded, skipped, failed, 
tasks_to_run):
         """
-        Updates the counters per state of the tasks that were running
+        Updates the counters per state of the tasks that were running. Can 
re-add
+        to tasks to run in case required.
         :param started:
         :param succeeded:
         :param skipped:
@@ -1700,6 +1701,20 @@ class BackfillJob(BaseJob):
                                     .format(ti))
                 started.pop(key)
                 tasks_to_run[key] = ti
+            # special case: The state of the task can be set to NONE by the 
task itself
+            # when it reaches concurrency limits. It could also happen when 
the state
+            # is changed externally, e.g. by clearing tasks from the ui. We 
need to cover
+            # for that as otherwise those tasks would fall outside of the 
scope of
+            # the backfill suddenly.
+            elif ti.state == State.NONE:
+                self.logger.warning("FIXME: task instance {} state was set to "
+                                    "None externally or reaching concurrency 
limits. "
+                                    "Re-adding task to queue.".format(ti))
+                session = settings.Session()
+                ti.set_state(State.SCHEDULED, session=session)
+                session.close()
+                started.pop(key)
+                tasks_to_run[key] = ti
 
     def _manage_executor_state(self, started):
         """
@@ -1909,19 +1924,23 @@ class BackfillJob(BaseJob):
                             verbose=True):
                         ti.refresh_from_db(lock_for_update=True, 
session=session)
                         if ti.state == State.SCHEDULED or ti.state == 
State.UP_FOR_RETRY:
-                            # Skip scheduled state, we are executing 
immediately
-                            ti.state = State.QUEUED
-                            session.merge(ti)
-                            self.logger.debug('Sending {} to 
executor'.format(ti))
-                            executor.queue_task_instance(
-                                ti,
-                                mark_success=self.mark_success,
-                                pickle_id=pickle_id,
-                                ignore_task_deps=self.ignore_task_deps,
-                                ignore_depends_on_past=ignore_depends_on_past,
-                                pool=self.pool)
-                            started[key] = ti
-                            tasks_to_run.pop(key)
+                            if executor.has_task(ti):
+                                self.logger.debug("Task Instance {} already in 
executor "
+                                                  "waiting for queue to 
clear".format(ti))
+                            else:
+                                self.logger.debug('Sending {} to 
executor'.format(ti))
+                                # Skip scheduled state, we are executing 
immediately
+                                ti.state = State.QUEUED
+                                session.merge(ti)
+                                executor.queue_task_instance(
+                                    ti,
+                                    mark_success=self.mark_success,
+                                    pickle_id=pickle_id,
+                                    ignore_task_deps=self.ignore_task_deps,
+                                    
ignore_depends_on_past=ignore_depends_on_past,
+                                    pool=self.pool)
+                                started[key] = ti
+                                tasks_to_run.pop(key)
                         session.commit()
                         continue
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/570b2ed3/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 21102e6..cb1766b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -445,6 +445,91 @@ class BackfillJobTest(unittest.TestCase):
         subdag.clear()
         dag.clear()
 
+    def test_update_counters(self):
+        dag = DAG(
+            dag_id='test_manage_executor_state',
+            start_date=DEFAULT_DATE)
+
+        task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        job = BackfillJob(dag=dag)
+
+        session = settings.Session()
+        dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX,
+                               state=State.RUNNING,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE,
+                               session=session)
+        ti = TI(task1, dr.execution_date)
+        ti.refresh_from_db()
+
+        started = {}
+        tasks_to_run = {}
+        failed = set()
+        succeeded = set()
+        started = {}
+        skipped = set()
+
+        # test for success
+        ti.set_state(State.SUCCESS, session)
+        started[ti.key] = ti
+        job._update_counters(started=started, succeeded=succeeded,
+                                     skipped=skipped, failed=failed,
+                                     tasks_to_run=tasks_to_run)
+        self.assertTrue(len(started) == 0)
+        self.assertTrue(len(succeeded) == 1)
+        self.assertTrue(len(skipped) == 0)
+        self.assertTrue(len(failed) == 0)
+        self.assertTrue(len(tasks_to_run) == 0)
+
+        succeeded.clear()
+
+        # test for skipped
+        ti.set_state(State.SKIPPED, session)
+        started[ti.key] = ti
+        job._update_counters(started=started, succeeded=succeeded,
+                                     skipped=skipped, failed=failed,
+                                     tasks_to_run=tasks_to_run)
+        self.assertTrue(len(started) == 0)
+        self.assertTrue(len(succeeded) == 0)
+        self.assertTrue(len(skipped) == 1)
+        self.assertTrue(len(failed) == 0)
+        self.assertTrue(len(tasks_to_run) == 0)
+
+        skipped.clear()
+
+        # test for failed
+        ti.set_state(State.FAILED, session)
+        started[ti.key] = ti
+        job._update_counters(started=started, succeeded=succeeded,
+                                     skipped=skipped, failed=failed,
+                                     tasks_to_run=tasks_to_run)
+        self.assertTrue(len(started) == 0)
+        self.assertTrue(len(succeeded) == 0)
+        self.assertTrue(len(skipped) == 0)
+        self.assertTrue(len(failed) == 1)
+        self.assertTrue(len(tasks_to_run) == 0)
+
+        failed.clear()
+
+        # test for reschedule
+        # test for failed
+        ti.set_state(State.NONE, session)
+        started[ti.key] = ti
+        job._update_counters(started=started, succeeded=succeeded,
+                                     skipped=skipped, failed=failed,
+                                     tasks_to_run=tasks_to_run)
+        self.assertTrue(len(started) == 0)
+        self.assertTrue(len(succeeded) == 0)
+        self.assertTrue(len(skipped) == 0)
+        self.assertTrue(len(failed) == 0)
+        self.assertTrue(len(tasks_to_run) == 1)
+
+        session.close()
+
 
 class LocalTaskJobTest(unittest.TestCase):
     def setUp(self):

Reply via email to