o-nikolas commented on code in PR #66633:
URL: https://github.com/apache/airflow/pull/66633#discussion_r3230014314
##########
providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py:
##########
@@ -118,16 +119,29 @@ def handler(self) -> watchtower.CloudWatchLogHandler:
json_serialize_default=_json_serialize or json_serialize_legacy,
)
+ @property
+ def handler(self) -> watchtower.CloudWatchLogHandler:
+ # Defensive self-healing: if the handler was killed by
logging.shutdown()
+ # (shutting_down=True), recreate it. This can happen if dictConfig()
is called
+ # after the handler was first created, since dictConfig calls
+ # _clearExistingHandlers() -> logging.shutdown() on all existing
handlers.
+ if self._handler is None or self._handler.shutting_down:
+ self._handler = self._create_handler()
+ return self._handler
+
@cached_property
def processors(self) -> tuple[structlog.typing.Processor, ...]:
from logging import getLogRecordFactory
import structlog.stdlib
logRecordFactory = getLogRecordFactory()
- # The handler MUST be initted here, before the processor is actually
used to log anything.
- # Otherwise, logging that occurs during the creation of the handler
can create infinite loops.
- _handler = self.handler
+ # Eagerly init the handler to avoid infinite loops from logging during
handler creation.
+ # We do NOT capture it in a closure variable — instead we access
self.handler each time
+ # so that if the handler is killed by logging.shutdown() and
recreated, the processor
+ # always uses the live instance rather than a dead one.#
Review Comment:
```suggestion
# always uses the live instance rather than a dead one.
```
##########
providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py:
##########
@@ -118,16 +119,29 @@ def handler(self) -> watchtower.CloudWatchLogHandler:
json_serialize_default=_json_serialize or json_serialize_legacy,
)
+ @property
+ def handler(self) -> watchtower.CloudWatchLogHandler:
+ # Defensive self-healing: if the handler was killed by
logging.shutdown()
+ # (shutting_down=True), recreate it. This can happen if dictConfig()
is called
+ # after the handler was first created, since dictConfig calls
+ # _clearExistingHandlers() -> logging.shutdown() on all existing
handlers.
Review Comment:
Do we still need to be this defensive if the other issues are fixed?
##########
providers/amazon/src/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py:
##########
@@ -118,16 +119,29 @@ def handler(self) -> watchtower.CloudWatchLogHandler:
json_serialize_default=_json_serialize or json_serialize_legacy,
)
+ @property
+ def handler(self) -> watchtower.CloudWatchLogHandler:
+ # Defensive self-healing: if the handler was killed by
logging.shutdown()
+ # (shutting_down=True), recreate it. This can happen if dictConfig()
is called
+ # after the handler was first created, since dictConfig calls
+ # _clearExistingHandlers() -> logging.shutdown() on all existing
handlers.
+ if self._handler is None or self._handler.shutting_down:
+ self._handler = self._create_handler()
+ return self._handler
+
@cached_property
def processors(self) -> tuple[structlog.typing.Processor, ...]:
from logging import getLogRecordFactory
import structlog.stdlib
logRecordFactory = getLogRecordFactory()
- # The handler MUST be initted here, before the processor is actually
used to log anything.
- # Otherwise, logging that occurs during the creation of the handler
can create infinite loops.
- _handler = self.handler
+ # Eagerly init the handler to avoid infinite loops from logging during
handler creation.
+ # We do NOT capture it in a closure variable — instead we access
self.handler each time
+ # so that if the handler is killed by logging.shutdown() and
recreated, the processor
+ # always uses the live instance rather than a dead one.#
+ _ = self.handler
Review Comment:
Why bother assign this to anything at all? Is that needed?
##########
task-sdk/src/airflow/sdk/log.py:
##########
@@ -118,8 +118,14 @@ def configure_logging(
if mask_secrets:
extra_processors += (mask_logs,)
- if (remote := load_remote_log_handler()) and (remote_processors :=
getattr(remote, "processors")):
- extra_processors += remote_processors
+ # NOTE: Do NOT call getattr(remote, "processors") here.
+ # Accessing remote.processors triggers creation of the watchtower
CloudWatchLogHandler
Review Comment:
Again make this generic, or make it clear you're using the AWS Cloudwatch
handler as an example of what can happen. There are many other provider
handlers not just Cloudwatch.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
Review Comment:
@ashb or @amoghrajesh either one of you should have a look at these changes.
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2209,9 +2235,29 @@ def _configure_logging(log_path: str, client: Client) ->
tuple[FilteringBoundLog
with _remote_logging_conn(client):
processors = logging_processors(json_output=json_logs)
+
logger = structlog.wrap_logger(underlying_logger, processors=processors,
logger_name="task").bind()
- return logger, log_file_descriptor
+ try:
+ yield logger, log_file_descriptor
+ finally:
+ # Flush and close the remote handler now — AFTER the supervisor has
+ # drained all task log messages from the subprocess pipe (i.e. after
+ # process.wait() has returned).
+ #
+ # Without this, the only thing that ever closes the handler is
+ # Python's logging.shutdown() at process exit, which fires after
+ # supervise_task() returns. Any messages still queued in the handler
+ # at that point are silently dropped, producing:
+ #
+ # WatchtowerWarning: "Received message after logging system shutdown"
Review Comment:
We shouldn't just mention exceptions and warnings that providers throw in
core/sdk code without providing the context that this is just one example of
what one provider based logger will show.
```suggestion
# at that point are silently dropped. The AWS Cloudwatch logger for
example will emit:
#
# WatchtowerWarning: "Received message after logging system shutdown"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]