This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c2763f806 Remove remaining Airflow 2.5 backcompat code from Google 
Provider (#36366)
2c2763f806 is described below

commit 2c2763f806517ae514d5614d519966da02ff4371
Author: Andrey Anshin <[email protected]>
AuthorDate: Fri Dec 22 20:39:10 2023 +0400

    Remove remaining Airflow 2.5 backcompat code from Google Provider (#36366)
---
 .../providers/google/cloud/log/gcs_task_handler.py | 41 ++--------------------
 .../google/cloud/log/stackdriver_task_handler.py   | 13 ++-----
 .../google/cloud/log/test_gcs_task_handler.py      |  9 ++---
 3 files changed, 7 insertions(+), 56 deletions(-)

diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py 
b/airflow/providers/google/cloud/log/gcs_task_handler.py
index 9921bb8753..abc2bc8845 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -26,7 +26,6 @@ from typing import TYPE_CHECKING, Collection
 
 # not sure why but mypy complains on missing `storage` but it is clearly there 
and is importable
 from google.cloud import storage  # type: ignore[attr-defined]
-from packaging.version import Version
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowNotFoundException
@@ -48,18 +47,6 @@ _DEFAULT_SCOPESS = frozenset(
 logger = logging.getLogger(__name__)
 
 
-def get_default_delete_local_copy():
-    """Load delete_local_logs conf if Airflow version > 2.6 and return False 
if not.
-
-    TODO: delete this function when min airflow version >= 2.6.
-    """
-    from airflow.version import version
-
-    if Version(version) < Version("2.6"):
-        return False
-    return conf.getboolean("logging", "delete_local_logs")
-
-
 class GCSTaskHandler(FileTaskHandler, LoggingMixin):
     """
     GCSTaskHandler is a python log handler that handles and reads task 
instance logs.
@@ -108,8 +95,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         self.gcp_keyfile_dict = gcp_keyfile_dict
         self.scopes = gcp_scopes
         self.project_id = project_id
-        self.delete_local_copy = (
-            kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else 
get_default_delete_local_copy()
+        self.delete_local_copy = kwargs.get(
+            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
         )
 
     @cached_property
@@ -218,30 +205,6 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             messages.append(f"Unable to read remote log {e}")
         return messages, logs
 
-    def _read(self, ti, try_number, metadata=None):
-        """
-        Read logs of given task instance and try_number from GCS.
-
-        If failed, read the log from task instance host machine.
-
-        todo: when min airflow version >= 2.6, remove this method
-
-        :param ti: task instance object
-        :param try_number: task instance try_number to read logs from
-        :param metadata: log metadata,
-                         can be used for steaming log reading and auto-tailing.
-        """
-        if hasattr(super(), "_read_remote_logs"):
-            # from Airflow 2.6, we don't implement the `_read` method.
-            # if parent has _read_remote_logs, we're >= 2.6
-            return super()._read(ti, try_number, metadata)
-
-        messages, logs = self._read_remote_logs(ti, try_number, metadata)
-        if not logs:
-            return super()._read(ti, try_number, metadata)
-
-        return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), 
{"end_of_log": True}
-
     def gcs_write(self, log, remote_log_location) -> bool:
         """
         Write the log to the remote location and return `True`; fail silently 
and return `False` on error.
diff --git a/airflow/providers/google/cloud/log/stackdriver_task_handler.py 
b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
index bf0e3bf4fb..f33e621f30 100644
--- a/airflow/providers/google/cloud/log/stackdriver_task_handler.py
+++ b/airflow/providers/google/cloud/log/stackdriver_task_handler.py
@@ -30,21 +30,13 @@ from google.cloud.logging_v2.types import 
ListLogEntriesRequest, ListLogEntriesR
 
 from airflow.providers.google.cloud.utils.credentials_provider import 
get_credentials_and_project_id
 from airflow.providers.google.common.consts import CLIENT_INFO
+from airflow.utils.log.trigger_handler import ctx_indiv_trigger
 
 if TYPE_CHECKING:
-    from contextvars import ContextVar
-
     from google.auth.credentials import Credentials
 
     from airflow.models import TaskInstance
 
-try:
-    # todo: remove this conditional import when min airflow version >= 2.6
-    ctx_indiv_trigger: ContextVar | None
-    from airflow.utils.log.trigger_handler import ctx_indiv_trigger
-except ImportError:
-    ctx_indiv_trigger = None
-
 DEFAULT_LOGGER_NAME = "airflow"
 _GLOBAL_RESOURCE = Resource(type="global", labels={})
 
@@ -174,8 +166,7 @@ class StackdriverTaskHandler(logging.Handler):
         """
         message = self.format(record)
         ti = None
-        # todo: remove ctx_indiv_trigger is not None check when min airflow 
version >= 2.6
-        if ctx_indiv_trigger is not None and getattr(record, 
ctx_indiv_trigger.name, None):
+        if getattr(record, ctx_indiv_trigger.name, None):
             ti = getattr(record, "task_instance", None)  # trigger context
         labels = self._get_labels(ti)
         self._transport.send(record, message, resource=self.resource, 
labels=labels)
diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py 
b/tests/providers/google/cloud/log/test_gcs_task_handler.py
index 2d4dd7340d..a3e929b985 100644
--- a/tests/providers/google/cloud/log/test_gcs_task_handler.py
+++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py
@@ -248,8 +248,8 @@ class TestGCSTaskHandler:
         )
 
     @pytest.mark.parametrize(
-        "delete_local_copy, expected_existence_of_local_copy, airflow_version",
-        [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, 
"2.5.0"), (False, True, "2.5.0")],
+        "delete_local_copy, expected_existence_of_local_copy",
+        [(True, False), (False, True)],
     )
     @mock.patch(
         
"airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id",
@@ -265,12 +265,9 @@ class TestGCSTaskHandler:
         local_log_location,
         delete_local_copy,
         expected_existence_of_local_copy,
-        airflow_version,
     ):
         mock_blob.from_string.return_value.download_as_bytes.return_value = 
b"CONTENT"
-        with conf_vars({("logging", "delete_local_logs"): 
str(delete_local_copy)}), mock.patch(
-            "airflow.version.version", airflow_version
-        ):
+        with conf_vars({("logging", "delete_local_logs"): 
str(delete_local_copy)}):
             handler = GCSTaskHandler(
                 base_log_folder=local_log_location,
                 gcs_log_folder="gs://bucket/remote/log/location",

Reply via email to