Owen-CH-Leung commented on code in PR #64364:
URL: https://github.com/apache/airflow/pull/64364#discussion_r3079909174


##########
providers/opensearch/src/airflow/providers/opensearch/log/os_task_handler.py:
##########
@@ -225,23 +310,54 @@ def __init__(
         self.mark_end_on_close = True
         self.end_of_log_mark = end_of_log_mark.strip()
         self.write_stdout = write_stdout
+        self.write_to_opensearch = write_to_opensearch
         self.json_format = json_format
         self.json_fields = [label.strip() for label in json_fields.split(",")]
         self.host = self.format_url(host)
         self.host_field = host_field
         self.offset_field = offset_field
+        self.target_index = target_index
         self.index_patterns = index_patterns
         self.index_patterns_callable = index_patterns_callable
         self.context_set = False
-        self.client = OpenSearch(
-            hosts=[{"host": host, "port": port}],
-            http_auth=(username, password),
-            **os_kwargs,
+        self.client = _create_opensearch_client(
+            self.host,
+            port,
+            username,
+            password,
+            cast("dict[str, Any]", os_kwargs),
         )
+        self.delete_local_copy = kwargs.get(
+            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
+        )
+        self.log_id_template = log_id_template
         self.formatter: logging.Formatter
-        self.handler: logging.FileHandler | logging.StreamHandler
+        self.handler: logging.FileHandler | logging.StreamHandler | None = None
         self._doc_type_map: dict[Any, Any] = {}
         self._doc_type: list[Any] = []
+        self.io = OpensearchRemoteLogIO(
+            host=self.host,
+            port=port,
+            username=username,
+            password=password,
+            write_to_opensearch=self.write_to_opensearch,
+            target_index=self.target_index,
+            write_stdout=self.write_stdout,
+            offset_field=self.offset_field,
+            host_field=self.host_field,
+            base_log_folder=base_log_folder,
+            delete_local_copy=self.delete_local_copy,
+            json_format=self.json_format,
+            log_id_template=self.log_id_template,
+        )
+        if AIRFLOW_V_3_0_PLUS:
+            if AIRFLOW_V_3_2_PLUS:
+                from airflow.logging_config import _ActiveLoggingConfig, 
get_remote_task_log
+
+                if get_remote_task_log() is None:
+                    _ActiveLoggingConfig.set(self.io, None)

Review Comment:
   Thanks @ashb. I think removing those lines are safe since the sdk has 
already done the log handler setting when the provider is initialized. I'll 
file a PR soon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to