jason810496 commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2329338036
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +602,79 @@ def _get_result(self, hit: dict[Any, Any],
parent_class=None) -> Hit:
callback: type[Hit] | Callable[..., Any] = getattr(doc_class,
"from_es", doc_class)
return callback(hit)
- def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+ def _get_log_message(self, hit: Hit) -> str:
+ """Get log message from hit, supporting both Airflow 2.x and 3.x
formats."""
+ if hasattr(hit, "event"):
+ return hit.event
+ if hasattr(hit, "message"):
+ return hit.message
+ return ""
+
+
[email protected](kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101
+ json_format: bool = False
+ write_stdout: bool = False
+ delete_local_copy: bool = False
+ host: str = "http://localhost:9200"
+ host_field: str = "host"
+ target_index: str = "airflow-logs"
+ offset_field: str = "offset"
+ write_to_es: bool = False
+ base_log_folder: Path = attrs.field(converter=Path)
+
+ processors = ()
+
+ def __attrs_post_init__(self):
+ es_kwargs = get_es_kwargs_from_config()
+ self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+ self.index_patterns_callable = conf.get("elasticsearch",
"index_patterns_callable", fallback="")
+ self.PAGE = 0
+ self.MAX_LINE_PER_PAGE = 1000
+ self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+ self._doc_type_map: dict[Any, Any] = {}
+ self._doc_type: list[Any] = []
+
+ def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+ """Write the log to ElasticSearch."""
+ path = Path(path)
+
+ if path.is_absolute():
+ local_loc = path
+ else:
+ local_loc = self.base_log_folder.joinpath(path)
+
+ # Convert the runtimeTI to the real TaskInstance that via fetching
from DB
+ ti = TaskInstance.get_task_instance(
+ ti.dag_id, ti.run_id, ti.task_id, ti.map_index if ti.map_index is
not None else -1
+ ) # type: ignore[assignment]
Review Comment:
Sorry Owen, I thought there was ElasticSearch integration for Breeze before.
For workaround, we can achieve integration with ElasticSearch with Breeze by
adding the change of `dev/breeze/src/airflow_breeze/global_constants.py` and
`scripts/ci/docker-compose/integration-elasticsearch.yml` in [Add elasticsearch
Breeze integration #55365](https://github.com/apache/airflow/pull/55365) PR to
your local. ( It still required some more effort to get #55365 merged, so
copying the change to local _might_ be the fastest way right now )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]