This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 96eefca6d2f fix: elasticsearch / opensearch logging exception details 
are missing in task log tab (#63739)
96eefca6d2f is described below

commit 96eefca6d2f59205df9b71f31e70cda7d73ffc0d
Author: Subham <[email protected]>
AuthorDate: Sun Mar 22 20:33:15 2026 +0530

    fix: elasticsearch / opensearch logging exception details are missing in 
task log tab (#63739)
---
 .../src/airflow/example_dags/example_failed_dag.py |  39 ++++
 .../test_remote_logging_elasticsearch.py           |  42 ++++
 .../providers/elasticsearch/log/es_task_handler.py |  57 ++++-
 .../elasticsearch/log/test_es_remote_log_io.py     |  33 ++-
 .../unit/elasticsearch/log/test_es_task_handler.py | 256 +++++++++++++++++++++
 .../providers/opensearch/log/os_task_handler.py    |  54 ++++-
 .../unit/opensearch/log/test_os_task_handler.py    | 217 +++++++++++++++++
 7 files changed, 686 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/example_dags/example_failed_dag.py 
b/airflow-core/src/airflow/example_dags/example_failed_dag.py
new file mode 100644
index 00000000000..5b8cae60f27
--- /dev/null
+++ b/airflow-core/src/airflow/example_dags/example_failed_dag.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Example DAG demonstrating task failure to generate exception traces in 
logs."""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow.sdk import DAG, task
+
+
+@task
+def fail_task():
+    """A task that always fails to generate error_detail."""
+    raise RuntimeError("This is a test exception for stacktrace rendering")
+
+
+with DAG(
+    "example_failed_dag",
+    schedule="@once",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=["example"],
+) as dag:
+    fail_task()
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
 
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
index 5c07a04203a..c17f5109f80 100644
--- 
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
+++ 
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
@@ -101,3 +101,45 @@ class TestRemoteLoggingElasticsearch:
         assert any(self.expected_message in event for event in events), (
             f"Expected task logs to contain {self.expected_message!r}, got 
events: {events}"
         )
+
+    def test_remote_logging_elasticsearch_error_detail(self):
+        """Test that log error_detail is retrieved correctly from 
Elasticsearch."""
+        dag_id = "example_failed_dag"
+        task_id = "fail_task"
+
+        self.airflow_client.un_pause_dag(dag_id)
+        resp = self.airflow_client.trigger_dag(
+            dag_id,
+            json={"logical_date": datetime.now(timezone.utc).isoformat()},
+        )
+        run_id = resp["dag_run_id"]
+        state = self.airflow_client.wait_for_dag_run(dag_id=dag_id, 
run_id=run_id)
+
+        assert state == "failed"
+
+        # Logs might take some time to appear in ES
+        task_logs_content = []
+        for _ in range(self.max_retries):
+            task_logs_resp = self.airflow_client.get_task_logs(
+                dag_id=dag_id,
+                task_id=task_id,
+                run_id=run_id,
+            )
+            task_logs_content = task_logs_resp.get("content", [])
+            # Search for the log entry with error_detail
+            if any("error_detail" in item for item in task_logs_content if 
isinstance(item, dict)):
+                break
+            time.sleep(self.retry_interval_in_seconds)
+
+        error_entries = [
+            item for item in task_logs_content if isinstance(item, dict) and 
"error_detail" in item
+        ]
+        assert len(error_entries) > 0, (
+            f"Expected error_detail in logs, but none found. Logs: 
{task_logs_content}"
+        )
+
+        error_detail = error_entries[0]["error_detail"]
+        assert isinstance(error_detail, list), f"Expected error_detail to be a 
list, got {type(error_detail)}"
+        assert len(error_detail) > 0, "Expected error_detail to have at least 
one exception"
+        assert error_detail[0]["exc_type"] == "RuntimeError"
+        assert "This is a test exception for stacktrace rendering" in 
error_detail[0]["exc_value"]
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 60080658ae9..f063749e523 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -79,7 +79,56 @@ LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
 # not exist, the task handler should use the log_id_template attribute instead.
 USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
 
-TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
+TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", 
"error_detail", "message", "levelname"]
+
+
+def _format_error_detail(error_detail: Any) -> str | None:
+    """Render the structured ``error_detail`` written by the Airflow 3 
supervisor as a traceback string."""
+    if not error_detail:
+        return None
+    if not isinstance(error_detail, list):
+        return str(error_detail)
+
+    lines: list[str] = ["Traceback (most recent call last):"]
+    for exc_info in error_detail:
+        if not isinstance(exc_info, dict):
+            lines.append(str(exc_info))
+            continue
+        if exc_info.get("is_cause"):
+            lines.append("\nThe above exception was the direct cause of the 
following exception:\n")
+            lines.append("Traceback (most recent call last):")
+        for frame in exc_info.get("frames", []):
+            lines.append(
+                f'  File "{frame.get("filename", "<unknown>")}", line 
{frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
+            )
+        exc_type = exc_info.get("exc_type", "")
+        exc_value = exc_info.get("exc_value", "")
+        if exc_type:
+            lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
+    return "\n".join(lines)
+
+
+def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
+    """Filter an ES hit to ``TASK_LOG_FIELDS`` and ensure compatibility with 
StructuredLogMessage."""
+    fields = {k: v for k, v in hit_dict.items() if k.lower() in 
TASK_LOG_FIELDS or k == "@timestamp"}
+
+    # Map @timestamp to timestamp
+    if "@timestamp" in fields and "timestamp" not in fields:
+        fields["timestamp"] = fields.pop("@timestamp")
+
+    # Map levelname to level
+    if "levelname" in fields and "level" not in fields:
+        fields["level"] = fields.pop("levelname")
+
+    # Airflow 3 StructuredLogMessage requires 'event'
+    if "event" not in fields:
+        fields["event"] = fields.pop("message", "")
+
+    # Clean up error_detail if it's empty
+    if "error_detail" in fields and not fields["error_detail"]:
+        fields.pop("error_detail")
+    return fields
+
 
 VALID_ES_CONFIG_KEYS = 
set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys())
 # Remove `self` from the valid set of kwargs
@@ -356,9 +405,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
 
                 # Flatten all hits, filter to only desired fields, and 
construct StructuredLogMessage objects
                 message = header + [
-                    StructuredLogMessage(
-                        **{k: v for k, v in hit.to_dict().items() if k.lower() 
in TASK_LOG_FIELDS}
-                    )
+                    StructuredLogMessage(**_build_log_fields(hit.to_dict()))
                     for hits in logs_by_host.values()
                     for hit in hits
                 ]
@@ -668,7 +715,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
         # Structured log messages
         for hits in logs_by_host.values():
             for hit in hits:
-                filtered = {k: v for k, v in hit.to_dict().items() if 
k.lower() in TASK_LOG_FIELDS}
+                filtered = _build_log_fields(hit.to_dict())
                 message.append(json.dumps(filtered))
 
         return header, message
diff --git 
a/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
 
b/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
index aa8e91f812d..89a12a3a39b 100644
--- 
a/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
+++ 
b/providers/elasticsearch/tests/integration/elasticsearch/log/test_es_remote_log_io.py
@@ -24,7 +24,7 @@ from unittest.mock import patch
 import elasticsearch
 import pytest
 
-from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchRemoteLogIO
+from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchRemoteLogIO, _render_log_id
 
 # The ES service hostname as defined in 
scripts/ci/docker-compose/integration-elasticsearch.yml
 ES_HOST = "http://elasticsearch:9200";
@@ -101,8 +101,8 @@ class TestElasticsearchRemoteLogIOIntegration:
         expected_messages = ["start", "processing", "end"]
         for expected, log_message in zip(expected_messages, log_messages):
             log_entry = json.loads(log_message)
-            assert "message" in log_entry
-            assert log_entry["message"] == expected
+            assert "event" in log_entry
+            assert log_entry["event"] == expected
 
     def test_read_missing_log(self, ti):
         """Verify that a missing log returns the expected error message.
@@ -118,3 +118,30 @@ class TestElasticsearchRemoteLogIOIntegration:
         assert log_source_info == []
         assert len(log_messages) == 1
         assert "not found in Elasticsearch" in log_messages[0]
+
+    def test_read_error_detail_integration(self, ti):
+        """Verify that error_detail is correctly retrieved and formatted in 
integration tests."""
+        # Manually index a log entry with error_detail
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "Woopsie. Something went wrong.",
+            }
+        ]
+        body = {
+            "event": "Task failed with exception",
+            "log_id": _render_log_id(self.elasticsearch_io.log_id_template, 
ti, ti.try_number),
+            "offset": 1,
+            "error_detail": error_detail,
+        }
+        self.elasticsearch_io.client.index(index=self.target_index, body=body)
+        self.elasticsearch_io.client.indices.refresh(index=self.target_index)
+
+        log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+
+        assert len(log_messages) == 1
+        log_entry = json.loads(log_messages[0])
+        assert "error_detail" in log_entry
+        assert log_entry["error_detail"] == error_detail
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index 6698f27efa3..8d6a0ba616c 100644
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++ 
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -1060,3 +1060,259 @@ class TestElasticsearchRemoteLogIO:
         assert log_source_info == []
         assert f"*** Log {log_id} not found in Elasticsearch" in 
log_messages[0]
         mocked_count.assert_called_once()
+
+    def test_read_error_detail(self, ti):
+        """Verify that error_detail is correctly retrieved and formatted."""
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "Woopsie. Something went wrong.",
+            }
+        ]
+        body = {
+            "event": "Task failed with exception",
+            "log_id": _render_log_id(self.elasticsearch_io.log_id_template, 
ti, ti.try_number),
+            "offset": 1,
+            "error_detail": error_detail,
+        }
+
+        from airflow.providers.elasticsearch.log.es_response import Hit
+
+        mock_hit = Hit({"_source": body})
+        with (
+            patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
+            patch.object(
+                self.elasticsearch_io,
+                "_group_logs_by_host",
+                return_value={"http://localhost:9200": [mock_hit]},
+            ),
+        ):
+            mock_es_read.return_value = mock.MagicMock()
+            mock_es_read.return_value.hits = [mock_hit]
+
+            log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+
+            assert len(log_messages) == 1
+            log_entry = json.loads(log_messages[0])
+            assert "error_detail" in log_entry
+            assert log_entry["error_detail"] == error_detail
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+    """Unit tests for _format_error_detail."""
+
+    def test_returns_none_for_empty(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+
+        assert _format_error_detail(None) is None
+        assert _format_error_detail([]) is None
+
+    def test_returns_string_for_non_list(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+
+        assert _format_error_detail("raw string") == "raw string"
+
+    def test_formats_single_exception(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [
+                    {"filename": "/app/task.py", "lineno": 13, "name": 
"log_and_raise"},
+                ],
+                "exc_type": "RuntimeError",
+                "exc_value": "Something went wrong.",
+                "exceptions": [],
+                "is_group": False,
+            }
+        ]
+        result = _format_error_detail(error_detail)
+        assert result is not None
+        assert "Traceback (most recent call last):" in result
+        assert 'File "/app/task.py", line 13, in log_and_raise' in result
+        assert "RuntimeError: Something went wrong." in result
+
+    def test_formats_chained_exceptions(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+
+        error_detail = [
+            {
+                "is_cause": True,
+                "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+                "exc_type": "ValueError",
+                "exc_value": "original",
+                "exceptions": [],
+            },
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "wrapped",
+                "exceptions": [],
+            },
+        ]
+        result = _format_error_detail(error_detail)
+        assert result is not None
+        assert "direct cause" in result
+        assert "ValueError: original" in result
+        assert "RuntimeError: wrapped" in result
+
+    def test_exc_type_without_value(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [],
+                "exc_type": "StopIteration",
+                "exc_value": "",
+            }
+        ]
+        result = _format_error_detail(error_detail)
+        assert result is not None
+        assert result.endswith("StopIteration")
+
+    def test_non_dict_items_are_stringified(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_format_error_detail
+
+        result = _format_error_detail(["unexpected string item"])
+        assert result is not None
+        assert "unexpected string item" in result
+
+
+class TestBuildStructuredLogFields:
+    """Unit tests for _build_log_fields."""
+
+    def test_filters_to_allowed_fields(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        hit = {"event": "hello", "level": "info", "unknown_field": "should be 
dropped"}
+        result = _build_log_fields(hit)
+        assert "event" in result
+        assert "level" in result
+        assert "unknown_field" not in result
+
+    def test_message_mapped_to_event(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+        fields = _build_log_fields(hit)
+        assert fields["event"] == "plain message"
+        assert "message" not in fields  # Ensure it is popped if used as event
+
+    def test_message_preserved_if_event_exists(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        hit = {"event": "structured event", "message": "plain message"}
+        fields = _build_log_fields(hit)
+        assert fields["event"] == "structured event"
+        # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide 
with event
+        assert fields["message"] == "plain message"
+
+    def test_levelname_mapped_to_level(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        hit = {"event": "msg", "levelname": "ERROR"}
+        result = _build_log_fields(hit)
+        assert result["level"] == "ERROR"
+        assert "levelname" not in result
+
+    def test_at_timestamp_mapped_to_timestamp(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+        result = _build_log_fields(hit)
+        assert result["timestamp"] == "2024-01-01T00:00:00Z"
+        assert "@timestamp" not in result
+
+    def test_error_detail_is_kept_as_list(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/dag.py", "lineno": 10, "name": 
"run"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "Woopsie.",
+            }
+        ]
+        hit = {
+            "event": "Task failed with exception",
+            "error_detail": error_detail,
+        }
+        result = _build_log_fields(hit)
+        assert result["error_detail"] == error_detail
+
+    def test_error_detail_dropped_when_empty(self):
+        from airflow.providers.elasticsearch.log.es_task_handler import 
_build_log_fields
+
+        hit = {"event": "msg", "error_detail": []}
+        result = _build_log_fields(hit)
+        assert "error_detail" not in result
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage 
only exists in Airflow 3+")
+    @elasticmock
+    def test_read_includes_error_detail_in_structured_message(self):
+        """End-to-end: a hit with error_detail should surface it in the 
returned StructuredLogMessage."""
+        from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler
+
+        local_log_location = "local/log/location"
+        handler = ElasticsearchTaskHandler(
+            base_log_folder=local_log_location,
+            end_of_log_mark="end_of_log\n",
+            write_stdout=False,
+            json_format=False,
+            json_fields="asctime,filename,lineno,levelname,message,exc_text",
+        )
+
+        es = elasticsearch.Elasticsearch("http://localhost:9200";)
+        log_id = "test_dag-test_task-test_run--1-1"
+        body = {
+            "event": "Task failed with exception",
+            "log_id": log_id,
+            "offset": 1,
+            "error_detail": [
+                {
+                    "is_cause": False,
+                    "frames": [
+                        {"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}
+                    ],
+                    "exc_type": "RuntimeError",
+                    "exc_value": "Woopsie. Something went wrong.",
+                }
+            ],
+        }
+        es.index(index="test_index", doc_type="log", body=body, id=1)
+
+        # Patch the IO layer to return our fake document
+        mock_hit_dict = body.copy()
+
+        from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse, Hit
+
+        mock_hit = Hit({"_source": mock_hit_dict})
+        mock_response = mock.MagicMock(spec=ElasticSearchResponse)
+        mock_response.hits = [mock_hit]
+        mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
+        mock_response.__bool__ = mock.Mock(return_value=True)
+        mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
+
+        with mock.patch.object(handler.io, "_es_read", 
return_value=mock_response):
+            with mock.patch.object(handler.io, "_group_logs_by_host", 
return_value={"localhost": [mock_hit]}):
+                # Build StructuredLogMessages
+                from airflow.providers.elasticsearch.log.es_task_handler 
import _build_log_fields
+                from airflow.utils.log.file_task_handler import 
StructuredLogMessage
+
+                fields = _build_log_fields(mock_hit.to_dict())
+                msg = StructuredLogMessage(**fields)
+
+                assert msg.event == "Task failed with exception"
+                assert hasattr(msg, "error_detail")
+                assert msg.error_detail == body["error_detail"]
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
index c76980e5106..05f0ff90cbf 100644
--- 
a/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
+++ 
b/providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py
@@ -58,7 +58,55 @@ else:
 
 USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
 LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
-TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger"]
+TASK_LOG_FIELDS = ["timestamp", "event", "level", "chan", "logger", 
"error_detail", "message", "levelname"]
+
+
+def _format_error_detail(error_detail: Any) -> str | None:
+    """Render the structured ``error_detail`` written by the Airflow 3 
supervisor as a traceback string."""
+    if not error_detail:
+        return None
+    if not isinstance(error_detail, list):
+        return str(error_detail)
+
+    lines: list[str] = ["Traceback (most recent call last):"]
+    for exc_info in error_detail:
+        if not isinstance(exc_info, dict):
+            lines.append(str(exc_info))
+            continue
+        if exc_info.get("is_cause"):
+            lines.append("\nThe above exception was the direct cause of the 
following exception:\n")
+            lines.append("Traceback (most recent call last):")
+        for frame in exc_info.get("frames", []):
+            lines.append(
+                f'  File "{frame.get("filename", "<unknown>")}", line 
{frame.get("lineno", "?")}, in {frame.get("name", "<unknown>")}'
+            )
+        exc_type = exc_info.get("exc_type", "")
+        exc_value = exc_info.get("exc_value", "")
+        if exc_type:
+            lines.append(f"{exc_type}: {exc_value}" if exc_value else exc_type)
+    return "\n".join(lines)
+
+
+def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
+    """Filter an OpenSearch hit to ``TASK_LOG_FIELDS`` and ensure 
compatibility with StructuredLogMessage."""
+    fields = {k: v for k, v in hit_dict.items() if k.lower() in 
TASK_LOG_FIELDS or k == "@timestamp"}
+
+    # Map @timestamp to timestamp
+    if "@timestamp" in fields and "timestamp" not in fields:
+        fields["timestamp"] = fields.pop("@timestamp")
+
+    # Map levelname to level
+    if "levelname" in fields and "level" not in fields:
+        fields["level"] = fields.pop("levelname")
+
+    # Airflow 3 StructuredLogMessage requires 'event'
+    if "event" not in fields:
+        fields["event"] = fields.pop("message", "")
+
+    # Clean up error_detail if it's empty
+    if "error_detail" in fields and not fields["error_detail"]:
+        fields.pop("error_detail")
+    return fields
 
 
 def getattr_nested(obj, item, default):
@@ -416,9 +464,7 @@ class OpensearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMixin)
 
                 # Flatten all hits, filter to only desired fields, and 
construct StructuredLogMessage objects
                 message = header + [
-                    StructuredLogMessage(
-                        **{k: v for k, v in hit.to_dict().items() if k.lower() 
in TASK_LOG_FIELDS}
-                    )
+                    StructuredLogMessage(**_build_log_fields(hit.to_dict()))
                     for hits in logs_by_host.values()
                     for hit in hits
                 ]
diff --git 
a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py 
b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
index 4dc46c1d89b..15aba25ae8b 100644
--- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
+++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py
@@ -568,3 +568,220 @@ def test_retrieve_config_keys():
         # http_compress comes from config value
         assert "http_compress" in args_from_config
         assert "self" not in args_from_config
+
+
+# ---------------------------------------------------------------------------
+# Tests for the error_detail helpers (issue #63736)
+# ---------------------------------------------------------------------------
+
+
+class TestFormatErrorDetail:
+    """Unit tests for _format_error_detail."""
+
+    def test_returns_none_for_empty(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+
+        assert _format_error_detail(None) is None
+        assert _format_error_detail([]) is None
+
+    def test_returns_string_for_non_list(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+
+        assert _format_error_detail("raw string") == "raw string"
+
+    def test_formats_single_exception(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [
+                    {"filename": "/app/task.py", "lineno": 13, "name": 
"log_and_raise"},
+                ],
+                "exc_type": "RuntimeError",
+                "exc_value": "Something went wrong.",
+                "exceptions": [],
+                "is_group": False,
+            }
+        ]
+        result = _format_error_detail(error_detail)
+        assert result is not None
+        assert "Traceback (most recent call last):" in result
+        assert 'File "/app/task.py", line 13, in log_and_raise' in result
+        assert "RuntimeError: Something went wrong." in result
+
+    def test_formats_chained_exceptions(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+
+        error_detail = [
+            {
+                "is_cause": True,
+                "frames": [{"filename": "/a.py", "lineno": 1, "name": "foo"}],
+                "exc_type": "ValueError",
+                "exc_value": "original",
+                "exceptions": [],
+            },
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/b.py", "lineno": 2, "name": "bar"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "wrapped",
+                "exceptions": [],
+            },
+        ]
+        result = _format_error_detail(error_detail)
+        assert result is not None
+        assert "direct cause" in result
+        assert "ValueError: original" in result
+        assert "RuntimeError: wrapped" in result
+
+    def test_exc_type_without_value(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [],
+                "exc_type": "StopIteration",
+                "exc_value": "",
+            }
+        ]
+        result = _format_error_detail(error_detail)
+        assert result is not None
+        assert result.endswith("StopIteration")
+
+    def test_non_dict_items_are_stringified(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_format_error_detail
+
+        result = _format_error_detail(["unexpected string item"])
+        assert result is not None
+        assert "unexpected string item" in result
+
+
+class TestBuildLogFields:
+    """Unit tests for _build_log_fields."""
+
+    def test_filters_to_allowed_fields(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        hit = {"event": "hello", "level": "info", "unknown_field": "should be 
dropped"}
+        result = _build_log_fields(hit)
+        assert "event" in result
+        assert "level" in result
+        assert "unknown_field" not in result
+
+    def test_message_mapped_to_event(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
+        fields = _build_log_fields(hit)
+        assert fields["event"] == "plain message"
+        assert "message" not in fields  # Ensure it is popped if used as event
+
+    def test_message_preserved_if_event_exists(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        hit = {"event": "structured event", "message": "plain message"}
+        fields = _build_log_fields(hit)
+        assert fields["event"] == "structured event"
+        # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide 
with event
+        assert fields["message"] == "plain message"
+
+    def test_levelname_mapped_to_level(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        hit = {"event": "msg", "levelname": "ERROR"}
+        result = _build_log_fields(hit)
+        assert result["level"] == "ERROR"
+        assert "levelname" not in result
+
+    def test_at_timestamp_mapped_to_timestamp(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
+        result = _build_log_fields(hit)
+        assert result["timestamp"] == "2024-01-01T00:00:00Z"
+        assert "@timestamp" not in result
+
+    def test_error_detail_is_kept_as_list(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        error_detail = [
+            {
+                "is_cause": False,
+                "frames": [{"filename": "/dag.py", "lineno": 10, "name": 
"run"}],
+                "exc_type": "RuntimeError",
+                "exc_value": "Woopsie.",
+            }
+        ]
+        hit = {
+            "event": "Task failed with exception",
+            "error_detail": error_detail,
+        }
+        result = _build_log_fields(hit)
+        assert result["error_detail"] == error_detail
+
+    def test_error_detail_dropped_when_empty(self):
+        from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+
+        hit = {"event": "msg", "error_detail": []}
+        result = _build_log_fields(hit)
+        assert "error_detail" not in result
+
+    @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage 
only exists in Airflow 3+")
+    def test_read_includes_error_detail_in_structured_message(self):
+        """End-to-end: a hit with error_detail should surface it in the 
returned StructuredLogMessage."""
+        from airflow.providers.opensearch.log.os_task_handler import 
OpensearchTaskHandler
+
+        local_log_location = "local/log/location"
+        handler = OpensearchTaskHandler(
+            base_log_folder=local_log_location,
+            end_of_log_mark="end_of_log\n",
+            write_stdout=False,
+            json_format=False,
+            json_fields="asctime,filename,lineno,levelname,message,exc_text",
+            host="localhost",
+            port=9200,
+            username="admin",
+            password="password",
+        )
+
+        log_id = "test_dag-test_task-test_run--1-1"
+        body = {
+            "event": "Task failed with exception",
+            "log_id": log_id,
+            "offset": 1,
+            "error_detail": [
+                {
+                    "is_cause": False,
+                    "frames": [
+                        {"filename": "/opt/airflow/dags/fail.py", "lineno": 
13, "name": "log_and_raise"}
+                    ],
+                    "exc_type": "RuntimeError",
+                    "exc_value": "Woopsie. Something went wrong.",
+                }
+            ],
+        }
+
+        # Instead of firing up an OpenSearch client, we patch the IO and 
response class
+        mock_hit_dict = body.copy()
+        from airflow.providers.opensearch.log.os_response import Hit, 
OpensearchResponse
+
+        mock_hit = Hit({"_source": mock_hit_dict})
+        mock_response = mock.MagicMock(spec=OpensearchResponse)
+        mock_response.hits = [mock_hit]
+        mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
+        mock_response.__bool__ = mock.Mock(return_value=True)
+        mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
+
+        with mock.patch.object(handler, "_os_read", 
return_value=mock_response):
+            with mock.patch.object(handler, "_group_logs_by_host", 
return_value={"localhost": [mock_hit]}):
+                from airflow.providers.opensearch.log.os_task_handler import 
_build_log_fields
+                from airflow.utils.log.file_task_handler import 
StructuredLogMessage
+
+                fields = _build_log_fields(mock_hit.to_dict())
+                msg = StructuredLogMessage(**fields)
+
+                assert msg.event == "Task failed with exception"
+                assert hasattr(msg, "error_detail")
+                assert msg.error_detail == body["error_detail"]


Reply via email to