[AIRFLOW-1294] Backfills can loose tasks to execute In backfills we can loose tasks to execute due to a task setting its own state to NONE if concurrency limits are reached, this makes them fall outside of the scope the backfill is managing hence they will not be executed.
Dear Airflow maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - [X] My PR addresses the following [Airflow JIRA] (https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-1294 ### Description - [X] Here are some details about my PR, including screenshots of any UI changes: In backfills we can loose tasks to execute due to a task setting its own state to NONE if concurrency limits are reached, this makes them fall outside of the scope the backfill is managing hence they will not be executed. ### Tests - [X] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Should be covered by current tests, will adjust if required. ### Commits - [X] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" mistercrunch aoen saguziel This is a simplified fix that should be easier to digest in 1.8.2. It does not address all underlying issues as in https://github.com/apache/incubator- airflow/pull/2356 , but those can be addressed separately and in smaller bits. Closes #2360 from bolkedebruin/fix_race_backfill_2 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/570b2ed3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/570b2ed3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/570b2ed3 Branch: refs/heads/v1-8-test Commit: 570b2ed3ef01123dace11b620b4fcafde3bcd8b8 Parents: 3f48d48 Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Tue Jun 13 08:21:26 2017 -0700 Committer: Maxime Beauchemin <maximebeauche...@gmail.com> Committed: Tue Jun 13 08:23:33 2017 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 47 ++++++++++++++++++++--------- tests/jobs.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/570b2ed3/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 457966f..c517350 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1668,7 +1668,8 @@ class BackfillJob(BaseJob): def _update_counters(self, started, succeeded, skipped, failed, tasks_to_run): """ - Updates the counters per state of the tasks that were running + Updates the counters per state of the tasks that were running. Can re-add + to tasks to run in case required. :param started: :param succeeded: :param skipped: @@ -1700,6 +1701,20 @@ class BackfillJob(BaseJob): .format(ti)) started.pop(key) tasks_to_run[key] = ti + # special case: The state of the task can be set to NONE by the task itself + # when it reaches concurrency limits. It could also happen when the state + # is changed externally, e.g. by clearing tasks from the ui. We need to cover + # for that as otherwise those tasks would fall outside of the scope of + # the backfill suddenly. + elif ti.state == State.NONE: + self.logger.warning("FIXME: task instance {} state was set to " + "None externally or reaching concurrency limits. " + "Re-adding task to queue.".format(ti)) + session = settings.Session() + ti.set_state(State.SCHEDULED, session=session) + session.close() + started.pop(key) + tasks_to_run[key] = ti def _manage_executor_state(self, started): """ @@ -1909,19 +1924,23 @@ class BackfillJob(BaseJob): verbose=True): ti.refresh_from_db(lock_for_update=True, session=session) if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY: - # Skip scheduled state, we are executing immediately - ti.state = State.QUEUED - session.merge(ti) - self.logger.debug('Sending {} to executor'.format(ti)) - executor.queue_task_instance( - ti, - mark_success=self.mark_success, - pickle_id=pickle_id, - ignore_task_deps=self.ignore_task_deps, - ignore_depends_on_past=ignore_depends_on_past, - pool=self.pool) - started[key] = ti - tasks_to_run.pop(key) + if executor.has_task(ti): + self.logger.debug("Task Instance {} already in executor " + "waiting for queue to clear".format(ti)) + else: + self.logger.debug('Sending {} to executor'.format(ti)) + # Skip scheduled state, we are executing immediately + ti.state = State.QUEUED + session.merge(ti) + executor.queue_task_instance( + ti, + mark_success=self.mark_success, + pickle_id=pickle_id, + ignore_task_deps=self.ignore_task_deps, + ignore_depends_on_past=ignore_depends_on_past, + pool=self.pool) + started[key] = ti + tasks_to_run.pop(key) session.commit() continue http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/570b2ed3/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 21102e6..cb1766b 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -445,6 +445,91 @@ class BackfillJobTest(unittest.TestCase): subdag.clear() dag.clear() + def test_update_counters(self): + dag = DAG( + dag_id='test_manage_executor_state', + start_date=DEFAULT_DATE) + + task1 = DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow') + + job = BackfillJob(dag=dag) + + session = settings.Session() + dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + ti = TI(task1, dr.execution_date) + ti.refresh_from_db() + + started = {} + tasks_to_run = {} + failed = set() + succeeded = set() + started = {} + skipped = set() + + # test for success + ti.set_state(State.SUCCESS, session) + started[ti.key] = ti + job._update_counters(started=started, succeeded=succeeded, + skipped=skipped, failed=failed, + tasks_to_run=tasks_to_run) + self.assertTrue(len(started) == 0) + self.assertTrue(len(succeeded) == 1) + self.assertTrue(len(skipped) == 0) + self.assertTrue(len(failed) == 0) + self.assertTrue(len(tasks_to_run) == 0) + + succeeded.clear() + + # test for skipped + ti.set_state(State.SKIPPED, session) + started[ti.key] = ti + job._update_counters(started=started, succeeded=succeeded, + skipped=skipped, failed=failed, + tasks_to_run=tasks_to_run) + self.assertTrue(len(started) == 0) + self.assertTrue(len(succeeded) == 0) + self.assertTrue(len(skipped) == 1) + self.assertTrue(len(failed) == 0) + self.assertTrue(len(tasks_to_run) == 0) + + skipped.clear() + + # test for failed + ti.set_state(State.FAILED, session) + started[ti.key] = ti + job._update_counters(started=started, succeeded=succeeded, + skipped=skipped, failed=failed, + tasks_to_run=tasks_to_run) + self.assertTrue(len(started) == 0) + self.assertTrue(len(succeeded) == 0) + self.assertTrue(len(skipped) == 0) + self.assertTrue(len(failed) == 1) + self.assertTrue(len(tasks_to_run) == 0) + + failed.clear() + + # test for reschedule + # test for failed + ti.set_state(State.NONE, session) + started[ti.key] = ti + job._update_counters(started=started, succeeded=succeeded, + skipped=skipped, failed=failed, + tasks_to_run=tasks_to_run) + self.assertTrue(len(started) == 0) + self.assertTrue(len(succeeded) == 0) + self.assertTrue(len(skipped) == 0) + self.assertTrue(len(failed) == 0) + self.assertTrue(len(tasks_to_run) == 1) + + session.close() + class LocalTaskJobTest(unittest.TestCase): def setUp(self):