Repository: incubator-airflow Updated Branches: refs/heads/master d1f94fe20 -> 9c0c4264c
[AIRFLOW-2178] Add handling on SLA miss errors Closes #3173 from d3cay1/airflow2178-master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9c0c4264 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9c0c4264 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9c0c4264 Branch: refs/heads/master Commit: 9c0c4264c3ecdee2d11c0be9d2a151ea423dd3d9 Parents: d1f94fe Author: David Klosowski <dav...@thinknear.com> Authored: Wed Apr 4 09:19:59 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Wed Apr 4 09:19:59 2018 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 27 +++++++++----- tests/jobs.py | 103 +++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 109 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c0c4264/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 13fc2a2..6241717 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -24,7 +24,6 @@ import os import psutil import signal import six -import socket import sys import threading import time @@ -43,7 +42,6 @@ from time import sleep from airflow import configuration as conf from airflow import executors, models, settings from airflow.exceptions import AirflowException -from airflow.logging_config import configure_logging from airflow.models import DAG, DagRun from airflow.settings import Stats from airflow.task.task_runner import get_task_runner @@ -672,8 +670,13 @@ class SchedulerJob(BaseJob): if dag.sla_miss_callback: # Execute the alert callback self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ') - dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis) - notification_sent = True + try: + dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, + blocking_tis) + notification_sent = True + except Exception: + self.log.exception("Could not call sla_miss_callback for DAG %s", + dag.dag_id) email_content = """\ Here's a list of tasks that missed their SLAs: <pre><code>{task_list}\n<code></pre> @@ -691,12 +694,16 @@ class SchedulerJob(BaseJob): if email not in emails: emails.append(email) if emails and len(slas): - send_email( - emails, - "[airflow] SLA miss on DAG=" + dag.dag_id, - email_content) - email_sent = True - notification_sent = True + try: + send_email( + emails, + "[airflow] SLA miss on DAG=" + dag.dag_id, + email_content) + email_sent = True + notification_sent = True + except Exception: + self.log.exception("Could not send SLA Miss email notification for" + " DAG %s", dag.dag_id) # If we sent any notification, update the sla_miss table if notification_sent: for sla in slas: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c0c4264/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index ace593a..1e411e2 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -23,7 +23,6 @@ import multiprocessing import os import shutil import six -import socket import threading import time import unittest @@ -46,7 +45,7 @@ from airflow.utils.timeout import timeout from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, list_py_file_paths from airflow.utils.net import get_hostname -from mock import Mock, patch +from mock import Mock, patch, MagicMock, PropertyMock from sqlalchemy.orm.session import make_transient from tests.executors.test_executor import TestExecutor @@ -95,7 +94,7 @@ class BackfillJobTest(unittest.TestCase): target_dag.clear() scheduler = SchedulerJob() - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(target_dag, queue=queue) self.assertFalse(queue.append.called) @@ -108,7 +107,7 @@ class BackfillJobTest(unittest.TestCase): job.run() scheduler = SchedulerJob() - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(target_dag, queue=queue) self.assertTrue(queue.append.called) @@ -1944,7 +1943,7 @@ class SchedulerJobTest(unittest.TestCase): dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(dag, queue=queue) queue.append.assert_called_with( @@ -1976,7 +1975,7 @@ class SchedulerJobTest(unittest.TestCase): dag_id='test_scheduler_do_not_schedule_removed_task', start_date=DEFAULT_DATE) - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -2002,7 +2001,7 @@ class SchedulerJobTest(unittest.TestCase): dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -2034,7 +2033,7 @@ class SchedulerJobTest(unittest.TestCase): session.commit() session.close() - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -2072,7 +2071,7 @@ class SchedulerJobTest(unittest.TestCase): dag=dag, owner='airflow') - queue = mock.Mock() + queue = Mock() scheduler._process_task_instances(dag, queue=queue) tis = dr.get_task_instances() @@ -2210,7 +2209,7 @@ class SchedulerJobTest(unittest.TestCase): # Reduce max_active_runs to 1 dag.max_active_runs = 1 - queue = mock.Mock() + queue = Mock() # and schedule them in, so we can check how many # tasks are put on the queue (should be one, not 3) scheduler._process_task_instances(dag, queue=queue) @@ -2384,7 +2383,7 @@ class SchedulerJobTest(unittest.TestCase): session = settings.Session() # Mock the callback function so we can verify that it was not called - sla_callback = mock.MagicMock() + sla_callback = MagicMock() # Create dag with a start of 2 days ago, but an sla of 1 day ago so we'll already have an sla_miss on the books test_start_date = days_ago(2) @@ -2417,6 +2416,88 @@ class SchedulerJobTest(unittest.TestCase): sla_callback.assert_not_called() + def test_scheduler_sla_miss_callback_exception(self): + """ + Test that the scheduler gracefully logs an exception if there is a problem + calling the sla_miss_callback + """ + session = settings.Session() + + sla_callback = MagicMock(side_effect=RuntimeError('Could not call function')) + + test_start_date = days_ago(2) + dag = DAG(dag_id='test_sla_miss', + sla_miss_callback=sla_callback, + default_args={'start_date': test_start_date}) + + task = DummyOperator(task_id='dummy', + dag=dag, + owner='airflow', + sla=datetime.timedelta(hours=1)) + + session.merge(models.TaskInstance(task=task, + execution_date=test_start_date, + state='Success')) + + # Create an SlaMiss where notification was sent, but email was not + session.merge(models.SlaMiss(task_id='dummy', + dag_id='test_sla_miss', + execution_date=test_start_date)) + + # Now call manage_slas and see if the sla_miss callback gets called + scheduler = SchedulerJob(dag_id='test_sla_miss', + **self.default_scheduler_args) + + with mock.patch('airflow.jobs.SchedulerJob.log', + new_callable=PropertyMock) as mock_log: + scheduler.manage_slas(dag=dag, session=session) + sla_callback.assert_called() + mock_log().exception.assert_called_with( + 'Could not call sla_miss_callback for DAG %s', + 'test_sla_miss') + + @mock.patch("airflow.utils.email.send_email") + def test_scheduler_sla_miss_email_exception(self, mock_send_email): + """ + Test that the scheduler gracefully logs an exception if there is a problem + sending an email + """ + session = settings.Session() + + # Mock the callback function so we can verify that it was not called + mock_send_email.side_effect = RuntimeError('Could not send an email') + + test_start_date = days_ago(2) + dag = DAG(dag_id='test_sla_miss', + default_args={'start_date': test_start_date, + 'sla': datetime.timedelta(days=1)}) + + task = DummyOperator(task_id='dummy', + dag=dag, + owner='airflow', + email='t...@test.com', + sla=datetime.timedelta(hours=1)) + + session.merge(models.TaskInstance(task=task, + execution_date=test_start_date, + state='Success')) + + # Create an SlaMiss where notification was sent, but email was not + session.merge(models.SlaMiss(task_id='dummy', + dag_id='test_sla_miss', + execution_date=test_start_date)) + + scheduler = SchedulerJob(dag_id='test_sla_miss', + num_runs=1, + **self.default_scheduler_args) + + with mock.patch('airflow.jobs.SchedulerJob.log', + new_callable=PropertyMock) as mock_log: + scheduler.manage_slas(dag=dag, session=session) + mock_log().exception.assert_called_with( + 'Could not send SLA Miss email notification for DAG %s', + 'test_sla_miss') + def test_retry_still_in_executor(self): """ Checks if the scheduler does not put a task in limbo, when a task is retried