This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 20d8142869 Fix scheduler logic to plan new dag runs by ignoring manual 
runs (#34027)
20d8142869 is described below

commit 20d81428699db240b65f72a92183255c24e8c19b
Author: Daniel DylÄ…g <bi...@users.noreply.github.com>
AuthorDate: Tue Sep 5 15:01:33 2023 +0200

    Fix scheduler logic to plan new dag runs by ignoring manual runs (#34027)
    
    * Fix manual task triggering scheduled tasks
    
    Fixes #33949
    
    * fix static checks
    
    * static checks
    
    * add unit test
    
    * static check
    
    * Undo renaming
    
    * Update airflow/jobs/scheduler_job_runner.py
    
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
    
    * use keyword-only arguments for last_dag_run and total_active_runs
    
    ---------
    
    Co-authored-by: daniel.dylag <danieldylag1...@gmail.com>
    Co-authored-by: Tzu-ping Chung <uranu...@gmail.com>
---
 airflow/jobs/scheduler_job_runner.py | 33 +++++++++++++++++-------
 tests/jobs/test_scheduler_job.py     | 50 ++++++++++++++++++++++++++++++++----
 2 files changed, 69 insertions(+), 14 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 1507a6f06f..6c4652ef1a 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1189,7 +1189,11 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                 )
                 active_runs_of_dags[dag.dag_id] += 1
             if self._should_update_dag_next_dagruns(
-                dag, dag_model, active_runs_of_dags[dag.dag_id], 
session=session
+                dag,
+                dag_model,
+                last_dag_run=None,
+                total_active_runs=active_runs_of_dags[dag.dag_id],
+                session=session,
             ):
                 dag_model.calculate_dagrun_date_fields(dag, data_interval)
         # TODO[HA]: Should we do a session.flush() so we don't have to keep 
lots of state/object in
@@ -1297,9 +1301,22 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                 )
 
     def _should_update_dag_next_dagruns(
-        self, dag: DAG, dag_model: DagModel, total_active_runs: int | None = 
None, *, session: Session
+        self,
+        dag: DAG,
+        dag_model: DagModel,
+        *,
+        last_dag_run: DagRun | None = None,
+        total_active_runs: int | None = None,
+        session: Session,
     ) -> bool:
         """Check if the dag's next_dagruns_create_after should be updated."""
+        # If last_dag_run is defined, the update was triggered by a scheduling 
decision in this DAG run.
+        # In such case, schedule next only if last_dag_run is finished and was 
an automated run.
+        if last_dag_run and not (
+            last_dag_run.state in State.finished_dr_states
+            and last_dag_run.run_type in [DagRunType.SCHEDULED, 
DagRunType.BACKFILL_JOB]
+        ):
+            return False
         # If the DAG never schedules skip save runtime
         if not dag.timetable.can_be_scheduled:
             return False
@@ -1434,8 +1451,8 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
                 session.merge(task_instance)
             session.flush()
             self.log.info("Run %s of %s has timed-out", dag_run.run_id, 
dag_run.dag_id)
-            # Work out if we should allow creating a new DagRun now?
-            if self._should_update_dag_next_dagruns(dag, dag_model, 
session=session):
+
+            if self._should_update_dag_next_dagruns(dag, dag_model, 
last_dag_run=dag_run, session=session):
                 dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
 
             callback_to_execute = DagCallbackRequest(
@@ -1462,11 +1479,9 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
             return callback
         # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
         schedulable_tis, callback_to_run = 
dag_run.update_state(session=session, execute_callbacks=False)
-        # Check if DAG not scheduled then skip interval calculation to same 
scheduler runtime
-        if dag_run.state in State.finished_dr_states:
-            # Work out if we should allow creating a new DagRun now?
-            if self._should_update_dag_next_dagruns(dag, dag_model, 
session=session):
-                dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
+
+        if self._should_update_dag_next_dagruns(dag, dag_model, 
last_dag_run=dag_run, session=session):
+            dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
         # This will do one query per dag run. We "could" build up a complex
         # query to update all the TIs across all the execution dates and dag
         # IDs in a single query, but it turns out that can be _very very slow_
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 94cf3b6728..4fdb006a38 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1756,10 +1756,7 @@ class TestSchedulerJob:
             # Need to use something that doesn't immediately get marked as 
success by the scheduler
             BashOperator(task_id="task", bash_command="true")
 
-        dag_run = dag_maker.create_dagrun(
-            state=State.RUNNING,
-            session=session,
-        )
+        dag_run = dag_maker.create_dagrun(state=State.RUNNING, 
session=session, run_type=DagRunType.SCHEDULED)
 
         # Reach max_active_runs
         for _ in range(3):
@@ -3458,7 +3455,50 @@ class TestSchedulerJob:
         self.job_runner = SchedulerJobRunner(job=scheduler_job)
 
         assert excepted is self.job_runner._should_update_dag_next_dagruns(
-            dag, dag_model, number_running, session=session
+            dag, dag_model, total_active_runs=number_running, session=session
+        )
+
+    @pytest.mark.parametrize(
+        "run_type, should_update",
+        [
+            (DagRunType.MANUAL, False),
+            (DagRunType.SCHEDULED, True),
+            (DagRunType.BACKFILL_JOB, True),
+            (DagRunType.DATASET_TRIGGERED, False),
+        ],
+        ids=[
+            DagRunType.MANUAL.name,
+            DagRunType.SCHEDULED.name,
+            DagRunType.BACKFILL_JOB.name,
+            DagRunType.DATASET_TRIGGERED.name,
+        ],
+    )
+    def test_should_update_dag_next_dagruns_after_run_type(self, run_type, 
should_update, session, dag_maker):
+        """Test that whether next dagrun is updated depends on run type"""
+        with dag_maker(
+            dag_id="test_should_update_dag_next_dagruns_after_run_type",
+            schedule="*/1 * * * *",
+            max_active_runs=10,
+        ) as dag:
+            EmptyOperator(task_id="dummy")
+
+        dag_model = dag_maker.dag_model
+
+        run = dag_maker.create_dagrun(
+            run_id="run",
+            run_type=run_type,
+            execution_date=DEFAULT_DATE,
+            start_date=timezone.utcnow(),
+            state=State.SUCCESS,
+            session=session,
+        )
+
+        session.flush()
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        assert should_update is 
self.job_runner._should_update_dag_next_dagruns(
+            dag, dag_model, last_dag_run=run, total_active_runs=0, 
session=session
         )
 
     def test_create_dag_runs(self, dag_maker):

Reply via email to