ashb commented on code in PR #53821:
URL: https://github.com/apache/airflow/pull/53821#discussion_r2281632395


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -189,6 +208,21 @@ def __init__(
         self.handler: logging.FileHandler | logging.StreamHandler | None = None
         self._doc_type_map: dict[Any, Any] = {}
         self._doc_type: list[Any] = []
+        self.io = ElasticsearchRemoteLogIO(
+            host=self.host,
+            target_index=self.target_index,
+            write_stdout=self.write_stdout,
+            write_to_es=self.write_to_es,
+            offset_field=self.offset_field,
+            host_field=self.host_field,
+            base_log_folder=base_log_folder,
+            delete_local_copy=self.delete_local_copy,
+        )
+        # Airflow 3 introduce REMOTE_TASK_LOG for handling remote logging
+        # REMOTE_TASK_LOG should be explicitly set in 
airflow_local_settings.py when trying to use ESTaskHandler
+        # Before airflow 3.1, REMOTE_TASK_LOG is not set when trying to use ES 
TaskHandler.
+        if AIRFLOW_V_3_0_PLUS and alc.REMOTE_TASK_LOG is None:
+            alc.REMOTE_TASK_LOG = self.io

Review Comment:
   Why do we need to know if we are on Airflow 3 or not? None of the other 
logging providers needed this check...



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -661,13 +610,66 @@ def _get_result(self, hit: dict[Any, Any], 
parent_class=None) -> Hit:
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
-    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+
[email protected](kw_only=True)
+class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
+    write_stdout: bool = False
+    delete_local_copy: bool = False
+    host: str = "http://localhost:9200";
+    host_field: str = "host"
+    target_index: str = "airflow-logs"
+    offset_field: str = "offset"
+    write_to_es: bool = False
+    base_log_folder: Path = attrs.field(converter=Path)
+
+    processors = ()
+
+    def __attrs_post_init__(self):
+        es_kwargs = get_es_kwargs_from_config()
+        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.index_patterns_callable = conf.get("elasticsearch", 
"index_patterns_callable", fallback="")
+        self.PAGE = 0
+        self.MAX_LINE_PER_PAGE = 1000
+        self.index_patterns: str = conf.get("elasticsearch", "index_patterns")
+        self._doc_type_map: dict[Any, Any] = {}
+        self._doc_type: list[Any] = []
+
+    def upload(self, path: os.PathLike | str, ti: RuntimeTI):
+        """Write the log to ElasticSearch."""
+        path = Path(path)
+
+        if path.is_absolute():
+            local_loc = path
+        else:
+            local_loc = self.base_log_folder.joinpath(path)
+
+        if local_loc.is_file() and self.write_stdout:
+            # Intentionally construct the log_id and offset field
+            log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+            for line in log_lines:
+                sys.stdout.write(json.dumps(line) + "\n")
+                sys.stdout.flush()
+
+        if local_loc.is_file() and self.write_to_es:
+            log_lines = self._parse_raw_log(local_loc.read_text(), ti)
+            success = self._write_to_es(log_lines)
+            if success and self.delete_local_copy:
+                shutil.rmtree(os.path.dirname(local_loc))
+
+    def _parse_raw_log(self, log: str, ti: RuntimeTI) -> list[dict[str, Any]]:
         logs = log.split("\n")
         parsed_logs = []
+        offset = 1
         for line in logs:
             # Make sure line is not empty
             if line.strip():
-                parsed_logs.append(json.loads(line))
+                # construct log_id which is 
{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}
+                # also construct the offset field (default is 'offset')
+                log_dict = json.loads(line)
+                log_id = 
f"{ti.dag_id}-{ti.task_id}-{ti.run_id}-{ti.map_index}-{ti.try_number}"

Review Comment:
   This might be the easiest approach on Airflow3. 
   ```suggestion
                   log_id = str(ti.id)
   ```



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -686,17 +688,130 @@ def _write_to_es(self, log_lines: list[dict[str, Any]]) 
-> bool:
             self.log.exception("Unable to insert logs into Elasticsearch. 
Reason: %s", str(e))
             return False
 
+    def read(self, relative_path: str, ti: RuntimeTI) -> tuple[LogSourceInfo, 
LogMessages]:
+        log_id = 
f"{ti.dag_id}-{ti.task_id}-{ti.run_id}-{ti.map_index}-{ti.try_number}"

Review Comment:
   (Though even if we do write with `ti.id`, if that doesn't turn up any 
results we should fall back to this old pattern so that we can still read old 
logs, or logs written via fluentd etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to