This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a3e8c22712e4c5c3fd212d9eac60bd1b92194fc2 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Tue Jul 20 18:48:35 2021 +0100 Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889) This change adds pytest fixture to create dag and dagrun then use it on local task job tests Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> (cherry picked from commit 7c0d8a2f83cc6db25bdddcf6cecb6fb56f05f02f) --- tests/jobs/test_local_task_job.py | 77 ++++++++++++++++----------------------- 1 file changed, 32 insertions(+), 45 deletions(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index bd82a47..13d2136 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -800,55 +800,42 @@ class TestLocalTaskJob: assert retry_callback_called.value == 1 assert task_terminated_externally.value == 1 - def test_process_sigterm_works_with_retries(self, dag_maker): - """ - Test that ensures that task runner sets tasks to retry when they(task runner) - receive sigterm - """ - # use shared memory value so we can properly track value change even if - # it's been updated across processes. - retry_callback_called = Value('i', 0) - task_terminated_externally = Value('i', 1) - shared_mem_lock = Lock() - - def retry_callback(context): - with shared_mem_lock: - retry_callback_called.value += 1 - assert context['dag_run'].dag_id == 'test_mark_failure_2' + def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self): + """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" + dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) + op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True) - def task_function(ti): - time.sleep(60) - # This should not happen -- the state change should be noticed and the task should get killed - with shared_mem_lock: - task_terminated_externally.value = 0 + session = settings.Session() + orm_dag = DagModel( + dag_id=dag.dag_id, + has_task_concurrency_limits=False, + next_dagrun=dag.start_date, + next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), + is_active=True, + is_paused=True, + ) + session.add(orm_dag) + session.flush() + # Write Dag to DB + dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) + dagbag.bag_dag(dag, root_dag=dag) + dagbag.sync_to_db() - with dag_maker(dag_id='test_mark_failure_2'): - task = PythonOperator( - task_id='test_on_failure', - python_callable=task_function, - retries=1, - retry_delay=timedelta(seconds=2), - on_retry_callback=retry_callback, - ) - ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) - ti.refresh_from_db() + dr = dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + session=session, + ) + assert dr.state == State.RUNNING + ti = TaskInstance(op1, dr.execution_date) job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) job1.task_runner = StandardTaskRunner(job1) - job1.task_runner.start() - settings.engine.dispose() - process = multiprocessing.Process(target=job1.run) - process.start() - for _ in range(0, 25): - ti.refresh_from_db() - if ti.state == State.RUNNING and ti.pid is not None: - break - time.sleep(0.2) - os.kill(process.pid, signal.SIGTERM) - process.join() - ti.refresh_from_db() - assert ti.state == State.UP_FOR_RETRY - assert retry_callback_called.value == 1 - assert task_terminated_externally.value == 1 + job1.run() + session.add(dr) + session.refresh(dr) + assert dr.state == State.SUCCESS @pytest.fixture()