This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ca51dbe372dbc8c4f635b04b894339236b12d6cc
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Thu Jul 22 09:40:33 2021 +0100

    Use dag_maker fixture in test_backfill_job.py (#17118)
    
    This change uses the dag_maker fixture in tests
    
    fixup! Use dag_maker fixture in test_backfill_job.py
    
    fixup! fixup! Use dag_maker fixture in test_backfill_job.py
    
    (cherry picked from commit 9f043cf7dbd83ce22a6af10a9d005a00de6f02b7)
---
 tests/jobs/test_backfill_job.py | 334 ++++++++++++++++++----------------------
 1 file changed, 151 insertions(+), 183 deletions(-)

diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 62fe153..9302911 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -37,7 +37,7 @@ from airflow.exceptions import (
     TaskConcurrencyLimitReached,
 )
 from airflow.jobs.backfill_job import BackfillJob
-from airflow.models import DAG, DagBag, Pool, TaskInstance as TI
+from airflow.models import DagBag, Pool, TaskInstance as TI
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstanceKey
 from airflow.operators.dummy import DummyOperator
@@ -59,6 +59,41 @@ def dag_bag():
     return DagBag(include_examples=True)
 
 
+@pytest.fixture
+def get_dummy_dag_and_run(dag_maker):
+    def _get_dummy_dag_and_run(
+        dag_id='test_dag', pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None, 
task_id='op', **kwargs
+    ):
+        with dag_maker(dag_id=dag_id, schedule_interval='@daily', **kwargs) as 
dag:
+            DummyOperator(task_id=task_id, pool=pool, 
task_concurrency=task_concurrency)
+
+        return dag, dag_maker.dag_run
+
+    return _get_dummy_dag_and_run
+
+
+@pytest.fixture
+def get_dag_test_max_active_limits(dag_maker):
+    def _get_dag_test_max_active_limits(dag_id='test_dag', max_active_runs=1, 
**kwargs):
+        with dag_maker(
+            dag_id=dag_id,
+            start_date=DEFAULT_DATE,
+            schedule_interval="@hourly",
+            max_active_runs=max_active_runs,
+            **kwargs,
+        ) as dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+
+            op1 >> op2 >> op3
+            op4 >> op3
+        return dag, dag_maker.dag_run
+
+    return _get_dag_test_max_active_limits
+
+
 class TestBackfillJob:
     @staticmethod
     def clean_db():
@@ -71,15 +106,6 @@ class TestBackfillJob:
         self.parser = cli_parser.get_parser()
         self.dagbag = dag_bag
 
-    def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, 
task_concurrency=None):
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, 
schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='op', pool=pool, 
task_concurrency=task_concurrency, dag=dag)
-
-        dag.clear()
-        return dag
-
     def _times_called_with(self, method, class_):
         count = 0
         for args in method.call_args_list:
@@ -87,13 +113,8 @@ class TestBackfillJob:
                 count += 1
         return count
 
-    def test_unfinished_dag_runs_set_to_failed(self):
-        dag = self._get_dummy_dag('dummy_dag')
-
-        dag_run = dag.create_dagrun(
-            run_id='test',
-            state=State.RUNNING,
-        )
+    def test_unfinished_dag_runs_set_to_failed(self, get_dummy_dag_and_run):
+        dag, dag_run = get_dummy_dag_and_run(dag_id='dummy_dag')
 
         job = BackfillJob(
             dag=dag,
@@ -108,13 +129,8 @@ class TestBackfillJob:
 
         assert State.FAILED == dag_run.state
 
-    def test_dag_run_with_finished_tasks_set_to_success(self):
-        dag = self._get_dummy_dag('dummy_dag')
-
-        dag_run = dag.create_dagrun(
-            run_id='test',
-            state=State.RUNNING,
-        )
+    def test_dag_run_with_finished_tasks_set_to_success(self, 
get_dummy_dag_and_run):
+        dag, dag_run = get_dummy_dag_and_run(dag_id='dummy_dag')
 
         for ti in dag_run.get_task_instances():
             ti.set_state(State.SUCCESS)
@@ -273,8 +289,8 @@ class TestBackfillJob:
             for task_id in expected_execution_order
         ] == executor.sorted_tasks
 
-    def test_backfill_conf(self):
-        dag = self._get_dummy_dag('test_backfill_conf')
+    def test_backfill_conf(self, get_dummy_dag_and_run):
+        dag, _ = get_dummy_dag_and_run(dag_id='test_backfill_conf')
 
         executor = MockExecutor()
 
@@ -288,15 +304,18 @@ class TestBackfillJob:
         )
         job.run()
 
-        dr = DagRun.find(dag_id='test_backfill_conf')
+        # We ignore the first dag_run created by fixture
+        dr = DagRun.find(
+            dag_id='test_backfill_conf', execution_start_date=DEFAULT_DATE + 
datetime.timedelta(days=1)
+        )
 
         assert conf_ == dr[0].conf
 
     @patch('airflow.jobs.backfill_job.BackfillJob.log')
-    def test_backfill_respect_task_concurrency_limit(self, mock_log):
+    def test_backfill_respect_task_concurrency_limit(self, mock_log, 
get_dummy_dag_and_run):
         task_concurrency = 2
-        dag = self._get_dummy_dag(
-            'test_backfill_respect_task_concurrency_limit',
+        dag, _ = get_dummy_dag_and_run(
+            dag_id='test_backfill_respect_task_concurrency_limit',
             task_concurrency=task_concurrency,
         )
 
@@ -345,9 +364,9 @@ class TestBackfillJob:
         assert times_task_concurrency_limit_reached_in_debug > 0
 
     @patch('airflow.jobs.backfill_job.BackfillJob.log')
-    def test_backfill_respect_dag_concurrency_limit(self, mock_log):
+    def test_backfill_respect_dag_concurrency_limit(self, mock_log, 
get_dummy_dag_and_run):
 
-        dag = self._get_dummy_dag('test_backfill_respect_concurrency_limit')
+        dag, _ = 
get_dummy_dag_and_run(dag_id='test_backfill_respect_concurrency_limit')
         dag.concurrency = 2
 
         executor = MockExecutor()
@@ -396,11 +415,11 @@ class TestBackfillJob:
         assert times_dag_concurrency_limit_reached_in_debug > 0
 
     @patch('airflow.jobs.backfill_job.BackfillJob.log')
-    def test_backfill_respect_default_pool_limit(self, mock_log):
+    def test_backfill_respect_default_pool_limit(self, mock_log, 
get_dummy_dag_and_run):
         default_pool_slots = 2
         set_default_pool_slots(default_pool_slots)
 
-        dag = self._get_dummy_dag('test_backfill_with_no_pool_limit')
+        dag, _ = 
get_dummy_dag_and_run(dag_id='test_backfill_with_no_pool_limit')
 
         executor = MockExecutor()
 
@@ -450,8 +469,8 @@ class TestBackfillJob:
         assert 0 == times_task_concurrency_limit_reached_in_debug
         assert times_pool_limit_reached_in_debug > 0
 
-    def test_backfill_pool_not_found(self):
-        dag = self._get_dummy_dag(
+    def test_backfill_pool_not_found(self, get_dummy_dag_and_run):
+        dag, _ = get_dummy_dag_and_run(
             dag_id='test_backfill_pool_not_found',
             pool='king_pool',
         )
@@ -471,7 +490,7 @@ class TestBackfillJob:
             return
 
     @patch('airflow.jobs.backfill_job.BackfillJob.log')
-    def test_backfill_respect_pool_limit(self, mock_log):
+    def test_backfill_respect_pool_limit(self, mock_log, 
get_dummy_dag_and_run):
         session = settings.Session()
 
         slots = 2
@@ -482,7 +501,7 @@ class TestBackfillJob:
         session.add(pool)
         session.commit()
 
-        dag = self._get_dummy_dag(
+        dag, _ = get_dummy_dag_and_run(
             dag_id='test_backfill_respect_pool_limit',
             pool=pool.pool,
         )
@@ -531,16 +550,10 @@ class TestBackfillJob:
         assert 0 == times_dag_concurrency_limit_reached_in_debug
         assert times_pool_limit_reached_in_debug > 0
 
-    def test_backfill_run_rescheduled(self):
-        dag = DAG(dag_id='test_backfill_run_rescheduled', 
start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(
-                task_id='test_backfill_run_rescheduled_task-1',
-                dag=dag,
-            )
-
-        dag.clear()
+    def test_backfill_run_rescheduled(self, get_dummy_dag_and_run):
+        dag, _ = get_dummy_dag_and_run(
+            dag_id="test_backfill_run_rescheduled", 
task_id="test_backfill_run_rescheduled_task-1"
+        )
 
         executor = MockExecutor()
 
@@ -568,13 +581,10 @@ class TestBackfillJob:
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
-    def test_backfill_rerun_failed_tasks(self):
-        dag = DAG(dag_id='test_backfill_rerun_failed', 
start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='test_backfill_rerun_failed_task-1', dag=dag)
-
-        dag.clear()
+    def test_backfill_rerun_failed_tasks(self, get_dummy_dag_and_run):
+        dag, _ = get_dummy_dag_and_run(
+            dag_id="test_backfill_rerun_failed", 
task_id="test_backfill_rerun_failed_task-1"
+        )
 
         executor = MockExecutor()
 
@@ -602,17 +612,15 @@ class TestBackfillJob:
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
-    def test_backfill_rerun_upstream_failed_tasks(self):
-        dag = DAG(
-            dag_id='test_backfill_rerun_upstream_failed', 
start_date=DEFAULT_DATE, schedule_interval='@daily'
-        )
+    def test_backfill_rerun_upstream_failed_tasks(self, dag_maker):
 
-        with dag:
-            op1 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1', dag=dag)
-            op2 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2', dag=dag)
+        with dag_maker(
+            dag_id='test_backfill_rerun_upstream_failed', 
start_date=DEFAULT_DATE, schedule_interval='@daily'
+        ) as dag:
+            op1 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1')
+            op2 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2')
             op1.set_upstream(op2)
 
-        dag.clear()
         executor = MockExecutor()
 
         job = BackfillJob(
@@ -639,13 +647,10 @@ class TestBackfillJob:
         ti.refresh_from_db()
         assert ti.state == State.SUCCESS
 
-    def test_backfill_rerun_failed_tasks_without_flag(self):
-        dag = DAG(dag_id='test_backfill_rerun_failed', 
start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='test_backfill_rerun_failed_task-1', dag=dag)
-
-        dag.clear()
+    def test_backfill_rerun_failed_tasks_without_flag(self, 
get_dummy_dag_and_run):
+        dag, _ = get_dummy_dag_and_run(
+            dag_id='test_backfill_rerun_failed', 
task_id='test_backfill_rerun_failed_task-1'
+        )
 
         executor = MockExecutor()
 
@@ -672,8 +677,8 @@ class TestBackfillJob:
         with pytest.raises(AirflowException):
             job.run()
 
-    def test_backfill_retry_intermittent_failed_task(self):
-        dag = DAG(
+    def test_backfill_retry_intermittent_failed_task(self, dag_maker):
+        with dag_maker(
             dag_id='test_intermittent_failure_job',
             start_date=DEFAULT_DATE,
             schedule_interval="@daily",
@@ -681,9 +686,8 @@ class TestBackfillJob:
                 'retries': 2,
                 'retry_delay': datetime.timedelta(seconds=0),
             },
-        )
-        task1 = DummyOperator(task_id="task1", dag=dag)
-        dag.clear()
+        ) as dag:
+            task1 = DummyOperator(task_id="task1")
 
         executor = MockExecutor(parallelism=16)
         executor.mock_task_results[
@@ -700,8 +704,8 @@ class TestBackfillJob:
         )
         job.run()
 
-    def test_backfill_retry_always_failed_task(self):
-        dag = DAG(
+    def test_backfill_retry_always_failed_task(self, dag_maker):
+        with dag_maker(
             dag_id='test_always_failure_job',
             start_date=DEFAULT_DATE,
             schedule_interval="@daily",
@@ -709,9 +713,8 @@ class TestBackfillJob:
                 'retries': 1,
                 'retry_delay': datetime.timedelta(seconds=0),
             },
-        )
-        task1 = DummyOperator(task_id="task1", dag=dag)
-        dag.clear()
+        ) as dag:
+            task1 = DummyOperator(task_id="task1")
 
         executor = MockExecutor(parallelism=16)
         executor.mock_task_results[
@@ -727,14 +730,13 @@ class TestBackfillJob:
         with pytest.raises(BackfillUnfinished):
             job.run()
 
-    def test_backfill_ordered_concurrent_execute(self):
-        dag = DAG(
+    def test_backfill_ordered_concurrent_execute(self, dag_maker):
+
+        with dag_maker(
             dag_id='test_backfill_ordered_concurrent_execute',
             start_date=DEFAULT_DATE,
             schedule_interval="@daily",
-        )
-
-        with dag:
+        ) as dag:
             op1 = DummyOperator(task_id='leave1')
             op2 = DummyOperator(task_id='leave2')
             op3 = DummyOperator(task_id='upstream_level_1')
@@ -746,8 +748,6 @@ class TestBackfillJob:
             op4.set_downstream(op5)
             op3.set_downstream(op4)
 
-        dag.clear()
-
         executor = MockExecutor(parallelism=16)
         job = BackfillJob(
             dag=dag,
@@ -881,31 +881,10 @@ class TestBackfillJob:
         parsed_args = self.parser.parse_args(args)
         assert 0.5 == parsed_args.delay_on_limit
 
-    def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1):
-        dag = DAG(
-            dag_id=dag_id,
-            start_date=DEFAULT_DATE,
-            schedule_interval="@hourly",
-            max_active_runs=max_active_runs,
+    def test_backfill_max_limit_check_within_limit(self, 
get_dag_test_max_active_limits):
+        dag, _ = get_dag_test_max_active_limits(
+            dag_id='test_backfill_max_limit_check_within_limit', 
max_active_runs=16
         )
-
-        with dag:
-            op1 = DummyOperator(task_id='leave1')
-            op2 = DummyOperator(task_id='leave2')
-            op3 = DummyOperator(task_id='upstream_level_1')
-            op4 = DummyOperator(task_id='upstream_level_2')
-
-            op1 >> op2 >> op3
-            op4 >> op3
-
-        dag.clear()
-        return dag
-
-    def test_backfill_max_limit_check_within_limit(self):
-        dag = self._get_dag_test_max_active_limits(
-            'test_backfill_max_limit_check_within_limit', max_active_runs=16
-        )
-
         start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
         end_date = DEFAULT_DATE
 
@@ -919,9 +898,9 @@ class TestBackfillJob:
         assert 2 == len(dagruns)
         assert all(run.state == State.SUCCESS for run in dagruns)
 
-    def test_backfill_max_limit_check(self):
+    def test_backfill_max_limit_check(self, get_dag_test_max_active_limits):
         dag_id = 'test_backfill_max_limit_check'
-        run_id = 'test_dagrun'
+        run_id = 'test_dag_run'
         start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
         end_date = DEFAULT_DATE
 
@@ -932,16 +911,12 @@ class TestBackfillJob:
             # this session object is different than the one in the main thread
             with create_session() as thread_session:
                 try:
-                    dag = self._get_dag_test_max_active_limits(dag_id)
-
-                    # Existing dagrun that is not within the backfill range
-                    dag.create_dagrun(
+                    dag, _ = get_dag_test_max_active_limits(
+                        # Existing dagrun that is not within the backfill range
+                        dag_id=dag_id,
                         run_id=run_id,
-                        state=State.RUNNING,
                         execution_date=DEFAULT_DATE + 
datetime.timedelta(hours=1),
-                        start_date=DEFAULT_DATE,
                     )
-
                     thread_session.commit()
                     cond.notify()
                 finally:
@@ -985,18 +960,11 @@ class TestBackfillJob:
             finally:
                 dag_run_created_cond.release()
 
-    def test_backfill_max_limit_check_no_count_existing(self):
-        dag = 
self._get_dag_test_max_active_limits('test_backfill_max_limit_check_no_count_existing')
+    def test_backfill_max_limit_check_no_count_existing(self, 
get_dag_test_max_active_limits):
         start_date = DEFAULT_DATE
         end_date = DEFAULT_DATE
-
         # Existing dagrun that is within the backfill range
-        dag.create_dagrun(
-            run_id="test_existing_backfill",
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-        )
+        dag, _ = 
get_dag_test_max_active_limits(dag_id='test_backfill_max_limit_check_no_count_existing')
 
         executor = MockExecutor()
         job = BackfillJob(
@@ -1012,8 +980,8 @@ class TestBackfillJob:
         assert 1 == len(dagruns)
         assert State.SUCCESS == dagruns[0].state
 
-    def test_backfill_max_limit_check_complete_loop(self):
-        dag = 
self._get_dag_test_max_active_limits('test_backfill_max_limit_check_complete_loop')
+    def test_backfill_max_limit_check_complete_loop(self, 
get_dag_test_max_active_limits):
+        dag, _ = 
get_dag_test_max_active_limits(dag_id='test_backfill_max_limit_check_complete_loop')
         start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
         end_date = DEFAULT_DATE
 
@@ -1031,10 +999,14 @@ class TestBackfillJob:
         assert success_expected == success_dagruns
         assert 0 == running_dagruns  # no dag_runs in running state are left
 
-    def test_sub_set_subdag(self):
-        dag = DAG('test_sub_set_subdag', start_date=DEFAULT_DATE, 
default_args={'owner': 'owner1'})
+    def test_sub_set_subdag(self, dag_maker):
 
-        with dag:
+        with dag_maker(
+            'test_sub_set_subdag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'},
+            execution_date=DEFAULT_DATE,
+        ) as dag:
             op1 = DummyOperator(task_id='leave1')
             op2 = DummyOperator(task_id='leave2')
             op3 = DummyOperator(task_id='upstream_level_1')
@@ -1046,10 +1018,7 @@ class TestBackfillJob:
             op4.set_downstream(op5)
             op3.set_downstream(op4)
 
-        dag.clear()
-        dr = dag.create_dagrun(
-            run_id="test", state=State.RUNNING, execution_date=DEFAULT_DATE, 
start_date=DEFAULT_DATE
-        )
+        dr = dag_maker.dag_run
 
         executor = MockExecutor()
         sub_dag = dag.sub_dag(task_ids_or_regex="leave*", 
include_downstream=False, include_upstream=False)
@@ -1069,14 +1038,13 @@ class TestBackfillJob:
             else:
                 assert State.NONE == ti.state
 
-    def test_backfill_fill_blanks(self):
-        dag = DAG(
+    def test_backfill_fill_blanks(self, dag_maker):
+        with dag_maker(
             'test_backfill_fill_blanks',
             start_date=DEFAULT_DATE,
             default_args={'owner': 'owner1'},
-        )
-
-        with dag:
+            execution_date=DEFAULT_DATE,
+        ) as dag:
             op1 = DummyOperator(task_id='op1')
             op2 = DummyOperator(task_id='op2')
             op3 = DummyOperator(task_id='op3')
@@ -1084,10 +1052,8 @@ class TestBackfillJob:
             op5 = DummyOperator(task_id='op5')
             op6 = DummyOperator(task_id='op6')
 
-        dag.clear()
-        dr = dag.create_dagrun(
-            run_id='test', state=State.RUNNING, execution_date=DEFAULT_DATE, 
start_date=DEFAULT_DATE
-        )
+        dr = dag_maker.dag_run
+
         executor = MockExecutor()
 
         session = settings.Session()
@@ -1262,21 +1228,15 @@ class TestBackfillJob:
         subdag.clear()
         dag.clear()
 
-    def test_update_counters(self):
-        dag = DAG(dag_id='test_manage_executor_state', start_date=DEFAULT_DATE)
-
-        task1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
-
+    def test_update_counters(self, dag_maker):
+        with dag_maker(
+            dag_id='test_manage_executor_state', start_date=DEFAULT_DATE, 
execution_date=DEFAULT_DATE
+        ) as dag:
+            task1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+        dr = dag_maker.dag_run
         job = BackfillJob(dag=dag)
-
         session = settings.Session()
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            session=session,
-        )
+
         ti = TI(task1, dr.execution_date)
         ti.refresh_from_db()
 
@@ -1356,20 +1316,20 @@ class TestBackfillJob:
 
         session.close()
 
-    def test_dag_get_run_dates(self):
-        def get_test_dag_for_backfill(schedule_interval=None):
-            dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, 
schedule_interval=schedule_interval)
+    def test_dag_get_run_dates(self, dag_maker):
+        with dag_maker(
+            dag_id='test_get_dates', start_date=DEFAULT_DATE, 
schedule_interval="@hourly"
+        ) as test_dag:
             DummyOperator(
                 task_id='dummy',
-                dag=dag,
                 owner='airflow',
             )
-            return dag
 
-        test_dag = get_test_dag_for_backfill()
-        assert [DEFAULT_DATE] == 
test_dag.get_run_dates(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-
-        test_dag = get_test_dag_for_backfill(schedule_interval="@hourly")
+        assert [DEFAULT_DATE] == test_dag.get_run_dates(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            align=True,
+        )
         assert [
             DEFAULT_DATE - datetime.timedelta(hours=3),
             DEFAULT_DATE - datetime.timedelta(hours=2),
@@ -1410,24 +1370,26 @@ class TestBackfillJob:
         dag.clear()
         session.close()
 
-    def test_reset_orphaned_tasks_with_orphans(self):
+    def test_reset_orphaned_tasks_with_orphans(self, dag_maker):
         """Create dagruns and ensure only ones with correct states are 
reset."""
         prefix = 'backfill_job_test_test_reset_orphaned_tasks'
         states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, 
State.SUCCESS]
         states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE]
 
-        dag = DAG(dag_id=prefix, start_date=DEFAULT_DATE, 
schedule_interval="@daily")
         tasks = []
-        for i in range(len(states)):
-            task_id = f"{prefix}_task_{i}"
-            task = DummyOperator(task_id=task_id, dag=dag)
-            tasks.append(task)
+        with dag_maker(
+            dag_id=prefix, start_date=DEFAULT_DATE, 
schedule_interval="@daily", run_id='test1'
+        ) as dag:
+            for i in range(len(states)):
+                task_id = f"{prefix}_task_{i}"
+                task = DummyOperator(task_id=task_id)
+                tasks.append(task)
 
         session = settings.Session()
         job = BackfillJob(dag=dag)
 
         # create dagruns
-        dr1 = dag.create_dagrun(run_id='test1', state=State.RUNNING)
+        dr1 = dag_maker.dag_run
         dr2 = dag.create_dagrun(run_id='test2', state=State.SUCCESS)
 
         # create taskinstances and set states
@@ -1472,17 +1434,23 @@ class TestBackfillJob:
         for state, ti in zip(states, dr2_tis):
             assert state == ti.state
 
-    def test_reset_orphaned_tasks_specified_dagrun(self):
+    def test_reset_orphaned_tasks_specified_dagrun(self, dag_maker):
         """Try to reset when we specify a dagrun and ensure nothing else is."""
         dag_id = 'test_reset_orphaned_tasks_specified_dagrun'
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, 
schedule_interval='@daily')
         task_id = dag_id + '_task'
-        DummyOperator(task_id=task_id, dag=dag)
+        with dag_maker(
+            dag_id=dag_id,
+            start_date=DEFAULT_DATE,
+            schedule_interval='@daily',
+            state=State.SUCCESS,
+            run_id='test1',
+        ) as dag:
+            DummyOperator(task_id=task_id, dag=dag)
 
         job = BackfillJob(dag=dag)
         session = settings.Session()
         # make two dagruns, only reset for one
-        dr1 = dag.create_dagrun(run_id='test1', state=State.SUCCESS)
+        dr1 = dag_maker.dag_run  # Already created in dag_maker with 
state=SUCCESS
         dr2 = dag.create_dagrun(run_id='test2', state=State.RUNNING)
         ti1 = dr1.get_task_instances(session=session)[0]
         ti2 = dr2.get_task_instances(session=session)[0]
@@ -1502,10 +1470,10 @@ class TestBackfillJob:
         assert State.SCHEDULED == ti1.state
         assert State.NONE == ti2.state
 
-    def test_job_id_is_assigned_to_dag_run(self):
+    def test_job_id_is_assigned_to_dag_run(self, dag_maker):
         dag_id = 'test_job_id_is_assigned_to_dag_run'
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, 
schedule_interval='@daily')
-        DummyOperator(task_id="dummy_task", dag=dag)
+        with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, 
schedule_interval='@daily') as dag:
+            DummyOperator(task_id="dummy_task", dag=dag)
 
         job = BackfillJob(
             dag=dag, executor=MockExecutor(), 
start_date=datetime.datetime.now() - datetime.timedelta(days=1)

Reply via email to