This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b31f373ef Remove remaining Airflow 2.6 backcompat code from Amazon 
Provider (#36324)
2b31f373ef is described below

commit 2b31f373ef92c2b793f3f484192aa7b7fc88a7b6
Author: Andrey Anshin <andrey.ans...@taragol.is>
AuthorDate: Wed Dec 20 14:06:08 2023 +0400

    Remove remaining Airflow 2.6 backcompat code from Amazon Provider (#36324)
---
 .../providers/amazon/aws/log/s3_task_handler.py    | 46 +---------------------
 .../amazon/aws/log/test_s3_task_handler.py         | 39 ++----------------
 2 files changed, 6 insertions(+), 79 deletions(-)

diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py 
b/airflow/providers/amazon/aws/log/s3_task_handler.py
index f3664f7c41..96cf54478a 100644
--- a/airflow/providers/amazon/aws/log/s3_task_handler.py
+++ b/airflow/providers/amazon/aws/log/s3_task_handler.py
@@ -24,8 +24,6 @@ import shutil
 from functools import cached_property
 from typing import TYPE_CHECKING
 
-from packaging.version import Version
-
 from airflow.configuration import conf
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.utils.log.file_task_handler import FileTaskHandler
@@ -35,18 +33,6 @@ if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance
 
 
-def get_default_delete_local_copy():
-    """Load delete_local_logs conf if Airflow version > 2.6 and return False 
if not.
-
-    TODO: delete this function when min airflow version >= 2.6
-    """
-    from airflow.version import version
-
-    if Version(version) < Version("2.6"):
-        return False
-    return conf.getboolean("logging", "delete_local_logs")
-
-
 class S3TaskHandler(FileTaskHandler, LoggingMixin):
     """
     S3TaskHandler is a python log handler that handles and reads task instance 
logs.
@@ -66,8 +52,8 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
         self._hook = None
         self.closed = False
         self.upload_on_close = True
-        self.delete_local_copy = (
-            kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else 
get_default_delete_local_copy()
+        self.delete_local_copy = kwargs.get(
+            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
         )
 
     @cached_property
@@ -145,34 +131,6 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             messages.append(f"No logs found on s3 for ti={ti}")
         return messages, logs
 
-    def _read(self, ti, try_number, metadata=None):
-        """
-        Read logs of given task instance and try_number from S3 remote storage.
-
-        If failed, read the log from task instance host machine.
-
-        todo: when min airflow version >= 2.6 then remove this method 
(``_read``)
-
-        :param ti: task instance object
-        :param try_number: task instance try_number to read logs from
-        :param metadata: log metadata,
-                         can be used for steaming log reading and auto-tailing.
-        """
-        # from airflow 2.6 we no longer implement the _read method
-        if hasattr(super(), "_read_remote_logs"):
-            return super()._read(ti, try_number, metadata)
-        # if we get here, we're on airflow < 2.6 and we use this backcompat 
logic
-        messages, logs = self._read_remote_logs(ti, try_number, metadata)
-        if logs:
-            return "".join(f"*** {x}\n" for x in messages) + "\n".join(logs), 
{"end_of_log": True}
-        else:
-            if metadata and metadata.get("log_pos", 0) > 0:
-                log_prefix = ""
-            else:
-                log_prefix = "*** Falling back to local log\n"
-            local_log, metadata = super()._read(ti, try_number, metadata)
-            return f"{log_prefix}{local_log}", metadata
-
     def s3_log_exists(self, remote_log_location: str) -> bool:
         """
         Check if remote_log_location exists in remote storage.
diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py 
b/tests/providers/amazon/aws/log/test_s3_task_handler.py
index 758bf1244a..67d5d3257f 100644
--- a/tests/providers/amazon/aws/log/test_s3_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py
@@ -148,33 +148,6 @@ class TestS3TaskHandler:
         assert actual == expected
         assert {"end_of_log": True, "log_pos": 0} == metadata[0]
 
-    def test_read_when_s3_log_missing_and_log_pos_missing_pre_26(self):
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        # mock that super class has no _read_remote_logs method
-        with 
mock.patch("airflow.providers.amazon.aws.log.s3_task_handler.hasattr", 
return_value=False):
-            log, metadata = self.s3_task_handler.read(ti)
-        assert 1 == len(log)
-        assert log[0][0][-1].startswith("*** Falling back to local log")
-
-    def test_read_when_s3_log_missing_and_log_pos_zero_pre_26(self):
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        # mock that super class has no _read_remote_logs method
-        with 
mock.patch("airflow.providers.amazon.aws.log.s3_task_handler.hasattr", 
return_value=False):
-            log, metadata = self.s3_task_handler.read(ti, metadata={"log_pos": 
0})
-        assert 1 == len(log)
-        assert log[0][0][-1].startswith("*** Falling back to local log")
-
-    def test_read_when_s3_log_missing_and_log_pos_over_zero_pre_26(self):
-        ti = copy.copy(self.ti)
-        ti.state = TaskInstanceState.SUCCESS
-        # mock that super class has no _read_remote_logs method
-        with 
mock.patch("airflow.providers.amazon.aws.log.s3_task_handler.hasattr", 
return_value=False):
-            log, metadata = self.s3_task_handler.read(ti, metadata={"log_pos": 
1})
-        assert 1 == len(log)
-        assert not log[0][0][-1].startswith("*** Falling back to local log")
-
     def test_s3_read_when_log_missing(self):
         handler = self.s3_task_handler
         url = "s3://bucket/foo"
@@ -240,15 +213,11 @@ class TestS3TaskHandler:
             boto3.resource("s3").Object("bucket", self.remote_log_key).get()
 
     @pytest.mark.parametrize(
-        "delete_local_copy, expected_existence_of_local_copy, airflow_version",
-        [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, 
"2.5.0"), (False, True, "2.5.0")],
+        "delete_local_copy, expected_existence_of_local_copy",
+        [(True, False), (False, True)],
     )
-    def test_close_with_delete_local_logs_conf(
-        self, delete_local_copy, expected_existence_of_local_copy, 
airflow_version
-    ):
-        with conf_vars({("logging", "delete_local_logs"): 
str(delete_local_copy)}), mock.patch(
-            "airflow.version.version", airflow_version
-        ):
+    def test_close_with_delete_local_logs_conf(self, delete_local_copy, 
expected_existence_of_local_copy):
+        with conf_vars({("logging", "delete_local_logs"): 
str(delete_local_copy)}):
             handler = S3TaskHandler(self.local_log_location, 
self.remote_log_base)
 
         handler.log.info("test")

Reply via email to