Repository: incubator-airflow Updated Branches: refs/heads/master 817296a7b -> c2b962ca9
[AIRFLOW-2558] Clear task/dag is clearing all executions Closes #3465 from feng-tao/airflow_2588_new Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c2b962ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c2b962ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c2b962ca Branch: refs/heads/master Commit: c2b962ca9831cbf56f6dbb0147c97563b7ebbd8f Parents: 817296a Author: Tao feng <tf...@lyft.com> Authored: Tue Jun 5 15:14:43 2018 -0700 Committer: Maxime Beauchemin <maximebeauche...@gmail.com> Committed: Tue Jun 5 15:14:43 2018 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 1 - airflow/models.py | 43 ++++++++++++++++++------------------------- tests/models.py | 4 ++-- 3 files changed, 20 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b962ca/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 3bff685..2742df5 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -199,7 +199,6 @@ def backfill(args, dag=None): end_date=args.end_date, confirm_prompt=True, include_subdags=False, - only_backfill_dagruns=True, ) dag.run( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b962ca/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index eda4808..c26ec01 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -132,17 +132,15 @@ def clear_task_instances(tis, session, activate_dag_runs=True, dag=None, - only_backfill_dagruns=False, ): """ Clears a set of task instances, but makes sure the running ones - get killed. Reset backfill dag run state to removed if only_backfill_dagruns is set + get killed. :param tis: a list of task instances :param session: current session :param activate_dag_runs: flag to check for active dag run :param dag: DAG object - :param only_backfill_dagruns: flag for setting backfill state """ job_ids = [] for ti in tis: @@ -176,13 +174,8 @@ def clear_task_instances(tis, DagRun.execution_date.in_({ti.execution_date for ti in tis}), ).all() for dr in drs: - if only_backfill_dagruns and dr.is_backfill: - # If the flag is set, we reset backfill dag run for retry. - # dont reset start date - dr.state = State.REMOVED - else: - dr.state = State.RUNNING - dr.start_date = timezone.utcnow() + dr.state = State.RUNNING + dr.start_date = timezone.utcnow() class DagBag(BaseDagBag, LoggingMixin): @@ -3697,18 +3690,20 @@ class DAG(BaseDag, LoggingMixin): self, state=State.RUNNING, session=None, - only_backfill_dagruns=False): - drs = session.query(DagRun).filter_by(dag_id=self.dag_id).all() + start_date=None, + end_date=None, + ): + query = session.query(DagRun).filter_by(dag_id=self.dag_id) + if start_date: + query = query.filter(DagRun.execution_date >= start_date) + if end_date: + query = query.filter(DagRun.execution_date <= end_date) + drs = query.all() + dirty_ids = [] for dr in drs: - if only_backfill_dagruns: - if dr.is_backfill: - dr.state = state - dirty_ids.append(dr.dag_id) - else: - if not dr.is_backfill: - dr.state = state - dirty_ids.append(dr.dag_id) + dr.state = state + dirty_ids.append(dr.dag_id) DagStat.update(dirty_ids, session=session) @provide_session @@ -3721,7 +3716,6 @@ class DAG(BaseDag, LoggingMixin): reset_dag_runs=True, dry_run=False, session=None, - only_backfill_dagruns=False, ): """ Clears a set of task instances associated with the current dag for @@ -3772,11 +3766,12 @@ class DAG(BaseDag, LoggingMixin): clear_task_instances(tis.all(), session, dag=self, - only_backfill_dagruns=only_backfill_dagruns, ) if reset_dag_runs: self.set_dag_runs_state(session=session, - only_backfill_dagruns=only_backfill_dagruns) + start_date=start_date, + end_date=end_date, + ) else: count = 0 print("Bail. Nothing was cleared.") @@ -3795,7 +3790,6 @@ class DAG(BaseDag, LoggingMixin): include_subdags=True, reset_dag_runs=True, dry_run=False, - only_backfill_dagruns=False, ): all_tis = [] for dag in dags: @@ -3836,7 +3830,6 @@ class DAG(BaseDag, LoggingMixin): include_subdags=include_subdags, reset_dag_runs=reset_dag_runs, dry_run=False, - only_backfill_dagruns=only_backfill_dagruns, ) else: count = 0 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b962ca/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 87ba74a..3515516 100644 --- a/tests/models.py +++ b/tests/models.py @@ -639,14 +639,14 @@ class DagRunTest(unittest.TestCase): qry = session.query(TI).filter( TI.dag_id == dag.dag_id).all() - clear_task_instances(qry, session, only_backfill_dagruns=True) + clear_task_instances(qry, session) session.commit() ti0.refresh_from_db() dr0 = session.query(DagRun).filter( DagRun.dag_id == dag_id, DagRun.execution_date == now ).first() - self.assertEquals(dr0.state, State.REMOVED) + self.assertEquals(dr0.state, State.RUNNING) def test_id_for_date(self): run_id = models.DagRun.id_for_date(