This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ef9a0c8b4a10455c1fd18ed881bd16b4219f018d Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Mon Aug 2 07:37:40 2021 +0100 Improve `dag_maker` fixture (#17324) This PR improves the dag_maker fixture to enable creation of dagrun, dag and dag_model separately Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> (cherry picked from commit 5c1e09cafacea922b9281e901db7da7cadb3e9be) --- tests/jobs/test_local_task_job.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 2e28332..c18e6e5 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -836,34 +836,25 @@ class TestLocalTaskJob: assert retry_callback_called.value == 1 assert task_terminated_externally.value == 1 - def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self): + def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker): """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" - dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) - op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True) + with dag_maker(dag_id='test_dags') as dag: + op1 = PythonOperator(task_id='dummy', python_callable=lambda: True) session = settings.Session() - orm_dag = DagModel( - dag_id=dag.dag_id, + dag_maker.make_dagmodel( has_task_concurrency_limits=False, - next_dagrun=dag.start_date, next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), is_active=True, is_paused=True, ) - session.add(orm_dag) - session.flush() # Write Dag to DB dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) dagbag.bag_dag(dag, root_dag=dag) dagbag.sync_to_db() - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - ) + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + assert dr.state == State.RUNNING ti = TaskInstance(op1, dr.execution_date) job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())