This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ca51dbe372dbc8c4f635b04b894339236b12d6cc Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu Jul 22 09:40:33 2021 +0100 Use dag_maker fixture in test_backfill_job.py (#17118) This change uses the dag_maker fixture in tests fixup! Use dag_maker fixture in test_backfill_job.py fixup! fixup! Use dag_maker fixture in test_backfill_job.py (cherry picked from commit 9f043cf7dbd83ce22a6af10a9d005a00de6f02b7) --- tests/jobs/test_backfill_job.py | 334 ++++++++++++++++++---------------------- 1 file changed, 151 insertions(+), 183 deletions(-) diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 62fe153..9302911 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -37,7 +37,7 @@ from airflow.exceptions import ( TaskConcurrencyLimitReached, ) from airflow.jobs.backfill_job import BackfillJob -from airflow.models import DAG, DagBag, Pool, TaskInstance as TI +from airflow.models import DagBag, Pool, TaskInstance as TI from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstanceKey from airflow.operators.dummy import DummyOperator @@ -59,6 +59,41 @@ def dag_bag(): return DagBag(include_examples=True) +@pytest.fixture +def get_dummy_dag_and_run(dag_maker): + def _get_dummy_dag_and_run( + dag_id='test_dag', pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None, task_id='op', **kwargs + ): + with dag_maker(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag: + DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency) + + return dag, dag_maker.dag_run + + return _get_dummy_dag_and_run + + +@pytest.fixture +def get_dag_test_max_active_limits(dag_maker): + def _get_dag_test_max_active_limits(dag_id='test_dag', max_active_runs=1, **kwargs): + with dag_maker( + dag_id=dag_id, + start_date=DEFAULT_DATE, + schedule_interval="@hourly", + max_active_runs=max_active_runs, + **kwargs, + ) as dag: + op1 = DummyOperator(task_id='leave1') + op2 = DummyOperator(task_id='leave2') + op3 = DummyOperator(task_id='upstream_level_1') + op4 = DummyOperator(task_id='upstream_level_2') + + op1 >> op2 >> op3 + op4 >> op3 + return dag, dag_maker.dag_run + + return _get_dag_test_max_active_limits + + class TestBackfillJob: @staticmethod def clean_db(): @@ -71,15 +106,6 @@ class TestBackfillJob: self.parser = cli_parser.get_parser() self.dagbag = dag_bag - def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None): - dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') - - with dag: - DummyOperator(task_id='op', pool=pool, task_concurrency=task_concurrency, dag=dag) - - dag.clear() - return dag - def _times_called_with(self, method, class_): count = 0 for args in method.call_args_list: @@ -87,13 +113,8 @@ class TestBackfillJob: count += 1 return count - def test_unfinished_dag_runs_set_to_failed(self): - dag = self._get_dummy_dag('dummy_dag') - - dag_run = dag.create_dagrun( - run_id='test', - state=State.RUNNING, - ) + def test_unfinished_dag_runs_set_to_failed(self, get_dummy_dag_and_run): + dag, dag_run = get_dummy_dag_and_run(dag_id='dummy_dag') job = BackfillJob( dag=dag, @@ -108,13 +129,8 @@ class TestBackfillJob: assert State.FAILED == dag_run.state - def test_dag_run_with_finished_tasks_set_to_success(self): - dag = self._get_dummy_dag('dummy_dag') - - dag_run = dag.create_dagrun( - run_id='test', - state=State.RUNNING, - ) + def test_dag_run_with_finished_tasks_set_to_success(self, get_dummy_dag_and_run): + dag, dag_run = get_dummy_dag_and_run(dag_id='dummy_dag') for ti in dag_run.get_task_instances(): ti.set_state(State.SUCCESS) @@ -273,8 +289,8 @@ class TestBackfillJob: for task_id in expected_execution_order ] == executor.sorted_tasks - def test_backfill_conf(self): - dag = self._get_dummy_dag('test_backfill_conf') + def test_backfill_conf(self, get_dummy_dag_and_run): + dag, _ = get_dummy_dag_and_run(dag_id='test_backfill_conf') executor = MockExecutor() @@ -288,15 +304,18 @@ class TestBackfillJob: ) job.run() - dr = DagRun.find(dag_id='test_backfill_conf') + # We ignore the first dag_run created by fixture + dr = DagRun.find( + dag_id='test_backfill_conf', execution_start_date=DEFAULT_DATE + datetime.timedelta(days=1) + ) assert conf_ == dr[0].conf @patch('airflow.jobs.backfill_job.BackfillJob.log') - def test_backfill_respect_task_concurrency_limit(self, mock_log): + def test_backfill_respect_task_concurrency_limit(self, mock_log, get_dummy_dag_and_run): task_concurrency = 2 - dag = self._get_dummy_dag( - 'test_backfill_respect_task_concurrency_limit', + dag, _ = get_dummy_dag_and_run( + dag_id='test_backfill_respect_task_concurrency_limit', task_concurrency=task_concurrency, ) @@ -345,9 +364,9 @@ class TestBackfillJob: assert times_task_concurrency_limit_reached_in_debug > 0 @patch('airflow.jobs.backfill_job.BackfillJob.log') - def test_backfill_respect_dag_concurrency_limit(self, mock_log): + def test_backfill_respect_dag_concurrency_limit(self, mock_log, get_dummy_dag_and_run): - dag = self._get_dummy_dag('test_backfill_respect_concurrency_limit') + dag, _ = get_dummy_dag_and_run(dag_id='test_backfill_respect_concurrency_limit') dag.concurrency = 2 executor = MockExecutor() @@ -396,11 +415,11 @@ class TestBackfillJob: assert times_dag_concurrency_limit_reached_in_debug > 0 @patch('airflow.jobs.backfill_job.BackfillJob.log') - def test_backfill_respect_default_pool_limit(self, mock_log): + def test_backfill_respect_default_pool_limit(self, mock_log, get_dummy_dag_and_run): default_pool_slots = 2 set_default_pool_slots(default_pool_slots) - dag = self._get_dummy_dag('test_backfill_with_no_pool_limit') + dag, _ = get_dummy_dag_and_run(dag_id='test_backfill_with_no_pool_limit') executor = MockExecutor() @@ -450,8 +469,8 @@ class TestBackfillJob: assert 0 == times_task_concurrency_limit_reached_in_debug assert times_pool_limit_reached_in_debug > 0 - def test_backfill_pool_not_found(self): - dag = self._get_dummy_dag( + def test_backfill_pool_not_found(self, get_dummy_dag_and_run): + dag, _ = get_dummy_dag_and_run( dag_id='test_backfill_pool_not_found', pool='king_pool', ) @@ -471,7 +490,7 @@ class TestBackfillJob: return @patch('airflow.jobs.backfill_job.BackfillJob.log') - def test_backfill_respect_pool_limit(self, mock_log): + def test_backfill_respect_pool_limit(self, mock_log, get_dummy_dag_and_run): session = settings.Session() slots = 2 @@ -482,7 +501,7 @@ class TestBackfillJob: session.add(pool) session.commit() - dag = self._get_dummy_dag( + dag, _ = get_dummy_dag_and_run( dag_id='test_backfill_respect_pool_limit', pool=pool.pool, ) @@ -531,16 +550,10 @@ class TestBackfillJob: assert 0 == times_dag_concurrency_limit_reached_in_debug assert times_pool_limit_reached_in_debug > 0 - def test_backfill_run_rescheduled(self): - dag = DAG(dag_id='test_backfill_run_rescheduled', start_date=DEFAULT_DATE, schedule_interval='@daily') - - with dag: - DummyOperator( - task_id='test_backfill_run_rescheduled_task-1', - dag=dag, - ) - - dag.clear() + def test_backfill_run_rescheduled(self, get_dummy_dag_and_run): + dag, _ = get_dummy_dag_and_run( + dag_id="test_backfill_run_rescheduled", task_id="test_backfill_run_rescheduled_task-1" + ) executor = MockExecutor() @@ -568,13 +581,10 @@ class TestBackfillJob: ti.refresh_from_db() assert ti.state == State.SUCCESS - def test_backfill_rerun_failed_tasks(self): - dag = DAG(dag_id='test_backfill_rerun_failed', start_date=DEFAULT_DATE, schedule_interval='@daily') - - with dag: - DummyOperator(task_id='test_backfill_rerun_failed_task-1', dag=dag) - - dag.clear() + def test_backfill_rerun_failed_tasks(self, get_dummy_dag_and_run): + dag, _ = get_dummy_dag_and_run( + dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1" + ) executor = MockExecutor() @@ -602,17 +612,15 @@ class TestBackfillJob: ti.refresh_from_db() assert ti.state == State.SUCCESS - def test_backfill_rerun_upstream_failed_tasks(self): - dag = DAG( - dag_id='test_backfill_rerun_upstream_failed', start_date=DEFAULT_DATE, schedule_interval='@daily' - ) + def test_backfill_rerun_upstream_failed_tasks(self, dag_maker): - with dag: - op1 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1', dag=dag) - op2 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2', dag=dag) + with dag_maker( + dag_id='test_backfill_rerun_upstream_failed', start_date=DEFAULT_DATE, schedule_interval='@daily' + ) as dag: + op1 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1') + op2 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2') op1.set_upstream(op2) - dag.clear() executor = MockExecutor() job = BackfillJob( @@ -639,13 +647,10 @@ class TestBackfillJob: ti.refresh_from_db() assert ti.state == State.SUCCESS - def test_backfill_rerun_failed_tasks_without_flag(self): - dag = DAG(dag_id='test_backfill_rerun_failed', start_date=DEFAULT_DATE, schedule_interval='@daily') - - with dag: - DummyOperator(task_id='test_backfill_rerun_failed_task-1', dag=dag) - - dag.clear() + def test_backfill_rerun_failed_tasks_without_flag(self, get_dummy_dag_and_run): + dag, _ = get_dummy_dag_and_run( + dag_id='test_backfill_rerun_failed', task_id='test_backfill_rerun_failed_task-1' + ) executor = MockExecutor() @@ -672,8 +677,8 @@ class TestBackfillJob: with pytest.raises(AirflowException): job.run() - def test_backfill_retry_intermittent_failed_task(self): - dag = DAG( + def test_backfill_retry_intermittent_failed_task(self, dag_maker): + with dag_maker( dag_id='test_intermittent_failure_job', start_date=DEFAULT_DATE, schedule_interval="@daily", @@ -681,9 +686,8 @@ class TestBackfillJob: 'retries': 2, 'retry_delay': datetime.timedelta(seconds=0), }, - ) - task1 = DummyOperator(task_id="task1", dag=dag) - dag.clear() + ) as dag: + task1 = DummyOperator(task_id="task1") executor = MockExecutor(parallelism=16) executor.mock_task_results[ @@ -700,8 +704,8 @@ class TestBackfillJob: ) job.run() - def test_backfill_retry_always_failed_task(self): - dag = DAG( + def test_backfill_retry_always_failed_task(self, dag_maker): + with dag_maker( dag_id='test_always_failure_job', start_date=DEFAULT_DATE, schedule_interval="@daily", @@ -709,9 +713,8 @@ class TestBackfillJob: 'retries': 1, 'retry_delay': datetime.timedelta(seconds=0), }, - ) - task1 = DummyOperator(task_id="task1", dag=dag) - dag.clear() + ) as dag: + task1 = DummyOperator(task_id="task1") executor = MockExecutor(parallelism=16) executor.mock_task_results[ @@ -727,14 +730,13 @@ class TestBackfillJob: with pytest.raises(BackfillUnfinished): job.run() - def test_backfill_ordered_concurrent_execute(self): - dag = DAG( + def test_backfill_ordered_concurrent_execute(self, dag_maker): + + with dag_maker( dag_id='test_backfill_ordered_concurrent_execute', start_date=DEFAULT_DATE, schedule_interval="@daily", - ) - - with dag: + ) as dag: op1 = DummyOperator(task_id='leave1') op2 = DummyOperator(task_id='leave2') op3 = DummyOperator(task_id='upstream_level_1') @@ -746,8 +748,6 @@ class TestBackfillJob: op4.set_downstream(op5) op3.set_downstream(op4) - dag.clear() - executor = MockExecutor(parallelism=16) job = BackfillJob( dag=dag, @@ -881,31 +881,10 @@ class TestBackfillJob: parsed_args = self.parser.parse_args(args) assert 0.5 == parsed_args.delay_on_limit - def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1): - dag = DAG( - dag_id=dag_id, - start_date=DEFAULT_DATE, - schedule_interval="@hourly", - max_active_runs=max_active_runs, + def test_backfill_max_limit_check_within_limit(self, get_dag_test_max_active_limits): + dag, _ = get_dag_test_max_active_limits( + dag_id='test_backfill_max_limit_check_within_limit', max_active_runs=16 ) - - with dag: - op1 = DummyOperator(task_id='leave1') - op2 = DummyOperator(task_id='leave2') - op3 = DummyOperator(task_id='upstream_level_1') - op4 = DummyOperator(task_id='upstream_level_2') - - op1 >> op2 >> op3 - op4 >> op3 - - dag.clear() - return dag - - def test_backfill_max_limit_check_within_limit(self): - dag = self._get_dag_test_max_active_limits( - 'test_backfill_max_limit_check_within_limit', max_active_runs=16 - ) - start_date = DEFAULT_DATE - datetime.timedelta(hours=1) end_date = DEFAULT_DATE @@ -919,9 +898,9 @@ class TestBackfillJob: assert 2 == len(dagruns) assert all(run.state == State.SUCCESS for run in dagruns) - def test_backfill_max_limit_check(self): + def test_backfill_max_limit_check(self, get_dag_test_max_active_limits): dag_id = 'test_backfill_max_limit_check' - run_id = 'test_dagrun' + run_id = 'test_dag_run' start_date = DEFAULT_DATE - datetime.timedelta(hours=1) end_date = DEFAULT_DATE @@ -932,16 +911,12 @@ class TestBackfillJob: # this session object is different than the one in the main thread with create_session() as thread_session: try: - dag = self._get_dag_test_max_active_limits(dag_id) - - # Existing dagrun that is not within the backfill range - dag.create_dagrun( + dag, _ = get_dag_test_max_active_limits( + # Existing dagrun that is not within the backfill range + dag_id=dag_id, run_id=run_id, - state=State.RUNNING, execution_date=DEFAULT_DATE + datetime.timedelta(hours=1), - start_date=DEFAULT_DATE, ) - thread_session.commit() cond.notify() finally: @@ -985,18 +960,11 @@ class TestBackfillJob: finally: dag_run_created_cond.release() - def test_backfill_max_limit_check_no_count_existing(self): - dag = self._get_dag_test_max_active_limits('test_backfill_max_limit_check_no_count_existing') + def test_backfill_max_limit_check_no_count_existing(self, get_dag_test_max_active_limits): start_date = DEFAULT_DATE end_date = DEFAULT_DATE - # Existing dagrun that is within the backfill range - dag.create_dagrun( - run_id="test_existing_backfill", - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - ) + dag, _ = get_dag_test_max_active_limits(dag_id='test_backfill_max_limit_check_no_count_existing') executor = MockExecutor() job = BackfillJob( @@ -1012,8 +980,8 @@ class TestBackfillJob: assert 1 == len(dagruns) assert State.SUCCESS == dagruns[0].state - def test_backfill_max_limit_check_complete_loop(self): - dag = self._get_dag_test_max_active_limits('test_backfill_max_limit_check_complete_loop') + def test_backfill_max_limit_check_complete_loop(self, get_dag_test_max_active_limits): + dag, _ = get_dag_test_max_active_limits(dag_id='test_backfill_max_limit_check_complete_loop') start_date = DEFAULT_DATE - datetime.timedelta(hours=1) end_date = DEFAULT_DATE @@ -1031,10 +999,14 @@ class TestBackfillJob: assert success_expected == success_dagruns assert 0 == running_dagruns # no dag_runs in running state are left - def test_sub_set_subdag(self): - dag = DAG('test_sub_set_subdag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}) + def test_sub_set_subdag(self, dag_maker): - with dag: + with dag_maker( + 'test_sub_set_subdag', + start_date=DEFAULT_DATE, + default_args={'owner': 'owner1'}, + execution_date=DEFAULT_DATE, + ) as dag: op1 = DummyOperator(task_id='leave1') op2 = DummyOperator(task_id='leave2') op3 = DummyOperator(task_id='upstream_level_1') @@ -1046,10 +1018,7 @@ class TestBackfillJob: op4.set_downstream(op5) op3.set_downstream(op4) - dag.clear() - dr = dag.create_dagrun( - run_id="test", state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE - ) + dr = dag_maker.dag_run executor = MockExecutor() sub_dag = dag.sub_dag(task_ids_or_regex="leave*", include_downstream=False, include_upstream=False) @@ -1069,14 +1038,13 @@ class TestBackfillJob: else: assert State.NONE == ti.state - def test_backfill_fill_blanks(self): - dag = DAG( + def test_backfill_fill_blanks(self, dag_maker): + with dag_maker( 'test_backfill_fill_blanks', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}, - ) - - with dag: + execution_date=DEFAULT_DATE, + ) as dag: op1 = DummyOperator(task_id='op1') op2 = DummyOperator(task_id='op2') op3 = DummyOperator(task_id='op3') @@ -1084,10 +1052,8 @@ class TestBackfillJob: op5 = DummyOperator(task_id='op5') op6 = DummyOperator(task_id='op6') - dag.clear() - dr = dag.create_dagrun( - run_id='test', state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE - ) + dr = dag_maker.dag_run + executor = MockExecutor() session = settings.Session() @@ -1262,21 +1228,15 @@ class TestBackfillJob: subdag.clear() dag.clear() - def test_update_counters(self): - dag = DAG(dag_id='test_manage_executor_state', start_date=DEFAULT_DATE) - - task1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow') - + def test_update_counters(self, dag_maker): + with dag_maker( + dag_id='test_manage_executor_state', start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE + ) as dag: + task1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow') + dr = dag_maker.dag_run job = BackfillJob(dag=dag) - session = settings.Session() - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - ) + ti = TI(task1, dr.execution_date) ti.refresh_from_db() @@ -1356,20 +1316,20 @@ class TestBackfillJob: session.close() - def test_dag_get_run_dates(self): - def get_test_dag_for_backfill(schedule_interval=None): - dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval=schedule_interval) + def test_dag_get_run_dates(self, dag_maker): + with dag_maker( + dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval="@hourly" + ) as test_dag: DummyOperator( task_id='dummy', - dag=dag, owner='airflow', ) - return dag - test_dag = get_test_dag_for_backfill() - assert [DEFAULT_DATE] == test_dag.get_run_dates(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - - test_dag = get_test_dag_for_backfill(schedule_interval="@hourly") + assert [DEFAULT_DATE] == test_dag.get_run_dates( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + align=True, + ) assert [ DEFAULT_DATE - datetime.timedelta(hours=3), DEFAULT_DATE - datetime.timedelta(hours=2), @@ -1410,24 +1370,26 @@ class TestBackfillJob: dag.clear() session.close() - def test_reset_orphaned_tasks_with_orphans(self): + def test_reset_orphaned_tasks_with_orphans(self, dag_maker): """Create dagruns and ensure only ones with correct states are reset.""" prefix = 'backfill_job_test_test_reset_orphaned_tasks' states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS] states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE] - dag = DAG(dag_id=prefix, start_date=DEFAULT_DATE, schedule_interval="@daily") tasks = [] - for i in range(len(states)): - task_id = f"{prefix}_task_{i}" - task = DummyOperator(task_id=task_id, dag=dag) - tasks.append(task) + with dag_maker( + dag_id=prefix, start_date=DEFAULT_DATE, schedule_interval="@daily", run_id='test1' + ) as dag: + for i in range(len(states)): + task_id = f"{prefix}_task_{i}" + task = DummyOperator(task_id=task_id) + tasks.append(task) session = settings.Session() job = BackfillJob(dag=dag) # create dagruns - dr1 = dag.create_dagrun(run_id='test1', state=State.RUNNING) + dr1 = dag_maker.dag_run dr2 = dag.create_dagrun(run_id='test2', state=State.SUCCESS) # create taskinstances and set states @@ -1472,17 +1434,23 @@ class TestBackfillJob: for state, ti in zip(states, dr2_tis): assert state == ti.state - def test_reset_orphaned_tasks_specified_dagrun(self): + def test_reset_orphaned_tasks_specified_dagrun(self, dag_maker): """Try to reset when we specify a dagrun and ensure nothing else is.""" dag_id = 'test_reset_orphaned_tasks_specified_dagrun' - dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') task_id = dag_id + '_task' - DummyOperator(task_id=task_id, dag=dag) + with dag_maker( + dag_id=dag_id, + start_date=DEFAULT_DATE, + schedule_interval='@daily', + state=State.SUCCESS, + run_id='test1', + ) as dag: + DummyOperator(task_id=task_id, dag=dag) job = BackfillJob(dag=dag) session = settings.Session() # make two dagruns, only reset for one - dr1 = dag.create_dagrun(run_id='test1', state=State.SUCCESS) + dr1 = dag_maker.dag_run # Already created in dag_maker with state=SUCCESS dr2 = dag.create_dagrun(run_id='test2', state=State.RUNNING) ti1 = dr1.get_task_instances(session=session)[0] ti2 = dr2.get_task_instances(session=session)[0] @@ -1502,10 +1470,10 @@ class TestBackfillJob: assert State.SCHEDULED == ti1.state assert State.NONE == ti2.state - def test_job_id_is_assigned_to_dag_run(self): + def test_job_id_is_assigned_to_dag_run(self, dag_maker): dag_id = 'test_job_id_is_assigned_to_dag_run' - dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') - DummyOperator(task_id="dummy_task", dag=dag) + with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') as dag: + DummyOperator(task_id="dummy_task", dag=dag) job = BackfillJob( dag=dag, executor=MockExecutor(), start_date=datetime.datetime.now() - datetime.timedelta(days=1)