dstandish commented on code in PR #28626:
URL: https://github.com/apache/airflow/pull/28626#discussion_r1210678412


##########
airflow/providers/apache/hdfs/log/webhdfs_task_handler.py:
##########
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+import pathlib
+from typing import Any
+
+from airflow.compat.functools import cached_property
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.providers.apache.hdfs.hooks.webhdfs import WebHDFSHook
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class WebHDFSTaskHandler(FileTaskHandler, LoggingMixin):
+    """A TaskHandler that uploads logs to HDFS once done. During the run, it 
will show the executor logs."""
+
+    trigger_should_wrap = True
+
+    def __init__(
+        self,
+        base_log_folder: str,
+        webhdfs_log_folder: str,
+        filename_template: str | None = None,
+    ):
+        super().__init__(base_log_folder=base_log_folder, 
filename_template=filename_template)
+        # Overwrite the location to write task logs to.
+        self.remote_base = WebHDFSHook.parse_webhdfs_url(webhdfs_log_folder)
+        self.log_relative_path = ""
+        self.upload_on_close = False
+        self.closed = False
+
+    @cached_property
+    def hook(self) -> WebHDFSHook:
+        """Returns WebHDFSHook."""
+        return WebHDFSHook(webhdfs_conn_id=conf.get("logging", 
"REMOTE_LOG_CONN_ID"))
+
+    def set_context(self, ti: TaskInstance) -> None:
+        """Provide task_instance context to airflow task handler."""
+        super().set_context(ti)
+
+        # Local location and remote location is needed to open and
+        # upload local log file to S3 remote storage.
+        full_path = self.handler.baseFilename  # type: ignore[union-attr]
+        self.log_relative_path = 
pathlib.Path(full_path).relative_to(self.local_base).as_posix()
+        is_trigger_log_context = getattr(ti, "is_trigger_log_context", False)
+        self.upload_on_close = is_trigger_log_context or not ti.raw
+
+        # Clear the file first so that duplicate data is not uploaded
+        # when re-using the same path (e.g. with rescheduled sensors)
+        if self.upload_on_close:
+            with open(self.handler.baseFilename, "w"):  # type: 
ignore[union-attr]
+                pass
+
+    @staticmethod
+    def encode_remote_path(remote_loc: str) -> str:
+        """
+        Encode URLs for compatibility with WebHDFS.
+
+        This will replace all ":" and "+" with "-".
+        """
+        # We replace all URL-encode characters with hyphen because:
+        # - ":" is not a valid character in HDFS paths.
+        # - "+" is present in timezones from many template variables
+        # - WebHDFS does not handle urlencoded correctly between versions:
+        #   - https://issues.apache.org/jira/browse/HDFS-14423
+        #   - https://issues.apache.org/jira/browse/HDFS-14466
+        # """
+        return remote_loc.replace(":", "-").replace("+", "-")
+
+    def close(self) -> None:
+        """Close and upload local log file to remote storage GCS."""
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super().close()
+
+        if not self.upload_on_close:
+            return
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = os.path.join(self.remote_base, 
self.encode_remote_path(self.log_relative_path))
+
+        if os.path.exists(local_loc):
+            log = pathlib.Path(local_loc).read_text()
+            self.hdfs_write(log, remote_loc)
+
+        # Mark closed, so we don't double write if close is called twice
+        self.closed = True
+
+    def _read_remote_logs(
+        self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | 
None = None
+    ) -> tuple[list[str], list[str]]:
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        worker_log_rel_path = self._render_filename(ti, try_number)
+
+        remote_loc = os.path.join(self.remote_base, worker_log_rel_path)
+        remote_loc = self.encode_remote_path(remote_loc)
+
+        logs = []
+        messages = []
+        if self.hook.check_for_path(remote_loc):
+            messages.append(f"Found logs in webhdfs: {remote_loc}")
+            try:
+                messages.append(f"Reading remote log from HDFS: {remote_loc}.")
+                logs.extend(self.hook.read_file(remote_loc).splitlines())

Review Comment:
   ```suggestion
                   logs.append(self.hook.read_file(remote_loc))
   ```
   the `logs` object returned should be one element per file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to