Copilot commented on code in PR #64364:
URL: https://github.com/apache/airflow/pull/64364#discussion_r3025329268
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
@staticmethod
def format_url(host: str) -> str:
+ return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin): # noqa: D101
+ json_format: bool = False
+ write_stdout: bool = False
+ write_to_opensearch: bool = False
+ delete_local_copy: bool = False
+ host: str = "localhost"
+ port: int | None = 9200
+ username: str = ""
+ password: str = ""
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ base_log_folder: Path = attrs.field(converter=Path)
+ log_id_template: str = (
+ conf.get("opensearch", "log_id_template", fallback="")
+ or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ )
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ self.host = _format_url(self.host)
+ self.port = self.port if self.port is not None else
(urlparse(self.host).port or 9200)
+ self.client = _create_opensearch_client(
+ self.host,
+ self.port,
+ self.username,
+ self.password,
+ get_os_kwargs_from_config(),
+ )
+ self.index_patterns_callable = conf.get("opensearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns = conf.get("opensearch", "index_patterns",
fallback="_all")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Emit structured task logs to stdout and/or write them directly to
OpenSearch."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+ if not local_loc.is_file():
+ return
+
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ if self.write_stdout:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ for line in log_lines:
+ sys.stdout.write(json.dumps(line) + "\n")
+ sys.stdout.flush()
+
+ if self.write_to_opensearch:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ success = self._write_to_opensearch(log_lines)
+ if success and self.delete_local_copy:
+ shutil.rmtree(os.path.dirname(local_loc))
+
+ def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
+ parsed_logs = []
+ offset = 1
+ for line in log.split("\n"):
+ if not line.strip():
+ continue
+ log_dict = json.loads(line)
+ log_dict.update({"log_id": log_id, self.offset_field: offset})
+ offset += 1
+ parsed_logs.append(log_dict)
+ return parsed_logs
+
+ def _write_to_opensearch(self, log_lines: list[dict[str, Any]]) -> bool:
"""
- Format the given host string to ensure it starts with 'http' and check
if it represents a valid URL.
+ Write logs to OpenSearch; return `True` on success and `False` on
failure.
- :params host: The host string to format and check.
+ :param log_lines: The parsed log lines to write to OpenSearch.
"""
- parsed_url = urlparse(host)
+ bulk_actions = [{"_index": self.target_index, "_source": log} for log
in log_lines]
+ try:
+ _ = helpers.bulk(self.client, bulk_actions)
+ return True
+ except helpers.BulkIndexError as bie:
+ self.log.exception("Bulk upload failed for %d log(s)",
len(bie.errors))
+ for error in bie.errors:
+ self.log.exception(error)
+ return False
+ except Exception as e:
+ self.log.exception("Unable to insert logs into OpenSearch. Reason:
%s", str(e))
+ return False
+
+ def read(self, _relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo,
LogMessages]:
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ self.log.info("Reading log %s from Opensearch", log_id)
+ response = self._os_read(log_id, 0, ti)
+ if response is not None and response.hits:
+ logs_by_host = self._group_logs_by_host(response)
+ else:
+ logs_by_host = None
- if parsed_url.scheme not in ("http", "https"):
- host = "http://" + host
- parsed_url = urlparse(host)
+ if logs_by_host is None:
+ missing_log_message = (
+ f"*** Log {log_id} not found in Opensearch. "
+ "If your task started recently, please wait a moment and
reload this page. "
+ "Otherwise, the logs for this task instance may have been
removed."
+ )
+ return [], [missing_log_message]
- if not parsed_url.netloc:
- raise ValueError(f"'{host}' is not a valid URL.")
+ header = ["".join([host for host in logs_by_host.keys()])]
+ message = []
+ for hits in logs_by_host.values():
+ for hit in hits:
+ message.append(json.dumps(_build_log_fields(hit.to_dict())))
+ return header, message
Review Comment:
`header` concatenates all host keys into a single string with no separator
(and returns it as a single-element list). For multi-host results this produces
an incorrect/ambiguous `LogSourceInfo`. Return a list of host strings (e.g.,
`list(logs_by_host.keys())`) or join with a delimiter if a single string is
required by the consumer.
##########
providers/opensearch/docs/logging/index.rst:
##########
@@ -25,6 +25,8 @@ Available only with Airflow>=3.0
Airflow can be configured to read task logs from Opensearch and optionally
write logs to stdout in standard or json format. These logs can later be
collected and forwarded to the cluster using tools like fluentd, logstash or
others.
+Airflow also supports writing logs to OpenSearch directly without requiring
additional software like fluentd or logstash. To enable this feature, set
``write_to_opensearch`` and ``json_format`` to ``True`` and ``write_stdout`` to
``False`` in ``airflow.cfg``. If you set both ``write_to_opensearch`` and
``delete_local_logs`` in the ``logging`` section to ``True``, Airflow deletes
the local copy of task logs after the direct write succeeds.
Review Comment:
The docs use the key name `write_to_opensearch`, but the provider config
additions introduce `write_to_os`, and the core config template reads
`WRITE_TO_OS`. This mismatch will confuse users and can lead to non-working
configurations. Please make the docs match the actual config key(s) supported
(or add backwards-compatible alias handling in code if both names are intended
to work).
##########
providers/opensearch/docs/logging/index.rst:
##########
@@ -52,6 +54,24 @@ To output task logs to stdout in JSON format, the following
config could be used
write_stdout = True
json_format = True
+To output task logs to OpenSearch directly, the following config could be
used: (set ``delete_local_logs`` to ``True`` if you do not want to retain a
local copy of the task log)
+
+.. code-block:: ini
+
+ [logging]
+ remote_logging = True
+ delete_local_logs = False
+
+ [opensearch]
+ host = <host>
+ port = <port>
+ username = <username>
+ password = <password>
+ write_stdout = False
+ json_format = True
+ write_to_opensearch = True
+ target_index = [name of the index to store logs]
Review Comment:
The docs use the key name `write_to_opensearch`, but the provider config
additions introduce `write_to_os`, and the core config template reads
`WRITE_TO_OS`. This mismatch will confuse users and can lead to non-working
configurations. Please make the docs match the actual config key(s) supported
(or add backwards-compatible alias handling in code if both names are intended
to work).
##########
airflow-core/src/airflow/config_templates/airflow_local_settings.py:
##########
@@ -306,34 +306,40 @@ def _default_conn_name_from(mod_path, hook_name):
)
elif OPENSEARCH_HOST:
- OPENSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
- OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
+ from airflow.providers.opensearch.log.os_task_handler import
OpensearchRemoteLogIO
+
+ OPENSEARCH_PORT = conf.getint("opensearch", "PORT", fallback=9200)
OPENSEARCH_USERNAME: str = conf.get_mandatory_value("opensearch",
"USERNAME")
OPENSEARCH_PASSWORD: str = conf.get_mandatory_value("opensearch",
"PASSWORD")
OPENSEARCH_WRITE_STDOUT: bool = conf.getboolean("opensearch",
"WRITE_STDOUT")
+ OPENSEARCH_WRITE_TO_OPENSEARCH: bool = conf.getboolean(
+ "opensearch",
+ "WRITE_TO_OS",
+ fallback=conf.getboolean("opensearch", "WRITE_TO_OS",
fallback=False),
+ )
OPENSEARCH_JSON_FORMAT: bool = conf.getboolean("opensearch",
"JSON_FORMAT")
- OPENSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("opensearch",
"JSON_FIELDS")
+ OPENSEARCH_TARGET_INDEX: str = conf.get_mandatory_value("opensearch",
"TARGET_INDEX")
OPENSEARCH_HOST_FIELD: str = conf.get_mandatory_value("opensearch",
"HOST_FIELD")
OPENSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("opensearch",
"OFFSET_FIELD")
Review Comment:
A few config issues here are likely to cause user-facing breakage: (1) the
`fallback` for `WRITE_TO_OS` redundantly reads the same key again (so it never
falls back meaningfully); (2) `TARGET_INDEX`, `HOST_FIELD`, and `OFFSET_FIELD`
are now mandatory, but the provider config declares defaults (and previous
setups likely relied on defaults). Recommend: use a single
`conf.getboolean(..., fallback=False)` for the flag, align the option name with
the documented key (`write_to_os` vs `write_to_opensearch`), and read
`TARGET_INDEX`/`HOST_FIELD`/`OFFSET_FIELD` with `fallback=` to their documented
defaults.
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -469,9 +583,10 @@ def concat_logs(hits: list[Hit]):
for hit in hits
]
else:
- message = [(host, concat_logs(hits)) for host, hits in
logs_by_host.items()] # type: ignore[misc]
+ message = [(host, self.concat_logs(hits)) for host, hits in
logs_by_host.items()] # type: ignore[misc]
else:
message = []
+ metadata["end_of_log"] = True
Review Comment:
Setting `metadata[\"end_of_log\"] = True` whenever `logs_by_host` is falsy
marks the log as complete even when no lines were returned (e.g., logs not
indexed yet). This can prematurely stop UI polling/streaming depending on how
`LogMetadata.end_of_log` is interpreted upstream. Consider leaving `end_of_log`
unchanged/False here and letting the existing \"missing log\" timeout logic
determine when to end.
```suggestion
```
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
@staticmethod
def format_url(host: str) -> str:
+ return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin): # noqa: D101
+ json_format: bool = False
+ write_stdout: bool = False
+ write_to_opensearch: bool = False
+ delete_local_copy: bool = False
+ host: str = "localhost"
+ port: int | None = 9200
+ username: str = ""
+ password: str = ""
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ base_log_folder: Path = attrs.field(converter=Path)
+ log_id_template: str = (
+ conf.get("opensearch", "log_id_template", fallback="")
+ or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ )
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ self.host = _format_url(self.host)
+ self.port = self.port if self.port is not None else
(urlparse(self.host).port or 9200)
+ self.client = _create_opensearch_client(
+ self.host,
+ self.port,
+ self.username,
+ self.password,
+ get_os_kwargs_from_config(),
+ )
+ self.index_patterns_callable = conf.get("opensearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns = conf.get("opensearch", "index_patterns",
fallback="_all")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Emit structured task logs to stdout and/or write them directly to
OpenSearch."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+ if not local_loc.is_file():
+ return
+
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ if self.write_stdout:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ for line in log_lines:
+ sys.stdout.write(json.dumps(line) + "\n")
+ sys.stdout.flush()
+
+ if self.write_to_opensearch:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ success = self._write_to_opensearch(log_lines)
+ if success and self.delete_local_copy:
+ shutil.rmtree(os.path.dirname(local_loc))
+
+ def _parse_raw_log(self, log: str, log_id: str) -> list[dict[str, Any]]:
+ parsed_logs = []
+ offset = 1
+ for line in log.split("\n"):
+ if not line.strip():
+ continue
+ log_dict = json.loads(line)
+ log_dict.update({"log_id": log_id, self.offset_field: offset})
+ offset += 1
+ parsed_logs.append(log_dict)
+ return parsed_logs
Review Comment:
`_parse_raw_log` assumes every line is valid JSON. If `json_format` is
accidentally disabled (or a non-JSON line is present), `json.loads` will raise
and can break log upload entirely. Consider catching `json.JSONDecodeError` and
either (a) logging + skipping invalid lines, or (b) wrapping raw text into a
structured payload (e.g., setting `event` to the raw line) when `json_format`
is False.
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
@staticmethod
def format_url(host: str) -> str:
+ return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin): # noqa: D101
+ json_format: bool = False
+ write_stdout: bool = False
+ write_to_opensearch: bool = False
+ delete_local_copy: bool = False
+ host: str = "localhost"
+ port: int | None = 9200
+ username: str = ""
+ password: str = ""
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ base_log_folder: Path = attrs.field(converter=Path)
+ log_id_template: str = (
+ conf.get("opensearch", "log_id_template", fallback="")
+ or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ )
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ self.host = _format_url(self.host)
+ self.port = self.port if self.port is not None else
(urlparse(self.host).port or 9200)
+ self.client = _create_opensearch_client(
+ self.host,
+ self.port,
+ self.username,
+ self.password,
+ get_os_kwargs_from_config(),
+ )
+ self.index_patterns_callable = conf.get("opensearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns = conf.get("opensearch", "index_patterns",
fallback="_all")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Emit structured task logs to stdout and/or write them directly to
OpenSearch."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+ if not local_loc.is_file():
+ return
+
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ if self.write_stdout:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ for line in log_lines:
+ sys.stdout.write(json.dumps(line) + "\n")
+ sys.stdout.flush()
+
+ if self.write_to_opensearch:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ success = self._write_to_opensearch(log_lines)
+ if success and self.delete_local_copy:
+ shutil.rmtree(os.path.dirname(local_loc))
+
Review Comment:
`delete_local_copy` currently deletes the *entire directory* containing the
log file via `shutil.rmtree(...)`. This can remove other task logs (or even the
whole base log dir, depending on the path layout). Prefer deleting only the
uploaded file (e.g., `local_loc.unlink(missing_ok=True)`) and optionally
cleaning up empty parent directories safely.
```suggestion
# Delete only the uploaded log file and optionally clean up
empty parent directories.
try:
local_loc.unlink(missing_ok=True) # type:
ignore[call-arg]
except TypeError:
# For Python versions without the 'missing_ok' parameter.
if local_loc.exists():
local_loc.unlink()
# Best-effort removal of now-empty parent directories up to
the base log folder.
with contextlib.suppress(OSError):
base_dir = Path(self.base_log_folder)
parent = local_loc.parent
while parent != base_dir and parent.is_dir():
# Stop if directory is not empty.
if any(parent.iterdir()):
break
parent.rmdir()
parent = parent.parent
```
##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -664,18 +764,185 @@ def log_name(self) -> str:
@staticmethod
def format_url(host: str) -> str:
+ return _format_url(host)
+
+
[email protected](kw_only=True)
+class OpensearchRemoteLogIO(LoggingMixin): # noqa: D101
+ json_format: bool = False
+ write_stdout: bool = False
+ write_to_opensearch: bool = False
+ delete_local_copy: bool = False
+ host: str = "localhost"
+ port: int | None = 9200
+ username: str = ""
+ password: str = ""
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ base_log_folder: Path = attrs.field(converter=Path)
+ log_id_template: str = (
+ conf.get("opensearch", "log_id_template", fallback="")
+ or "{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}"
+ )
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ self.host = _format_url(self.host)
+ self.port = self.port if self.port is not None else
(urlparse(self.host).port or 9200)
+ self.client = _create_opensearch_client(
+ self.host,
+ self.port,
+ self.username,
+ self.password,
+ get_os_kwargs_from_config(),
+ )
+ self.index_patterns_callable = conf.get("opensearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns = conf.get("opensearch", "index_patterns",
fallback="_all")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Emit structured task logs to stdout and/or write them directly to
OpenSearch."""
+ path = Path(path)
+ local_loc = path if path.is_absolute() else
self.base_log_folder.joinpath(path)
+ if not local_loc.is_file():
+ return
+
+ log_id = _render_log_id(self.log_id_template, ti, ti.try_number) #
type: ignore[arg-type]
+ if self.write_stdout:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ for line in log_lines:
+ sys.stdout.write(json.dumps(line) + "\n")
+ sys.stdout.flush()
+
+ if self.write_to_opensearch:
+ log_lines = self._parse_raw_log(local_loc.read_text(), log_id)
+ success = self._write_to_opensearch(log_lines)
+ if success and self.delete_local_copy:
+ shutil.rmtree(os.path.dirname(local_loc))
+
Review Comment:
When both `write_stdout` and `write_to_opensearch` are enabled, the log file
is read and parsed twice (`local_loc.read_text()` + `_parse_raw_log(...)`).
Read the file once and reuse the parsed lines to avoid redundant I/O and JSON
parsing.
```suggestion
if self.write_stdout or self.write_to_opensearch:
raw_log = local_loc.read_text()
log_lines = self._parse_raw_log(raw_log, log_id)
if self.write_stdout:
for line in log_lines:
sys.stdout.write(json.dumps(line) + "\n")
sys.stdout.flush()
if self.write_to_opensearch:
success = self._write_to_opensearch(log_lines)
if success and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]