Repository: incubator-airflow Updated Branches: refs/heads/master c2472ffa1 -> 4e79b830e
[AIRFLOW-1142] Do not reset orphaned state for backfills The scheduler could interfere with backfills when it resets the state of tasks that were considered orphaned. This patch prevents the scheduler from doing so and adds a guard in the backfill. Closes #2260 from bolkedebruin/AIRFLOW-1142 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4e79b830 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4e79b830 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4e79b830 Branch: refs/heads/master Commit: 4e79b830e3261b9d54fdbc7c9dcb510d36565986 Parents: c2472ff Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Thu Apr 27 21:17:25 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Thu Apr 27 21:17:25 2017 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 10 +++++++++- airflow/models.py | 10 +++++++++- tests/jobs.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e79b830/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 02449c5..e9b6094 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1358,7 +1358,8 @@ class SchedulerJob(BaseJob): active_runs = DagRun.find( state=State.RUNNING, external_trigger=False, - session=session + session=session, + no_backfills=True, ) for dr in active_runs: self.logger.info("Resetting {} {}".format(dr.dag_id, @@ -1855,6 +1856,13 @@ class BackfillJob(BaseJob): self.logger.debug("Task instance to run {} state {}" .format(ti, ti.state)) + # guard against externally modified tasks instances or + # in case max concurrency has been reached at task runtime + if ti.state == State.NONE: + self.logger.warning("FIXME: task instance {} state was set to " + "None externally. This should not happen") + ti.set_state(State.SCHEDULED, session=session) + # The task was already marked successful or skipped by a # different Job. Don't rerun it. if ti.state == State.SUCCESS: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e79b830/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d2e41cf..51beab8 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4037,7 +4037,8 @@ class DagRun(Base): @staticmethod @provide_session def find(dag_id=None, run_id=None, execution_date=None, - state=None, external_trigger=None, session=None): + state=None, external_trigger=None, no_backfills=False, + session=None): """ Returns a set of dag runs for the given search criteria. :param dag_id: the dag_id to find dag runs for @@ -4050,6 +4051,9 @@ class DagRun(Base): :type state: State :param external_trigger: whether this dag run is externally triggered :type external_trigger: bool + :param no_backfills: return no backfills (True), return all (False). + Defaults to False + :type no_backfills: bool :param session: database session :type session: Session """ @@ -4069,6 +4073,10 @@ class DagRun(Base): qry = qry.filter(DR.state == state) if external_trigger is not None: qry = qry.filter(DR.external_trigger == external_trigger) + if no_backfills: + # in order to prevent a circular dependency + from airflow.jobs import BackfillJob + qry = qry.filter(DR.run_id.notlike(BackfillJob.ID_PREFIX + '%')) dr = qry.order_by(DR.execution_date).all() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e79b830/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 26a105b..9ebea15 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -612,6 +612,48 @@ class SchedulerJobTest(unittest.TestCase): session.close() + def test_execute_helper_reset_orphaned_tasks(self): + session = settings.Session() + dag = DAG( + 'test_execute_helper_reset_orphaned_tasks', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}) + + with dag: + op1 = DummyOperator(task_id='op1') + + dag.clear() + dr = dag.create_dagrun(run_id=DagRun.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session) + dr2 = dag.create_dagrun(run_id=BackfillJob.ID_PREFIX, + state=State.RUNNING, + execution_date=DEFAULT_DATE + datetime.timedelta(1), + start_date=DEFAULT_DATE, + session=session) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.SCHEDULED + ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) + ti2.state = State.SCHEDULED + session.commit() + + processor = mock.MagicMock() + processor.get_last_finish_time.return_value = None + + scheduler = SchedulerJob(num_runs=0, run_duration=0) + executor = TestExecutor() + scheduler.executor = executor + + scheduler._execute_helper(processor_manager=processor) + + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + self.assertEqual(ti.state, State.NONE) + + ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) + self.assertEqual(ti2.state, State.SCHEDULED) + @provide_session def evaluate_dagrun( self,