This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 330dd07d1dc7ec5eae2c009e4b678c909509aae1
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Thu Feb 24 08:12:12 2022 +0100

    Fix max_active_runs=1 not scheduling runs when min_file_process_interval is 
high (#21413)
    
    The finished dagrun was still being seen as running when we call 
dag.get_num_active_runs
    because the session was not flushed. This PR fixes it
    
    (cherry picked from commit feea143af9b1db3b1f8cd8d29677f0b2b2ab757a)
---
 airflow/models/dagrun.py         |  2 ++
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 20ec7cd..c42604d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -604,11 +604,13 @@ class DagRun(Base, LoggingMixin):
                 self.data_interval_end,
                 self.dag_hash,
             )
+            session.flush()
 
         
self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
         self._emit_duration_stats_for_finished_state()
 
         session.merge(self)
+        # We do not flush here for performance reasons(It increases queries 
count by +20)
 
         return schedulable_tis, callback
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 7185720..168d452 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1180,6 +1180,41 @@ class TestSchedulerJob:
         assert session.query(DagRun.state).filter(DagRun.state == 
State.QUEUED).count() == 0
         assert orm_dag.next_dagrun_create_after is None
 
+    def test_runs_are_created_after_max_active_runs_was_reached(self, 
dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached the runs 
does not stick
+        """
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
+
+        with dag_maker(max_active_runs=1, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as 
success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        dag_run = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            session=session,
+        )
+
+        # Reach max_active_runs
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        # Complete dagrun
+        # Add dag_run back in to the session (_do_scheduling does an 
expunge_all)
+        dag_run = session.merge(dag_run)
+        session.refresh(dag_run)
+        dag_run.get_task_instance(task_id='task', session=session).state = 
State.SUCCESS
+
+        # create new run
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        # Assert that new runs has created
+        dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(dag_runs) == 2
+
     def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs

Reply via email to