CRZbulabula commented on code in PR #16131:
URL: https://github.com/apache/iotdb/pull/16131#discussion_r2264257828


##########
iotdb-core/ainode/ainode/core/inference/request_manager.py:
##########
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import threading
+import time
+from ainode.core.config import AINodeDescriptor
+from ainode.core.inference.inference_request import (
+    InferenceRequest,
+    InferenceRequestProxy,
+)
+from ainode.core.log import Logger
+import torch.multiprocessing as mp
+from ainode.core.inference.pool_scheduler import PoolScheduler
+from ainode.core.inference.pool_manager import PoolManager
+
+logger = Logger()
+
+
+class RequestManager:
+    """
+    Manages the lifecycle of inference requests and their associated resources.
+    """
+
+    WAITING_INTERVAL_IN_MS = (
+        
AINodeDescriptor().get_config().get_ain_inference_batch_interval_in_ms()
+    )  # How often to check for requests in the result queue
+
+    def __init__(self):
+        self._result_queue = mp.Queue()
+        self._result_wrapper_map = {}
+        self._result_wrapper_lock = threading.RLock()
+
+        self._stop_event = mp.Event()
+        self._result_handler_thread = threading.Thread(
+            target=self._handle_results, daemon=True
+        )
+        self._result_handler_thread.start()
+        self._pool_manager = PoolManager()
+        self._pool_scheduler = PoolScheduler(self._pool_manager, 
self._result_queue)
+
+    def _handle_results(self):
+        while not self._stop_event.is_set():
+            if self._result_queue.empty():
+                time.sleep(self.WAITING_INTERVAL_IN_MS / 1000)
+                continue
+            infer_req: InferenceRequest = self._result_queue.get()
+            self._pool_manager.remove_request(infer_req.model_id, 
infer_req.req_id)
+            with self._result_wrapper_lock:
+                self._result_wrapper_map[infer_req.req_id].set_result(
+                    infer_req.get_final_output()
+                )
+
+    def process_request(self, req):
+        infer_proxy = InferenceRequestProxy(req.req_id)
+        with self._result_wrapper_lock:
+            self._result_wrapper_map[req.req_id] = infer_proxy
+        # lazy initialization for first request
+        model_id = req.model_id
+        if model_id not in self._pool_manager.get_request_pool_map():
+            self._pool_scheduler._first_req_init(model_id)

Review Comment:
   Get the whole pool map for judging the existence is not a good 
implementation. I suggest to add a new interface in the `pool_manager`, which 
checks whether the specified pools exist. Subsequently, every requests will 
enter the synchronized function and wait until the first pool is initialized; 
otherwise, every requests can be dispatched immediately.



##########
iotdb-core/ainode/ainode/core/inference/request_manager.py:
##########
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import threading
+import time
+from ainode.core.config import AINodeDescriptor
+from ainode.core.inference.inference_request import (
+    InferenceRequest,
+    InferenceRequestProxy,
+)
+from ainode.core.log import Logger
+import torch.multiprocessing as mp
+from ainode.core.inference.pool_scheduler import PoolScheduler
+from ainode.core.inference.pool_manager import PoolManager
+
+logger = Logger()
+
+
+class RequestManager:

Review Comment:
   The term `manager` corresponds to highest abstract under the `RPCService`, 
use a different name to distinguish between the different levels.



##########
iotdb-core/ainode/ainode/core/inference/pool_manager.py:
##########
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from collections import defaultdict
+import threading
+from ainode.core.inference.inference_request import InferenceRequest
+from ainode.core.log import Logger
+from ainode.core.exception import (
+    InferenceModelInternalError,
+)
+from typing import Dict, Optional
+import torch.multiprocessing as mp
+from ainode.core.inference.inference_request_pool import InferenceRequestPool, 
PoolState
+import torch
+from ainode.core.inference.inference_request_pool_group import PoolGroup
+
+logger = Logger()
+
+
+class PoolManager:
+    """
+    A manager for handling inference request pools.
+    It handles the registration of pools, adding and removing requests,
+    and managing the state of each pool.
+    """
+
+    DEFAULT_DEVICE = torch.device("cpu")
+    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
+
+    def __init__(self):
+        # structure: {model_id: {pool_id: set(req_id)}}
+        self.pool_to_reqs: Dict[str, Dict[int, set[str]]] = defaultdict(
+            lambda: defaultdict(set)
+        )
+        # structure: {req_id: pool_id}
+        self.req_to_pool: Dict[str, int] = {}
+        # structure: {model_id: {pool_id: PoolState}}
+        self.pool_states: Dict[str, Dict[int, PoolState]] = defaultdict(dict)
+        # structure: {model_id: {pool_id: threading.RLock}}
+        self.pool_locks: Dict[str, Dict[int, threading.RLock]] = defaultdict(
+            lambda: defaultdict(threading.RLock)
+        )
+        # structure: {model_id: {pool_id: PoolGroup}}
+        self._request_pool_map: Dict[str, PoolGroup] = {}
+
+    def dispatch_request(self, model_id, req: InferenceRequest):
+        pool_idx = self._get_optimal_pool(model_id)
+        if pool_idx is None:
+            raise InferenceModelInternalError("No available pool for model")
+        self.add_request(pool_idx, req)
+        logger.debug(
+            
f"[Inference][Device-{self.DEFAULT_DEVICE}][Pool-{pool_idx}][ID-{req.req_id}] 
Request is queued for inference"
+        )
+
+    def _get_optimal_pool(self, model_id) -> int:
+        loads = []
+        for pool_idx in self.get_pool_ids(model_id):
+            load_count = self.get_load(model_id, pool_idx)
+            loads.append((pool_idx, load_count))
+        min_idx = min(loads, key=lambda x: x[1])[0]
+        return min_idx

Review Comment:
   In our current stage, I suggest we just employ intuitive schedule 
algorithms, eg. hash, random and round robin. Because probing the load of pools 
results in transimission between processes, or maintaining the `pool_to_reqs` 
and `req_to_pools`.



##########
iotdb-core/ainode/ainode/core/inference/pool_manager.py:
##########
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from collections import defaultdict
+import threading
+from ainode.core.inference.inference_request import InferenceRequest
+from ainode.core.log import Logger
+from ainode.core.exception import (
+    InferenceModelInternalError,
+)
+from typing import Dict, Optional
+import torch.multiprocessing as mp
+from ainode.core.inference.inference_request_pool import InferenceRequestPool, 
PoolState
+import torch
+from ainode.core.inference.inference_request_pool_group import PoolGroup
+
+logger = Logger()
+
+
+class PoolManager:
+    """
+    A manager for handling inference request pools.
+    It handles the registration of pools, adding and removing requests,
+    and managing the state of each pool.
+    """
+
+    DEFAULT_DEVICE = torch.device("cpu")
+    # DEFAULT_DEVICE = torch.device("cuda" if torch.cuda.is_available() else 
"cpu")
+
+    def __init__(self):
+        # structure: {model_id: {pool_id: set(req_id)}}
+        self.pool_to_reqs: Dict[str, Dict[int, set[str]]] = defaultdict(
+            lambda: defaultdict(set)
+        )
+        # structure: {req_id: pool_id}
+        self.req_to_pool: Dict[str, int] = {}
+        # structure: {model_id: {pool_id: PoolState}}
+        self.pool_states: Dict[str, Dict[int, PoolState]] = defaultdict(dict)
+        # structure: {model_id: {pool_id: threading.RLock}}
+        self.pool_locks: Dict[str, Dict[int, threading.RLock]] = defaultdict(
+            lambda: defaultdict(threading.RLock)
+        )

Review Comment:
   Should further distinguish the read and write locks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to