topherinternational commented on code in PR #53821: URL: https://github.com/apache/airflow/pull/53821#discussion_r2314519678
########## providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py: ########## @@ -661,13 +656,57 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) - def _parse_raw_log(self, log: str) -> list[dict[str, Any]]: + +def getattr_nested(obj, item, default): + """ + Get item from obj but return default if not found. + + E.g. calling ``getattr_nested(a, 'b.c', "NA")`` will return + ``a.b.c`` if such a value exists, and "NA" otherwise. + + :meta private: + """ + try: + return attrgetter(item)(obj) + except AttributeError: + return default + + +@attrs.define(kw_only=True) +class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101 + host: str + target_index: str + base_log_folder: Path = attrs.field(converter=Path) + delete_local_copy: bool + + processors = () Review Comment: @ashb I took a quick look at how we'd make this a processor in the future and I got stuck on the fact that while the `upload()` method takes the `ti` as an argument, the process function takes only the `event_dict`, and we need the `ti` to construct the `log_id` which needs to be on every log event that gets indexed to ES/OS. Can we expect each log event from the task logger to contain the `ti` or otherwise give us the `log_id` components? (Or is that a necessary future change?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org