This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b3a8bfaaee Resolve testSchedulerJob internal warning (#39090)
b3a8bfaaee is described below

commit b3a8bfaaee72d6e90cdc29f1b559e06df81a320a
Author: Owen Leung <owen.leu...@gmail.com>
AuthorDate: Sat May 4 22:53:38 2024 +0800

    Resolve testSchedulerJob internal warning (#39090)
    
    * Resolve testSchedulerJob internal warning
    
    * Fix test_start_queued_dagruns_do_follow_execution_date_order
    
    * raise warnings when concurrency param is used
    
    * replace concurrency with max_active_tasks, remove the deprecation 
handling in conftest.py
    
    * Fix the extra new line
---
 tests/deprecations_ignore.yml    | 16 ----------
 tests/jobs/test_scheduler_job.py | 67 +++++++++++++++++++++++++++-------------
 2 files changed, 45 insertions(+), 38 deletions(-)

diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 131ea00c67..f1fafcca73 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -79,22 +79,6 @@
 - 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_reset_orphaned_tasks_with_orphans
 - 
tests/jobs/test_backfill_job.py::TestBackfillJob::test_subdag_clear_parentdag_downstream_clear
 - tests/jobs/test_backfill_job.py::TestBackfillJob::test_update_counters
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_adopt_or_reset_orphaned_tasks
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_fail
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_root_fail
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_root_fail_unfinished
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_dagrun_success
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_do_schedule_max_active_runs_dag_timed_out
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_find_zombies
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_infinite_pool
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_no_dagruns_would_stuck_in_running
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_not_enough_pool_slots
-- 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_start_queued_dagruns_do_follow_execution_date_order
-- tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_zombie_message
 - 
tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_fallback_task
 - 
tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_not_file_task
 - 
tests/jobs/test_triggerer_job_logging.py::test_configure_trigger_log_handler_root_old_file_task
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 389172bed1..f122f12265 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -94,6 +94,7 @@ ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, 
"elastic_dag.py")
 
 TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
 TRY_NUMBER = 1
 
 
@@ -967,7 +968,7 @@ class TestSchedulerJob:
 
     def test_infinite_pool(self, dag_maker):
         dag_id = "SchedulerJobTest.test_infinite_pool"
-        with dag_maker(dag_id=dag_id, concurrency=16):
+        with dag_maker(dag_id=dag_id, max_active_tasks=16):
             EmptyOperator(task_id="dummy", pool="infinite_pool")
 
         scheduler_job = Job()
@@ -994,7 +995,7 @@ class TestSchedulerJob:
 
     def test_not_enough_pool_slots(self, caplog, dag_maker):
         dag_id = "SchedulerJobTest.test_test_not_enough_pool_slots"
-        with dag_maker(dag_id=dag_id, concurrency=16):
+        with dag_maker(dag_id=dag_id, max_active_tasks=16):
             EmptyOperator(task_id="cannot_run", pool="some_pool", pool_slots=4)
             EmptyOperator(task_id="can_run", pool="some_pool", pool_slots=1)
 
@@ -1704,6 +1705,7 @@ class TestSchedulerJob:
         with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag:
             op1 = EmptyOperator(task_id="op1")
 
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dr = dag_maker.create_dagrun()
         dr2 = dag.create_dagrun(
             run_type=DagRunType.BACKFILL_JOB,
@@ -1711,6 +1713,7 @@ class TestSchedulerJob:
             execution_date=DEFAULT_DATE + datetime.timedelta(1),
             start_date=DEFAULT_DATE,
             session=session,
+            data_interval=data_interval,
         )
         scheduler_job = Job()
         session.add(scheduler_job)
@@ -2401,12 +2404,13 @@ class TestSchedulerJob:
         dag = self.dagbag.get_dag(dag_id)
         dagrun_info = dag.next_dagrun_info(None)
         assert dagrun_info is not None
-
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=dagrun_info.logical_date,
             state=None,
             session=session,
+            data_interval=data_interval,
         )
 
         if advance_execution_date:
@@ -2416,6 +2420,7 @@ class TestSchedulerJob:
                 execution_date=dr.data_interval_end,
                 state=None,
                 session=session,
+                data_interval=data_interval,
             )
         ex_date = dr.execution_date
 
@@ -2492,10 +2497,12 @@ class TestSchedulerJob:
         # Run both the failed and successful tasks
         dag_id = "test_dagrun_states_root_fail_unfinished"
         dag = self.dagbag.get_dag(dag_id)
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dr = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
             state=None,
+            data_interval=data_interval,
         )
         self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
 
@@ -3977,12 +3984,14 @@ class TestSchedulerJob:
         assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + 
timedelta(minutes=2)
 
         # Trigger the Dag externally
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dr = dag.create_dagrun(
             state=State.RUNNING,
             execution_date=timezone.utcnow(),
             run_type=DagRunType.MANUAL,
             session=session,
             external_trigger=True,
+            data_interval=data_interval,
         )
         assert dr is not None
         # Run DAG.bulk_write_to_db -- this is run when in 
DagFileProcessor.process_file
@@ -4060,13 +4069,14 @@ class TestSchedulerJob:
             )
 
         session = settings.Session()
-
+        data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         run1 = dag.create_dagrun(
             run_type=DagRunType.SCHEDULED,
             execution_date=DEFAULT_DATE,
             state=State.RUNNING,
             start_date=timezone.utcnow() - timedelta(seconds=2),
             session=session,
+            data_interval=data_interval,
         )
 
         run1_ti = run1.get_task_instance(task1.task_id, session)
@@ -4077,6 +4087,7 @@ class TestSchedulerJob:
             execution_date=DEFAULT_DATE + timedelta(seconds=10),
             state=State.QUEUED,
             session=session,
+            data_interval=data_interval,
         )
 
         scheduler_job = Job()
@@ -4349,9 +4360,9 @@ class TestSchedulerJob:
 
     def test_start_queued_dagruns_do_follow_execution_date_order(self, 
dag_maker):
         session = settings.Session()
-        with dag_maker("test_dag1", max_active_runs=1) as dag:
+        with dag_maker("test_dag1", max_active_runs=1):
             EmptyOperator(task_id="mytask")
-        date = dag.following_schedule(DEFAULT_DATE)
+        date = DEFAULT_DATE
         for i in range(30):
             dr = dag_maker.create_dagrun(
                 run_id=f"dagrun_{i}", run_type=DagRunType.SCHEDULED, 
state=State.QUEUED, execution_date=date
@@ -4393,14 +4404,18 @@ class TestSchedulerJob:
         session = settings.Session()
         # first dag and dagruns
         date = timezone.datetime(2016, 1, 1)
+        logical_date = timezone.coerce_datetime(date)
         with dag_maker("test_dagrun_states_are_correct_1", max_active_runs=1, 
start_date=date) as dag:
             task1 = EmptyOperator(task_id="dummy_task")
 
         dr1_running = dag_maker.create_dagrun(run_id="dr1_run_1", 
execution_date=date)
+        data_interval = dag.infer_automated_data_interval(logical_date)
         dag_maker.create_dagrun(
             run_id="dr1_run_2",
             state=State.QUEUED,
-            execution_date=dag.following_schedule(dr1_running.execution_date),
+            execution_date=dag.next_dagrun_info(
+                last_automated_dagrun=data_interval, restricted=False
+            ).data_interval.start,
         )
         # second dag and dagruns
         date = timezone.datetime(2020, 1, 1)
@@ -4434,7 +4449,7 @@ class TestSchedulerJob:
         scheduler_job.executor = MockExecutor(do_update=False)
         self.job_runner.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
 
-        ti = TaskInstance(task=task1, execution_date=DEFAULT_DATE)
+        ti = TaskInstance(task=task1, run_id=dr1_running.run_id)
         ti.refresh_from_db()
         ti.state = State.SUCCESS
         session.merge(ti)
@@ -4783,12 +4798,13 @@ class TestSchedulerJob:
             session.query(Job).delete()
             dag = dagbag.get_dag("example_branch_operator")
             dag.sync_to_db()
-
+            data_interval = 
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
 
             scheduler_job = Job()
@@ -4849,11 +4865,13 @@ class TestSchedulerJob:
             dag = dagbag.get_dag("example_branch_operator")
             dag.sync_to_db()
 
+            data_interval = 
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
 
             scheduler_job = Job(executor=MockExecutor())
@@ -4917,12 +4935,13 @@ class TestSchedulerJob:
             session.query(Job).delete()
             dag = dagbag.get_dag("test_example_bash_operator")
             dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER)
-
+            data_interval = 
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
                 execution_date=DEFAULT_DATE,
                 run_type=DagRunType.SCHEDULED,
                 session=session,
+                data_interval=data_interval,
             )
             task = dag.get_task(task_id="run_this_last")
 
@@ -4998,7 +5017,7 @@ class TestSchedulerJob:
             assert active_dag_count == 1
 
     @mock.patch.object(settings, "USE_JOB_SCHEDULE", False)
-    def run_scheduler_until_dagrun_terminal(self, job_runner: 
SchedulerJobRunner):
+    def run_scheduler_until_dagrun_terminal(self):
         """
         Run a scheduler until any dag run reaches a terminal state, or the 
scheduler becomes "idle".
 
@@ -5025,23 +5044,23 @@ class TestSchedulerJob:
         num_finished_events: deque[int] = deque([], 3)
 
         do_scheduling_spy = mock.patch.object(
-            job_runner,
+            self.job_runner,
             "_do_scheduling",
-            side_effect=spy_on_return(job_runner._do_scheduling, 
num_queued_tis),
+            side_effect=spy_on_return(self.job_runner._do_scheduling, 
num_queued_tis),
         )
         executor_events_spy = mock.patch.object(
-            job_runner,
+            self.job_runner,
             "_process_executor_events",
-            side_effect=spy_on_return(job_runner._process_executor_events, 
num_finished_events),
+            
side_effect=spy_on_return(self.job_runner._process_executor_events, 
num_finished_events),
         )
 
         orig_set_state = DagRun.set_state
 
-        def watch_set_state(self: DagRun, state, **kwargs):
+        def watch_set_state(dr: DagRun, state, **kwargs):
             if state in (DagRunState.SUCCESS, DagRunState.FAILED):
                 # Stop the scheduler
-                job_runner.num_runs = 1
-            orig_set_state(self, state, **kwargs)  # type: ignore[call-arg]
+                self.job_runner.num_runs = 1  # type: ignore[attr-defined]
+            orig_set_state(dr, state, **kwargs)  # type: ignore[call-arg]
 
         def watch_heartbeat(*args, **kwargs):
             if len(num_queued_tis) < 3 or len(num_finished_events) < 3:
@@ -5053,10 +5072,11 @@ class TestSchedulerJob:
             ), "Scheduler has stalled without setting the DagRun state!"
 
         set_state_spy = mock.patch.object(DagRun, "set_state", 
new=watch_set_state)
-        heartbeat_spy = mock.patch.object(job_runner, "heartbeat", 
new=watch_heartbeat)
+        heartbeat_spy = mock.patch.object(self.job_runner.job, "heartbeat", 
new=watch_heartbeat)
 
+        # with heartbeat_spy, set_state_spy, do_scheduling_spy, 
executor_events_spy:
         with heartbeat_spy, set_state_spy, do_scheduling_spy, 
executor_events_spy:
-            run_job(job_runner.job, execute_callable=job_runner._execute)
+            run_job(self.job_runner.job, 
execute_callable=self.job_runner._execute)
 
     @pytest.mark.long_running
     @pytest.mark.parametrize("dag_id", ["test_mapped_classic", 
"test_mapped_taskflow"])
@@ -5068,19 +5088,22 @@ class TestSchedulerJob:
         self.dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py"))
         dag = self.dagbag.get_dag(dag_id)
         assert dag
+        logical_date = timezone.coerce_datetime(timezone.utcnow() - 
datetime.timedelta(days=2))
+        data_interval = dag.infer_automated_data_interval(logical_date)
         dr = dag.create_dagrun(
+            run_id=f"{dag_id}_1",
             run_type=DagRunType.MANUAL,
             start_date=timezone.utcnow(),
             state=State.RUNNING,
-            execution_date=timezone.utcnow() - datetime.timedelta(days=2),
             session=session,
+            data_interval=data_interval,
         )
 
         executor = SequentialExecutor()
 
         job = Job(executor=executor)
         self.job_runner = SchedulerJobRunner(job=job, subdir=dag.fileloc)
-        self.run_scheduler_until_dagrun_terminal(job)
+        self.run_scheduler_until_dagrun_terminal()
 
         dr.refresh_from_db(session)
         assert dr.state == DagRunState.SUCCESS

Reply via email to