This is an automated email from the ASF dual-hosted git repository. liyuheng pushed a commit to branch lyh/ainode/logger-refactor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 59a7250e2cd4403684429afdf76f22d030fe3c42 Author: liyuheng <[email protected]> AuthorDate: Wed Jul 30 19:17:51 2025 +0800 seems good --- .../ainode/ainode/core/{ainode.py => ai_node.py} | 0 iotdb-core/ainode/ainode/core/constant.py | 6 +- .../ainode/core/inference/inference_request.py | 4 + .../core/inference/inference_request_pool.py | 25 +++-- iotdb-core/ainode/ainode/core/log.py | 112 +-------------------- .../ainode/ainode/core/logger/base_logger.py | 90 +++++++++++++++++ .../ainode/core/manager/inference_manager.py | 26 ++++- iotdb-core/ainode/ainode/core/script.py | 2 +- .../ainode/resources/conf/iotdb-ainode.properties | 2 +- 9 files changed, 143 insertions(+), 124 deletions(-) diff --git a/iotdb-core/ainode/ainode/core/ainode.py b/iotdb-core/ainode/ainode/core/ai_node.py similarity index 100% rename from iotdb-core/ainode/ainode/core/ainode.py rename to iotdb-core/ainode/ainode/core/ai_node.py diff --git a/iotdb-core/ainode/ainode/core/constant.py b/iotdb-core/ainode/ainode/core/constant.py index bd5646b3513..99aa49d1851 100644 --- a/iotdb-core/ainode/ainode/core/constant.py +++ b/iotdb-core/ainode/ainode/core/constant.py @@ -34,11 +34,11 @@ AINODE_SYSTEM_FILE_NAME = "system.properties" # AINode cluster configuration AINODE_CLUSTER_NAME = "defaultCluster" -AINODE_TARGET_CONFIG_NODE_LIST = TEndPoint("127.0.0.1", 10710) +AINODE_TARGET_CONFIG_NODE_LIST = TEndPoint("127.0.0.1", 12710) AINODE_RPC_ADDRESS = "127.0.0.1" -AINODE_RPC_PORT = 10810 +AINODE_RPC_PORT = 12810 AINODE_CLUSTER_INGRESS_ADDRESS = "127.0.0.1" -AINODE_CLUSTER_INGRESS_PORT = 6667 +AINODE_CLUSTER_INGRESS_PORT = 13667 AINODE_CLUSTER_INGRESS_USERNAME = "root" AINODE_CLUSTER_INGRESS_PASSWORD = "root" AINODE_CLUSTER_INGRESS_TIME_ZONE = "UTC+8" diff --git a/iotdb-core/ainode/ainode/core/inference/inference_request.py b/iotdb-core/ainode/ainode/core/inference/inference_request.py index 2c45826fd26..714bde05bf0 100644 --- a/iotdb-core/ainode/ainode/core/inference/inference_request.py +++ b/iotdb-core/ainode/ainode/core/inference/inference_request.py @@ -63,11 +63,15 @@ class InferenceRequest: self.batch_size, max_new_tokens, device="cpu" ) # shape: [self.batch_size, max_new_steps] + self.logger = Logger() + def mark_running(self): self.state = InferenceRequestState.RUNNING + logger.info("mark_running") def mark_finished(self): self.state = InferenceRequestState.FINISHED + logger.info("mark_finished") def is_finished(self) -> bool: return ( diff --git a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py index b6c0edb7f60..7245d9c8bac 100644 --- a/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py +++ b/iotdb-core/ainode/ainode/core/inference/inference_request_pool.py @@ -26,14 +26,13 @@ import torch import torch.multiprocessing as mp from transformers import PretrainedConfig +from ainode.core.logger.base_logger import BaseLogger from ainode.core.config import AINodeDescriptor from ainode.core.inference.inference_request import InferenceRequest from ainode.core.inference.scheduler.basic_scheduler import BasicScheduler from ainode.core.log import Logger from ainode.core.manager.model_manager import ModelManager -logger = Logger() - class InferenceRequestPool(mp.Process): """ @@ -62,6 +61,11 @@ class InferenceRequestPool(mp.Process): self.model = None self._model_manager = None self.device = None + self.logger = BaseLogger(f"log_inference_rank_{self.device}") + self.logger.info("info") + self.logger.warning("warning") + self.logger.error("error") + self.logger.debug("debug") self._threads = [] self._waiting_queue = request_queue # Requests that are waiting to be processed @@ -89,7 +93,7 @@ class InferenceRequestPool(mp.Process): torch.cuda.reset_peak_memory_stats(self.device) torch.cuda.synchronize(self.device) memory_before_warmup = torch.cuda.memory_allocated(self.device) - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Before warm-up, peak memory usage: {memory_before_warmup:.2f} bytes" ) @@ -98,10 +102,10 @@ class InferenceRequestPool(mp.Process): self.model.generate(dummy_input, max_new_tokens=1) torch.cuda.synchronize(self.device) peak_memory_1_token = torch.cuda.max_memory_allocated(self.device) - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Baseline memory usage for 1 token: {peak_memory_1_token:.2f} bytes" ) - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Differentiation : {peak_memory_1_token-memory_before_warmup:.2f} bytes" ) @@ -113,7 +117,7 @@ class InferenceRequestPool(mp.Process): ) request.mark_running() self._running_queue.put(request) - logger.debug( + self.logger.debug( f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] Request is activated with inputs shape {request.inputs.shape}" ) @@ -123,6 +127,7 @@ class InferenceRequestPool(mp.Process): self._activate_requests() def _step(self): + self.logger.info("_step") requests = self._scheduler.schedule_step() # TODO: We need a batcher to accelerate the concurrent inference for request in requests: @@ -138,14 +143,14 @@ class InferenceRequestPool(mp.Process): request.inference_pipeline.post_decode() if request.is_finished(): request.inference_pipeline.post_inference() - logger.debug( + self.logger.debug( f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] Request is finished" ) # ensure the output tensor is on CPU before sending to result queue request.output_tensor = request.output_tensor.cpu() self._finished_queue.put(request) else: - logger.debug( + self.logger.debug( f"[Inference][Device-{self.device}][Pool-{self.pool_id}][ID-{request.req_id}] Request is not finished, re-queueing" ) self._waiting_queue.put(request) @@ -178,7 +183,7 @@ class InferenceRequestPool(mp.Process): def stop(self): self._stop_event.set() - logger.info( + self.logger.info( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Stopping and releasing resources." ) try: @@ -187,6 +192,6 @@ class InferenceRequestPool(mp.Process): torch.cuda.empty_cache() gc.collect() except Exception as e: - logger.warning( + self.logger.warning( f"[Inference][Device-{self.device}][Pool-{self.pool_id}] Failed to clean up: {e}" ) diff --git a/iotdb-core/ainode/ainode/core/log.py b/iotdb-core/ainode/ainode/core/log.py index 4e5d426a20e..eddccf33191 100644 --- a/iotdb-core/ainode/ainode/core/log.py +++ b/iotdb-core/ainode/ainode/core/log.py @@ -16,55 +16,12 @@ # under the License. # -import inspect -import logging -import multiprocessing -import os -import random -import sys -import threading - -from ainode.core.constant import ( - AINODE_LOG_DIR, - AINODE_LOG_FILE_LEVELS, - AINODE_LOG_FILE_NAMES, - STD_LEVEL, -) +from ainode.core.logger.base_logger import BaseLogger from ainode.core.util.decorator import singleton -class LoggerFilter(logging.Filter): - def filter(self, record): - record.msg = f"{self.custom_log_info()}: {record.msg}" - return True - - @staticmethod - def custom_log_info(): - frame = inspect.currentframe() - stack_trace = inspect.getouterframes(frame) - - pid = os.getpid() - process_name = multiprocessing.current_process().name - - stack_info = "" - frame_info = stack_trace[7] - file_name = frame_info.filename - # if file_name is not in current working directory, find the first "iotdb" in the path - for l in range(len(file_name)): - i = len(file_name) - l - 1 - if file_name[i:].startswith("iotdb/") or file_name[i:].startswith( - "iotdb\\" - ): - file_name = file_name[i:] - break - - stack_info += f"{file_name}:{frame_info.lineno}-{frame_info.function}" - - return f"[{pid}:{process_name}] {stack_info}" - - @singleton -class Logger: +class Logger(BaseLogger): """Logger is a singleton, it will be initialized when AINodeDescriptor is inited for the first time. You can just use Logger() to get it anywhere. @@ -78,66 +35,5 @@ class Logger: _lock: process lock for logger. This is just a precaution, we currently do not have multiprocessing """ - def __init__(self, log_dir=AINODE_LOG_DIR): - - self.logger_format = logging.Formatter( - fmt="%(asctime)s %(levelname)s %(" "message)s", datefmt="%Y-%m-%d %H:%M:%S" - ) - - self.logger = logging.getLogger(str(random.random())) - self.logger.handlers.clear() - self.logger.setLevel(logging.DEBUG) - self.console_handler = logging.StreamHandler(sys.stdout) - self.console_handler.setLevel(STD_LEVEL) - self.console_handler.setFormatter(self.logger_format) - - self.logger.addHandler(self.console_handler) - - if log_dir is not None: - file_names = AINODE_LOG_FILE_NAMES - file_levels = AINODE_LOG_FILE_LEVELS - if not os.path.exists(log_dir): - os.makedirs(log_dir) - os.chmod(log_dir, 0o777) - for file_name in file_names: - log_path = log_dir + "/" + file_name - if not os.path.exists(log_path): - f = open(log_path, mode="w", encoding="utf-8") - f.close() - os.chmod(log_path, 0o777) - self.file_handlers = [] - for l in range(len(file_names)): - self.file_handlers.append( - logging.FileHandler(log_dir + "/" + file_names[l], mode="a") - ) - self.file_handlers[l].setLevel(file_levels[l]) - self.file_handlers[l].setFormatter(self.logger_format) - - for file_handler in self.file_handlers: - self.logger.addHandler(file_handler) - else: - log_dir = "None" - - self.logger.addFilter(LoggerFilter()) - self._lock = threading.Lock() - self.info(f"Logger init successfully. Log will be written to {log_dir}") - - def debug(self, *args) -> None: - self._lock.acquire() - self.logger.debug(" ".join(map(str, args))) - self._lock.release() - - def info(self, *args) -> None: - self._lock.acquire() - self.logger.info(" ".join(map(str, args))) - self._lock.release() - - def warning(self, *args) -> None: - self._lock.acquire() - self.logger.warning(" ".join(map(str, args))) - self._lock.release() - - def error(self, *args) -> None: - self._lock.acquire() - self.logger.error(" ".join(map(str, args))) - self._lock.release() + def __init__(self, sub_dir: str = "log_ainode_all"): + super().__init__(sub_dir=sub_dir) diff --git a/iotdb-core/ainode/ainode/core/logger/base_logger.py b/iotdb-core/ainode/ainode/core/logger/base_logger.py new file mode 100644 index 00000000000..1608967fdee --- /dev/null +++ b/iotdb-core/ainode/ainode/core/logger/base_logger.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import logging +import os +import random +import sys +import threading +from logging.handlers import TimedRotatingFileHandler + +from ainode.core.constant import ( + AINODE_LOG_DIR, + AINODE_LOG_FILE_LEVELS, + AINODE_LOG_FILE_NAMES, + STD_LEVEL, +) + + +class BaseLogger: + + def __init__( + self, + sub_dir: str + ): + self.logger_format = logging.Formatter( + fmt="%(asctime)s %(levelname)s [%(process)d:%(processName)s] " \ + "%(filename)s:%(funcName)s:%(lineno)d - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + + self._lock = threading.Lock() + + self.logger = logging.getLogger(str(random.random())) + self.logger.handlers.clear() + self.logger.setLevel(logging.DEBUG) + self.console_handler = logging.StreamHandler(sys.stdout) + self.console_handler.setLevel(STD_LEVEL) + self.console_handler.setFormatter(self.logger_format) + self.logger.addHandler(self.console_handler) + + target_dir = os.path.join(AINODE_LOG_DIR, sub_dir) + os.makedirs(target_dir, exist_ok=True) + os.chmod(target_dir, 0o755) + + for i in range(len(AINODE_LOG_FILE_NAMES)): + file_name = AINODE_LOG_FILE_NAMES[i] + # create log file if not exist + file_path = os.path.join(target_dir, f"{file_name}") + if not os.path.exists(file_path): + with open(file_path, "w", encoding="utf-8"): + pass + os.chmod(file_path, 0o644) + # create handler + file_level = AINODE_LOG_FILE_LEVELS[i] + fh = TimedRotatingFileHandler( + filename=os.path.join(target_dir, f"{file_name}"), + when="MIDNIGHT", + interval=1, + encoding="utf-8", + ) + fh.setLevel(file_level) + fh.setFormatter(self.logger_format) + self.logger.addHandler(fh) + + self.info(f"Logger init successfully. Log will be written to {target_dir}") + + # interfaces + def debug(self, *msg): self._write(self.logger.debug, *msg) + def info(self, *msg): self._write(self.logger.info, *msg) + def warning(self, *msg): self._write(self.logger.warning, *msg) + def error(self, *msg): self._write(self.logger.error, *msg) + + def _write(self, function, *msg): + with self._lock: + function(" ".join(map(str, msg)), stacklevel=3) \ No newline at end of file diff --git a/iotdb-core/ainode/ainode/core/manager/inference_manager.py b/iotdb-core/ainode/ainode/core/manager/inference_manager.py index 5a853ac4e72..f5682a0ae53 100644 --- a/iotdb-core/ainode/ainode/core/manager/inference_manager.py +++ b/iotdb-core/ainode/ainode/core/manager/inference_manager.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # +import os import threading import time from abc import ABC, abstractmethod @@ -72,6 +73,28 @@ class InferenceStrategy(ABC): # we only get valueList currently. class TimerXLStrategy(InferenceStrategy): def infer(self, full_data, predict_length=96, **_): + if torch.cuda.is_available(): + device = next(self.model.parameters()).device + else: + device = torch.device("cpu") + # Get possible rank + if torch.distributed.is_initialized(): + global_rank = torch.distributed.get_rank() + world_size = torch.distributed.get_world_size() + else: + # Not distribution, default rank=0, world_size=1 + global_rank, world_size = 0, 1 + + if device.type == "cuda": + gpu_name = torch.cuda.get_device_name(device.index) + logger.info( + f"[rank {global_rank}/{world_size}] " + f"Running on GPU {device.index} ({gpu_name})" + ) + else: + logger.info(f"[rank {global_rank}/{world_size}] Running on CPU") + + logger.info("Start inference") data = full_data[1][0] if data.dtype.byteorder not in ("=", "|"): data = data.byteswap().newbyteorder() @@ -79,6 +102,7 @@ class TimerXLStrategy(InferenceStrategy): # TODO: unify model inference input output = self.model.generate(seqs, max_new_tokens=predict_length, revin=True) df = pd.DataFrame(output[0]) + logger.info("Complete inference") return convert_to_binary(df) @@ -139,7 +163,7 @@ class InferenceManager: # DEFAULT_DEVICE = "cpu" DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") DEFAULT_POOL_SIZE = ( - 0 # TODO: Remove these parameter by sampling model inference consumption + 1 # TODO: Remove these parameter by sampling model inference consumption ) WAITING_INTERVAL_IN_MS = ( AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms() diff --git a/iotdb-core/ainode/ainode/core/script.py b/iotdb-core/ainode/ainode/core/script.py index 8cccfec5d17..a01ce691dc7 100644 --- a/iotdb-core/ainode/ainode/core/script.py +++ b/iotdb-core/ainode/ainode/core/script.py @@ -21,7 +21,7 @@ import sys import torch.multiprocessing as mp -from ainode.core.ainode import AINode +from ainode.core.ai_node import AINode from ainode.core.config import AINodeDescriptor from ainode.core.constant import TSStatusCode from ainode.core.exception import MissingConfigError diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties index b31079ce00d..6c8c28e3f26 100644 --- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties +++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties @@ -19,7 +19,7 @@ # Used for indicate cluster name and distinguish different cluster. # Datatype: string -cluster_name=defaultCluster +cluster_name=???? # ConfigNode address registered at AINode startup. # Allow modifications only before starting the service for the first time.
