This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8223e8975197e6d96068127b4f5ff5b29ceafee5 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Wed Oct 20 21:25:46 2021 +0100 Prevent scheduler crash when serialized dag is missing (#19113) Scheduler._send_dag_callbacks_to_processor calls dag_run.get_dag which raises exception. This PR changes to calling dagbag.get_dag and changing Scheduler._send_dag_callbacks_to_processor args to accept dag instead of dag_run. (cherry picked from commit 5dc375aa7744f37c7a09f322cd9f4a221aa4ccbe) --- airflow/jobs/scheduler_job.py | 14 ++++++++------ tests/jobs/test_scheduler_job.py | 4 +--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 7684032..17fc55c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -766,7 +766,12 @@ class SchedulerJob(BaseJob): # Send the callbacks after we commit to ensure the context is up to date when it gets run for dag_run, callback_to_run in callback_tuples: - self._send_dag_callbacks_to_processor(dag_run, callback_to_run) + dag = self.dagbag.get_dag(dag_run.dag_id, session=session) + if not dag: + self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id) + continue + + self._send_dag_callbacks_to_processor(dag, callback_to_run) # Without this, the session has an invalid view of the DB session.expunge_all() @@ -990,7 +995,7 @@ class SchedulerJob(BaseJob): ) # Send SLA & DAG Success/Failure Callbacks to be executed - self._send_dag_callbacks_to_processor(dag_run, callback_to_execute) + self._send_dag_callbacks_to_processor(dag, callback_to_execute) return 0 @@ -1026,13 +1031,10 @@ class SchedulerJob(BaseJob): # Verify integrity also takes care of session.flush dag_run.verify_integrity(session=session) - def _send_dag_callbacks_to_processor( - self, dag_run: DagRun, callback: Optional[DagCallbackRequest] = None - ): + def _send_dag_callbacks_to_processor(self, dag: DAG, callback: Optional[DagCallbackRequest] = None): if not self.processor_agent: raise ValueError("Processor agent is not started.") - dag = dag_run.get_dag() self._send_sla_callbacks_to_processor(dag) if callback: self.processor_agent.send_callback_to_execute(callback) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 3813265..11a2fca 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1363,7 +1363,6 @@ class TestSchedulerJob: self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once() call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0] assert call_args[0].dag_id == dr.dag_id - assert call_args[0].execution_date == dr.execution_date assert call_args[1] is None session.rollback() @@ -1394,11 +1393,10 @@ class TestSchedulerJob: with mock.patch.object(settings, "USE_JOB_SCHEDULE", False): self.scheduler_job._do_scheduling(session) - # Verify Callback is not set (i.e is None) when no callbacks are set on DAG + # Verify Callback is set (i.e is None) when no callbacks are set on DAG self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once() call_args = self.scheduler_job._send_dag_callbacks_to_processor.call_args[0] assert call_args[0].dag_id == dr.dag_id - assert call_args[0].execution_date == dr.execution_date assert call_args[1] is not None assert call_args[1].msg == msg session.rollback()