topherinternational commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2314519678


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +656,57 @@ def _get_result(self, hit: dict[Any, Any], 
parent_class=None) -> Hit:
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+
+def getattr_nested(obj, item, default):
+    """
+    Get item from obj but return default if not found.
+
+    E.g. calling ``getattr_nested(a, 'b.c', "NA")`` will return
+    ``a.b.c`` if such a value exists, and "NA" otherwise.
+
+    :meta private:
+    """
+    try:
+        return attrgetter(item)(obj)
+    except AttributeError:
+        return default
+
+
+@attrs.define(kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    host: str
+    target_index: str
+    base_log_folder: Path = attrs.field(converter=Path)
+    delete_local_copy: bool
+
+    processors = ()

Review Comment:
   @ashb I took a quick look at how we'd make this a processor in the future 
and I got stuck on the fact that while the `upload()` method takes the `ti` as 
an argument, the process function takes only the `event_dict`, and we need the 
`ti` to construct the `log_id` which needs to be on every log event that gets 
indexed to ES/OS. 
   
   Can we expect each log event from the task logger to contain the `ti` or 
otherwise give us the `log_id` components? (Or is that a necessary future 
change?)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to