This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 20d8142869 Fix scheduler logic to plan new dag runs by ignoring manual runs (#34027) 20d8142869 is described below commit 20d81428699db240b65f72a92183255c24e8c19b Author: Daniel DylÄ…g <bi...@users.noreply.github.com> AuthorDate: Tue Sep 5 15:01:33 2023 +0200 Fix scheduler logic to plan new dag runs by ignoring manual runs (#34027) * Fix manual task triggering scheduled tasks Fixes #33949 * fix static checks * static checks * add unit test * static check * Undo renaming * Update airflow/jobs/scheduler_job_runner.py Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> * use keyword-only arguments for last_dag_run and total_active_runs --------- Co-authored-by: daniel.dylag <danieldylag1...@gmail.com> Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> --- airflow/jobs/scheduler_job_runner.py | 33 +++++++++++++++++------- tests/jobs/test_scheduler_job.py | 50 ++++++++++++++++++++++++++++++++---- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1507a6f06f..6c4652ef1a 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1189,7 +1189,11 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin): ) active_runs_of_dags[dag.dag_id] += 1 if self._should_update_dag_next_dagruns( - dag, dag_model, active_runs_of_dags[dag.dag_id], session=session + dag, + dag_model, + last_dag_run=None, + total_active_runs=active_runs_of_dags[dag.dag_id], + session=session, ): dag_model.calculate_dagrun_date_fields(dag, data_interval) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in @@ -1297,9 +1301,22 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin): ) def _should_update_dag_next_dagruns( - self, dag: DAG, dag_model: DagModel, total_active_runs: int | None = None, *, session: Session + self, + dag: DAG, + dag_model: DagModel, + *, + last_dag_run: DagRun | None = None, + total_active_runs: int | None = None, + session: Session, ) -> bool: """Check if the dag's next_dagruns_create_after should be updated.""" + # If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run. + # In such case, schedule next only if last_dag_run is finished and was an automated run. + if last_dag_run and not ( + last_dag_run.state in State.finished_dr_states + and last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB] + ): + return False # If the DAG never schedules skip save runtime if not dag.timetable.can_be_scheduled: return False @@ -1434,8 +1451,8 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin): session.merge(task_instance) session.flush() self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) - # Work out if we should allow creating a new DagRun now? - if self._should_update_dag_next_dagruns(dag, dag_model, session=session): + + if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) callback_to_execute = DagCallbackRequest( @@ -1462,11 +1479,9 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin): return callback # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) - # Check if DAG not scheduled then skip interval calculation to same scheduler runtime - if dag_run.state in State.finished_dr_states: - # Work out if we should allow creating a new DagRun now? - if self._should_update_dag_next_dagruns(dag, dag_model, session=session): - dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) + + if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session): + dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) # This will do one query per dag run. We "could" build up a complex # query to update all the TIs across all the execution dates and dag # IDs in a single query, but it turns out that can be _very very slow_ diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 94cf3b6728..4fdb006a38 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1756,10 +1756,7 @@ class TestSchedulerJob: # Need to use something that doesn't immediately get marked as success by the scheduler BashOperator(task_id="task", bash_command="true") - dag_run = dag_maker.create_dagrun( - state=State.RUNNING, - session=session, - ) + dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED) # Reach max_active_runs for _ in range(3): @@ -3458,7 +3455,50 @@ class TestSchedulerJob: self.job_runner = SchedulerJobRunner(job=scheduler_job) assert excepted is self.job_runner._should_update_dag_next_dagruns( - dag, dag_model, number_running, session=session + dag, dag_model, total_active_runs=number_running, session=session + ) + + @pytest.mark.parametrize( + "run_type, should_update", + [ + (DagRunType.MANUAL, False), + (DagRunType.SCHEDULED, True), + (DagRunType.BACKFILL_JOB, True), + (DagRunType.DATASET_TRIGGERED, False), + ], + ids=[ + DagRunType.MANUAL.name, + DagRunType.SCHEDULED.name, + DagRunType.BACKFILL_JOB.name, + DagRunType.DATASET_TRIGGERED.name, + ], + ) + def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_update, session, dag_maker): + """Test that whether next dagrun is updated depends on run type""" + with dag_maker( + dag_id="test_should_update_dag_next_dagruns_after_run_type", + schedule="*/1 * * * *", + max_active_runs=10, + ) as dag: + EmptyOperator(task_id="dummy") + + dag_model = dag_maker.dag_model + + run = dag_maker.create_dagrun( + run_id="run", + run_type=run_type, + execution_date=DEFAULT_DATE, + start_date=timezone.utcnow(), + state=State.SUCCESS, + session=session, + ) + + session.flush() + scheduler_job = Job(executor=self.null_exec) + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + assert should_update is self.job_runner._should_update_dag_next_dagruns( + dag, dag_model, last_dag_run=run, total_active_runs=0, session=session ) def test_create_dag_runs(self, dag_maker):