Owen-CH-Leung commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2287783246


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +610,66 @@ def _get_result(self, hit: dict[Any, Any], 
parent_class=None) -> Hit:
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+
[email protected](kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    write_stdout: bool = False
+    delete_local_copy: bool = False
+    host: str = "http://localhost:9200";
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    write_to_es: bool = False
+    base_log_folder: Path = attrs.field(converter=Path)
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        es_kwargs = get_es_kwargs_from_config()
+        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.index_patterns_callable = conf.get("elasticsearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Write the log to ElasticSearch."""
+        path = Path(path)
+
+        if path.is_absolute():
+            local_loc = path
+        else:
+            local_loc = self.base_log_folder.joinpath(path)
+
+        if local_loc.is_file() and self.write_stdout:
+            # Intentionally construct the log_id and offset field
+            log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if local_loc.is_file() and self.write_to_es:
+            log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+            success = self._write_to_es(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+
+    def _parse_raw_log(self, log: str, ti: RuntimeTI) -> list[dict[str, Any]]:
         logs = log.split("\n")
         parsed_logs = []
+        offset = 1
         for line in logs:
             # Make sure line is not empty
             if line.strip():
-                parsed_logs.append(json.loads(line))
+                # construct log_id which is 
{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+                # also construct the offset field (default is 'offset')
+                log_dict = json.loads(line)
+                log_id = 
f"{ti.dag_id}-{ti.task_id}-{ti.run_id}-{ti.map_index}-{ti.try_number}"

Review Comment:
   I just tried. `str(ti.id)` will return a uuid like  which doesn't fit into 
the specific requirement of elasticsearch 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to