This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit effbee143fb5f1afdff25b6ec6065808297ad04b Author: James Timmins <ja...@astronomer.io> AuthorDate: Fri Aug 13 11:01:58 2021 -0700 Revert "Fix DAG run state not updated while DAG is paused (#16343)" This reverts commit 446e66b052a116f8371be331624c1e1b03299380. --- airflow/jobs/local_task_job.py | 15 -------------- tests/jobs/test_local_task_job.py | 42 ++------------------------------------- 2 files changed, 2 insertions(+), 55 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 3afc801..3dca97e 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -158,8 +158,6 @@ class LocalTaskJob(BaseJob): if self.task_instance.state != State.SUCCESS: error = self.task_runner.deserialize_run_error() self.task_instance._run_finished_callback(error=error) - if not self.task_instance.test_mode: - self._update_dagrun_state_for_paused_dag() def on_kill(self): self.task_runner.terminate() @@ -213,16 +211,3 @@ class LocalTaskJob(BaseJob): error = self.task_runner.deserialize_run_error() or "task marked as failed externally" ti._run_finished_callback(error=error) self.terminating = True - - @provide_session - def _update_dagrun_state_for_paused_dag(self, session=None): - """ - Checks for paused dags with DagRuns in the running state and - update the DagRun state if possible - """ - dag = self.task_instance.task.dag - if dag.get_is_paused(): - dag_run = self.task_instance.get_dagrun(session=session) - if dag_run: - dag_run.dag = dag - dag_run.update_state(session=session, execute_callbacks=True) diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index ed43198..17a7285 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -44,7 +44,6 @@ from airflow.utils.net import get_hostname from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timeout import timeout -from airflow.utils.types import DagRunType from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_jobs, clear_db_runs from tests.test_utils.mock_executor import MockExecutor @@ -668,43 +667,6 @@ class TestLocalTaskJob(unittest.TestCase): assert task_terminated_externally.value == 1 assert not process.is_alive() - def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self): - """Test that with DAG paused, DagRun state will update when the tasks finishes the run""" - dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE) - op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True) - - session = settings.Session() - orm_dag = DagModel( - dag_id=dag.dag_id, - has_task_concurrency_limits=False, - next_dagrun=dag.start_date, - next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE), - is_active=True, - is_paused=True, - ) - session.add(orm_dag) - session.flush() - # Write Dag to DB - dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False) - dagbag.bag_dag(dag, root_dag=dag) - dagbag.sync_to_db() - - dr = dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - state=State.RUNNING, - execution_date=DEFAULT_DATE, - start_date=DEFAULT_DATE, - session=session, - ) - assert dr.state == State.RUNNING - ti = TaskInstance(op1, dr.execution_date) - job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) - job1.task_runner = StandardTaskRunner(job1) - job1.run() - session.add(dr) - session.refresh(dr) - assert dr.state == State.SUCCESS - @pytest.fixture() def clean_db_helper(): @@ -723,12 +685,12 @@ class TestLocalTaskJobPerformance: task = DummyOperator(task_id='test_state_succeeded1', dag=dag) dag.clear() - dag.create_dagrun(run_id=unique_prefix, execution_date=DEFAULT_DATE, state=State.NONE) + dag.create_dagrun(run_id=unique_prefix, state=State.NONE) ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) mock_get_task_runner.return_value.return_code.side_effects = return_codes job = LocalTaskJob(task_instance=ti, executor=MockExecutor()) - with assert_queries_count(16): + with assert_queries_count(15): job.run()