This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b16439e156077deec4ee287c58ef0ede6621c6f6 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Mon Oct 23 19:39:37 2023 +0100 Fix Scheduler crash looping when dagrun creation fails (#35135) (cherry picked from commit 98618f0c4f1dfd4718741291c29d4adb6f0c0140) --- airflow/jobs/scheduler_job_runner.py | 30 +++++++++++++++++++----------- tests/jobs/test_scheduler_job.py | 25 +++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index c8494c8731..3b65dbfafa 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1170,17 +1170,25 @@ class SchedulerJobRunner(BaseJobRunner[Job], LoggingMixin): # create a new one. This is so that in the next Scheduling loop we try to create new runs # instead of falling in a loop of Integrity Error. if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns: - dag.create_dagrun( - run_type=DagRunType.SCHEDULED, - execution_date=dag_model.next_dagrun, - state=DagRunState.QUEUED, - data_interval=data_interval, - external_trigger=False, - session=session, - dag_hash=dag_hash, - creating_job_id=self.job.id, - ) - active_runs_of_dags[dag.dag_id] += 1 + try: + dag.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=dag_model.next_dagrun, + state=DagRunState.QUEUED, + data_interval=data_interval, + external_trigger=False, + session=session, + dag_hash=dag_hash, + creating_job_id=self.job.id, + ) + active_runs_of_dags[dag.dag_id] += 1 + # Exceptions like ValueError, ParamValidationError, etc. are raised by + # dag.create_dagrun() when dag is misconfigured. The scheduler should not + # crash due to misconfigured dags. We should log any exception encountered + # and continue to the next dag. + except Exception: + self.log.exception("Failed creating DagRun for %s", dag.dag_id) + continue if self._should_update_dag_next_dagruns( dag, dag_model, diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index dd9b0d9635..57a02e782a 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5121,6 +5121,31 @@ class TestSchedulerJob: ] assert orphaned_datasets == ["ds2", "ds4"] + def test_misconfigured_dags_doesnt_crash_scheduler(self, session, dag_maker, caplog): + """Test that if dagrun creation throws an exception, the scheduler doesn't crash""" + + with dag_maker("testdag1", serialized=True): + BashOperator(task_id="task", bash_command="echo 1") + + dm1 = dag_maker.dag_model + # Here, the next_dagrun is set to None, which will cause an exception + dm1.next_dagrun = None + session.add(dm1) + session.flush() + + with dag_maker("testdag2", serialized=True): + BashOperator(task_id="task", bash_command="echo 1") + dm2 = dag_maker.dag_model + + scheduler_job = Job() + job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull) + # In the dagmodel list, the first dag should fail, but the second one should succeed + job_runner._create_dag_runs([dm1, dm2], session) + assert "Failed creating DagRun for testdag1" in caplog.text + assert not DagRun.find(dag_id="testdag1", session=session) + # Check if the second dagrun was created + assert DagRun.find(dag_id="testdag2", session=session) + @pytest.mark.need_serialized_dag def test_schedule_dag_run_with_upstream_skip(dag_maker, session):