XD-DENG commented on code in PR #62893:
URL: https://github.com/apache/airflow/pull/62893#discussion_r2956944816


##########
airflow-core/tests/unit/jobs/test_scheduler_job.py:
##########
@@ -5130,6 +5130,64 @@ def 
test_scheduler_create_dag_runs_does_not_crash_on_deserialization_error(self,
                 f"Expected deserialization error log, got: 
{scheduler_messages}"
             )
 
+    def 
test_schedule_all_dag_runs_does_not_crash_on_single_dag_run_error(self, 
dag_maker, caplog, session):
+        """Test that _schedule_all_dag_runs continues processing other DAG runs
+        when one DAG run raises an exception during scheduling.
+
+        Previously, _schedule_all_dag_runs used a list comprehension that would
+        abort entirely if any single _schedule_dag_run call raised, crashing
+        the entire scheduler and stopping scheduling for ALL DAGs.
+
+        While the specific scenario used to reproduce this (a TaskInstance with
+        state=UP_FOR_RETRY and end_date=NULL) is nearly impossible under normal
+        operation, the lack of per-dag-run fault isolation means ANY unexpected
+        exception from ANY dag run would have the same catastrophic effect.
+        """
+        # Create two DAGs with running DAG runs
+        with dag_maker(dag_id="good_dag", schedule="@once"):
+            EmptyOperator(task_id="good_task")
+        good_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        with dag_maker(dag_id="bad_dag", schedule="@once"):
+            EmptyOperator(task_id="bad_task")
+        bad_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        session.flush()
+
+        scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=scheduler_job, 
executors=[self.null_exec])
+
+        caplog.clear()
+        with (
+            caplog.at_level("ERROR", 
logger="airflow.jobs.scheduler_job_runner"),
+            patch.object(
+                self.job_runner,
+                "_schedule_dag_run",
+                side_effect=[

Review Comment:
   Thanks @kaxil !
   
   Addressed at 
https://github.com/apache/airflow/pull/62893/commits/0658cb8b498419796ac287298bf0e4a04eb4efad



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2197,7 +2197,13 @@ def _schedule_all_dag_runs(
         session: Session,
     ) -> list[tuple[DagRun, DagCallbackRequest | None]]:
         """Make scheduling decisions for all `dag_runs`."""
-        callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
+        callback_tuples = []
+        for run in dag_runs:
+            try:
+                callback = self._schedule_dag_run(run, session=session)
+                callback_tuples.append((run, callback))
+            except Exception:

Review Comment:
   Thanks a lot @kaxil !
   
   Addressed at 
https://github.com/apache/airflow/pull/62893/commits/0658cb8b498419796ac287298bf0e4a04eb4efad



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to