This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b9f3a5392f Extend task context logging support for remote logging 
using GCP GCS (#32970)
b9f3a5392f is described below

commit b9f3a5392f1b2fb8043ca56b2c8ded20f40e2297
Author: Pankaj Koti <pankajkoti...@gmail.com>
AuthorDate: Fri Nov 17 20:17:13 2023 +0530

    Extend task context logging support for remote logging using GCP GCS 
(#32970)
    
    With the addition of taxt context logging feature in PR #32646,
    this PR extends the feature to GCP Cloud storage when is it set as
    remote logging store. Here, backward compatibility is ensured for
    older versions of Airflow that do not have the feature included in
    Airflow Core.
---
 airflow/providers/google/cloud/log/gcs_task_handler.py | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py 
b/airflow/providers/google/cloud/log/gcs_task_handler.py
index cebe9a829e..39d0f072a8 100644
--- a/airflow/providers/google/cloud/log/gcs_task_handler.py
+++ b/airflow/providers/google/cloud/log/gcs_task_handler.py
@@ -22,7 +22,7 @@ import os
 import shutil
 from functools import cached_property
 from pathlib import Path
-from typing import Collection
+from typing import TYPE_CHECKING, Collection
 
 # not sure why but mypy complains on missing `storage` but it is clearly there 
and is importable
 from google.cloud import storage  # type: ignore[attr-defined]
@@ -36,6 +36,9 @@ from airflow.providers.google.common.consts import CLIENT_INFO
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.logging_mixin import LoggingMixin
 
+if TYPE_CHECKING:
+    from airflow.models.taskinstance import TaskInstance
+
 _DEFAULT_SCOPESS = frozenset(
     [
         "https://www.googleapis.com/auth/devstorage.read_write";,
@@ -96,6 +99,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
         **kwargs,
     ):
         super().__init__(base_log_folder, filename_template)
+        self.handler: logging.FileHandler | None = None
         self.remote_base = gcs_log_folder
         self.log_relative_path = ""
         self.closed = False
@@ -137,15 +141,21 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             project=self.project_id if self.project_id else project_id,
         )
 
-    def set_context(self, ti):
-        super().set_context(ti)
+    def set_context(self, ti: TaskInstance, *, identifier: str | None = None) 
-> None:
+        if getattr(self, "supports_task_context_logging", False):
+            super().set_context(ti, identifier=identifier)
+        else:
+            super().set_context(ti)
         # Log relative path is used to construct local and remote
         # log path to upload log files into GCS and read from the
         # remote location.
+        if TYPE_CHECKING:
+            assert self.handler is not None
+
         full_path = self.handler.baseFilename
         self.log_relative_path = 
Path(full_path).relative_to(self.local_base).as_posix()
         is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
-        self.upload_on_close = is_trigger_log_context or not ti.raw
+        self.upload_on_close = is_trigger_log_context or not getattr(ti, 
"raw", None)
 
     def close(self):
         """Close and upload local log file to remote storage GCS."""

Reply via email to