amoghrajesh commented on code in PR #62103:
URL: https://github.com/apache/airflow/pull/62103#discussion_r2828166394


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -319,6 +318,17 @@ def __call__(self, processors: 
Iterable[structlog.typing.Processor]) -> WrappedL
         self.bound_logger = logger
         return logger
 
+    def __del__(self):

Review Comment:
   The one concern I mainly have with using `del` is that if an exception 
occurs during cleanup, it will quietly exit



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -319,6 +318,17 @@ def __call__(self, processors: 
Iterable[structlog.typing.Processor]) -> WrappedL
         self.bound_logger = logger
         return logger
 
+    def __del__(self):
+        # Explicitly close the file descriptor when the logger is garbage 
collected.
+        if raw_logger := getattr(self.bound_logger, "_logger", None):
+            file_handle = getattr(raw_logger, "_file", None)
+        else:
+            return
+
+        if file_handle and not file_handle.closed:
+            file_handle.flush()
+            file_handle.close()

Review Comment:
   Can we handle it similar to how it is done for DAG processor: 
https://github.com/apache/airflow/pull/47574
   
   
   In short something like this, where we store the handler and clear it to 
avoid diversion from that approach?
   ```diff
   diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
   --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py  (revision 
10cd08dff8916b93f8c3f94bc34265bb7544fde4)
   +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py  (date 
1771510381992)
   @@ -30,7 +30,7 @@
    from datetime import datetime
    from socket import socket
    from traceback import format_exception
   -from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, 
TypedDict
   +from typing import IO, TYPE_CHECKING, Annotated, Any, ClassVar, Literal, 
TypedDict
    
    import anyio
    import attrs
   @@ -302,6 +302,8 @@
    
        bound_logger: WrappedLogger = attrs.field(init=False, repr=False)
    
   +    _filehandle: IO[Any] = attrs.field(init=False, repr=False)
   +
        def __call__(self, processors: Iterable[structlog.typing.Processor]) -> 
WrappedLogger:
            if hasattr(self, "bound_logger"):
                return self.bound_logger
   @@ -312,13 +314,20 @@
    
            pretty_logs = False
            if pretty_logs:
   -            underlying_logger: WrappedLogger = 
structlog.WriteLogger(log_file.open("w", buffering=1))
   +            self._filehandle = log_file.open("w", buffering=1)
   +            underlying_logger: WrappedLogger = 
structlog.WriteLogger(self._filehandle)
            else:
   -            underlying_logger = structlog.BytesLogger(log_file.open("wb"))
   +            self._filehandle = log_file.open("wb")
   +            underlying_logger = structlog.BytesLogger(self._filehandle)
            logger = structlog.wrap_logger(underlying_logger, 
processors=processors).bind()
            self.bound_logger = logger
            return logger
    
   +    def close(self):
   +        """Explicitly close the underlying log file handle."""
   +        if hasattr(self, "_filehandle"):
   +            self._filehandle.close()
   +
        def upload_to_remote(self):
            from airflow.sdk.log import upload_to_remote
    
   @@ -421,10 +430,9 @@
                for id in msg.finished or ():
                    self.running_triggers.discard(id)
                    self.cancelling_triggers.discard(id)
                    # Remove logger from the cache, and since structlog doesn't 
have an explicit close method, we
                   # only need to remove the last reference to it to close the 
open FH
                    if factory := self.logger_cache.pop(id, None):
                        factory.upload_to_remote()
   +                    factory.close()
    
                response = messages.TriggerStateSync(
                    to_create=[],
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to