Copilot commented on code in PR #64364:
URL: https://github.com/apache/airflow/pull/64364#discussion_r3025329268


##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
 
     @staticmethod
     def format_url(host: str) -> str:
+        return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    write_to_opensearch: bool = False
+    delete_local_copy: bool = False
+    host: str = "localhost"
+    port: int | None = 9200
+    username: str = ""
+    password: str = ""
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    base_log_folder: Path = attrs.field(converter=Path)
+    log_id_template: str = (
+        conf.get("opensearch", "log_id_template", fallback="")
+        or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+    )
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        self.host = _format_url(self.host)
+        self.port = self.port if self.port is not None else 
(urlparse(self.host).port or 9200)
+        self.client = _create_opensearch_client(
+            self.host,
+            self.port,
+            self.username,
+            self.password,
+            get_os_kwargs_from_config(),
+        )
+        self.index_patterns_callable = conf.get("opensearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns = conf.get("opensearch", "index_patterns", 
fallback="_all")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Emit structured task logs to stdout and/or write them directly to 
OpenSearch."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+        if not local_loc.is_file():
+            return
+
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        if self.write_stdout:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if self.write_to_opensearch:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            success = self._write_to_opensearch(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+
+    def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
+        parsed_logs = []
+        offset = 1
+        for line in log.split("\n"):
+            if not line.strip():
+                continue
+            log_dict = json.loads(line)
+            log_dict.update({"log_id": log_id, self.offset_field: offset})
+            offset += 1
+            parsed_logs.append(log_dict)
+        return parsed_logs
+
+    def _write_to_opensearch(self, log_lines: list[dict[str, Any]]) -> bool:
         """
-        Format the given host string to ensure it starts with 'http' and check 
if it represents a valid URL.
+        Write logs to OpenSearch; return `True` on success and `False` on 
failure.
 
-        :params host: The host string to format and check.
+        :param log_lines: The parsed log lines to write to OpenSearch.
         """
-        parsed_url = urlparse(host)
+        bulk_actions = [{"_index": self.target_index, "_source": log} for log 
in log_lines]
+        try:
+            _ = helpers.bulk(self.client, bulk_actions)
+            return True
+        except helpers.BulkIndexError as bie:
+            self.log.exception("Bulk upload failed for %d log(s)", 
len(bie.errors))
+            for error in bie.errors:
+                self.log.exception(error)
+            return False
+        except Exception as e:
+            self.log.exception("Unable to insert logs into OpenSearch. Reason: 
%s", str(e))
+            return False
+
+    def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        self.log.info("Reading log %s from Opensearch", log_id)
+        response = self._os_read(log_id, 0, ti)
+        if response is not None and response.hits:
+            logs_by_host = self._group_logs_by_host(response)
+        else:
+            logs_by_host = None
 
-        if parsed_url.scheme not in ("http", "https"):
-            host = "http://"; + host
-            parsed_url = urlparse(host)
+        if logs_by_host is None:
+            missing_log_message = (
+                f"*** Log {log_id} not found in Opensearch. "
+                "If your task started recently, please wait a moment and 
reload this page. "
+                "Otherwise, the logs for this task instance may have been 
removed."
+            )
+            return [], [missing_log_message]
 
-        if not parsed_url.netloc:
-            raise ValueError(f"'{host}' is not a valid URL.")
+        header = ["".join([host for host in logs_by_host.keys()])]
+        message = []
+        for hits in logs_by_host.values():
+            for hit in hits:
+                message.append(json.dumps(_build_log_fields(hit.to_dict())))
+        return header, message

Review Comment:
   `header` concatenates all host keys into a single string with no separator 
(and returns it as a single-element list). For multi-host results this produces 
an incorrect/ambiguous `LogSourceInfo`. Return a list of host strings (e.g., 
`list(logs_by_host.keys())`) or join with a delimiter if a single string is 
required by the consumer.



##########
providers/opensearch/docs/logging/index.rst:
##########
@@ -25,6 +25,8 @@ Available only with Airflow>=3.0
 
 Airflow can be configured to read task logs from Opensearch and optionally 
write logs to stdout in standard or json format. These logs can later be 
collected and forwarded to the cluster using tools like fluentd, logstash or 
others.
 
+Airflow also supports writing logs to OpenSearch directly without requiring 
additional software like fluentd or logstash. To enable this feature, set 
``write_to_opensearch`` and ``json_format`` to ``True`` and ``write_stdout`` to 
``False`` in ``airflow.cfg``. If you set both ``write_to_opensearch`` and 
``delete_local_logs`` in the ``logging`` section to ``True``, Airflow deletes 
the local copy of task logs after the direct write succeeds.

Review Comment:
   The docs use the key name `write_to_opensearch`, but the provider config 
additions introduce `write_to_os`, and the core config template reads 
`WRITE_TO_OS`. This mismatch will confuse users and can lead to non-working 
configurations. Please make the docs match the actual config key(s) supported 
(or add backwards-compatible alias handling in code if both names are intended 
to work).



##########
providers/opensearch/docs/logging/index.rst:
##########
@@ -52,6 +54,24 @@ To output task logs to stdout in JSON format, the following 
config could be used
     write_stdout = True
     json_format = True
 
+To output task logs to OpenSearch directly, the following config could be 
used: (set ``delete_local_logs`` to ``True`` if you do not want to retain a 
local copy of the task log)
+
+.. code-block:: ini
+
+    [logging]
+    remote_logging = True
+    delete_local_logs = False
+
+    [opensearch]
+    host = <host>
+    port = <port>
+    username = <username>
+    password = <password>
+    write_stdout = False
+    json_format = True
+    write_to_opensearch = True
+    target_index = [name of the index to store logs]

Review Comment:
   The docs use the key name `write_to_opensearch`, but the provider config 
additions introduce `write_to_os`, and the core config template reads 
`WRITE_TO_OS`. This mismatch will confuse users and can lead to non-working 
configurations. Please make the docs match the actual config key(s) supported 
(or add backwards-compatible alias handling in code if both names are intended 
to work).



##########
airflow-core/src/airflow/config_templates/airflow_local_settings.py:
##########
@@ -306,34 +306,40 @@ def _default_conn_name_from(mod_path, hook_name):
         )
 
     elif OPENSEARCH_HOST:
-        OPENSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
-        OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
+        from airflow.providers.opensearch.log.os_task_handler import 
OpensearchRemoteLogIO
+
+        OPENSEARCH_PORT = conf.getint("opensearch", "PORT", fallback=9200)
         OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch", 
"USERNAME")
         OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch", 
"PASSWORD")
         OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch", 
"WRITE_STDOUT")
+        OPENSEARCH_WRITE_TO_OPENSEARCH: bool = conf.getboolean(
+            "opensearch",
+            "WRITE_TO_OS",
+            fallback=conf.getboolean("opensearch", "WRITE_TO_OS", 
fallback=False),
+        )
         OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch", 
"JSON_FORMAT")
-        OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch", 
"JSON_FIELDS")
+        OPENSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("opensearch", 
"TARGET_INDEX")
         OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch", 
"HOST_FIELD")
         OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch", 
"OFFSET_FIELD")

Review Comment:
   A few config issues here are likely to cause user-facing breakage: (1) the 
`fallback` for `WRITE_TO_OS` redundantly reads the same key again (so it never 
falls back meaningfully); (2) `TARGET_INDEX`, `HOST_FIELD`, and `OFFSET_FIELD` 
are now mandatory, but the provider config declares defaults (and previous 
setups likely relied on defaults). Recommend: use a single 
`conf.getboolean(..., fallback=False)` for the flag, align the option name with 
the documented key (`write_to_os` vs `write_to_opensearch`), and read 
`TARGET_INDEX`/`HOST_FIELD`/`OFFSET_FIELD` with `fallback=` to their documented 
defaults.



##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -469,9 +583,10 @@ def concat_logs(hits: list[Hit]):
                     for hit in hits
                 ]
             else:
-                message = [(host, concat_logs(hits)) for host, hits in 
logs_by_host.items()]  # type: ignore[misc]
+                message = [(host, self.concat_logs(hits)) for host, hits in 
logs_by_host.items()]  # type: ignore[misc]
         else:
             message = []
+            metadata["end_of_log"] = True

Review Comment:
   Setting `metadata[\"end_of_log\"] = True` whenever `logs_by_host` is falsy 
marks the log as complete even when no lines were returned (e.g., logs not 
indexed yet). This can prematurely stop UI polling/streaming depending on how 
`LogMetadata.end_of_log` is interpreted upstream. Consider leaving `end_of_log` 
unchanged/False here and letting the existing \"missing log\" timeout logic 
determine when to end.
   ```suggestion
   
   ```



##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
 
     @staticmethod
     def format_url(host: str) -> str:
+        return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    write_to_opensearch: bool = False
+    delete_local_copy: bool = False
+    host: str = "localhost"
+    port: int | None = 9200
+    username: str = ""
+    password: str = ""
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    base_log_folder: Path = attrs.field(converter=Path)
+    log_id_template: str = (
+        conf.get("opensearch", "log_id_template", fallback="")
+        or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+    )
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        self.host = _format_url(self.host)
+        self.port = self.port if self.port is not None else 
(urlparse(self.host).port or 9200)
+        self.client = _create_opensearch_client(
+            self.host,
+            self.port,
+            self.username,
+            self.password,
+            get_os_kwargs_from_config(),
+        )
+        self.index_patterns_callable = conf.get("opensearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns = conf.get("opensearch", "index_patterns", 
fallback="_all")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Emit structured task logs to stdout and/or write them directly to 
OpenSearch."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+        if not local_loc.is_file():
+            return
+
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        if self.write_stdout:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if self.write_to_opensearch:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            success = self._write_to_opensearch(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+
+    def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
+        parsed_logs = []
+        offset = 1
+        for line in log.split("\n"):
+            if not line.strip():
+                continue
+            log_dict = json.loads(line)
+            log_dict.update({"log_id": log_id, self.offset_field: offset})
+            offset += 1
+            parsed_logs.append(log_dict)
+        return parsed_logs

Review Comment:
   `_parse_raw_log` assumes every line is valid JSON. If `json_format` is 
accidentally disabled (or a non-JSON line is present), `json.loads` will raise 
and can break log upload entirely. Consider catching `json.JSONDecodeError` and 
either (a) logging + skipping invalid lines, or (b) wrapping raw text into a 
structured payload (e.g., setting `event` to the raw line) when `json_format` 
is False.



##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
 
     @staticmethod
     def format_url(host: str) -> str:
+        return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    write_to_opensearch: bool = False
+    delete_local_copy: bool = False
+    host: str = "localhost"
+    port: int | None = 9200
+    username: str = ""
+    password: str = ""
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    base_log_folder: Path = attrs.field(converter=Path)
+    log_id_template: str = (
+        conf.get("opensearch", "log_id_template", fallback="")
+        or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+    )
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        self.host = _format_url(self.host)
+        self.port = self.port if self.port is not None else 
(urlparse(self.host).port or 9200)
+        self.client = _create_opensearch_client(
+            self.host,
+            self.port,
+            self.username,
+            self.password,
+            get_os_kwargs_from_config(),
+        )
+        self.index_patterns_callable = conf.get("opensearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns = conf.get("opensearch", "index_patterns", 
fallback="_all")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Emit structured task logs to stdout and/or write them directly to 
OpenSearch."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+        if not local_loc.is_file():
+            return
+
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        if self.write_stdout:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if self.write_to_opensearch:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            success = self._write_to_opensearch(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+

Review Comment:
   `delete_local_copy` currently deletes the *entire directory* containing the 
log file via `shutil.rmtree(...)`. This can remove other task logs (or even the 
whole base log dir, depending on the path layout). Prefer deleting only the 
uploaded file (e.g., `local_loc.unlink(missing_ok=True)`) and optionally 
cleaning up empty parent directories safely.
   ```suggestion
                   # Delete only the uploaded log file and optionally clean up 
empty parent directories.
                   try:
                       local_loc.unlink(missing_ok=True)  # type: 
ignore[call-arg]
                   except TypeError:
                       # For Python versions without the 'missing_ok' parameter.
                       if local_loc.exists():
                           local_loc.unlink()
   
                   # Best-effort removal of now-empty parent directories up to 
the base log folder.
                   with contextlib.suppress(OSError):
                       base_dir = Path(self.base_log_folder)
                       parent = local_loc.parent
                       while parent != base_dir and parent.is_dir():
                           # Stop if directory is not empty.
                           if any(parent.iterdir()):
                               break
                           parent.rmdir()
                           parent = parent.parent
   ```



##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
 
     @staticmethod
     def format_url(host: str) -> str:
+        return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    write_to_opensearch: bool = False
+    delete_local_copy: bool = False
+    host: str = "localhost"
+    port: int | None = 9200
+    username: str = ""
+    password: str = ""
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    base_log_folder: Path = attrs.field(converter=Path)
+    log_id_template: str = (
+        conf.get("opensearch", "log_id_template", fallback="")
+        or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+    )
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        self.host = _format_url(self.host)
+        self.port = self.port if self.port is not None else 
(urlparse(self.host).port or 9200)
+        self.client = _create_opensearch_client(
+            self.host,
+            self.port,
+            self.username,
+            self.password,
+            get_os_kwargs_from_config(),
+        )
+        self.index_patterns_callable = conf.get("opensearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns = conf.get("opensearch", "index_patterns", 
fallback="_all")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Emit structured task logs to stdout and/or write them directly to 
OpenSearch."""
+        path = Path(path)
+        local_loc = path if path.is_absolute() else 
self.base_log_folder.joinpath(path)
+        if not local_loc.is_file():
+            return
+
+        log_id = _render_log_id(self.log_id_template, ti, ti.try_number)  # 
type: ignore[arg-type]
+        if self.write_stdout:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if self.write_to_opensearch:
+            log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+            success = self._write_to_opensearch(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+

Review Comment:
   When both `write_stdout` and `write_to_opensearch` are enabled, the log file 
is read and parsed twice (`local_loc.read_text()` + `_parse_raw_log(...)`). 
Read the file once and reuse the parsed lines to avoid redundant I/O and JSON 
parsing.
   ```suggestion
   
           if self.write_stdout or self.write_to_opensearch:
               raw_log = local_loc.read_text()
               log_lines = self._parse_raw_log(raw_log, log_id)
   
               if self.write_stdout:
                   for line in log_lines:
                       sys.stdout.write(json.dumps(line) + "\n")
                       sys.stdout.flush()
   
               if self.write_to_opensearch:
                   success = self._write_to_opensearch(log_lines)
                   if success and self.delete_local_copy:
                       shutil.rmtree(os.path.dirname(local_loc))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to