This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 127c0725b9 Revert "Remove remaining Airflow 2.5 backcompat code from GCS Task Handler (#36443)" (#36453) 127c0725b9 is described below commit 127c0725b9eb7c8be015ac10d74f963e3d6383ae Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Wed Dec 27 19:50:53 2023 +0100 Revert "Remove remaining Airflow 2.5 backcompat code from GCS Task Handler (#36443)" (#36453) This reverts commit 75faf1115d990746784e25280c0b326b3b557b86. --- .../providers/google/cloud/log/gcs_task_handler.py | 41 ++++++++++++++++++++-- .../google/cloud/log/test_gcs_task_handler.py | 9 +++-- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index abc2bc8845..9921bb8753 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -26,6 +26,7 @@ from typing import TYPE_CHECKING, Collection # not sure why but mypy complains on missing `storage` but it is clearly there and is importable from google.cloud import storage # type: ignore[attr-defined] +from packaging.version import Version from airflow.configuration import conf from airflow.exceptions import AirflowNotFoundException @@ -47,6 +48,18 @@ _DEFAULT_SCOPESS = frozenset( logger = logging.getLogger(__name__) +def get_default_delete_local_copy(): + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not. + + TODO: delete this function when min airflow version >= 2.6. + """ + from airflow.version import version + + if Version(version) < Version("2.6"): + return False + return conf.getboolean("logging", "delete_local_logs") + + class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ GCSTaskHandler is a python log handler that handles and reads task instance logs. @@ -95,8 +108,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): self.gcp_keyfile_dict = gcp_keyfile_dict self.scopes = gcp_scopes self.project_id = project_id - self.delete_local_copy = kwargs.get( - "delete_local_copy", conf.getboolean("logging", "delete_local_logs") + self.delete_local_copy = ( + kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() ) @cached_property @@ -205,6 +218,30 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): messages.append(f"Unable to read remote log {e}") return messages, logs + def _read(self, ti, try_number, metadata=None): + """ + Read logs of given task instance and try_number from GCS. + + If failed, read the log from task instance host machine. + + todo: when min airflow version >= 2.6, remove this method + + :param ti: task instance object + :param try_number: task instance try_number to read logs from + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + """ + if hasattr(super(), "_read_remote_logs"): + # from Airflow 2.6, we don't implement the `_read` method. + # if parent has _read_remote_logs, we're >= 2.6 + return super()._read(ti, try_number, metadata) + + messages, logs = self._read_remote_logs(ti, try_number, metadata) + if not logs: + return super()._read(ti, try_number, metadata) + + return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True} + def gcs_write(self, log, remote_log_location) -> bool: """ Write the log to the remote location and return `True`; fail silently and return `False` on error. diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index a3e929b985..2d4dd7340d 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -248,8 +248,8 @@ class TestGCSTaskHandler: ) @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy", - [(True, False), (False, True)], + "delete_local_copy, expected_existence_of_local_copy, airflow_version", + [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], ) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -265,9 +265,12 @@ class TestGCSTaskHandler: local_log_location, delete_local_copy, expected_existence_of_local_copy, + airflow_version, ): mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT" - with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}): + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( + "airflow.version.version", airflow_version + ): handler = GCSTaskHandler( base_log_folder=local_log_location, gcs_log_folder="gs://bucket/remote/log/location",