Owen-CH-Leung commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2287783246
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +610,66 @@ def _get_result(self, hit: dict[Any, Any],
parent_class=None) -> Hit:
callback: type[Hit] | Callable[..., Any] = getattr(doc_class,
"from_es", doc_class)
return callback(hit)
- def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+
[email protected](kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
+ write_stdout: bool = False
+ delete_local_copy: bool = False
+ host: str = "http://localhost:9200"
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ write_to_es: bool = False
+ base_log_folder: Path = attrs.field(converter=Path)
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ es_kwargs = get_es_kwargs_from_config()
+ self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+ self.index_patterns_callable = conf.get("elasticsearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Write the log to ElasticSearch."""
+ path = Path(path)
+
+ if path.is_absolute():
+ local_loc = path
+ else:
+ local_loc = self.base_log_folder.joinpath(path)
+
+ if local_loc.is_file() and self.write_stdout:
+ # Intentionally construct the log_id and offset field
+ log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+ for line in log_lines:
+ sys.stdout.write(json.dumps(line) + "\n")
+ sys.stdout.flush()
+
+ if local_loc.is_file() and self.write_to_es:
+ log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+ success = self._write_to_es(log_lines)
+ if success and self.delete_local_copy:
+ shutil.rmtree(os.path.dirname(local_loc))
+
+ def _parse_raw_log(self, log: str, ti: RuntimeTI) -> list[dict[str, Any]]:
logs = log.split("\n")
parsed_logs = []
+ offset = 1
for line in logs:
# Make sure line is not empty
if line.strip():
- parsed_logs.append(json.loads(line))
+ # construct log_id which is
{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+ # also construct the offset field (default is 'offset')
+ log_dict = json.loads(line)
+ log_id =
f"{ti.dag_id}-{ti.task_id}-{ti.run_id}-{ti.map_index}-{ti.try_number}"
Review Comment:
I just tried. `str(ti.id)` will return a uuid like which doesn't fit into
the specific requirement of elasticsearch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]