This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 330dd07d1dc7ec5eae2c009e4b678c909509aae1 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu Feb 24 08:12:12 2022 +0100 Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413) The finished dagrun was still being seen as running when we call dag.get_num_active_runs because the session was not flushed. This PR fixes it (cherry picked from commit feea143af9b1db3b1f8cd8d29677f0b2b2ab757a) --- airflow/models/dagrun.py | 2 ++ tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 20ec7cd..c42604d 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -604,11 +604,13 @@ class DagRun(Base, LoggingMixin): self.data_interval_end, self.dag_hash, ) + session.flush() self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks) self._emit_duration_stats_for_finished_state() session.merge(self) + # We do not flush here for performance reasons(It increases queries count by +20) return schedulable_tis, callback diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 7185720..168d452 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1180,6 +1180,41 @@ class TestSchedulerJob: assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0 assert orm_dag.next_dagrun_create_after is None + def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session): + """ + Test that when creating runs once max_active_runs is reached the runs does not stick + """ + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor(do_update=True) + self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + with dag_maker(max_active_runs=1, session=session) as dag: + # Need to use something that doesn't immediately get marked as success by the scheduler + BashOperator(task_id='task', bash_command='true') + + dag_run = dag_maker.create_dagrun( + state=State.RUNNING, + session=session, + ) + + # Reach max_active_runs + for _ in range(3): + self.scheduler_job._do_scheduling(session) + + # Complete dagrun + # Add dag_run back in to the session (_do_scheduling does an expunge_all) + dag_run = session.merge(dag_run) + session.refresh(dag_run) + dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS + + # create new run + for _ in range(3): + self.scheduler_job._do_scheduling(session) + + # Assert that new runs has created + dag_runs = DagRun.find(dag_id=dag.dag_id, session=session) + assert len(dag_runs) == 2 + def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): """ Test if a a dagrun will not be scheduled if max_dag_runs