This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0678a94220f90940a3d84b5c89b17e3c7e9d2dae Author: Sam Wheating <sam.wheat...@shopify.com> AuthorDate: Fri Aug 6 03:35:13 2021 -0700 Handle and log exceptions raised during task callback (#17347) Add missing exception handling in success/retry/failure callbacks (cherry picked from commit faf9f731fa8810e05f868ffec989ea042381ada4) --- airflow/models/taskinstance.py | 15 ++++++++++++--- tests/models/test_taskinstance.py | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 41fa661..b99fa34 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1390,18 +1390,27 @@ class TaskInstance(Base, LoggingMixin): if task.on_failure_callback is not None: context = self.get_template_context() context["exception"] = error - task.on_failure_callback(context) + try: + task.on_failure_callback(context) + except Exception: + self.log.exception("Error when executing on_failure_callback") elif self.state == State.SUCCESS: task = self.task if task.on_success_callback is not None: context = self.get_template_context() - task.on_success_callback(context) + try: + task.on_success_callback(context) + except Exception: + self.log.exception("Error when executing on_success_callback") elif self.state == State.UP_FOR_RETRY: task = self.task if task.on_retry_callback is not None: context = self.get_template_context() context["exception"] = error - task.on_retry_callback(context) + try: + task.on_retry_callback(context) + except Exception: + self.log.exception("Error when executing on_retry_callback") @provide_session def run( diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 63f0479..021809b 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -1657,6 +1657,45 @@ class TestTaskInstance(unittest.TestCase): ti.refresh_from_db() assert ti.state == State.SUCCESS + @parameterized.expand( + [ + (State.SUCCESS, "Error when executing on_success_callback"), + (State.UP_FOR_RETRY, "Error when executing on_retry_callback"), + (State.FAILED, "Error when executing on_failure_callback"), + ] + ) + def test_finished_callbacks_handle_and_log_exception(self, finished_state, expected_message): + called = completed = False + + def on_finish_callable(context): + nonlocal called, completed + called = True + raise KeyError + completed = True + + dag = DAG( + 'test_success_callback_handles_exception', + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=10), + ) + task = DummyOperator( + task_id='op', + email='t...@test.test', + on_success_callback=on_finish_callable, + on_retry_callback=on_finish_callable, + on_failure_callback=on_finish_callable, + dag=dag, + ) + + ti = TI(task=task, execution_date=datetime.datetime.now()) + ti._log = mock.Mock() + ti.state = finished_state + ti._run_finished_callback() + + assert called + assert not completed + ti.log.exception.assert_called_once_with(expected_message) + def test_handle_failure(self): start_date = timezone.datetime(2016, 6, 1) dag = models.DAG(dag_id="test_handle_failure", schedule_interval=None, start_date=start_date)