Copilot commented on code in PR #16311:
URL: https://github.com/apache/iotdb/pull/16311#discussion_r2336245615
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -45,35 +55,57 @@ class PoolController:
A controller for handling inference request pools.
"""
- DEFAULT_DEVICE = torch.device("cpu")
- # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else
"cpu")
-
def __init__(self, result_queue: mp.Queue):
- # structure: {model_id: PoolGroup}
- self._request_pool_map: Dict[str, PoolGroup] = {}
+ # structure: {model_id: {device_id: PoolGroup}}
+ self._request_pool_map: Dict[str, Dict[str, PoolGroup]] = {}
+ self._new_pool_id = AtomicInt()
self._result_queue = result_queue
self._pool_scheduler = BasicPoolScheduler(self._request_pool_map)
+ self._stop_event = threading.Event()
+
+ # for pool instances control
+ self._task_queue = queue.Queue()
+ self._pool_control_worker_thread = threading.Thread(
+ target=self._worker_loop, daemon=True
+ )
+ self._pool_control_worker_thread.start()
+ self._executor = BatchExecutor(
+ thread_name_prefix=ThreadName.INFERENCE_POOL_CONTROLLER.value
+ )
+ # =============== Pool Management ===============
@synchronized(threading.Lock())
def first_req_init(self, model_id: str):
- if not self.has_request_pools(model_id):
- actions = self._pool_scheduler.schedule(model_id)
+ """
+ Initialize the pools when the first request for the given model_id
arrives.
+ """
+ if not self.has_request_pools(model_id, device.index):
+ # TODO: choose a device based on some strategy
+ device = self.DEFAULT_DEVICE
Review Comment:
The code checks `device.index` but then assigns `self.DEFAULT_DEVICE` to
`device`, which suggests `device` is undefined at the check. This will cause a
NameError.
```suggestion
device = self.DEFAULT_DEVICE
if not self.has_request_pools(model_id, device.index):
# TODO: choose a device based on some strategy
```
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -45,35 +55,57 @@ class PoolController:
A controller for handling inference request pools.
"""
- DEFAULT_DEVICE = torch.device("cpu")
- # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else
"cpu")
-
def __init__(self, result_queue: mp.Queue):
- # structure: {model_id: PoolGroup}
- self._request_pool_map: Dict[str, PoolGroup] = {}
+ # structure: {model_id: {device_id: PoolGroup}}
+ self._request_pool_map: Dict[str, Dict[str, PoolGroup]] = {}
+ self._new_pool_id = AtomicInt()
self._result_queue = result_queue
self._pool_scheduler = BasicPoolScheduler(self._request_pool_map)
+ self._stop_event = threading.Event()
+
+ # for pool instances control
+ self._task_queue = queue.Queue()
+ self._pool_control_worker_thread = threading.Thread(
+ target=self._worker_loop, daemon=True
+ )
+ self._pool_control_worker_thread.start()
+ self._executor = BatchExecutor(
+ thread_name_prefix=ThreadName.INFERENCE_POOL_CONTROLLER.value
+ )
+ # =============== Pool Management ===============
@synchronized(threading.Lock())
def first_req_init(self, model_id: str):
- if not self.has_request_pools(model_id):
- actions = self._pool_scheduler.schedule(model_id)
+ """
+ Initialize the pools when the first request for the given model_id
arrives.
+ """
+ if not self.has_request_pools(model_id, device.index):
+ # TODO: choose a device based on some strategy
+ device = self.DEFAULT_DEVICE
+ actions = self._pool_scheduler.schedule(model_id, device)
Review Comment:
The `schedule` method is being called with 2 parameters, but based on the
abstract class definition, it should only take `model_id` as a parameter.
```suggestion
actions = self._pool_scheduler.schedule(model_id)
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java:
##########
@@ -325,7 +325,6 @@ private TSExecuteStatementResp executeStatementInternal(
ExecutionResult result;
if (clientSession.getSqlDialect() == IClientSession.SqlDialect.TREE) {
Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
Review Comment:
[nitpick] Removing the blank line between variable declaration and
conditional check reduces code readability. Consider maintaining the blank line
for better visual separation.
```suggestion
Statement s = StatementGenerator.createStatement(statement,
clientSession.getZoneId());
```
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_scheduler/basic_pool_scheduler.py:
##########
@@ -30,20 +30,82 @@
ScaleActionType,
)
from iotdb.ainode.core.log import Logger
-from iotdb.ainode.core.manager.utils import estimate_pool_size
+from iotdb.ainode.core.manager.utils import (
+ INFERENCE_EXTRA_MEMORY_RATIO,
+ INFERENCE_MEMORY_USAGE_RATIO,
+ MODEL_MEM_USAGE_MAP,
+ estimate_pool_size,
+ evaluate_system_resources,
+)
+from iotdb.ainode.core.model.model_info import BUILT_IN_LTSM_MAP
+from iotdb.ainode.core.util.gpu_mapping import
convert_device_id_to_torch_device
logger = Logger()
-class BasicPoolScheduler(AbstractPoolScheduler):
+def _estimate_shared_pool_size_by_total_mem(
+ device: torch.device,
+ existing_model_ids: List[str],
+ new_model_id: Optional[str] = None,
+) -> Dict[str, int]:
"""
- A basic scheduler to init the request pools.
+ Estimate pool counts for (existing_model_ids + new_model_id) by equally
+ splitting the device's TOTAL memory among models.
+
+ Returns:
+ mapping {model_id: pool_num}
"""
+ # Extract unique model IDs
+ all_models = existing_model_ids + (
+ [new_model_id] if new_model_id is not None else []
+ )
+
+ # Seize memory usage for each model
+ mem_usages: Dict[str, float] = {}
+ for model_id in all_models:
+ model_info = BUILT_IN_LTSM_MAP.get(model_id)
+ model_type = model_info.model_type
+ mem_usages[model_id] = (
+ MODEL_MEM_USAGE_MAP[model_type] * INFERENCE_EXTRA_MEMORY_RATIO
+ )
+
+ # Evaluate system resources and get TOTAL memory
+ system_res = evaluate_system_resources(device)
+ # TODO: Its better to consider free memory, but we need to track the
memory usage of existing pools
+ total_mem = system_res.get("total_mem")
- DEFAULT_DEVICE = torch.device("cpu")
- # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else
"cpu")
+ usable_mem = total_mem * INFERENCE_MEMORY_USAGE_RATIO
+ if usable_mem <= 0:
+ logger.error(
+ f"[Inference][Device-{device}] No usable memory on device.
total={total_mem / 1024 ** 2:.2f} MB, usable={usable_mem / 1024 ** 2:.2f} MB"
+ )
- def __init__(self, request_pool_map: Dict[str, PoolGroup]):
+ # Each model gets an equal share of the TOTAL memory
+ num_models = len(all_models)
+ per_model_share = usable_mem / num_models # TODO: Implement more
strategies later
+
+ # Calculate pool allocation for each model
+ allocation: Dict[str, int] = {}
+ for model_id in all_models:
+ pool_num = int(per_model_share // mem_usages[model_id])
+ if pool_num <= 0:
+ logger.warning(
+ f"[Inference][Device-{device}] Not enough TOTAL memory to
guarantee at least 1 pool for model {model_id}, not pool will be scheduled for
this model. "
Review Comment:
There's a typo: 'not pool will be scheduled' should be 'no pool will be
scheduled'.
```suggestion
f"[Inference][Device-{device}] Not enough TOTAL memory to
guarantee at least 1 pool for model {model_id}, no pool will be scheduled for
this model. "
```
##########
iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py:
##########
@@ -242,10 +260,8 @@ def _run(
predict_length,
)
- if model_id in self.ACCELERATE_MODEL_ID and "cuda" in str(
- self.DEFAULT_DEVICE
- ):
- # TODO: Logic in this branch shall handle all LTSM inferences
+ if self._pool_controller.has_request_pools(model_id=model_id):
Review Comment:
The method `has_request_pools` is called with a keyword argument
`model_id=model_id`, but the method definition expects positional arguments.
This will cause a TypeError.
```suggestion
if self._pool_controller.has_request_pools(model_id):
```
##########
iotdb-core/ainode/iotdb/ainode/core/inference/pool_controller.py:
##########
@@ -45,35 +55,57 @@ class PoolController:
A controller for handling inference request pools.
"""
- DEFAULT_DEVICE = torch.device("cpu")
- # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else
"cpu")
-
def __init__(self, result_queue: mp.Queue):
- # structure: {model_id: PoolGroup}
- self._request_pool_map: Dict[str, PoolGroup] = {}
+ # structure: {model_id: {device_id: PoolGroup}}
+ self._request_pool_map: Dict[str, Dict[str, PoolGroup]] = {}
+ self._new_pool_id = AtomicInt()
self._result_queue = result_queue
self._pool_scheduler = BasicPoolScheduler(self._request_pool_map)
+ self._stop_event = threading.Event()
+
+ # for pool instances control
+ self._task_queue = queue.Queue()
+ self._pool_control_worker_thread = threading.Thread(
+ target=self._worker_loop, daemon=True
+ )
+ self._pool_control_worker_thread.start()
+ self._executor = BatchExecutor(
+ thread_name_prefix=ThreadName.INFERENCE_POOL_CONTROLLER.value
+ )
+ # =============== Pool Management ===============
@synchronized(threading.Lock())
def first_req_init(self, model_id: str):
- if not self.has_request_pools(model_id):
- actions = self._pool_scheduler.schedule(model_id)
+ """
+ Initialize the pools when the first request for the given model_id
arrives.
+ """
+ if not self.has_request_pools(model_id, device.index):
+ # TODO: choose a device based on some strategy
+ device = self.DEFAULT_DEVICE
+ actions = self._pool_scheduler.schedule(model_id, device)
for action in actions:
if action.action == ScaleActionType.SCALE_UP:
# initialize the first pool
- self._first_pool_init(action.model_id)
+ self._first_pool_init(action.model_id, str(device))
# start a background thread to expand pools
expand_thread = threading.Thread(
- target=self._expand_pools,
- args=(action.model_id, 1, action.amount - 1),
+ target=self._expand_pools_on_device,
+ args=(action.model_id, str(device), action.amount - 1),
daemon=True,
)
expand_thread.start()
elif action.action == ScaleActionType.SCALE_DOWN:
# TODO: implement scale down logic
pass
- def _first_pool_init(self, model_id: str):
+ def _first_pool_init(self, model_id: str, device_str: str):
+ """
+ Initialize the first pool for the given model_id.
+ Ensure the pool is ready before returning.
+ """
+ device = torch.device(device_str)
+ device_id = device.index
Review Comment:
The code tries to access `device.index` but for CPU devices,
`torch.device('cpu')` doesn't have an `index` attribute. This will raise an
AttributeError for CPU devices.
```suggestion
device_id = device.index if device.type == 'cuda' else 'cpu'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]