This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b16439e156077deec4ee287c58ef0ede6621c6f6
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Mon Oct 23 19:39:37 2023 +0100

    Fix Scheduler crash looping when dagrun creation fails (#35135)
    
    (cherry picked from commit 98618f0c4f1dfd4718741291c29d4adb6f0c0140)
---
 airflow/jobs/scheduler_job_runner.py | 30 +++++++++++++++++++-----------
 tests/jobs/test_scheduler_job.py     | 25 +++++++++++++++++++++++++
 2 files changed, 44 insertions(+), 11 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index c8494c8731..3b65dbfafa 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1170,17 +1170,25 @@ class SchedulerJobRunner(BaseJobRunner[Job], 
LoggingMixin):
             # create a new one. This is so that in the next Scheduling loop we 
try to create new runs
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
-                dag.create_dagrun(
-                    run_type=DagRunType.SCHEDULED,
-                    execution_date=dag_model.next_dagrun,
-                    state=DagRunState.QUEUED,
-                    data_interval=data_interval,
-                    external_trigger=False,
-                    session=session,
-                    dag_hash=dag_hash,
-                    creating_job_id=self.job.id,
-                )
-                active_runs_of_dags[dag.dag_id] += 1
+                try:
+                    dag.create_dagrun(
+                        run_type=DagRunType.SCHEDULED,
+                        execution_date=dag_model.next_dagrun,
+                        state=DagRunState.QUEUED,
+                        data_interval=data_interval,
+                        external_trigger=False,
+                        session=session,
+                        dag_hash=dag_hash,
+                        creating_job_id=self.job.id,
+                    )
+                    active_runs_of_dags[dag.dag_id] += 1
+                # Exceptions like ValueError, ParamValidationError, etc. are 
raised by
+                # dag.create_dagrun() when dag is misconfigured. The scheduler 
should not
+                # crash due to misconfigured dags. We should log any exception 
encountered
+                # and continue to the next dag.
+                except Exception:
+                    self.log.exception("Failed creating DagRun for %s", 
dag.dag_id)
+                    continue
             if self._should_update_dag_next_dagruns(
                 dag,
                 dag_model,
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index dd9b0d9635..57a02e782a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -5121,6 +5121,31 @@ class TestSchedulerJob:
         ]
         assert orphaned_datasets == ["ds2", "ds4"]
 
+    def test_misconfigured_dags_doesnt_crash_scheduler(self, session, 
dag_maker, caplog):
+        """Test that if dagrun creation throws an exception, the scheduler 
doesn't crash"""
+
+        with dag_maker("testdag1", serialized=True):
+            BashOperator(task_id="task", bash_command="echo 1")
+
+        dm1 = dag_maker.dag_model
+        # Here, the next_dagrun is set to None, which will cause an exception
+        dm1.next_dagrun = None
+        session.add(dm1)
+        session.flush()
+
+        with dag_maker("testdag2", serialized=True):
+            BashOperator(task_id="task", bash_command="echo 1")
+        dm2 = dag_maker.dag_model
+
+        scheduler_job = Job()
+        job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
+        # In the dagmodel list, the first dag should fail, but the second one 
should succeed
+        job_runner._create_dag_runs([dm1, dm2], session)
+        assert "Failed creating DagRun for testdag1" in caplog.text
+        assert not DagRun.find(dag_id="testdag1", session=session)
+        # Check if the second dagrun was created
+        assert DagRun.find(dag_id="testdag2", session=session)
+
 
 @pytest.mark.need_serialized_dag
 def test_schedule_dag_run_with_upstream_skip(dag_maker, session):

Reply via email to