This is an automated email from the ASF dual-hosted git repository. taragolis pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b3a8bfaaee Resolve testSchedulerJob internal warning (#39090) b3a8bfaaee is described below commit b3a8bfaaee72d6e90cdc29f1b559e06df81a320a Author: Owen Leung <owen.leu...@gmail.com> AuthorDate: Sat May 4 22:53:38 2024 +0800 Resolve testSchedulerJob internal warning (#39090) * Resolve testSchedulerJob internal warning * Fix test_start_queued_dagruns_do_follow_execution_date_order * raise warnings when concurrency param is used * replace concurrency with max_active_tasks, remove the deprecation handling in conftest.py * Fix the extra new line --- tests/deprecations_ignore.yml | 16 ---------- tests/jobs/test_scheduler_job.py | 67 +++++++++++++++++++++++++++------------- 2 files changed, 45 insertions(+), 38 deletions(-) diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index 131ea00c67..f1fafcca73 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -79,22 +79,6 @@ - tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans - tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear - tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_adopt_or_reset_orphaned_tasks -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_fail -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_root_fail -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_root_fail_unfinished -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_success -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_do_schedule_max_active_runs_dag_timed_out -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_find_zombies -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_infinite_pool -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_no_dagruns_would_stuck_in_running -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_not_enough_pool_slots -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_start_queued_dagruns_do_follow_execution_date_order -- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_zombie_message - tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_fallback_task - tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_not_file_task - tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_old_file_task diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 389172bed1..f122f12265 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -94,6 +94,7 @@ ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py") TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"] DEFAULT_DATE = timezone.datetime(2016, 1, 1) +DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE) TRY_NUMBER = 1 @@ -967,7 +968,7 @@ class TestSchedulerJob: def test_infinite_pool(self, dag_maker): dag_id = "SchedulerJobTest.test_infinite_pool" - with dag_maker(dag_id=dag_id, concurrency=16): + with dag_maker(dag_id=dag_id, max_active_tasks=16): EmptyOperator(task_id="dummy", pool="infinite_pool") scheduler_job = Job() @@ -994,7 +995,7 @@ class TestSchedulerJob: def test_not_enough_pool_slots(self, caplog, dag_maker): dag_id = "SchedulerJobTest.test_test_not_enough_pool_slots" - with dag_maker(dag_id=dag_id, concurrency=16): + with dag_maker(dag_id=dag_id, max_active_tasks=16): EmptyOperator(task_id="cannot_run", pool="some_pool", pool_slots=4) EmptyOperator(task_id="can_run", pool="some_pool", pool_slots=1) @@ -1704,6 +1705,7 @@ class TestSchedulerJob: with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag: op1 = EmptyOperator(task_id="op1") + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dr = dag_maker.create_dagrun() dr2 = dag.create_dagrun( run_type=DagRunType.BACKFILL_JOB, @@ -1711,6 +1713,7 @@ class TestSchedulerJob: execution_date=DEFAULT_DATE + datetime.timedelta(1), start_date=DEFAULT_DATE, session=session, + data_interval=data_interval, ) scheduler_job = Job() session.add(scheduler_job) @@ -2401,12 +2404,13 @@ class TestSchedulerJob: dag = self.dagbag.get_dag(dag_id) dagrun_info = dag.next_dagrun_info(None) assert dagrun_info is not None - + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dr = dag.create_dagrun( run_type=DagRunType.SCHEDULED, execution_date=dagrun_info.logical_date, state=None, session=session, + data_interval=data_interval, ) if advance_execution_date: @@ -2416,6 +2420,7 @@ class TestSchedulerJob: execution_date=dr.data_interval_end, state=None, session=session, + data_interval=data_interval, ) ex_date = dr.execution_date @@ -2492,10 +2497,12 @@ class TestSchedulerJob: # Run both the failed and successful tasks dag_id = "test_dagrun_states_root_fail_unfinished" dag = self.dagbag.get_dag(dag_id) + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dr = dag.create_dagrun( run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE, state=None, + data_interval=data_interval, ) self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id) @@ -3977,12 +3984,14 @@ class TestSchedulerJob: assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + timedelta(minutes=2) # Trigger the Dag externally + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dr = dag.create_dagrun( state=State.RUNNING, execution_date=timezone.utcnow(), run_type=DagRunType.MANUAL, session=session, external_trigger=True, + data_interval=data_interval, ) assert dr is not None # Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file @@ -4060,13 +4069,14 @@ class TestSchedulerJob: ) session = settings.Session() - + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) run1 = dag.create_dagrun( run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE, state=State.RUNNING, start_date=timezone.utcnow() - timedelta(seconds=2), session=session, + data_interval=data_interval, ) run1_ti = run1.get_task_instance(task1.task_id, session) @@ -4077,6 +4087,7 @@ class TestSchedulerJob: execution_date=DEFAULT_DATE + timedelta(seconds=10), state=State.QUEUED, session=session, + data_interval=data_interval, ) scheduler_job = Job() @@ -4349,9 +4360,9 @@ class TestSchedulerJob: def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker): session = settings.Session() - with dag_maker("test_dag1", max_active_runs=1) as dag: + with dag_maker("test_dag1", max_active_runs=1): EmptyOperator(task_id="mytask") - date = dag.following_schedule(DEFAULT_DATE) + date = DEFAULT_DATE for i in range(30): dr = dag_maker.create_dagrun( run_id=f"dagrun_{i}", run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date @@ -4393,14 +4404,18 @@ class TestSchedulerJob: session = settings.Session() # first dag and dagruns date = timezone.datetime(2016, 1, 1) + logical_date = timezone.coerce_datetime(date) with dag_maker("test_dagrun_states_are_correct_1", max_active_runs=1, start_date=date) as dag: task1 = EmptyOperator(task_id="dummy_task") dr1_running = dag_maker.create_dagrun(run_id="dr1_run_1", execution_date=date) + data_interval = dag.infer_automated_data_interval(logical_date) dag_maker.create_dagrun( run_id="dr1_run_2", state=State.QUEUED, - execution_date=dag.following_schedule(dr1_running.execution_date), + execution_date=dag.next_dagrun_info( + last_automated_dagrun=data_interval, restricted=False + ).data_interval.start, ) # second dag and dagruns date = timezone.datetime(2020, 1, 1) @@ -4434,7 +4449,7 @@ class TestSchedulerJob: scheduler_job.executor = MockExecutor(do_update=False) self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) - ti = TaskInstance(task=task1, execution_date=DEFAULT_DATE) + ti = TaskInstance(task=task1, run_id=dr1_running.run_id) ti.refresh_from_db() ti.state = State.SUCCESS session.merge(ti) @@ -4783,12 +4798,13 @@ class TestSchedulerJob: session.query(Job).delete() dag = dagbag.get_dag("example_branch_operator") dag.sync_to_db() - + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dag_run = dag.create_dagrun( state=DagRunState.RUNNING, execution_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, session=session, + data_interval=data_interval, ) scheduler_job = Job() @@ -4849,11 +4865,13 @@ class TestSchedulerJob: dag = dagbag.get_dag("example_branch_operator") dag.sync_to_db() + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dag_run = dag.create_dagrun( state=DagRunState.RUNNING, execution_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, session=session, + data_interval=data_interval, ) scheduler_job = Job(executor=MockExecutor()) @@ -4917,12 +4935,13 @@ class TestSchedulerJob: session.query(Job).delete() dag = dagbag.get_dag("test_example_bash_operator") dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER) - + data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE) dag_run = dag.create_dagrun( state=DagRunState.RUNNING, execution_date=DEFAULT_DATE, run_type=DagRunType.SCHEDULED, session=session, + data_interval=data_interval, ) task = dag.get_task(task_id="run_this_last") @@ -4998,7 +5017,7 @@ class TestSchedulerJob: assert active_dag_count == 1 @mock.patch.object(settings, "USE_JOB_SCHEDULE", False) - def run_scheduler_until_dagrun_terminal(self, job_runner: SchedulerJobRunner): + def run_scheduler_until_dagrun_terminal(self): """ Run a scheduler until any dag run reaches a terminal state, or the scheduler becomes "idle". @@ -5025,23 +5044,23 @@ class TestSchedulerJob: num_finished_events: deque[int] = deque([], 3) do_scheduling_spy = mock.patch.object( - job_runner, + self.job_runner, "_do_scheduling", - side_effect=spy_on_return(job_runner._do_scheduling, num_queued_tis), + side_effect=spy_on_return(self.job_runner._do_scheduling, num_queued_tis), ) executor_events_spy = mock.patch.object( - job_runner, + self.job_runner, "_process_executor_events", - side_effect=spy_on_return(job_runner._process_executor_events, num_finished_events), + side_effect=spy_on_return(self.job_runner._process_executor_events, num_finished_events), ) orig_set_state = DagRun.set_state - def watch_set_state(self: DagRun, state, **kwargs): + def watch_set_state(dr: DagRun, state, **kwargs): if state in (DagRunState.SUCCESS, DagRunState.FAILED): # Stop the scheduler - job_runner.num_runs = 1 - orig_set_state(self, state, **kwargs) # type: ignore[call-arg] + self.job_runner.num_runs = 1 # type: ignore[attr-defined] + orig_set_state(dr, state, **kwargs) # type: ignore[call-arg] def watch_heartbeat(*args, **kwargs): if len(num_queued_tis) < 3 or len(num_finished_events) < 3: @@ -5053,10 +5072,11 @@ class TestSchedulerJob: ), "Scheduler has stalled without setting the DagRun state!" set_state_spy = mock.patch.object(DagRun, "set_state", new=watch_set_state) - heartbeat_spy = mock.patch.object(job_runner, "heartbeat", new=watch_heartbeat) + heartbeat_spy = mock.patch.object(self.job_runner.job, "heartbeat", new=watch_heartbeat) + # with heartbeat_spy, set_state_spy, do_scheduling_spy, executor_events_spy: with heartbeat_spy, set_state_spy, do_scheduling_spy, executor_events_spy: - run_job(job_runner.job, execute_callable=job_runner._execute) + run_job(self.job_runner.job, execute_callable=self.job_runner._execute) @pytest.mark.long_running @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow"]) @@ -5068,19 +5088,22 @@ class TestSchedulerJob: self.dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py")) dag = self.dagbag.get_dag(dag_id) assert dag + logical_date = timezone.coerce_datetime(timezone.utcnow() - datetime.timedelta(days=2)) + data_interval = dag.infer_automated_data_interval(logical_date) dr = dag.create_dagrun( + run_id=f"{dag_id}_1", run_type=DagRunType.MANUAL, start_date=timezone.utcnow(), state=State.RUNNING, - execution_date=timezone.utcnow() - datetime.timedelta(days=2), session=session, + data_interval=data_interval, ) executor = SequentialExecutor() job = Job(executor=executor) self.job_runner = SchedulerJobRunner(job=job, subdir=dag.fileloc) - self.run_scheduler_until_dagrun_terminal(job) + self.run_scheduler_until_dagrun_terminal() dr.refresh_from_db(session) assert dr.state == DagRunState.SUCCESS