This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 20f6edf5eb1847cbadef91603389295ed980dba7 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Sun Jul 25 16:19:18 2021 +0100 Do not seek error file when it is closed (#17187) We do not check if error file is closed before we seek it, which causes exceptions. Sometimes, this error file does not exist e.g when the task state is changed externally. This change fixes it by returning None when the file is closed so that custom text can be used for error. Co-authored-by: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> (cherry picked from commit 01a0aca249eeaf71d182bf537b9d04121257ac09) --- airflow/models/taskinstance.py | 2 ++ tests/models/test_taskinstance.py | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 38d7e22..41fa661 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -110,6 +110,8 @@ def set_current_context(context: Context): def load_error_file(fd: IO[bytes]) -> Optional[Union[str, Exception]]: """Load and return error from error file""" + if fd.closed: + return None fd.seek(0, os.SEEK_SET) data = fd.read() if not data: diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 0b063f9..63f0479 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -21,6 +21,7 @@ import os import time import unittest import urllib +from tempfile import NamedTemporaryFile from typing import List, Optional, Union, cast from unittest import mock from unittest.mock import call, mock_open, patch @@ -44,6 +45,7 @@ from airflow.models import ( TaskReschedule, Variable, ) +from airflow.models.taskinstance import load_error_file, set_error_file from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.operators.python import PythonOperator @@ -113,6 +115,17 @@ class TestTaskInstance(unittest.TestCase): def tearDown(self): self.clean_db() + def test_load_error_file_returns_None_for_closed_file(self): + error_fd = NamedTemporaryFile() + error_fd.close() + assert load_error_file(error_fd) is None + + def test_load_error_file_loads_correctly(self): + error_message = "some random error message" + with NamedTemporaryFile() as error_fd: + set_error_file(error_fd.name, error=error_message) + assert load_error_file(error_fd) == error_message + def test_set_task_dates(self): """ Test that tasks properly take start/end dates from DAGs