Repository: incubator-airflow
Updated Branches:
  refs/heads/master 817296a7b -> c2b962ca9


[AIRFLOW-2558] Clear task/dag is clearing all executions

Closes #3465 from feng-tao/airflow_2588_new


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c2b962ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c2b962ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c2b962ca

Branch: refs/heads/master
Commit: c2b962ca9831cbf56f6dbb0147c97563b7ebbd8f
Parents: 817296a
Author: Tao feng <tf...@lyft.com>
Authored: Tue Jun 5 15:14:43 2018 -0700
Committer: Maxime Beauchemin <maximebeauche...@gmail.com>
Committed: Tue Jun 5 15:14:43 2018 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py |  1 -
 airflow/models.py  | 43 ++++++++++++++++++-------------------------
 tests/models.py    |  4 ++--
 3 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b962ca/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 3bff685..2742df5 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -199,7 +199,6 @@ def backfill(args, dag=None):
                 end_date=args.end_date,
                 confirm_prompt=True,
                 include_subdags=False,
-                only_backfill_dagruns=True,
             )
 
         dag.run(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b962ca/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index eda4808..c26ec01 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -132,17 +132,15 @@ def clear_task_instances(tis,
                          session,
                          activate_dag_runs=True,
                          dag=None,
-                         only_backfill_dagruns=False,
                          ):
     """
     Clears a set of task instances, but makes sure the running ones
-    get killed. Reset backfill dag run state to removed if 
only_backfill_dagruns is set
+    get killed.
 
     :param tis: a list of task instances
     :param session: current session
     :param activate_dag_runs: flag to check for active dag run
     :param dag: DAG object
-    :param only_backfill_dagruns: flag for setting backfill state
     """
     job_ids = []
     for ti in tis:
@@ -176,13 +174,8 @@ def clear_task_instances(tis,
             DagRun.execution_date.in_({ti.execution_date for ti in tis}),
         ).all()
         for dr in drs:
-            if only_backfill_dagruns and dr.is_backfill:
-                # If the flag is set, we reset backfill dag run for retry.
-                # dont reset start date
-                dr.state = State.REMOVED
-            else:
-                dr.state = State.RUNNING
-                dr.start_date = timezone.utcnow()
+            dr.state = State.RUNNING
+            dr.start_date = timezone.utcnow()
 
 
 class DagBag(BaseDagBag, LoggingMixin):
@@ -3697,18 +3690,20 @@ class DAG(BaseDag, LoggingMixin):
             self,
             state=State.RUNNING,
             session=None,
-            only_backfill_dagruns=False):
-        drs = session.query(DagRun).filter_by(dag_id=self.dag_id).all()
+            start_date=None,
+            end_date=None,
+    ):
+        query = session.query(DagRun).filter_by(dag_id=self.dag_id)
+        if start_date:
+            query = query.filter(DagRun.execution_date >= start_date)
+        if end_date:
+            query = query.filter(DagRun.execution_date <= end_date)
+        drs = query.all()
+
         dirty_ids = []
         for dr in drs:
-            if only_backfill_dagruns:
-                if dr.is_backfill:
-                    dr.state = state
-                    dirty_ids.append(dr.dag_id)
-            else:
-                if not dr.is_backfill:
-                    dr.state = state
-                    dirty_ids.append(dr.dag_id)
+            dr.state = state
+            dirty_ids.append(dr.dag_id)
         DagStat.update(dirty_ids, session=session)
 
     @provide_session
@@ -3721,7 +3716,6 @@ class DAG(BaseDag, LoggingMixin):
             reset_dag_runs=True,
             dry_run=False,
             session=None,
-            only_backfill_dagruns=False,
     ):
         """
         Clears a set of task instances associated with the current dag for
@@ -3772,11 +3766,12 @@ class DAG(BaseDag, LoggingMixin):
             clear_task_instances(tis.all(),
                                  session,
                                  dag=self,
-                                 only_backfill_dagruns=only_backfill_dagruns,
                                  )
             if reset_dag_runs:
                 self.set_dag_runs_state(session=session,
-                                        
only_backfill_dagruns=only_backfill_dagruns)
+                                        start_date=start_date,
+                                        end_date=end_date,
+                                        )
         else:
             count = 0
             print("Bail. Nothing was cleared.")
@@ -3795,7 +3790,6 @@ class DAG(BaseDag, LoggingMixin):
             include_subdags=True,
             reset_dag_runs=True,
             dry_run=False,
-            only_backfill_dagruns=False,
     ):
         all_tis = []
         for dag in dags:
@@ -3836,7 +3830,6 @@ class DAG(BaseDag, LoggingMixin):
                           include_subdags=include_subdags,
                           reset_dag_runs=reset_dag_runs,
                           dry_run=False,
-                          only_backfill_dagruns=only_backfill_dagruns,
                           )
         else:
             count = 0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c2b962ca/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 87ba74a..3515516 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -639,14 +639,14 @@ class DagRunTest(unittest.TestCase):
 
         qry = session.query(TI).filter(
             TI.dag_id == dag.dag_id).all()
-        clear_task_instances(qry, session, only_backfill_dagruns=True)
+        clear_task_instances(qry, session)
         session.commit()
         ti0.refresh_from_db()
         dr0 = session.query(DagRun).filter(
             DagRun.dag_id == dag_id,
             DagRun.execution_date == now
         ).first()
-        self.assertEquals(dr0.state, State.REMOVED)
+        self.assertEquals(dr0.state, State.RUNNING)
 
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(

Reply via email to