This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/main by this push:
new 759f0c6 Move most logging paraphernalia to a new loggers module
759f0c6 is described below
commit 759f0c6dd22c517f512b60ff439974a4977d434d
Author: Sean B. Palmer <[email protected]>
AuthorDate: Fri Jan 23 18:58:06 2026 +0000
Move most logging paraphernalia to a new loggers module
---
atr/loggers.py | 100 ++++++++++++++++++++++++++++++++++++++++++
atr/server.py | 80 +++++++--------------------------
atr/worker.py | 136 ++++++++++++++++++++++-----------------------------------
3 files changed, 167 insertions(+), 149 deletions(-)
diff --git a/atr/loggers.py b/atr/loggers.py
new file mode 100644
index 0000000..55b2b50
--- /dev/null
+++ b/atr/loggers.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+import logging.handlers
+import queue
+from typing import TYPE_CHECKING
+
+import structlog
+
+if TYPE_CHECKING:
+ from collections.abc import Sequence
+
+
+def configure_structlog(shared_processors:
Sequence[structlog.types.Processor]) -> None:
+ structlog.configure(
+ processors=[
+ *shared_processors,
+ structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
+ ],
+ wrapper_class=structlog.stdlib.BoundLogger,
+ context_class=dict,
+ logger_factory=structlog.stdlib.LoggerFactory(),
+ cache_logger_on_first_use=True,
+ )
+
+
+def create_json_formatter(
+ shared_processors: Sequence[structlog.types.Processor],
+) -> structlog.stdlib.ProcessorFormatter:
+ return structlog.stdlib.ProcessorFormatter(
+ processors=[
+ structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+ structlog.processors.JSONRenderer(),
+ ],
+ foreign_pre_chain=list(shared_processors),
+ )
+
+
+def create_output_formatter(
+ shared_processors: Sequence[structlog.types.Processor],
+ renderer: structlog.types.Processor,
+) -> structlog.stdlib.ProcessorFormatter:
+ return structlog.stdlib.ProcessorFormatter(
+ processors=[
+ structlog.stdlib.ProcessorFormatter.remove_processors_meta,
+ renderer,
+ ],
+ foreign_pre_chain=list(shared_processors),
+ )
+
+
+def setup_dedicated_file_logger(
+ logger_name: str,
+ file_path: str,
+ processors: Sequence[structlog.types.Processor],
+ queue_handler_class: type[logging.handlers.QueueHandler] =
logging.handlers.QueueHandler,
+) -> logging.handlers.QueueListener:
+ handler = logging.FileHandler(file_path, encoding="utf-8")
+ handler.setFormatter(create_json_formatter(processors))
+
+ log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
+ listener = logging.handlers.QueueListener(log_queue, handler)
+ listener.start()
+
+ logger = logging.getLogger(logger_name)
+ logger.setLevel(logging.INFO)
+ logger.handlers.clear()
+ logger.addHandler(queue_handler_class(log_queue))
+ logger.propagate = False
+
+ return listener
+
+
+def shared_processors() -> list[structlog.types.Processor]:
+ return [
+ structlog.contextvars.merge_contextvars,
+ structlog.stdlib.add_log_level,
+ structlog.stdlib.add_logger_name,
+ structlog.stdlib.PositionalArgumentsFormatter(),
+ structlog.processors.TimeStamper(fmt="iso"),
+ structlog.processors.StackInfoRenderer(),
+ structlog.processors.UnicodeDecoder(),
+ ]
diff --git a/atr/server.py b/atr/server.py
index 948cb19..398f691 100644
--- a/atr/server.py
+++ b/atr/server.py
@@ -305,16 +305,9 @@ def _app_setup_logging(app: base.QuartApp, config_mode:
config.Mode, app_config:
import structlog
- # Shared processors for structlog (run before formatting)
- shared_processors: list[structlog.types.Processor] = [
- structlog.contextvars.merge_contextvars,
- structlog.stdlib.add_log_level,
- structlog.stdlib.add_logger_name,
- structlog.stdlib.PositionalArgumentsFormatter(),
- structlog.processors.TimeStamper(fmt="iso"),
- structlog.processors.StackInfoRenderer(),
- structlog.processors.UnicodeDecoder(),
- ]
+ import atr.loggers as loggers
+
+ shared_processors = loggers.shared_processors()
# Output handler: pretty console for dev (Debug and Allow Tests), JSON for
non-dev (Docker, etc.)
output_handler = logging.StreamHandler(sys.stderr)
@@ -322,16 +315,9 @@ def _app_setup_logging(app: base.QuartApp, config_mode:
config.Mode, app_config:
renderer: structlog.types.Processor =
structlog.dev.ConsoleRenderer(colors=True)
else:
renderer = structlog.processors.JSONRenderer()
- output_handler.setFormatter(
- structlog.stdlib.ProcessorFormatter(
- processors=[
- structlog.stdlib.ProcessorFormatter.remove_processors_meta,
- renderer,
- ],
- foreign_pre_chain=shared_processors,
- )
- )
# Queue-based logging for thread safety
+
output_handler.setFormatter(loggers.create_output_formatter(shared_processors,
renderer))
+
log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
handlers: list[logging.Handler] = [output_handler]
if util.is_dev_environment():
@@ -347,61 +333,25 @@ def _app_setup_logging(app: base.QuartApp, config_mode:
config.Mode, app_config:
force=True,
)
- structlog.configure(
- processors=[
- *shared_processors,
- structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
- ],
- wrapper_class=structlog.stdlib.BoundLogger,
- context_class=dict,
- logger_factory=structlog.stdlib.LoggerFactory(),
- cache_logger_on_first_use=True,
- )
+ loggers.configure_structlog(shared_processors)
# Audit logger - JSON to dedicated file via queue
- audit_handler = logging.FileHandler(app_config.STORAGE_AUDIT_LOG_FILE,
encoding="utf-8")
- audit_handler.setFormatter(
- structlog.stdlib.ProcessorFormatter(
- processors=[
- structlog.stdlib.ProcessorFormatter.remove_processors_meta,
- structlog.processors.JSONRenderer(),
- ],
- foreign_pre_chain=shared_processors,
- )
+ audit_listener = loggers.setup_dedicated_file_logger(
+ "atr.storage.audit",
+ app_config.STORAGE_AUDIT_LOG_FILE,
+ shared_processors,
)
- audit_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
- audit_listener = logging.handlers.QueueListener(audit_queue, audit_handler)
- audit_listener.start()
app.extensions["audit_listener"] = audit_listener
- audit_logger = logging.getLogger("atr.storage.audit")
- audit_logger.setLevel(logging.INFO)
- audit_logger.handlers.clear()
- audit_logger.addHandler(logging.handlers.QueueHandler(audit_queue))
- audit_logger.propagate = False
-
# Request logs
- request_handler = logging.FileHandler(app_config.REQUEST_LOG_FILE,
encoding="utf-8")
- request_handler.setFormatter(
- structlog.stdlib.ProcessorFormatter(
- processors=[
- structlog.stdlib.ProcessorFormatter.remove_processors_meta,
- structlog.processors.JSONRenderer(),
- ],
- foreign_pre_chain=shared_processors,
- )
+ request_listener = loggers.setup_dedicated_file_logger(
+ "atr.request",
+ app_config.REQUEST_LOG_FILE,
+ shared_processors,
+ queue_handler_class=log.StructlogQueueHandler,
)
- request_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1)
- request_listener = logging.handlers.QueueListener(request_queue,
request_handler)
- request_listener.start()
app.extensions["request_listener"] = request_listener
- request_logger = logging.getLogger("atr.request")
- request_logger.setLevel(logging.INFO)
- request_logger.handlers.clear()
- request_logger.addHandler(log.StructlogQueueHandler(request_queue))
- request_logger.propagate = False
-
# Enable debug output for atr.* in DEBUG mode
if config_mode == config.Mode.Debug:
logging.getLogger(atr.__name__).setLevel(logging.DEBUG)
diff --git a/atr/worker.py b/atr/worker.py
index be0268c..850882f 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -92,49 +92,66 @@ def main() -> None:
log.info("Exiting worker process")
-def _setup_logging() -> None:
- import logging.handlers
+async def _execute_check_task(
+ handler: Callable[..., Awaitable[results.Results | None]],
+ task_args: list[str] | dict[str, Any],
+ task_id: int,
+ task_type: str,
+) -> results.Results | None:
+ log.debug(f"Handler {handler.__name__} expects checks.FunctionArguments,
fetching full task details")
+ async with db.session() as data:
+ task_obj = await data.task(id=task_id).demand(ValueError(f"Task
{task_id} disappeared during processing"))
- import structlog
+ # Validate required fields from the Task object itself
+ if task_obj.project_name is None:
+ raise ValueError(f"Task {task_id} is missing required project_name")
+ if task_obj.version_name is None:
+ raise ValueError(f"Task {task_id} is missing required version_name")
+ if task_obj.revision_number is None:
+ raise ValueError(f"Task {task_id} is missing required revision_number")
- os.makedirs("logs", exist_ok=True)
- # Configure logging
- shared_processors: list[structlog.types.Processor] = [
- structlog.contextvars.merge_contextvars,
- structlog.stdlib.add_log_level,
- structlog.stdlib.add_logger_name,
- structlog.stdlib.PositionalArgumentsFormatter(),
- structlog.processors.TimeStamper(fmt="iso"),
- structlog.processors.StackInfoRenderer(),
- structlog.processors.UnicodeDecoder(),
- ]
- output_handler = logging.FileHandler("logs/atr-worker.log")
- renderer = structlog.processors.JSONRenderer()
- output_handler.setFormatter(
- structlog.stdlib.ProcessorFormatter(
- processors=[
- structlog.stdlib.ProcessorFormatter.remove_processors_meta,
- renderer,
- ],
- foreign_pre_chain=shared_processors,
+ if not isinstance(task_args, dict):
+ raise TypeError(
+ f"Task {task_id} ({task_type}) has non-dict raw args {task_args}
which should represent keyword_args"
)
- )
- logging.basicConfig(level=logging.INFO, handlers=[output_handler],
force=True)
+ async def recorder_factory() -> checks.Recorder:
+ return await checks.Recorder.create(
+ checker=handler,
+ project_name=task_obj.project_name or "",
+ version_name=task_obj.version_name or "",
+ revision_number=task_obj.revision_number or "",
+ primary_rel_path=task_obj.primary_rel_path,
+ )
- structlog.configure(
- processors=[
- *shared_processors,
- structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
- ],
- wrapper_class=structlog.stdlib.BoundLogger,
- context_class=dict,
- logger_factory=structlog.stdlib.LoggerFactory(),
- cache_logger_on_first_use=True,
+ function_arguments = checks.FunctionArguments(
+ recorder=recorder_factory,
+ asf_uid=task_obj.asf_uid,
+ project_name=task_obj.project_name or "",
+ version_name=task_obj.version_name or "",
+ revision_number=task_obj.revision_number,
+ primary_rel_path=task_obj.primary_rel_path,
+ extra_args=task_args,
)
+ log.debug(f"Calling {handler.__name__} with structured arguments:
{function_arguments}")
+ handler_result = await handler(function_arguments)
+ return handler_result
+
+
+def _setup_logging() -> None:
+ import logging
+
+ import atr.loggers as loggers
+
+ os.makedirs("logs", exist_ok=True)
+
+ shared_processors = loggers.shared_processors()
+ output_handler = logging.FileHandler("logs/atr-worker.log")
+
output_handler.setFormatter(loggers.create_json_formatter(shared_processors))
+ logging.basicConfig(level=logging.INFO, handlers=[output_handler],
force=True)
-# Task functions
+ loggers.configure_structlog(shared_processors)
async def _task_next_claim() -> tuple[int, str, list[str] | dict[str, Any],
str] | None:
@@ -226,52 +243,6 @@ async def _task_process(task_id: int, task_type: str,
task_args: list[str] | dic
await _task_result_process(task_id, task_results, status, error)
-async def _execute_check_task(
- handler: Callable[..., Awaitable[results.Results | None]],
- task_args: list[str] | dict[str, Any],
- task_id: int,
- task_type: str,
-) -> results.Results | None:
- log.debug(f"Handler {handler.__name__} expects checks.FunctionArguments,
fetching full task details")
- async with db.session() as data:
- task_obj = await data.task(id=task_id).demand(ValueError(f"Task
{task_id} disappeared during processing"))
-
- # Validate required fields from the Task object itself
- if task_obj.project_name is None:
- raise ValueError(f"Task {task_id} is missing required project_name")
- if task_obj.version_name is None:
- raise ValueError(f"Task {task_id} is missing required version_name")
- if task_obj.revision_number is None:
- raise ValueError(f"Task {task_id} is missing required revision_number")
-
- if not isinstance(task_args, dict):
- raise TypeError(
- f"Task {task_id} ({task_type}) has non-dict raw args {task_args}
which should represent keyword_args"
- )
-
- async def recorder_factory() -> checks.Recorder:
- return await checks.Recorder.create(
- checker=handler,
- project_name=task_obj.project_name or "",
- version_name=task_obj.version_name or "",
- revision_number=task_obj.revision_number or "",
- primary_rel_path=task_obj.primary_rel_path,
- )
-
- function_arguments = checks.FunctionArguments(
- recorder=recorder_factory,
- asf_uid=task_obj.asf_uid,
- project_name=task_obj.project_name or "",
- version_name=task_obj.version_name or "",
- revision_number=task_obj.revision_number,
- primary_rel_path=task_obj.primary_rel_path,
- extra_args=task_args,
- )
- log.debug(f"Calling {handler.__name__} with structured arguments:
{function_arguments}")
- handler_result = await handler(function_arguments)
- return handler_result
-
-
async def _task_result_process(
task_id: int, task_results: results.Results | None, status:
sql.TaskStatus, error: str | None = None
) -> None:
@@ -290,9 +261,6 @@ async def _task_result_process(
task_obj.error = error
-# Worker functions
-
-
async def _worker_loop_run() -> None:
"""Main worker loop."""
processed = 0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]