Copilot commented on code in PR #16311:
URL: https://github.com/apache/iotdb/pull/16311#discussion_r2336245615


##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -45,35 +55,57 @@ class PoolController:
     A controller for handling inference request pools.
     """
 
-    DEFAULT_DEVICE = torch.device("cpu")
-    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
-
     def __init__(self, result_queue: mp.Queue):
-        # structure: {model_id: PoolGroup}
-        self._request_pool_map: Dict[str, PoolGroup] = {}
+        # structure: {model_id: {device_id: PoolGroup}}
+        self._request_pool_map: Dict[str, Dict[str, PoolGroup]] = {}
+        self._new_pool_id = AtomicInt()
         self._result_queue = result_queue
         self._pool_scheduler = BasicPoolScheduler(self._request_pool_map)
+        self._stop_event = threading.Event()
+
+        # for pool instances control
+        self._task_queue = queue.Queue()
+        self._pool_control_worker_thread = threading.Thread(
+            target=self._worker_loop, daemon=True
+        )
+        self._pool_control_worker_thread.start()
+        self._executor = BatchExecutor(
+            thread_name_prefix=ThreadName.INFERENCE_POOL_CONTROLLER.value
+        )
 
+    # =============== Pool Management ===============
     @synchronized(threading.Lock())
     def first_req_init(self, model_id: str):
-        if not self.has_request_pools(model_id):
-            actions = self._pool_scheduler.schedule(model_id)
+        """
+        Initialize the pools when the first request for the given model_id 
arrives.
+        """
+        if not self.has_request_pools(model_id, device.index):
+            # TODO: choose a device based on some strategy
+            device = self.DEFAULT_DEVICE

Review Comment:
   The code checks `device.index` but then assigns `self.DEFAULT_DEVICE` to 
`device`, which suggests `device` is undefined at the check. This will cause a 
NameError.
   ```suggestion
           device = self.DEFAULT_DEVICE
           if not self.has_request_pools(model_id, device.index):
               # TODO: choose a device based on some strategy
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -45,35 +55,57 @@ class PoolController:
     A controller for handling inference request pools.
     """
 
-    DEFAULT_DEVICE = torch.device("cpu")
-    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
-
     def __init__(self, result_queue: mp.Queue):
-        # structure: {model_id: PoolGroup}
-        self._request_pool_map: Dict[str, PoolGroup] = {}
+        # structure: {model_id: {device_id: PoolGroup}}
+        self._request_pool_map: Dict[str, Dict[str, PoolGroup]] = {}
+        self._new_pool_id = AtomicInt()
         self._result_queue = result_queue
         self._pool_scheduler = BasicPoolScheduler(self._request_pool_map)
+        self._stop_event = threading.Event()
+
+        # for pool instances control
+        self._task_queue = queue.Queue()
+        self._pool_control_worker_thread = threading.Thread(
+            target=self._worker_loop, daemon=True
+        )
+        self._pool_control_worker_thread.start()
+        self._executor = BatchExecutor(
+            thread_name_prefix=ThreadName.INFERENCE_POOL_CONTROLLER.value
+        )
 
+    # =============== Pool Management ===============
     @synchronized(threading.Lock())
     def first_req_init(self, model_id: str):
-        if not self.has_request_pools(model_id):
-            actions = self._pool_scheduler.schedule(model_id)
+        """
+        Initialize the pools when the first request for the given model_id 
arrives.
+        """
+        if not self.has_request_pools(model_id, device.index):
+            # TODO: choose a device based on some strategy
+            device = self.DEFAULT_DEVICE
+            actions = self._pool_scheduler.schedule(model_id, device)

Review Comment:
   The `schedule` method is being called with 2 parameters, but based on the 
abstract class definition, it should only take `model_id` as a parameter.
   ```suggestion
               actions = self._pool_scheduler.schedule(model_id)
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -325,7 +325,6 @@ private TSExecuteStatementResp executeStatementInternal(
       ExecutionResult result;
       if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) {
         Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());

Review Comment:
   [nitpick] Removing the blank line between variable declaration and 
conditional check reduces code readability. Consider maintaining the blank line 
for better visual separation.
   ```suggestion
           Statement s = StatementGenerator.createStatement(statement, 
clientSession.getZoneId());
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_scheduler/basic_pool_scheduler.py:
##########
@@ -30,20 +30,82 @@
     ScaleActionType,
 )
 from iotdb.ainode.core.log import Logger
-from iotdb.ainode.core.manager.utils import estimate_pool_size
+from iotdb.ainode.core.manager.utils import (
+    INFERENCE_EXTRA_MEMORY_RATIO,
+    INFERENCE_MEMORY_USAGE_RATIO,
+    MODEL_MEM_USAGE_MAP,
+    estimate_pool_size,
+    evaluate_system_resources,
+)
+from iotdb.ainode.core.model.model_info import BUILT_IN_LTSM_MAP
+from iotdb.ainode.core.util.gpu_mapping import 
convert_device_id_to_torch_device
 
 logger = Logger()
 
 
-class BasicPoolScheduler(AbstractPoolScheduler):
+def _estimate_shared_pool_size_by_total_mem(
+    device: torch.device,
+    existing_model_ids: List[str],
+    new_model_id: Optional[str] = None,
+) -> Dict[str, int]:
     """
-    A basic scheduler to init the request pools.
+    Estimate pool counts for (existing_model_ids + new_model_id) by equally
+    splitting the device's TOTAL memory among models.
+
+    Returns:
+        mapping {model_id: pool_num}
     """
+    # Extract unique model IDs
+    all_models = existing_model_ids + (
+        [new_model_id] if new_model_id is not None else []
+    )
+
+    # Seize memory usage for each model
+    mem_usages: Dict[str, float] = {}
+    for model_id in all_models:
+        model_info = BUILT_IN_LTSM_MAP.get(model_id)
+        model_type = model_info.model_type
+        mem_usages[model_id] = (
+            MODEL_MEM_USAGE_MAP[model_type] * INFERENCE_EXTRA_MEMORY_RATIO
+        )
+
+    # Evaluate system resources and get TOTAL memory
+    system_res = evaluate_system_resources(device)
+    # TODO: Its better to consider free memory, but we need to track the 
memory usage of existing pools
+    total_mem = system_res.get("total_mem")
 
-    DEFAULT_DEVICE = torch.device("cpu")
-    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
+    usable_mem = total_mem * INFERENCE_MEMORY_USAGE_RATIO
+    if usable_mem <= 0:
+        logger.error(
+            f"[Inference][Device-{device}] No usable memory on device. 
total={total_mem / 1024 ** 2:.2f} MB, usable={usable_mem / 1024 ** 2:.2f} MB"
+        )
 
-    def __init__(self, request_pool_map: Dict[str, PoolGroup]):
+    # Each model gets an equal share of the TOTAL memory
+    num_models = len(all_models)
+    per_model_share = usable_mem / num_models  # TODO: Implement more 
strategies later
+
+    # Calculate pool allocation for each model
+    allocation: Dict[str, int] = {}
+    for model_id in all_models:
+        pool_num = int(per_model_share // mem_usages[model_id])
+        if pool_num <= 0:
+            logger.warning(
+                f"[Inference][Device-{device}] Not enough TOTAL memory to 
guarantee at least 1 pool for model {model_id}, not pool will be scheduled for 
this model. "

Review Comment:
   There's a typo: 'not pool will be scheduled' should be 'no pool will be 
scheduled'.
   ```suggestion
                   f"[Inference][Device-{device}] Not enough TOTAL memory to 
guarantee at least 1 pool for model {model_id}, no pool will be scheduled for 
this model. "
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py:
##########
@@ -242,10 +260,8 @@ def _run(
                     predict_length,
                 )
 
-            if model_id in self.ACCELERATE_MODEL_ID and "cuda" in str(
-                self.DEFAULT_DEVICE
-            ):
-                # TODO: Logic in this branch shall handle all LTSM inferences
+            if self._pool_controller.has_request_pools(model_id=model_id):

Review Comment:
   The method `has_request_pools` is called with a keyword argument 
`model_id=model_id`, but the method definition expects positional arguments. 
This will cause a TypeError.
   ```suggestion
               if self._pool_controller.has_request_pools(model_id):
   ```



##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -45,35 +55,57 @@ class PoolController:
     A controller for handling inference request pools.
     """
 
-    DEFAULT_DEVICE = torch.device("cpu")
-    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
-
     def __init__(self, result_queue: mp.Queue):
-        # structure: {model_id: PoolGroup}
-        self._request_pool_map: Dict[str, PoolGroup] = {}
+        # structure: {model_id: {device_id: PoolGroup}}
+        self._request_pool_map: Dict[str, Dict[str, PoolGroup]] = {}
+        self._new_pool_id = AtomicInt()
         self._result_queue = result_queue
         self._pool_scheduler = BasicPoolScheduler(self._request_pool_map)
+        self._stop_event = threading.Event()
+
+        # for pool instances control
+        self._task_queue = queue.Queue()
+        self._pool_control_worker_thread = threading.Thread(
+            target=self._worker_loop, daemon=True
+        )
+        self._pool_control_worker_thread.start()
+        self._executor = BatchExecutor(
+            thread_name_prefix=ThreadName.INFERENCE_POOL_CONTROLLER.value
+        )
 
+    # =============== Pool Management ===============
     @synchronized(threading.Lock())
     def first_req_init(self, model_id: str):
-        if not self.has_request_pools(model_id):
-            actions = self._pool_scheduler.schedule(model_id)
+        """
+        Initialize the pools when the first request for the given model_id 
arrives.
+        """
+        if not self.has_request_pools(model_id, device.index):
+            # TODO: choose a device based on some strategy
+            device = self.DEFAULT_DEVICE
+            actions = self._pool_scheduler.schedule(model_id, device)
             for action in actions:
                 if action.action == ScaleActionType.SCALE_UP:
                     # initialize the first pool
-                    self._first_pool_init(action.model_id)
+                    self._first_pool_init(action.model_id, str(device))
                     # start a background thread to expand pools
                     expand_thread = threading.Thread(
-                        target=self._expand_pools,
-                        args=(action.model_id, 1, action.amount - 1),
+                        target=self._expand_pools_on_device,
+                        args=(action.model_id, str(device), action.amount - 1),
                         daemon=True,
                     )
                     expand_thread.start()
                 elif action.action == ScaleActionType.SCALE_DOWN:
                     # TODO: implement scale down logic
                     pass
 
-    def _first_pool_init(self, model_id: str):
+    def _first_pool_init(self, model_id: str, device_str: str):
+        """
+        Initialize the first pool for the given model_id.
+        Ensure the pool is ready before returning.
+        """
+        device = torch.device(device_str)
+        device_id = device.index

Review Comment:
   The code tries to access `device.index` but for CPU devices, 
`torch.device('cpu')` doesn't have an `index` attribute. This will raise an 
AttributeError for CPU devices.
   ```suggestion
           device_id = device.index if device.type == 'cuda' else 'cpu'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to