This is an automated email from the ASF dual-hosted git repository.

liyuheng pushed a commit to branch lyh/ainode/logger-refactor
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 59a7250e2cd4403684429afdf76f22d030fe3c42
Author: liyuheng <[email protected]>
AuthorDate: Wed Jul 30 19:17:51 2025 +0800

    seems good
---
 .../ainode/ainode/core/{ainode.py => ai_node.py}   |   0
 iotdb-core/ainode/ainode/core/constant.py          |   6 +-
 .../ainode/core/inference/inference_request.py     |   4 +
 .../core/inference/inference_request_pool.py       |  25 +++--
 iotdb-core/ainode/ainode/core/log.py               | 112 +--------------------
 .../ainode/ainode/core/logger/base_logger.py       |  90 +++++++++++++++++
 .../ainode/core/manager/inference_manager.py       |  26 ++++-
 iotdb-core/ainode/ainode/core/script.py            |   2 +-
 .../ainode/resources/conf/iotdb-ainode.properties  |   2 +-
 9 files changed, 143 insertions(+), 124 deletions(-)

diff --git a/iotdb-core/ainode/ainode/core/ainode.py 
b/iotdb-core/ainode/ainode/core/ai_node.py
similarity index 100%
rename from iotdb-core/ainode/ainode/core/ainode.py
rename to iotdb-core/ainode/ainode/core/ai_node.py
diff --git a/iotdb-core/ainode/ainode/core/constant.py 
b/iotdb-core/ainode/ainode/core/constant.py
index bd5646b3513..99aa49d1851 100644
--- a/iotdb-core/ainode/ainode/core/constant.py
+++ b/iotdb-core/ainode/ainode/core/constant.py
@@ -34,11 +34,11 @@ AINODE_SYSTEM_FILE_NAME = "system.properties"
 
 # AINode cluster configuration
 AINODE_CLUSTER_NAME = "defaultCluster"
-AINODE_TARGET_CONFIG_NODE_LIST = TEndPoint("127.0.0.1", 10710)
+AINODE_TARGET_CONFIG_NODE_LIST = TEndPoint("127.0.0.1", 12710)
 AINODE_RPC_ADDRESS = "127.0.0.1"
-AINODE_RPC_PORT = 10810
+AINODE_RPC_PORT = 12810
 AINODE_CLUSTER_INGRESS_ADDRESS = "127.0.0.1"
-AINODE_CLUSTER_INGRESS_PORT = 6667
+AINODE_CLUSTER_INGRESS_PORT = 13667
 AINODE_CLUSTER_INGRESS_USERNAME = "root"
 AINODE_CLUSTER_INGRESS_PASSWORD = "root"
 AINODE_CLUSTER_INGRESS_TIME_ZONE = "UTC+8"
diff --git a/iotdb-core/ainode/ainode/core/inference/inference_request.py 
b/iotdb-core/ainode/ainode/core/inference/inference_request.py
index 2c45826fd26..714bde05bf0 100644
--- a/iotdb-core/ainode/ainode/core/inference/inference_request.py
+++ b/iotdb-core/ainode/ainode/core/inference/inference_request.py
@@ -63,11 +63,15 @@ class InferenceRequest:
             self.batch_size, max_new_tokens, device="cpu"
         )  # shape: [self.batch_size, max_new_steps]
 
+        self.logger = Logger()
+
     def mark_running(self):
         self.state = InferenceRequestState.RUNNING
+        logger.info("mark_running")
 
     def mark_finished(self):
         self.state = InferenceRequestState.FINISHED
+        logger.info("mark_finished")
 
     def is_finished(self) -> bool:
         return (
diff --git a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py 
b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py
index b6c0edb7f60..7245d9c8bac 100644
--- a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py
+++ b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py
@@ -26,14 +26,13 @@ import torch
 import torch.multiprocessing as mp
 from transformers import PretrainedConfig
 
+from ainode.core.logger.base_logger import BaseLogger
 from ainode.core.config import AINodeDescriptor
 from ainode.core.inference.inference_request import InferenceRequest
 from ainode.core.inference.scheduler.basic_scheduler import BasicScheduler
 from ainode.core.log import Logger
 from ainode.core.manager.model_manager import ModelManager
 
-logger = Logger()
-
 
 class InferenceRequestPool(mp.Process):
     """
@@ -62,6 +61,11 @@ class InferenceRequestPool(mp.Process):
         self.model = None
         self._model_manager = None
         self.device = None
+        self.logger = BaseLogger(f"log_inference_rank_{self.device}")
+        self.logger.info("info")
+        self.logger.warning("warning")
+        self.logger.error("error")
+        self.logger.debug("debug")
 
         self._threads = []
         self._waiting_queue = request_queue  # Requests that are waiting to be 
processed
@@ -89,7 +93,7 @@ class InferenceRequestPool(mp.Process):
         torch.cuda.reset_peak_memory_stats(self.device)
         torch.cuda.synchronize(self.device)
         memory_before_warmup = torch.cuda.memory_allocated(self.device)
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Before 
warm-up, peak memory usage: {memory_before_warmup:.2f} bytes"
         )
 
@@ -98,10 +102,10 @@ class InferenceRequestPool(mp.Process):
             self.model.generate(dummy_input, max_new_tokens=1)
         torch.cuda.synchronize(self.device)
         peak_memory_1_token = torch.cuda.max_memory_allocated(self.device)
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Baseline 
memory usage for 1 token: {peak_memory_1_token:.2f} bytes"
         )
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] 
Differentiation : {peak_memory_1_token-memory_before_warmup:.2f} bytes"
         )
 
@@ -113,7 +117,7 @@ class InferenceRequestPool(mp.Process):
             )
             request.mark_running()
             self._running_queue.put(request)
-            logger.debug(
+            self.logger.debug(
                 
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is activated with inputs shape {request.inputs.shape}"
             )
 
@@ -123,6 +127,7 @@ class InferenceRequestPool(mp.Process):
             self._activate_requests()
 
     def _step(self):
+        self.logger.info("_step")
         requests = self._scheduler.schedule_step()
         # TODO: We need a batcher to accelerate the concurrent inference
         for request in requests:
@@ -138,14 +143,14 @@ class InferenceRequestPool(mp.Process):
             request.inference_pipeline.post_decode()
             if request.is_finished():
                 request.inference_pipeline.post_inference()
-                logger.debug(
+                self.logger.debug(
                     
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is finished"
                 )
                 # ensure the output tensor is on CPU before sending to result 
queue
                 request.output_tensor = request.output_tensor.cpu()
                 self._finished_queue.put(request)
             else:
-                logger.debug(
+                self.logger.debug(
                     
f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] 
Request is not finished, re-queueing"
                 )
                 self._waiting_queue.put(request)
@@ -178,7 +183,7 @@ class InferenceRequestPool(mp.Process):
 
     def stop(self):
         self._stop_event.set()
-        logger.info(
+        self.logger.info(
             f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Stopping 
and releasing resources."
         )
         try:
@@ -187,6 +192,6 @@ class InferenceRequestPool(mp.Process):
                 torch.cuda.empty_cache()
             gc.collect()
         except Exception as e:
-            logger.warning(
+            self.logger.warning(
                 f"[Inference][Device-{self.device}][Pool-{self.pool_id}] 
Failed to clean up: {e}"
             )
diff --git a/iotdb-core/ainode/ainode/core/log.py 
b/iotdb-core/ainode/ainode/core/log.py
index 4e5d426a20e..eddccf33191 100644
--- a/iotdb-core/ainode/ainode/core/log.py
+++ b/iotdb-core/ainode/ainode/core/log.py
@@ -16,55 +16,12 @@
 # under the License.
 #
 
-import inspect
-import logging
-import multiprocessing
-import os
-import random
-import sys
-import threading
-
-from ainode.core.constant import (
-    AINODE_LOG_DIR,
-    AINODE_LOG_FILE_LEVELS,
-    AINODE_LOG_FILE_NAMES,
-    STD_LEVEL,
-)
+from ainode.core.logger.base_logger import BaseLogger
 from ainode.core.util.decorator import singleton
 
 
-class LoggerFilter(logging.Filter):
-    def filter(self, record):
-        record.msg = f"{self.custom_log_info()}: {record.msg}"
-        return True
-
-    @staticmethod
-    def custom_log_info():
-        frame = inspect.currentframe()
-        stack_trace = inspect.getouterframes(frame)
-
-        pid = os.getpid()
-        process_name = multiprocessing.current_process().name
-
-        stack_info = ""
-        frame_info = stack_trace[7]
-        file_name = frame_info.filename
-        # if file_name is not in current working directory, find the first 
"iotdb" in the path
-        for l in range(len(file_name)):
-            i = len(file_name) - l - 1
-            if file_name[i:].startswith("iotdb/") or file_name[i:].startswith(
-                "iotdb\\"
-            ):
-                file_name = file_name[i:]
-                break
-
-        stack_info += f"{file_name}:{frame_info.lineno}-{frame_info.function}"
-
-        return f"[{pid}:{process_name}] {stack_info}"
-
-
 @singleton
-class Logger:
+class Logger(BaseLogger):
     """Logger is a singleton, it will be initialized when AINodeDescriptor is 
inited for the first time.
         You can just use Logger() to get it anywhere.
 
@@ -78,66 +35,5 @@ class Logger:
     _lock: process lock for logger. This is just a precaution, we currently do 
not have multiprocessing
     """
 
-    def __init__(self, log_dir=AINODE_LOG_DIR):
-
-        self.logger_format = logging.Formatter(
-            fmt="%(asctime)s %(levelname)s %(" "message)s", datefmt="%Y-%m-%d 
%H:%M:%S"
-        )
-
-        self.logger = logging.getLogger(str(random.random()))
-        self.logger.handlers.clear()
-        self.logger.setLevel(logging.DEBUG)
-        self.console_handler = logging.StreamHandler(sys.stdout)
-        self.console_handler.setLevel(STD_LEVEL)
-        self.console_handler.setFormatter(self.logger_format)
-
-        self.logger.addHandler(self.console_handler)
-
-        if log_dir is not None:
-            file_names = AINODE_LOG_FILE_NAMES
-            file_levels = AINODE_LOG_FILE_LEVELS
-            if not os.path.exists(log_dir):
-                os.makedirs(log_dir)
-                os.chmod(log_dir, 0o777)
-            for file_name in file_names:
-                log_path = log_dir + "/" + file_name
-                if not os.path.exists(log_path):
-                    f = open(log_path, mode="w", encoding="utf-8")
-                    f.close()
-                    os.chmod(log_path, 0o777)
-            self.file_handlers = []
-            for l in range(len(file_names)):
-                self.file_handlers.append(
-                    logging.FileHandler(log_dir + "/" + file_names[l], 
mode="a")
-                )
-                self.file_handlers[l].setLevel(file_levels[l])
-                self.file_handlers[l].setFormatter(self.logger_format)
-
-            for file_handler in self.file_handlers:
-                self.logger.addHandler(file_handler)
-        else:
-            log_dir = "None"
-
-        self.logger.addFilter(LoggerFilter())
-        self._lock = threading.Lock()
-        self.info(f"Logger init successfully. Log will be written to 
{log_dir}")
-
-    def debug(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.debug(" ".join(map(str, args)))
-        self._lock.release()
-
-    def info(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.info(" ".join(map(str, args)))
-        self._lock.release()
-
-    def warning(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.warning(" ".join(map(str, args)))
-        self._lock.release()
-
-    def error(self, *args) -> None:
-        self._lock.acquire()
-        self.logger.error(" ".join(map(str, args)))
-        self._lock.release()
+    def __init__(self, sub_dir: str = "log_ainode_all"):
+        super().__init__(sub_dir=sub_dir)
diff --git a/iotdb-core/ainode/ainode/core/logger/base_logger.py 
b/iotdb-core/ainode/ainode/core/logger/base_logger.py
new file mode 100644
index 00000000000..1608967fdee
--- /dev/null
+++ b/iotdb-core/ainode/ainode/core/logger/base_logger.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import logging
+import os
+import random
+import sys
+import threading
+from logging.handlers import TimedRotatingFileHandler
+
+from ainode.core.constant import (
+    AINODE_LOG_DIR,
+    AINODE_LOG_FILE_LEVELS,
+    AINODE_LOG_FILE_NAMES,
+    STD_LEVEL,
+)
+
+
+class BaseLogger:
+
+    def __init__(
+            self,
+            sub_dir: str
+    ):
+        self.logger_format = logging.Formatter(
+            fmt="%(asctime)s %(levelname)s [%(process)d:%(processName)s] " \
+                "%(filename)s:%(funcName)s:%(lineno)d - %(message)s",
+            datefmt="%Y-%m-%d %H:%M:%S"
+        )
+
+        self._lock = threading.Lock()
+
+        self.logger = logging.getLogger(str(random.random()))
+        self.logger.handlers.clear()
+        self.logger.setLevel(logging.DEBUG)
+        self.console_handler = logging.StreamHandler(sys.stdout)
+        self.console_handler.setLevel(STD_LEVEL)
+        self.console_handler.setFormatter(self.logger_format)
+        self.logger.addHandler(self.console_handler)
+
+        target_dir = os.path.join(AINODE_LOG_DIR, sub_dir)
+        os.makedirs(target_dir, exist_ok=True)
+        os.chmod(target_dir, 0o755)
+
+        for i in range(len(AINODE_LOG_FILE_NAMES)):
+            file_name = AINODE_LOG_FILE_NAMES[i]
+            # create log file if not exist
+            file_path = os.path.join(target_dir, f"{file_name}")
+            if not os.path.exists(file_path):
+                with open(file_path, "w", encoding="utf-8"):
+                    pass
+                os.chmod(file_path, 0o644)
+            # create handler
+            file_level = AINODE_LOG_FILE_LEVELS[i]
+            fh = TimedRotatingFileHandler(
+                filename=os.path.join(target_dir, f"{file_name}"),
+                when="MIDNIGHT",
+                interval=1,
+                encoding="utf-8",
+            )
+            fh.setLevel(file_level)
+            fh.setFormatter(self.logger_format)
+            self.logger.addHandler(fh)
+
+        self.info(f"Logger init successfully. Log will be written to 
{target_dir}")
+
+    # interfaces
+    def debug(self, *msg):   self._write(self.logger.debug,   *msg)
+    def info(self, *msg):    self._write(self.logger.info,    *msg)
+    def warning(self, *msg): self._write(self.logger.warning, *msg)
+    def error(self, *msg):   self._write(self.logger.error,   *msg)
+
+    def _write(self, function, *msg):
+        with self._lock:
+            function(" ".join(map(str, msg)), stacklevel=3)
\ No newline at end of file
diff --git a/iotdb-core/ainode/ainode/core/manager/inference_manager.py 
b/iotdb-core/ainode/ainode/core/manager/inference_manager.py
index 5a853ac4e72..f5682a0ae53 100644
--- a/iotdb-core/ainode/ainode/core/manager/inference_manager.py
+++ b/iotdb-core/ainode/ainode/core/manager/inference_manager.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import os
 import threading
 import time
 from abc import ABC, abstractmethod
@@ -72,6 +73,28 @@ class InferenceStrategy(ABC):
 # we only get valueList currently.
 class TimerXLStrategy(InferenceStrategy):
     def infer(self, full_data, predict_length=96, **_):
+        if torch.cuda.is_available():
+            device = next(self.model.parameters()).device
+        else:
+            device = torch.device("cpu")
+        # Get possible rank
+        if torch.distributed.is_initialized():
+            global_rank = torch.distributed.get_rank()
+            world_size  = torch.distributed.get_world_size()
+        else:
+            # Not distribution, default rank=0, world_size=1
+            global_rank, world_size = 0, 1
+
+        if device.type == "cuda":
+            gpu_name = torch.cuda.get_device_name(device.index)
+            logger.info(
+                f"[rank {global_rank}/{world_size}] "
+                f"Running on GPU {device.index} ({gpu_name})"
+            )
+        else:
+            logger.info(f"[rank {global_rank}/{world_size}] Running on CPU")
+
+        logger.info("Start inference")
         data = full_data[1][0]
         if data.dtype.byteorder not in ("=", "|"):
             data = data.byteswap().newbyteorder()
@@ -79,6 +102,7 @@ class TimerXLStrategy(InferenceStrategy):
         # TODO: unify model inference input
         output = self.model.generate(seqs, max_new_tokens=predict_length, 
revin=True)
         df = pd.DataFrame(output[0])
+        logger.info("Complete inference")
         return convert_to_binary(df)
 
 
@@ -139,7 +163,7 @@ class InferenceManager:
     # DEFAULT_DEVICE = "cpu"
     DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
     DEFAULT_POOL_SIZE = (
-        0  # TODO: Remove these parameter by sampling model inference 
consumption
+        1  # TODO: Remove these parameter by sampling model inference 
consumption
     )
     WAITING_INTERVAL_IN_MS = (
         
AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms()
diff --git a/iotdb-core/ainode/ainode/core/script.py 
b/iotdb-core/ainode/ainode/core/script.py
index 8cccfec5d17..a01ce691dc7 100644
--- a/iotdb-core/ainode/ainode/core/script.py
+++ b/iotdb-core/ainode/ainode/core/script.py
@@ -21,7 +21,7 @@ import sys
 
 import torch.multiprocessing as mp
 
-from ainode.core.ainode import AINode
+from ainode.core.ai_node import AINode
 from ainode.core.config import AINodeDescriptor
 from ainode.core.constant import TSStatusCode
 from ainode.core.exception import MissingConfigError
diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties 
b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
index b31079ce00d..6c8c28e3f26 100644
--- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
+++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
@@ -19,7 +19,7 @@
 
 # Used for indicate cluster name and distinguish different cluster.
 # Datatype: string
-cluster_name=defaultCluster
+cluster_name=????
 
 # ConfigNode address registered at AINode startup.
 # Allow modifications only before starting the service for the first time.

Reply via email to