This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/main by this push:
     new 759f0c6  Move most logging paraphernalia to a new loggers module
759f0c6 is described below

commit 759f0c6dd22c517f512b60ff439974a4977d434d
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jan 23 18:58:06 2026 +0000

    Move most logging paraphernalia to a new loggers module
---
 atr/loggers.py | 100 ++++++++++++++++++++++++++++++++++++++++++
 atr/server.py  |  80 +++++++--------------------------
 atr/worker.py  | 136 ++++++++++++++++++++++-----------------------------------
 3 files changed, 167 insertions(+), 149 deletions(-)

diff --git a/atr/loggers.py b/atr/loggers.py
new file mode 100644
index 0000000..55b2b50
--- /dev/null
+++ b/atr/loggers.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+import logging.handlers
+import queue
+from typing import TYPE_CHECKING
+
+import structlog
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+
+
+def configure_structlog(shared_processors: 
Sequence[structlog.types.Processor]) -> None:
+    structlog.configure(
+        processors=[
+            *shared_processors,
+            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
+        ],
+        wrapper_class=structlog.stdlib.BoundLogger,
+        context_class=dict,
+        logger_factory=structlog.stdlib.LoggerFactory(),
+        cache_logger_on_first_use=True,
+    )
+
+
+def create_json_formatter(
+    shared_processors: Sequence[structlog.types.Processor],
+) -> structlog.stdlib.ProcessorFormatter:
+    return structlog.stdlib.ProcessorFormatter(
+        processors=[
+            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+            structlog.processors.JSONRenderer(),
+        ],
+        foreign_pre_chain=list(shared_processors),
+    )
+
+
+def create_output_formatter(
+    shared_processors: Sequence[structlog.types.Processor],
+    renderer: structlog.types.Processor,
+) -> structlog.stdlib.ProcessorFormatter:
+    return structlog.stdlib.ProcessorFormatter(
+        processors=[
+            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+            renderer,
+        ],
+        foreign_pre_chain=list(shared_processors),
+    )
+
+
+def setup_dedicated_file_logger(
+    logger_name: str,
+    file_path: str,
+    processors: Sequence[structlog.types.Processor],
+    queue_handler_class: type[logging.handlers.QueueHandler] = 
logging.handlers.QueueHandler,
+) -> logging.handlers.QueueListener:
+    handler = logging.FileHandler(file_path, encoding="utf-8")
+    handler.setFormatter(create_json_formatter(processors))
+
+    log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
+    listener = logging.handlers.QueueListener(log_queue, handler)
+    listener.start()
+
+    logger = logging.getLogger(logger_name)
+    logger.setLevel(logging.INFO)
+    logger.handlers.clear()
+    logger.addHandler(queue_handler_class(log_queue))
+    logger.propagate = False
+
+    return listener
+
+
+def shared_processors() -> list[structlog.types.Processor]:
+    return [
+        structlog.contextvars.merge_contextvars,
+        structlog.stdlib.add_log_level,
+        structlog.stdlib.add_logger_name,
+        structlog.stdlib.PositionalArgumentsFormatter(),
+        structlog.processors.TimeStamper(fmt="iso"),
+        structlog.processors.StackInfoRenderer(),
+        structlog.processors.UnicodeDecoder(),
+    ]
diff --git a/atr/server.py b/atr/server.py
index 948cb19..398f691 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -305,16 +305,9 @@ def _app_setup_logging(app: base.QuartApp, config_mode: 
config.Mode, app_config:
 
     import structlog
 
-    # Shared processors for structlog (run before formatting)
-    shared_processors: list[structlog.types.Processor] = [
-        structlog.contextvars.merge_contextvars,
-        structlog.stdlib.add_log_level,
-        structlog.stdlib.add_logger_name,
-        structlog.stdlib.PositionalArgumentsFormatter(),
-        structlog.processors.TimeStamper(fmt="iso"),
-        structlog.processors.StackInfoRenderer(),
-        structlog.processors.UnicodeDecoder(),
-    ]
+    import atr.loggers as loggers
+
+    shared_processors = loggers.shared_processors()
 
     # Output handler: pretty console for dev (Debug and Allow Tests), JSON for 
non-dev (Docker, etc.)
     output_handler = logging.StreamHandler(sys.stderr)
@@ -322,16 +315,9 @@ def _app_setup_logging(app: base.QuartApp, config_mode: 
config.Mode, app_config:
         renderer: structlog.types.Processor = 
structlog.dev.ConsoleRenderer(colors=True)
     else:
         renderer = structlog.processors.JSONRenderer()
-    output_handler.setFormatter(
-        structlog.stdlib.ProcessorFormatter(
-            processors=[
-                structlog.stdlib.ProcessorFormatter.remove_processors_meta,
-                renderer,
-            ],
-            foreign_pre_chain=shared_processors,
-        )
-    )
     # Queue-based logging for thread safety
+    
output_handler.setFormatter(loggers.create_output_formatter(shared_processors, 
renderer))
+
     log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
     handlers: list[logging.Handler] = [output_handler]
     if util.is_dev_environment():
@@ -347,61 +333,25 @@ def _app_setup_logging(app: base.QuartApp, config_mode: 
config.Mode, app_config:
         force=True,
     )
 
-    structlog.configure(
-        processors=[
-            *shared_processors,
-            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
-        ],
-        wrapper_class=structlog.stdlib.BoundLogger,
-        context_class=dict,
-        logger_factory=structlog.stdlib.LoggerFactory(),
-        cache_logger_on_first_use=True,
-    )
+    loggers.configure_structlog(shared_processors)
 
     # Audit logger - JSON to dedicated file via queue
-    audit_handler = logging.FileHandler(app_config.STORAGE_AUDIT_LOG_FILE, 
encoding="utf-8")
-    audit_handler.setFormatter(
-        structlog.stdlib.ProcessorFormatter(
-            processors=[
-                structlog.stdlib.ProcessorFormatter.remove_processors_meta,
-                structlog.processors.JSONRenderer(),
-            ],
-            foreign_pre_chain=shared_processors,
-        )
+    audit_listener = loggers.setup_dedicated_file_logger(
+        "atr.storage.audit",
+        app_config.STORAGE_AUDIT_LOG_FILE,
+        shared_processors,
     )
-    audit_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
-    audit_listener = logging.handlers.QueueListener(audit_queue, audit_handler)
-    audit_listener.start()
     app.extensions["audit_listener"] = audit_listener
 
-    audit_logger = logging.getLogger("atr.storage.audit")
-    audit_logger.setLevel(logging.INFO)
-    audit_logger.handlers.clear()
-    audit_logger.addHandler(logging.handlers.QueueHandler(audit_queue))
-    audit_logger.propagate = False
-
     # Request logs
-    request_handler = logging.FileHandler(app_config.REQUEST_LOG_FILE, 
encoding="utf-8")
-    request_handler.setFormatter(
-        structlog.stdlib.ProcessorFormatter(
-            processors=[
-                structlog.stdlib.ProcessorFormatter.remove_processors_meta,
-                structlog.processors.JSONRenderer(),
-            ],
-            foreign_pre_chain=shared_processors,
-        )
+    request_listener = loggers.setup_dedicated_file_logger(
+        "atr.request",
+        app_config.REQUEST_LOG_FILE,
+        shared_processors,
+        queue_handler_class=log.StructlogQueueHandler,
     )
-    request_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
-    request_listener = logging.handlers.QueueListener(request_queue, 
request_handler)
-    request_listener.start()
     app.extensions["request_listener"] = request_listener
 
-    request_logger = logging.getLogger("atr.request")
-    request_logger.setLevel(logging.INFO)
-    request_logger.handlers.clear()
-    request_logger.addHandler(log.StructlogQueueHandler(request_queue))
-    request_logger.propagate = False
-
     # Enable debug output for atr.* in DEBUG mode
     if config_mode == config.Mode.Debug:
         logging.getLogger(atr.__name__).setLevel(logging.DEBUG)
diff --git a/atr/worker.py b/atr/worker.py
index be0268c..850882f 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -92,49 +92,66 @@ def main() -> None:
     log.info("Exiting worker process")
 
 
-def _setup_logging() -> None:
-    import logging.handlers
+async def _execute_check_task(
+    handler: Callable[..., Awaitable[results.Results | None]],
+    task_args: list[str] | dict[str, Any],
+    task_id: int,
+    task_type: str,
+) -> results.Results | None:
+    log.debug(f"Handler {handler.__name__} expects checks.FunctionArguments, 
fetching full task details")
+    async with db.session() as data:
+        task_obj = await data.task(id=task_id).demand(ValueError(f"Task 
{task_id} disappeared during processing"))
 
-    import structlog
+    # Validate required fields from the Task object itself
+    if task_obj.project_name is None:
+        raise ValueError(f"Task {task_id} is missing required project_name")
+    if task_obj.version_name is None:
+        raise ValueError(f"Task {task_id} is missing required version_name")
+    if task_obj.revision_number is None:
+        raise ValueError(f"Task {task_id} is missing required revision_number")
 
-    os.makedirs("logs", exist_ok=True)
-    # Configure logging
-    shared_processors: list[structlog.types.Processor] = [
-        structlog.contextvars.merge_contextvars,
-        structlog.stdlib.add_log_level,
-        structlog.stdlib.add_logger_name,
-        structlog.stdlib.PositionalArgumentsFormatter(),
-        structlog.processors.TimeStamper(fmt="iso"),
-        structlog.processors.StackInfoRenderer(),
-        structlog.processors.UnicodeDecoder(),
-    ]
-    output_handler = logging.FileHandler("logs/atr-worker.log")
-    renderer = structlog.processors.JSONRenderer()
-    output_handler.setFormatter(
-        structlog.stdlib.ProcessorFormatter(
-            processors=[
-                structlog.stdlib.ProcessorFormatter.remove_processors_meta,
-                renderer,
-            ],
-            foreign_pre_chain=shared_processors,
+    if not isinstance(task_args, dict):
+        raise TypeError(
+            f"Task {task_id} ({task_type}) has non-dict raw args {task_args} 
which should represent keyword_args"
         )
-    )
 
-    logging.basicConfig(level=logging.INFO, handlers=[output_handler], 
force=True)
+    async def recorder_factory() -> checks.Recorder:
+        return await checks.Recorder.create(
+            checker=handler,
+            project_name=task_obj.project_name or "",
+            version_name=task_obj.version_name or "",
+            revision_number=task_obj.revision_number or "",
+            primary_rel_path=task_obj.primary_rel_path,
+        )
 
-    structlog.configure(
-        processors=[
-            *shared_processors,
-            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
-        ],
-        wrapper_class=structlog.stdlib.BoundLogger,
-        context_class=dict,
-        logger_factory=structlog.stdlib.LoggerFactory(),
-        cache_logger_on_first_use=True,
+    function_arguments = checks.FunctionArguments(
+        recorder=recorder_factory,
+        asf_uid=task_obj.asf_uid,
+        project_name=task_obj.project_name or "",
+        version_name=task_obj.version_name or "",
+        revision_number=task_obj.revision_number,
+        primary_rel_path=task_obj.primary_rel_path,
+        extra_args=task_args,
     )
+    log.debug(f"Calling {handler.__name__} with structured arguments: 
{function_arguments}")
+    handler_result = await handler(function_arguments)
+    return handler_result
+
+
+def _setup_logging() -> None:
+    import logging
+
+    import atr.loggers as loggers
+
+    os.makedirs("logs", exist_ok=True)
+
+    shared_processors = loggers.shared_processors()
+    output_handler = logging.FileHandler("logs/atr-worker.log")
+    
output_handler.setFormatter(loggers.create_json_formatter(shared_processors))
 
+    logging.basicConfig(level=logging.INFO, handlers=[output_handler], 
force=True)
 
-# Task functions
+    loggers.configure_structlog(shared_processors)
 
 
 async def _task_next_claim() -> tuple[int, str, list[str] | dict[str, Any], 
str] | None:
@@ -226,52 +243,6 @@ async def _task_process(task_id: int, task_type: str, 
task_args: list[str] | dic
     await _task_result_process(task_id, task_results, status, error)
 
 
-async def _execute_check_task(
-    handler: Callable[..., Awaitable[results.Results | None]],
-    task_args: list[str] | dict[str, Any],
-    task_id: int,
-    task_type: str,
-) -> results.Results | None:
-    log.debug(f"Handler {handler.__name__} expects checks.FunctionArguments, 
fetching full task details")
-    async with db.session() as data:
-        task_obj = await data.task(id=task_id).demand(ValueError(f"Task 
{task_id} disappeared during processing"))
-
-    # Validate required fields from the Task object itself
-    if task_obj.project_name is None:
-        raise ValueError(f"Task {task_id} is missing required project_name")
-    if task_obj.version_name is None:
-        raise ValueError(f"Task {task_id} is missing required version_name")
-    if task_obj.revision_number is None:
-        raise ValueError(f"Task {task_id} is missing required revision_number")
-
-    if not isinstance(task_args, dict):
-        raise TypeError(
-            f"Task {task_id} ({task_type}) has non-dict raw args {task_args} 
which should represent keyword_args"
-        )
-
-    async def recorder_factory() -> checks.Recorder:
-        return await checks.Recorder.create(
-            checker=handler,
-            project_name=task_obj.project_name or "",
-            version_name=task_obj.version_name or "",
-            revision_number=task_obj.revision_number or "",
-            primary_rel_path=task_obj.primary_rel_path,
-        )
-
-    function_arguments = checks.FunctionArguments(
-        recorder=recorder_factory,
-        asf_uid=task_obj.asf_uid,
-        project_name=task_obj.project_name or "",
-        version_name=task_obj.version_name or "",
-        revision_number=task_obj.revision_number,
-        primary_rel_path=task_obj.primary_rel_path,
-        extra_args=task_args,
-    )
-    log.debug(f"Calling {handler.__name__} with structured arguments: 
{function_arguments}")
-    handler_result = await handler(function_arguments)
-    return handler_result
-
-
 async def _task_result_process(
     task_id: int, task_results: results.Results | None, status: 
sql.TaskStatus, error: str | None = None
 ) -> None:
@@ -290,9 +261,6 @@ async def _task_result_process(
                     task_obj.error = error
 
 
-# Worker functions
-
-
 async def _worker_loop_run() -> None:
     """Main worker loop."""
     processed = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to