This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eadbfe503ee28e958cfc9745a0b7714f50af3ff3
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Thu Jun 10 14:29:30 2021 +0100

    Run mini scheduler in LocalTaskJob during task exit (#16289)
    
    Currently, the chances of tasks being killed by the LocalTaskJob heartbeat 
is high.
    
    This is because, after marking a task successful/failed in Taskinstance.py 
and mini scheduler is enabled,
    we start running the mini scheduler. Whenever the mini scheduling takes 
time and meet the next job heartbeat,
    the heartbeat detects that this task has succeeded with no return code 
because LocalTaskJob.handle_task_exit
    was not called after the task succeeded. Hence, the heartbeat thinks that 
this task was externally marked failed/successful.
    
    This change resolves this by moving the mini scheduler to LocalTaskJob at 
the handle_task_exit method ensuring
    that the task will no longer be killed by the next heartbeat
    
    (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588)
---
 tests/jobs/test_local_task_job.py | 37 -------------------------------------
 1 file changed, 37 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 94f894d..bd82a47 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -850,43 +850,6 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
-    def 
test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
-        """Test that with DAG paused, DagRun state will update when the tasks 
finishes the run"""
-        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
-        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', 
python_callable=lambda: True)
-
-        session = settings.Session()
-        orm_dag = DagModel(
-            dag_id=dag.dag_id,
-            has_task_concurrency_limits=False,
-            next_dagrun=dag.start_date,
-            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
-            is_active=True,
-            is_paused=True,
-        )
-        session.add(orm_dag)
-        session.flush()
-        # Write Dag to DB
-        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, 
read_dags_from_db=False)
-        dagbag.bag_dag(dag, root_dag=dag)
-        dagbag.sync_to_db()
-
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            session=session,
-        )
-        assert dr.state == State.RUNNING
-        ti = TaskInstance(op1, dr.execution_date)
-        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, 
executor=SequentialExecutor())
-        job1.task_runner = StandardTaskRunner(job1)
-        job1.run()
-        session.add(dr)
-        session.refresh(dr)
-        assert dr.state == State.SUCCESS
-
 
 @pytest.fixture()
 def clean_db_helper():

Reply via email to