This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch another-remove-backcompat-gcs in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4e970bbfcd176035e24497b773ec7ba0d69b0fa1 Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Wed Dec 27 10:40:01 2023 +0100 Remove remaining Airflow 2.5 backcompat code from GCS Task Handler (#36443) Co-authored-by: Andrey Anshin <andrey.ans...@taragol.is> (cherry picked from commit 75faf1115d990746784e25280c0b326b3b557b86) --- .../providers/google/cloud/log/gcs_task_handler.py | 41 ++-------------------- .../google/cloud/log/test_gcs_task_handler.py | 9 ++--- 2 files changed, 5 insertions(+), 45 deletions(-) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 9921bb8753..abc2bc8845 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -26,7 +26,6 @@ from typing import TYPE_CHECKING, Collection # not sure why but mypy complains on missing `storage` but it is clearly there and is importable from google.cloud import storage # type: ignore[attr-defined] -from packaging.version import Version from airflow.configuration import conf from airflow.exceptions import AirflowNotFoundException @@ -48,18 +47,6 @@ _DEFAULT_SCOPESS = frozenset( logger = logging.getLogger(__name__) -def get_default_delete_local_copy(): - """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. - - TODO: delete this function when min airflow version >= 2.6. - """ - from airflow.version import version - - if Version(version) < Version("2.6"): - return False - return conf.getboolean("logging", "delete_local_logs") - - class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ GCSTaskHandler is a python log handler that handles and reads task instance logs. @@ -108,8 +95,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): self.gcp_keyfile_dict = gcp_keyfile_dict self.scopes = gcp_scopes self.project_id = project_id - self.delete_local_copy = ( - kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() + self.delete_local_copy = kwargs.get( + "delete_local_copy", conf.getboolean("logging", "delete_local_logs") ) @cached_property @@ -218,30 +205,6 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): messages.append(f"Unable to read remote log {e}") return messages, logs - def _read(self, ti, try_number, metadata=None): - """ - Read logs of given task instance and try_number from GCS. - - If failed, read the log from task instance host machine. - - todo: when min airflow version >= 2.6, remove this method - - :param ti: task instance object - :param try_number: task instance try_number to read logs from - :param metadata: log metadata, - can be used for steaming log reading and auto-tailing. - """ - if hasattr(super(), "_read_remote_logs"): - # from Airflow 2.6, we don't implement the `_read` method. - # if parent has _read_remote_logs, we're >= 2.6 - return super()._read(ti, try_number, metadata) - - messages, logs = self._read_remote_logs(ti, try_number, metadata) - if not logs: - return super()._read(ti, try_number, metadata) - - return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True} - def gcs_write(self, log, remote_log_location) -> bool: """ Write the log to the remote location and return `True`; fail silently and return `False` on error. diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index 2d4dd7340d..a3e929b985 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -248,8 +248,8 @@ class TestGCSTaskHandler: ) @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy, airflow_version", - [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], + "delete_local_copy, expected_existence_of_local_copy", + [(True, False), (False, True)], ) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -265,12 +265,9 @@ class TestGCSTaskHandler: local_log_location, delete_local_copy, expected_existence_of_local_copy, - airflow_version, ): mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT" - with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( - "airflow.version.version", airflow_version - ): + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}): handler = GCSTaskHandler( base_log_folder=local_log_location, gcs_log_folder="gs://bucket/remote/log/location",