This is an automated email from the ASF dual-hosted git repository. liyuheng pushed a commit to branch lyh/ainode/logger-refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 084e71b0730eeb1f1a9ba9ca80a8cf2ddfd2d54a Author: liyuheng <[email protected]> AuthorDate: Thu Jul 31 23:01:45 2025 +0800 almost work --- .../ainode/ainode/core/{log.py => base_logger.py} | 114 +++++++++------------ .../core/inference/inference_request_pool.py | 25 +++-- iotdb-core/ainode/ainode/core/log.py | 68 +----------- .../ainode/core/manager/inference_manager.py | 2 +- 4 files changed, 68 insertions(+), 141 deletions(-) diff --git a/iotdb-core/ainode/ainode/core/log.py b/iotdb-core/ainode/ainode/core/base_logger.py similarity index 53% copy from iotdb-core/ainode/ainode/core/log.py copy to iotdb-core/ainode/ainode/core/base_logger.py index 4e5d426a20e..2e05d5dc749 100644 --- a/iotdb-core/ainode/ainode/core/log.py +++ b/iotdb-core/ainode/ainode/core/base_logger.py @@ -23,6 +23,7 @@ import os import random import sys import threading +from logging.handlers import TimedRotatingFileHandler from ainode.core.constant import ( AINODE_LOG_DIR, @@ -30,7 +31,6 @@ from ainode.core.constant import ( AINODE_LOG_FILE_NAMES, STD_LEVEL, ) -from ainode.core.util.decorator import singleton class LoggerFilter(logging.Filter): @@ -53,7 +53,7 @@ class LoggerFilter(logging.Filter): for l in range(len(file_name)): i = len(file_name) - l - 1 if file_name[i:].startswith("iotdb/") or file_name[i:].startswith( - "iotdb\\" + "iotdb\\" ): file_name = file_name[i:] break @@ -63,81 +63,63 @@ class LoggerFilter(logging.Filter): return f"[{pid}:{process_name}] {stack_info}" -@singleton -class Logger: - """Logger is a singleton, it will be initialized when AINodeDescriptor is inited for the first time. - You can just use Logger() to get it anywhere. - - Args: - log_dir: log directory - - logger_format: log format - logger: global logger with custom format and level - file_handlers: file handlers for different levels - console_handler: console handler for stdout - _lock: process lock for logger. This is just a precaution, we currently do not have multiprocessing - """ - - def __init__(self, log_dir=AINODE_LOG_DIR): +class BaseLogger: + def __init__( + self, + sub_dir: str, + name: str | None = None, + std_level: int = logging.INFO, + ): self.logger_format = logging.Formatter( fmt="%(asctime)s %(levelname)s %(" "message)s", datefmt="%Y-%m-%d %H:%M:%S" ) + self._lock = threading.Lock() + self.logger = logging.getLogger(str(random.random())) self.logger.handlers.clear() self.logger.setLevel(logging.DEBUG) self.console_handler = logging.StreamHandler(sys.stdout) self.console_handler.setLevel(STD_LEVEL) self.console_handler.setFormatter(self.logger_format) - self.logger.addHandler(self.console_handler) - if log_dir is not None: - file_names = AINODE_LOG_FILE_NAMES - file_levels = AINODE_LOG_FILE_LEVELS - if not os.path.exists(log_dir): - os.makedirs(log_dir) - os.chmod(log_dir, 0o777) - for file_name in file_names: - log_path = log_dir + "/" + file_name - if not os.path.exists(log_path): - f = open(log_path, mode="w", encoding="utf-8") - f.close() - os.chmod(log_path, 0o777) - self.file_handlers = [] - for l in range(len(file_names)): - self.file_handlers.append( - logging.FileHandler(log_dir + "/" + file_names[l], mode="a") - ) - self.file_handlers[l].setLevel(file_levels[l]) - self.file_handlers[l].setFormatter(self.logger_format) - - for file_handler in self.file_handlers: - self.logger.addHandler(file_handler) - else: - log_dir = "None" - + # 2. 文件(按等级拆分,每天 0 点轮转) + target_dir = os.path.join(AINODE_LOG_DIR, sub_dir) + os.makedirs(target_dir, exist_ok=True) + os.chmod(target_dir, 0o700) # TODO 权限多少? + + for i in range(len(AINODE_LOG_FILE_NAMES)): + file_name = AINODE_LOG_FILE_NAMES[i] + # create log file if not exist + file_path = os.path.join(target_dir, f"{file_name}") + if not os.path.exists(file_path): + with open(file_path, "w", encoding="utf-8"): + pass + os.chmod(file_path, 0o777) + # create handler + file_level = AINODE_LOG_FILE_LEVELS[i] + fh = TimedRotatingFileHandler( + filename=os.path.join(target_dir, f"{file_name}"), + when="MIDNIGHT", + interval=1, + encoding="utf-8", + ) + fh.setLevel(file_level) + fh.setFormatter(self.logger_format) + self.logger.addHandler(fh) + + # 3. 统一 Filter self.logger.addFilter(LoggerFilter()) - self._lock = threading.Lock() - self.info(f"Logger init successfully. Log will be written to {log_dir}") - - def debug(self, *args) -> None: - self._lock.acquire() - self.logger.debug(" ".join(map(str, args))) - self._lock.release() - - def info(self, *args) -> None: - self._lock.acquire() - self.logger.info(" ".join(map(str, args))) - self._lock.release() - - def warning(self, *args) -> None: - self._lock.acquire() - self.logger.warning(" ".join(map(str, args))) - self._lock.release() - - def error(self, *args) -> None: - self._lock.acquire() - self.logger.error(" ".join(map(str, args))) - self._lock.release() + self.info(f"Logger init successfully. Log will be written to {target_dir}") + + # interfaces + def debug(self, *msg): self._write(self.logger.debug, *msg) + def info(self, *msg): self._write(self.logger.info, *msg) + def warning(self, *msg): self._write(self.logger.warning, *msg) + def error(self, *msg): self._write(self.logger.error, *msg) + + def _write(self, fn, *msg): + with self._lock: + fn(" ".join(map(str, msg))) \ No newline at end of file diff --git a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py index b6c0edb7f60..99c942a9f5f 100644 --- a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py +++ b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py @@ -26,14 +26,13 @@ import torch import torch.multiprocessing as mp from transformers import PretrainedConfig +from ainode.core.base_logger import BaseLogger from ainode.core.config import AINodeDescriptor from ainode.core.inference.inference_request import InferenceRequest from ainode.core.inference.scheduler.basic_scheduler import BasicScheduler from ainode.core.log import Logger from ainode.core.manager.model_manager import ModelManager -logger = Logger() - class InferenceRequestPool(mp.Process): """ @@ -62,6 +61,11 @@ class InferenceRequestPool(mp.Process): self.model = None self._model_manager = None self.device = None + self.logger = BaseLogger(f"log_inference_rank_{self.device}") + self.logger.info("info") + self.logger.warning("warning") + self.logger.error("error") + self.logger.debug("debug") self._threads = [] self._waiting_queue = request_queue # Requests that are waiting to be processed @@ -89,7 +93,7 @@ class InferenceRequestPool(mp.Process): torch.cuda.reset_peak_memory_stats(self.device) torch.cuda.synchronize(self.device) memory_before_warmup = torch.cuda.memory_allocated(self.device) - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Before warm-up, peak memory usage: {memory_before_warmup:.2f} bytes" ) @@ -98,10 +102,10 @@ class InferenceRequestPool(mp.Process): self.model.generate(dummy_input, max_new_tokens=1) torch.cuda.synchronize(self.device) peak_memory_1_token = torch.cuda.max_memory_allocated(self.device) - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Baseline memory usage for 1 token: {peak_memory_1_token:.2f} bytes" ) - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Differentiation : {peak_memory_1_token-memory_before_warmup:.2f} bytes" ) @@ -113,7 +117,7 @@ class InferenceRequestPool(mp.Process): ) request.mark_running() self._running_queue.put(request) - logger.debug( + self.logger.debug( f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] Request is activated with inputs shape {request.inputs.shape}" ) @@ -123,6 +127,7 @@ class InferenceRequestPool(mp.Process): self._activate_requests() def _step(self): + self.logger.info("_step") requests = self._scheduler.schedule_step() # TODO: We need a batcher to accelerate the concurrent inference for request in requests: @@ -138,14 +143,14 @@ class InferenceRequestPool(mp.Process): request.inference_pipeline.post_decode() if request.is_finished(): request.inference_pipeline.post_inference() - logger.debug( + self.logger.debug( f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] Request is finished" ) # ensure the output tensor is on CPU before sending to result queue request.output_tensor = request.output_tensor.cpu() self._finished_queue.put(request) else: - logger.debug( + self.logger.debug( f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] Request is not finished, re-queueing" ) self._waiting_queue.put(request) @@ -178,7 +183,7 @@ class InferenceRequestPool(mp.Process): def stop(self): self._stop_event.set() - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Stopping and releasing resources." ) try: @@ -187,6 +192,6 @@ class InferenceRequestPool(mp.Process): torch.cuda.empty_cache() gc.collect() except Exception as e: - logger.warning( + self.logger.warning( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Failed to clean up: {e}" ) diff --git a/iotdb-core/ainode/ainode/core/log.py b/iotdb-core/ainode/ainode/core/log.py index 4e5d426a20e..b225dd11b4c 100644 --- a/iotdb-core/ainode/ainode/core/log.py +++ b/iotdb-core/ainode/ainode/core/log.py @@ -24,6 +24,7 @@ import random import sys import threading +from ainode.core.base_logger import BaseLogger from ainode.core.constant import ( AINODE_LOG_DIR, AINODE_LOG_FILE_LEVELS, @@ -64,7 +65,7 @@ class LoggerFilter(logging.Filter): @singleton -class Logger: +class Logger(BaseLogger): """Logger is a singleton, it will be initialized when AINodeDescriptor is inited for the first time. You can just use Logger() to get it anywhere. @@ -78,66 +79,5 @@ class Logger: _lock: process lock for logger. This is just a precaution, we currently do not have multiprocessing """ - def __init__(self, log_dir=AINODE_LOG_DIR): - - self.logger_format = logging.Formatter( - fmt="%(asctime)s %(levelname)s %(" "message)s", datefmt="%Y-%m-%d %H:%M:%S" - ) - - self.logger = logging.getLogger(str(random.random())) - self.logger.handlers.clear() - self.logger.setLevel(logging.DEBUG) - self.console_handler = logging.StreamHandler(sys.stdout) - self.console_handler.setLevel(STD_LEVEL) - self.console_handler.setFormatter(self.logger_format) - - self.logger.addHandler(self.console_handler) - - if log_dir is not None: - file_names = AINODE_LOG_FILE_NAMES - file_levels = AINODE_LOG_FILE_LEVELS - if not os.path.exists(log_dir): - os.makedirs(log_dir) - os.chmod(log_dir, 0o777) - for file_name in file_names: - log_path = log_dir + "/" + file_name - if not os.path.exists(log_path): - f = open(log_path, mode="w", encoding="utf-8") - f.close() - os.chmod(log_path, 0o777) - self.file_handlers = [] - for l in range(len(file_names)): - self.file_handlers.append( - logging.FileHandler(log_dir + "/" + file_names[l], mode="a") - ) - self.file_handlers[l].setLevel(file_levels[l]) - self.file_handlers[l].setFormatter(self.logger_format) - - for file_handler in self.file_handlers: - self.logger.addHandler(file_handler) - else: - log_dir = "None" - - self.logger.addFilter(LoggerFilter()) - self._lock = threading.Lock() - self.info(f"Logger init successfully. Log will be written to {log_dir}") - - def debug(self, *args) -> None: - self._lock.acquire() - self.logger.debug(" ".join(map(str, args))) - self._lock.release() - - def info(self, *args) -> None: - self._lock.acquire() - self.logger.info(" ".join(map(str, args))) - self._lock.release() - - def warning(self, *args) -> None: - self._lock.acquire() - self.logger.warning(" ".join(map(str, args))) - self._lock.release() - - def error(self, *args) -> None: - self._lock.acquire() - self.logger.error(" ".join(map(str, args))) - self._lock.release() + def __init__(self, sub_dir: str = "log_ainode_all"): + super().__init__(sub_dir=sub_dir) diff --git a/iotdb-core/ainode/ainode/core/manager/inference_manager.py b/iotdb-core/ainode/ainode/core/manager/inference_manager.py index d331ea919eb..f5682a0ae53 100644 --- a/iotdb-core/ainode/ainode/core/manager/inference_manager.py +++ b/iotdb-core/ainode/ainode/core/manager/inference_manager.py @@ -163,7 +163,7 @@ class InferenceManager: # DEFAULT_DEVICE = "cpu" DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") DEFAULT_POOL_SIZE = ( - 0 # TODO: Remove these parameter by sampling model inference consumption + 1 # TODO: Remove these parameter by sampling model inference consumption ) WAITING_INTERVAL_IN_MS = ( AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms()
