This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8223e8975197e6d96068127b4f5ff5b29ceafee5
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Wed Oct 20 21:25:46 2021 +0100

    Prevent scheduler crash when serialized dag is missing (#19113)
    
    Scheduler._send_dag_callbacks_to_processor calls dag_run.get_dag which
    raises exception. This PR changes to calling dagbag.get_dag and changing
    Scheduler._send_dag_callbacks_to_processor args to accept dag instead of 
dag_run.
    
    (cherry picked from commit 5dc375aa7744f37c7a09f322cd9f4a221aa4ccbe)
---
 airflow/jobs/scheduler_job.py    | 14 ++++++++------
 tests/jobs/test_scheduler_job.py |  4 +---
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 7684032..17fc55c 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -766,7 +766,12 @@ class SchedulerJob(BaseJob):
 
             # Send the callbacks after we commit to ensure the context is up 
to date when it gets run
             for dag_run, callback_to_run in callback_tuples:
-                self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
+                dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
+                if not dag:
+                    self.log.error("DAG '%s' not found in serialized_dag 
table", dag_run.dag_id)
+                    continue
+
+                self._send_dag_callbacks_to_processor(dag, callback_to_run)
 
             # Without this, the session has an invalid view of the DB
             session.expunge_all()
@@ -990,7 +995,7 @@ class SchedulerJob(BaseJob):
             )
 
             # Send SLA & DAG Success/Failure Callbacks to be executed
-            self._send_dag_callbacks_to_processor(dag_run, callback_to_execute)
+            self._send_dag_callbacks_to_processor(dag, callback_to_execute)
 
             return 0
 
@@ -1026,13 +1031,10 @@ class SchedulerJob(BaseJob):
         # Verify integrity also takes care of session.flush
         dag_run.verify_integrity(session=session)
 
-    def _send_dag_callbacks_to_processor(
-        self, dag_run: DagRun, callback: Optional[DagCallbackRequest] = None
-    ):
+    def _send_dag_callbacks_to_processor(self, dag: DAG, callback: 
Optional[DagCallbackRequest] = None):
         if not self.processor_agent:
             raise ValueError("Processor agent is not started.")
 
-        dag = dag_run.get_dag()
         self._send_sla_callbacks_to_processor(dag)
         if callback:
             self.processor_agent.send_callback_to_execute(callback)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 3813265..11a2fca 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1363,7 +1363,6 @@ class TestSchedulerJob:
         
self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once()
         call_args = 
self.scheduler_job._send_dag_callbacks_to_processor.call_args[0]
         assert call_args[0].dag_id == dr.dag_id
-        assert call_args[0].execution_date == dr.execution_date
         assert call_args[1] is None
 
         session.rollback()
@@ -1394,11 +1393,10 @@ class TestSchedulerJob:
         with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
             self.scheduler_job._do_scheduling(session)
 
-        # Verify Callback is not set (i.e is None) when no callbacks are set 
on DAG
+        # Verify Callback is set (i.e is None) when no callbacks are set on DAG
         
self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once()
         call_args = 
self.scheduler_job._send_dag_callbacks_to_processor.call_args[0]
         assert call_args[0].dag_id == dr.dag_id
-        assert call_args[0].execution_date == dr.execution_date
         assert call_args[1] is not None
         assert call_args[1].msg == msg
         session.rollback()

Reply via email to