This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 20f6edf5eb1847cbadef91603389295ed980dba7
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Sun Jul 25 16:19:18 2021 +0100

    Do not seek error file when it is closed (#17187)
    
    We do not check if error file is closed before we seek it, which causes 
exceptions.
    Sometimes, this error file does not exist e.g when the task state is 
changed externally.
    
    This change fixes it by returning None when the file is closed so that 
custom text can be used for error.
    
    Co-authored-by: Jed Cunningham 
<66968678+jedcunning...@users.noreply.github.com>
    (cherry picked from commit 01a0aca249eeaf71d182bf537b9d04121257ac09)
---
 airflow/models/taskinstance.py    |  2 ++
 tests/models/test_taskinstance.py | 13 +++++++++++++
 2 files changed, 15 insertions(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 38d7e22..41fa661 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -110,6 +110,8 @@ def set_current_context(context: Context):
 
 def load_error_file(fd: IO[bytes]) -> Optional[Union[str, Exception]]:
     """Load and return error from error file"""
+    if fd.closed:
+        return None
     fd.seek(0, os.SEEK_SET)
     data = fd.read()
     if not data:
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 0b063f9..63f0479 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -21,6 +21,7 @@ import os
 import time
 import unittest
 import urllib
+from tempfile import NamedTemporaryFile
 from typing import List, Optional, Union, cast
 from unittest import mock
 from unittest.mock import call, mock_open, patch
@@ -44,6 +45,7 @@ from airflow.models import (
     TaskReschedule,
     Variable,
 )
+from airflow.models.taskinstance import load_error_file, set_error_file
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import PythonOperator
@@ -113,6 +115,17 @@ class TestTaskInstance(unittest.TestCase):
     def tearDown(self):
         self.clean_db()
 
+    def test_load_error_file_returns_None_for_closed_file(self):
+        error_fd = NamedTemporaryFile()
+        error_fd.close()
+        assert load_error_file(error_fd) is None
+
+    def test_load_error_file_loads_correctly(self):
+        error_message = "some random error message"
+        with NamedTemporaryFile() as error_fd:
+            set_error_file(error_fd.name, error=error_message)
+            assert load_error_file(error_fd) == error_message
+
     def test_set_task_dates(self):
         """
         Test that tasks properly take start/end dates from DAGs

Reply via email to