serkef commented on a change in pull request #5517: [AIRFLOW-4292] Cleanup and 
improve SLA code
URL: https://github.com/apache/airflow/pull/5517#discussion_r302275896
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -434,86 +434,89 @@ def manage_slas(self, dag, session=None):
                     dttm = dag.following_schedule(dttm)
         session.commit()
 
+        # Identify tasks should send notification
         slas = (
             session
             .query(SlaMiss)
             .filter(SlaMiss.notification_sent == False, SlaMiss.dag_id == 
dag.dag_id)  # noqa pylint: disable=singleton-comparison
             .all()
         )
 
-        if slas:
-            sla_dates = [sla.execution_date for sla in slas]
-            qry = (
-                session
-                .query(TI)
-                .filter(
-                    TI.state != State.SUCCESS,
-                    TI.execution_date.in_(sla_dates),
-                    TI.dag_id == dag.dag_id
-                ).all()
-            )
-            blocking_tis = []
-            for ti in qry:
-                if ti.task_id in dag.task_ids:
-                    ti.task = dag.get_task(ti.task_id)
-                    blocking_tis.append(ti)
-                else:
-                    session.delete(ti)
-                    session.commit()
-
-            task_list = "\n".join([
-                sla.task_id + ' on ' + sla.execution_date.isoformat()
-                for sla in slas])
-            blocking_task_list = "\n".join([
-                ti.task_id + ' on ' + ti.execution_date.isoformat()
-                for ti in blocking_tis])
-            # Track whether email or any alert notification sent
-            # We consider email or the alert callback as notifications
-            email_sent = False
-            notification_sent = False
-            if dag.sla_miss_callback:
-                # Execute the alert callback
-                self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL 
BACK ')
-                try:
-                    dag.sla_miss_callback(dag, task_list, blocking_task_list, 
slas,
-                                          blocking_tis)
-                    notification_sent = True
-                except Exception:
-                    self.log.exception("Could not call sla_miss_callback for 
DAG %s",
-                                       dag.dag_id)
-            email_content = """\
-            Here's a list of tasks that missed their SLAs:
-            <pre><code>{task_list}\n<code></pre>
-            Blocking tasks:
-            <pre><code>{blocking_task_list}\n{bug}<code></pre>
-            """.format(task_list=task_list, 
blocking_task_list=blocking_task_list,
-                       bug=asciiart.bug)
-            emails = set()
-            for task in dag.tasks:
-                if task.email:
-                    if isinstance(task.email, str):
-                        emails |= set(get_email_address_list(task.email))
-                    elif isinstance(task.email, (list, tuple)):
-                        emails |= set(task.email)
-            if emails:
-                try:
-                    send_email(
-                        emails,
-                        "[airflow] SLA miss on DAG=" + dag.dag_id,
-                        email_content)
-                    email_sent = True
-                    notification_sent = True
-                except Exception:
-                    self.log.exception("Could not send SLA Miss email 
notification for"
-                                       " DAG %s", dag.dag_id)
-            # If we sent any notification, update the sla_miss table
-            if notification_sent:
-                for sla in slas:
-                    if email_sent:
-                        sla.email_sent = True
-                    sla.notification_sent = True
-                    session.merge(sla)
-            session.commit()
+        if not slas:
+            return
+
+        sla_dates = {sla.execution_date for sla in slas}
+        qry = (
+            session
+            .query(TI)
+            .filter(
+                TI.state != State.SUCCESS,
+                TI.execution_date.in_(sla_dates),
+                TI.dag_id == dag.dag_id
+            ).all()
+        )
+        blocking_tis = []
+        for ti in qry:
+            if ti.task_id in dag.task_ids:
+                ti.task = dag.get_task(ti.task_id)
+                blocking_tis.append(ti)
+            else:
+                session.delete(ti)
+                session.commit()
+
+        task_list = "\n".join(
+            sla.task_id + ' on ' + sla.execution_date.isoformat()
 
 Review comment:
   Changing list to generator

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to