shahar1 commented on code in PR #65198:
URL: https://github.com/apache/airflow/pull/65198#discussion_r3223538767


##########
providers/google/tests/unit/google/cloud/log/test_stackdriver_task_handler.py:
##########
@@ -50,6 +56,231 @@ def clean_stackdriver_handlers():
             del handler
 
 
+class TestStackdriverRemoteLogIO:
+    def setup_method(self):
+        self.local_log_location = str(Path(tempfile.gettempdir()) / 
"local/stackdriver/logs")

Review Comment:
   nit: It is better to use pytest's native `tmp_path`
   https://docs.pytest.org/en/stable/how-to/tmp_path.html



##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -88,10 +298,10 @@ class StackdriverTaskHandler(logging.Handler):
     :param labels: (Optional) Mapping of labels for the entry.
     """
 
-    LABEL_TASK_ID = "task_id"
-    LABEL_DAG_ID = "dag_id"
-    LABEL_LOGICAL_DATE = "logical_date" if AIRFLOW_V_3_0_PLUS else 
"execution_date"
-    LABEL_TRY_NUMBER = "try_number"
+    LABEL_TASK_ID = LABEL_TASK_ID
+    LABEL_DAG_ID = LABEL_DAG_ID
+    LABEL_LOGICAL_DATE = LABEL_LOGICAL_DATE
+    LABEL_TRY_NUMBER = LABEL_TRY_NUMBER

Review Comment:
   ```suggestion
   # Re-export module-level constants for back-compat with external code 
reading them off the class
       LABEL_TASK_ID = LABEL_TASK_ID
       LABEL_DAG_ID = LABEL_DAG_ID
       LABEL_LOGICAL_DATE = LABEL_LOGICAL_DATE
       LABEL_TRY_NUMBER = LABEL_TRY_NUMBER
   ```
   
   otherwise it reads like dead code and someone will try to delete it.



##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -245,112 +425,9 @@ def read(
 
         return [((self.task_instance_hostname, messages),)], [new_metadata]
 
-    def _prepare_log_filter(self, ti_labels: dict[str, str]) -> str:
-        """
-        Prepare the filter that chooses which log entries to fetch.
-
-        More information:
-        
https://cloud.google.com/logging/docs/reference/v2/rest/v2/entries/list#body.request_body.FIELDS.filter
-        https://cloud.google.com/logging/docs/view/advanced-queries
-
-        :param ti_labels: Task Instance's labels that will be used to search 
for logs
-        :return: logs filter
-        """
-
-        def escape_label_key(key: str) -> str:
-            return f'"{key}"' if "." in key else key
-
-        def escale_label_value(value: str) -> str:
-            escaped_value = value.replace("\\", "\\\\").replace('"', '\\"')
-            return f'"{escaped_value}"'
-
-        _, project = self._credentials_and_project
-        log_filters = [
-            f"resource.type={escale_label_value(self.resource.type)}",
-            f'logName="projects/{project}/logs/{self.gcp_log_name}"',
-        ]
-
-        for key, value in self.resource.labels.items():
-            
log_filters.append(f"resource.labels.{escape_label_key(key)}={escale_label_value(value)}")
-
-        for key, value in ti_labels.items():
-            
log_filters.append(f"labels.{escape_label_key(key)}={escale_label_value(value)}")
-        return "\n".join(log_filters)
-
-    def _read_logs(
-        self, log_filter: str, next_page_token: str | None, all_pages: bool
-    ) -> tuple[str, bool, str | None]:
-        """
-        Send requests to the Stackdriver service and downloads logs.
-
-        :param log_filter: Filter specifying the logs to be downloaded.
-        :param next_page_token: The token of the page from which the log 
download will start.
-            If None is passed, it will start from the first page.
-        :param all_pages: If True is passed, all subpages will be downloaded. 
Otherwise, only the first
-            page will be downloaded
-        :return: A token that contains the following items:
-            * string with logs
-            * Boolean value describing whether there are more logs,
-            * token of the next page
-        """
-        messages = []
-        new_messages, next_page_token = self._read_single_logs_page(
-            log_filter=log_filter,
-            page_token=next_page_token,
-        )
-        messages.append(new_messages)
-        if all_pages:
-            while next_page_token:
-                new_messages, next_page_token = self._read_single_logs_page(
-                    log_filter=log_filter, page_token=next_page_token
-                )
-                messages.append(new_messages)
-                if not messages:
-                    break
-
-            end_of_log = True
-            next_page_token = None
-        else:
-            end_of_log = not bool(next_page_token)
-        return "\n".join(messages), end_of_log, next_page_token
-
-    def _read_single_logs_page(self, log_filter: str, page_token: str | None = 
None) -> tuple[str, str]:
-        """
-        Send requests to the Stackdriver service and downloads single pages 
with logs.
-
-        :param log_filter: Filter specifying the logs to be downloaded.
-        :param page_token: The token of the page to be downloaded. If None is 
passed, the first page will be
-            downloaded.
-        :return: Downloaded logs and next page token
-        """
-        _, project = self._credentials_and_project
-        request = ListLogEntriesRequest(
-            resource_names=[f"projects/{project}"],
-            filter=log_filter,
-            page_token=page_token,
-            order_by="timestamp asc",
-            page_size=1000,
-        )
-        response = 
self._logging_service_client.list_log_entries(request=request)
-        page: ListLogEntriesResponse = next(response.pages)
-        messages: list[str] = []
-        for entry in page.entries:
-            if "message" in (entry.json_payload or {}):
-                messages.append(entry.json_payload["message"])  # type: ignore
-            elif entry.text_payload:
-                messages.append(entry.text_payload)
-        return "\n".join(messages), page.next_page_token
-
     @classmethod
     def _task_instance_to_labels(cls, ti: TaskInstance) -> dict[str, str]:
-        return {
-            cls.LABEL_TASK_ID: ti.task_id,
-            cls.LABEL_DAG_ID: ti.dag_id,
-            cls.LABEL_LOGICAL_DATE: str(ti.logical_date.isoformat())
-            if AIRFLOW_V_3_0_PLUS
-            else str(ti.execution_date.isoformat()),
-            cls.LABEL_TRY_NUMBER: str(ti.try_number),
-        }
+        return _task_instance_to_labels(ti)

Review Comment:
   Subclass-override regression: old code built the dict with `cls.LABEL_*`; 
new code delegates to the module-level fn which uses module-level constants. 
   
   Compare (taken from current **`_task_instance_to_labels`):**
   ```python
   return {
           LABEL_TASK_ID: ti.task_id,
           ...
   ```
   
   with previous:
   ```python
   return {
           cls.LABEL_TASK_ID: ti.task_id,
           ...
   ```



##########
providers/google/src/airflow/providers/google/cloud/log/stackdriver_task_handler.py:
##########
@@ -56,6 +68,204 @@
     ["https://www.googleapis.com/auth/logging.read";, 
"https://www.googleapis.com/auth/logging.write";]
 )
 
+LABEL_TASK_ID = "task_id"
+LABEL_DAG_ID = "dag_id"
+LABEL_LOGICAL_DATE = "logical_date" if AIRFLOW_V_3_0_PLUS else "execution_date"
+LABEL_TRY_NUMBER = "try_number"
+
+
[email protected](kw_only=True)
+class StackdriverRemoteLogIO(LoggingMixin):
+    """Remote log IO that streams logs to and reads from Google Cloud 
Stackdriver Logging."""
+
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool = True
+
+    gcp_key_path: str | None = None
+    scopes: Collection[str] | None = _DEFAULT_SCOPESS
+    gcp_log_name: str = DEFAULT_LOGGER_NAME
+    transport_type: type[Transport] = BackgroundThreadTransport
+    resource: Resource = _GLOBAL_RESOURCE
+    labels: dict[str, str] | None = None
+
+    @cached_property
+    def credentials_and_project(self) -> tuple[Credentials, str]:
+        credentials, project = get_credentials_and_project_id(
+            key_path=self.gcp_key_path, scopes=self.scopes, 
disable_logging=True
+        )
+        return credentials, project
+
+    @cached_property
+    def _client(self) -> gcp_logging.Client:
+        """The Cloud Library API client."""
+        credentials, project = self.credentials_and_project
+        return gcp_logging.Client(
+            credentials=credentials,
+            project=project,
+            client_info=CLIENT_INFO,
+        )
+
+    @cached_property
+    def _logging_service_client(self) -> LoggingServiceV2Client:
+        """The Cloud logging service v2 client."""
+        credentials, _ = self.credentials_and_project
+        return LoggingServiceV2Client(
+            credentials=credentials,
+            client_info=CLIENT_INFO,
+        )
+
+    @cached_property
+    def transport(self) -> Transport:
+        """Object responsible for sending data to Stackdriver."""
+        return self.transport_type(self._client, self.gcp_log_name)
+
+    @cached_property
+    def processors(self) -> tuple[structlog.typing.Processor, ...]:
+        import structlog.stdlib
+
+        from airflow.sdk.log import relative_path_from_logger
+
+        log_record_factory = getLogRecordFactory()
+        _transport = self.transport
+
+        def proc(
+            logger: structlog.typing.WrappedLogger,
+            method_name: str,
+            event: structlog.typing.EventDict,
+        ):
+            if not logger or not relative_path_from_logger(logger):
+                return event
+
+            name = event.get("logger_name") or event.get("logger", "")
+            level = structlog.stdlib.NAME_TO_LEVEL.get(method_name.lower(), 
logging.INFO)
+            msg = copy.copy(event)
+            created = None
+            if ts := msg.pop("timestamp", None):
+                with contextlib.suppress(Exception):
+                    created = datetime.fromisoformat(ts)
+            record = log_record_factory(
+                name,
+                level,
+                pathname="",
+                lineno=0,
+                msg=msg,
+                args=(),
+                exc_info=None,
+                func=None,
+                sinfo=None,
+            )
+            if created is not None:
+                ct = created.timestamp()
+                record.created = ct
+                record.msecs = int((ct - int(ct)) * 1000) + 0.0
+            _transport.send(
+                record, str(msg.get("event", "")), resource=self.resource, 
labels=self.labels or {}
+            )

Review Comment:
   Here you send only the static labels (without TI-derived ones), but in lines 
188-193 you filter by task_id/dag_id/try_number - so reads won't return 
anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to