jason810496 commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2335228822


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +602,79 @@ def _get_result(self, hit: dict[Any, Any], 
parent_class=None) -> Hit:
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+    def _get_log_message(self, hit: Hit) -> str:
+        """Get log message from hit, supporting both Airflow 2.x and 3.x 
formats."""
+        if hasattr(hit, "event"):
+            return hit.event
+        if hasattr(hit, "message"):
+            return hit.message
+        return ""
+
+
[email protected](kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    json_format: bool = False
+    write_stdout: bool = False
+    delete_local_copy: bool = False
+    host: str = "http://localhost:9200";
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    write_to_es: bool = False
+    base_log_folder: Path = attrs.field(converter=Path)
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        es_kwargs = get_es_kwargs_from_config()
+        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.index_patterns_callable = conf.get("elasticsearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Write the log to ElasticSearch."""
+        path = Path(path)
+
+        if path.is_absolute():
+            local_loc = path
+        else:
+            local_loc = self.base_log_folder.joinpath(path)
+
+        # Convert the runtimeTI to the real TaskInstance that via fetching 
from DB
+        ti = TaskInstance.get_task_instance(
+            ti.dag_id, ti.run_id, ti.task_id, ti.map_index if ti.map_index is 
not None else -1
+        )  # type: ignore[assignment]

Review Comment:
   >  i'm hitting error while trying to create Runtime Task Instance:
   
   This is the difficulty I described early, and this is why Ash mentioning 
"This can't? shouldn't? work in Airflow 3"
   
   Additionally, if it's really too hard to resolve, another solution is "to 
raise a discussion about whether the new ElasticSearch TaskLogHander will need 
to support `LogTemplate` or not"? Since there are only ElasticSearch and 
OpenSearch are using `LogTemplate` DB Model, which might be a bit strange IMO, 
and maybe we can raise discussion for whether to remove them. It will still be 
compatible for Airflow 2, but for Airflow 3 we should limit ES Provider with 
the version after removing `LogTemplate`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to