This is an automated email from the ASF dual-hosted git repository.

liyuheng pushed a commit to branch lyh/ainode/logger-refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 084e71b0730eeb1f1a9ba9ca80a8cf2ddfd2d54a
Author: liyuheng <[email protected]>
AuthorDate: Thu Jul 31 23:01:45 2025 +0800

    almost work
---
 .../ainode/ainode/core/{log.py => base_logger.py}  | 114 +++++++++------------
 .../core/inference/inference_request_pool.py       |  25 +++--
 iotdb-core/ainode/ainode/core/log.py               |  68 +-----------
 .../ainode/core/manager/inference_manager.py       |   2 +-
 4 files changed, 68 insertions(+), 141 deletions(-)

diff --git a/iotdb-core/ainode/ainode/core/log.py 
b/iotdb-core/ainode/ainode/core/base_logger.py
similarity index 53%
copy from iotdb-core/ainode/ainode/core/log.py
copy to iotdb-core/ainode/ainode/core/base_logger.py
index 4e5d426a20e..2e05d5dc749 100644
--- a/iotdb-core/ainode/ainode/core/log.py
+++ b/iotdb-core/ainode/ainode/core/base_logger.py
@@ -23,6 +23,7 @@ import os
 import random
 import sys
 import threading
+from logging.handlers import TimedRotatingFileHandler
 
 from ainode.core.constant import (
     AINODE_LOG_DIR,
@@ -30,7 +31,6 @@ from ainode.core.constant import (
     AINODE_LOG_FILE_NAMES,
     STD_LEVEL,
 )
-from ainode.core.util.decorator import singleton
 
 
 class LoggerFilter(logging.Filter):
@@ -53,7 +53,7 @@ class LoggerFilter(logging.Filter):
         for l in range(len(file_name)):
             i = len(file_name) - l - 1
             if file_name[i:].startswith("iotdb/") or file_name[i:].startswith(
-                "iotdb\\"
+                    "iotdb\\"
             ):
                 file_name = file_name[i:]
                 break
@@ -63,81 +63,63 @@ class LoggerFilter(logging.Filter):
         return f"[{pid}:{process_name}] {stack_info}"
 
 
-@singleton
-class Logger:
-    """Logger is a singleton, it will be initialized when AINodeDescriptor is 
inited for the first time.
-        You can just use Logger() to get it anywhere.
-
-    Args:
-        log_dir: log directory
-
-    logger_format: log format
-    logger: global logger with custom format and level
-    file_handlers: file handlers for different levels
-    console_handler: console handler for stdout
-    _lock: process lock for logger. This is just a precaution, we currently do 
not have multiprocessing
-    """
-
-    def __init__(self, log_dir=AINODE_LOG_DIR):
+class BaseLogger:
 
+    def __init__(
+            self,
+            sub_dir: str,
+            name: str | None = None,
+            std_level: int = logging.INFO,
+    ):
         self.logger_format = logging.Formatter(
             fmt="%(asctime)s %(levelname)s %(" "message)s", datefmt="%Y-%m-%d 
%H:%M:%S"
         )
 
+        self._lock = threading.Lock()
+
         self.logger = logging.getLogger(str(random.random()))
         self.logger.handlers.clear()
         self.logger.setLevel(logging.DEBUG)
         self.console_handler = logging.StreamHandler(sys.stdout)
         self.console_handler.setLevel(STD_LEVEL)
         self.console_handler.setFormatter(self.logger_format)
-
         self.logger.addHandler(self.console_handler)
 
-        if log_dir is not None:
-            file_names = AINODE_LOG_FILE_NAMES
-            file_levels = AINODE_LOG_FILE_LEVELS
-            if not os.path.exists(log_dir):
-                os.makedirs(log_dir)
-                os.chmod(log_dir, 0o777)
-            for file_name in file_names:
-                log_path = log_dir + "/" + file_name
-                if not os.path.exists(log_path):
-                    f = open(log_path, mode="w", encoding="utf-8")
-                    f.close()
-                    os.chmod(log_path, 0o777)
-            self.file_handlers = []
-            for l in range(len(file_names)):
-                self.file_handlers.append(
-                    logging.FileHandler(log_dir + "/" + file_names[l], 
mode="a")
-                )
-                self.file_handlers[l].setLevel(file_levels[l])
-                self.file_handlers[l].setFormatter(self.logger_format)
-
-            for file_handler in self.file_handlers:
-                self.logger.addHandler(file_handler)
-        else:
-            log_dir = "None"
-
+        # 2. 文件(按等级拆分,每天 0 点轮转)
+        target_dir = os.path.join(AINODE_LOG_DIR, sub_dir)
+        os.makedirs(target_dir, exist_ok=True)
+        os.chmod(target_dir, 0o700) # TODO 权限多少?
+
+        for i in range(len(AINODE_LOG_FILE_NAMES)):
+            file_name = AINODE_LOG_FILE_NAMES[i]
+            # create log file if not exist
+            file_path = os.path.join(target_dir, f"{file_name}")
+            if not os.path.exists(file_path):
+                with open(file_path, "w", encoding="utf-8"):
+                    pass
+                os.chmod(file_path, 0o777)
+            # create handler
+            file_level = AINODE_LOG_FILE_LEVELS[i]
+            fh = TimedRotatingFileHandler(
+                filename=os.path.join(target_dir, f"{file_name}"),
+                when="MIDNIGHT",
+                interval=1,
+                encoding="utf-8",
+            )
+            fh.setLevel(file_level)
+            fh.setFormatter(self.logger_format)
+            self.logger.addHandler(fh)
+
+        # 3. 统一 Filter
         self.logger.addFilter(LoggerFilter())
-        self._lock = threading.Lock()
-        self.info(f"Logger init successfully. Log will be written to 
{log_dir}")
-
-    def debug(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.debug(" ".join(map(str, args)))
-        self._lock.release()
-
-    def info(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.info(" ".join(map(str, args)))
-        self._lock.release()
-
-    def warning(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.warning(" ".join(map(str, args)))
-        self._lock.release()
-
-    def error(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.error(" ".join(map(str, args)))
-        self._lock.release()
+        self.info(f"Logger init successfully. Log will be written to 
{target_dir}")
+
+    # interfaces
+    def debug(self, *msg):   self._write(self.logger.debug,   *msg)
+    def info(self, *msg):    self._write(self.logger.info,    *msg)
+    def warning(self, *msg): self._write(self.logger.warning, *msg)
+    def error(self, *msg):   self._write(self.logger.error,   *msg)
+
+    def _write(self, fn, *msg):
+        with self._lock:
+            fn(" ".join(map(str, msg)))
\ No newline at end of file
diff --git a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py 
b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py
index b6c0edb7f60..99c942a9f5f 100644
--- a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py
+++ b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py
@@ -26,14 +26,13 @@ import torch
 import torch.multiprocessing as mp
 from transformers import PretrainedConfig
 
+from ainode.core.base_logger import BaseLogger
 from ainode.core.config import AINodeDescriptor
 from ainode.core.inference.inference_request import InferenceRequest
 from ainode.core.inference.scheduler.basic_scheduler import BasicScheduler
 from ainode.core.log import Logger
 from ainode.core.manager.model_manager import ModelManager
 
-logger = Logger()
-
 
 class InferenceRequestPool(mp.Process):
     """
@@ -62,6 +61,11 @@ class InferenceRequestPool(mp.Process):
         self.model = None
         self._model_manager = None
         self.device = None
+        self.logger = BaseLogger(f"log_inference_rank_{self.device}")
+        self.logger.info("info")
+        self.logger.warning("warning")
+        self.logger.error("error")
+        self.logger.debug("debug")
 
         self._threads = []
         self._waiting_queue = request_queue  # Requests that are waiting to be 
processed
@@ -89,7 +93,7 @@ class InferenceRequestPool(mp.Process):
         torch.cuda.reset_peak_memory_stats(self.device)
         torch.cuda.synchronize(self.device)
         memory_before_warmup = torch.cuda.memory_allocated(self.device)
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Before 
warm-up, peak memory usage: {memory_before_warmup:.2f} bytes"
         )
 
@@ -98,10 +102,10 @@ class InferenceRequestPool(mp.Process):
             self.model.generate(dummy_input, max_new_tokens=1)
         torch.cuda.synchronize(self.device)
         peak_memory_1_token = torch.cuda.max_memory_allocated(self.device)
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Baseline 
memory usage for 1 token: {peak_memory_1_token:.2f} bytes"
         )
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] 
Differentiation : {peak_memory_1_token-memory_before_warmup:.2f} bytes"
         )
 
@@ -113,7 +117,7 @@ class InferenceRequestPool(mp.Process):
             )
             request.mark_running()
             self._running_queue.put(request)
-            logger.debug(
+            self.logger.debug(
                 
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is activated with inputs shape {request.inputs.shape}"
             )
 
@@ -123,6 +127,7 @@ class InferenceRequestPool(mp.Process):
             self._activate_requests()
 
     def _step(self):
+        self.logger.info("_step")
         requests = self._scheduler.schedule_step()
         # TODO: We need a batcher to accelerate the concurrent inference
         for request in requests:
@@ -138,14 +143,14 @@ class InferenceRequestPool(mp.Process):
             request.inference_pipeline.post_decode()
             if request.is_finished():
                 request.inference_pipeline.post_inference()
-                logger.debug(
+                self.logger.debug(
                     
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
                 )
                 # ensure the output tensor is on CPU before sending to result 
queue
                 request.output_tensor = request.output_tensor.cpu()
                 self._finished_queue.put(request)
             else:
-                logger.debug(
+                self.logger.debug(
                     
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
                 )
                 self._waiting_queue.put(request)
@@ -178,7 +183,7 @@ class InferenceRequestPool(mp.Process):
 
     def stop(self):
         self._stop_event.set()
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Stopping 
and releasing resources."
         )
         try:
@@ -187,6 +192,6 @@ class InferenceRequestPool(mp.Process):
                 torch.cuda.empty_cache()
             gc.collect()
         except Exception as e:
-            logger.warning(
+            self.logger.warning(
                 f"[Inference][Device-{self.device}][Pool-{self.pool_id}] 
Failed to clean up: {e}"
             )
diff --git a/iotdb-core/ainode/ainode/core/log.py 
b/iotdb-core/ainode/ainode/core/log.py
index 4e5d426a20e..b225dd11b4c 100644
--- a/iotdb-core/ainode/ainode/core/log.py
+++ b/iotdb-core/ainode/ainode/core/log.py
@@ -24,6 +24,7 @@ import random
 import sys
 import threading
 
+from ainode.core.base_logger import BaseLogger
 from ainode.core.constant import (
     AINODE_LOG_DIR,
     AINODE_LOG_FILE_LEVELS,
@@ -64,7 +65,7 @@ class LoggerFilter(logging.Filter):
 
 
 @singleton
-class Logger:
+class Logger(BaseLogger):
     """Logger is a singleton, it will be initialized when AINodeDescriptor is 
inited for the first time.
         You can just use Logger() to get it anywhere.
 
@@ -78,66 +79,5 @@ class Logger:
     _lock: process lock for logger. This is just a precaution, we currently do 
not have multiprocessing
     """
 
-    def __init__(self, log_dir=AINODE_LOG_DIR):
-
-        self.logger_format = logging.Formatter(
-            fmt="%(asctime)s %(levelname)s %(" "message)s", datefmt="%Y-%m-%d 
%H:%M:%S"
-        )
-
-        self.logger = logging.getLogger(str(random.random()))
-        self.logger.handlers.clear()
-        self.logger.setLevel(logging.DEBUG)
-        self.console_handler = logging.StreamHandler(sys.stdout)
-        self.console_handler.setLevel(STD_LEVEL)
-        self.console_handler.setFormatter(self.logger_format)
-
-        self.logger.addHandler(self.console_handler)
-
-        if log_dir is not None:
-            file_names = AINODE_LOG_FILE_NAMES
-            file_levels = AINODE_LOG_FILE_LEVELS
-            if not os.path.exists(log_dir):
-                os.makedirs(log_dir)
-                os.chmod(log_dir, 0o777)
-            for file_name in file_names:
-                log_path = log_dir + "/" + file_name
-                if not os.path.exists(log_path):
-                    f = open(log_path, mode="w", encoding="utf-8")
-                    f.close()
-                    os.chmod(log_path, 0o777)
-            self.file_handlers = []
-            for l in range(len(file_names)):
-                self.file_handlers.append(
-                    logging.FileHandler(log_dir + "/" + file_names[l], 
mode="a")
-                )
-                self.file_handlers[l].setLevel(file_levels[l])
-                self.file_handlers[l].setFormatter(self.logger_format)
-
-            for file_handler in self.file_handlers:
-                self.logger.addHandler(file_handler)
-        else:
-            log_dir = "None"
-
-        self.logger.addFilter(LoggerFilter())
-        self._lock = threading.Lock()
-        self.info(f"Logger init successfully. Log will be written to 
{log_dir}")
-
-    def debug(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.debug(" ".join(map(str, args)))
-        self._lock.release()
-
-    def info(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.info(" ".join(map(str, args)))
-        self._lock.release()
-
-    def warning(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.warning(" ".join(map(str, args)))
-        self._lock.release()
-
-    def error(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.error(" ".join(map(str, args)))
-        self._lock.release()
+    def __init__(self, sub_dir: str = "log_ainode_all"):
+        super().__init__(sub_dir=sub_dir)
diff --git a/iotdb-core/ainode/ainode/core/manager/inference_manager.py 
b/iotdb-core/ainode/ainode/core/manager/inference_manager.py
index d331ea919eb..f5682a0ae53 100644
--- a/iotdb-core/ainode/ainode/core/manager/inference_manager.py
+++ b/iotdb-core/ainode/ainode/core/manager/inference_manager.py
@@ -163,7 +163,7 @@ class InferenceManager:
     # DEFAULT_DEVICE = "cpu"
     DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
     DEFAULT_POOL_SIZE = (
-        0  # TODO: Remove these parameter by sampling model inference 
consumption
+        1  # TODO: Remove these parameter by sampling model inference 
consumption
     )
     WAITING_INTERVAL_IN_MS = (
         
AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms()

Reply via email to