This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit eadbfe503ee28e958cfc9745a0b7714f50af3ff3 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu Jun 10 14:29:30 2021 +0100 Run mini scheduler in LocalTaskJob during task exit (#16289) Currently, the chances of tasks being killed by the LocalTaskJob heartbeat is high. This is because, after marking a task successful/failed in Taskinstance.py and mini scheduler is enabled, we start running the mini scheduler. Whenever the mini scheduling takes time and meet the next job heartbeat, the heartbeat detects that this task has succeeded with no return code because LocalTaskJob.handle_task_exit was not called after the task succeeded. Hence, the heartbeat thinks that this task was externally marked failed/successful. This change resolves this by moving the mini scheduler to LocalTaskJob at the handle_task_exit method ensuring that the task will no longer be killed by the next heartbeat (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588) --- tests/jobs/test_local_task_job.py | 37 ------------------------------------- 1 file changed, 37 deletions(-) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 94f894d..bd82a47 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -850,43 +850,6 @@ class TestLocalTaskJob: assert retry_callback_called.value == 1 assert task_terminated_externally.value == 1 - def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self): - """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" - dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) - op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True) - - session = settings.Session() - orm_dag = DagModel( - dag_id=dag.dag_id, - has_task_concurrency_limits=False, - next_dagrun=dag.start_date, - next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), - is_active=True, - is_paused=True, - ) - session.add(orm_dag) - session.flush() - # Write Dag to DB - dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) - dagbag.bag_dag(dag, root_dag=dag) - dagbag.sync_to_db() - - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - ) - assert dr.state == State.RUNNING - ti = TaskInstance(op1, dr.execution_date) - job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) - job1.task_runner = StandardTaskRunner(job1) - job1.run() - session.add(dr) - session.refresh(dr) - assert dr.state == State.SUCCESS - @pytest.fixture() def clean_db_helper():