This is an automated email from the ASF dual-hosted git repository.

ycycse pushed a commit to branch ycy/refactorAINode
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ycy/refactorAINode by this 
push:
     new 7cc44cf5e81 refactor ainode (has bug needed to be fixed)
7cc44cf5e81 is described below

commit 7cc44cf5e8112889b469837bb6d2e15d26be535f
Author: ycycse <[email protected]>
AuthorDate: Fri Sep 6 00:20:17 2024 +0800

    refactor ainode (has bug needed to be fixed)
---
 iotdb-core/ainode/iotdb/ainode/client.py           | 232 +++++----------
 iotdb-core/ainode/iotdb/ainode/config.py           | 244 +++++++---------
 iotdb-core/ainode/iotdb/ainode/exception.py        |  23 +-
 iotdb-core/ainode/iotdb/ainode/factory.py          | 272 ------------------
 iotdb-core/ainode/iotdb/ainode/handler.py          |  16 +-
 iotdb-core/ainode/iotdb/ainode/log.py              |  44 ++-
 iotdb-core/ainode/iotdb/ainode/manager/__init__.py |   2 +-
 .../{inference.py => manager/inference_manager.py} | 193 +++++++------
 .../ainode/iotdb/ainode/manager/model_manager.py   |  34 +--
 iotdb-core/ainode/iotdb/ainode/model/__init__.py   |   2 +-
 .../built_in_model_factory.py}                     | 313 +++++++++++++++++++--
 .../model/{model_fetcher.py => model_factory.py}   |  92 +++++-
 .../ainode/iotdb/ainode/model/model_storage.py     |  40 ++-
 iotdb-core/ainode/iotdb/ainode/parser.py           | 191 -------------
 iotdb-core/ainode/iotdb/ainode/script.py           | 181 ++++++++----
 iotdb-core/ainode/iotdb/ainode/service.py          | 102 +------
 iotdb-core/ainode/iotdb/ainode/util.py             |  79 ------
 .../ainode/{encryption.py => util/__init__.py}     |   0
 .../ainode/{encryption.py => util/decorator.py}    |   9 +
 iotdb-core/ainode/iotdb/ainode/{ => util}/serde.py |   0
 .../iotdb/ainode/{encryption.py => util/status.py} |  16 ++
 21 files changed, 908 insertions(+), 1177 deletions(-)

diff --git a/iotdb-core/ainode/iotdb/ainode/client.py 
b/iotdb-core/ainode/iotdb/ainode/client.py
index ecc5bb40f89..5168b833731 100644
--- a/iotdb-core/ainode/iotdb/ainode/client.py
+++ b/iotdb-core/ainode/iotdb/ainode/client.py
@@ -17,152 +17,70 @@
 #
 import time
 
-import pandas as pd
 from thrift.Thrift import TException
 from thrift.protocol import TCompactProtocol, TBinaryProtocol
 from thrift.transport import TSocket, TTransport
 
-from iotdb.ainode import serde
-from iotdb.ainode.config import descriptor
+from iotdb.ainode.config import AINodeDescriptor
 from iotdb.ainode.constant import TSStatusCode
-from iotdb.ainode.log import logger
-from iotdb.ainode.util import verify_success
+from iotdb.ainode.log import Logger
+from iotdb.ainode.util.status import verify_success
 from iotdb.thrift.common.ttypes import TEndPoint, TSStatus, TAINodeLocation, 
TAINodeConfiguration
 from iotdb.thrift.confignode import IConfigNodeRPCService
 from iotdb.thrift.confignode.ttypes import (TAINodeRemoveReq, TNodeVersionInfo,
                                             TAINodeRegisterReq, 
TAINodeRestartReq)
-from iotdb.thrift.datanode import IAINodeInternalRPCService
-from iotdb.thrift.datanode.ttypes import (TFetchMoreDataReq,
-                                          TFetchTimeseriesReq)
 
 
 class ClientManager(object):
     def __init__(self):
-        self.__data_node_endpoint = 
descriptor.get_config().get_mn_target_data_node()
-        self.__config_node_endpoint = 
descriptor.get_config().get_ain_target_config_node_list()
-
-    def borrow_data_node_client(self):
-        return DataNodeClient(host=self.__data_node_endpoint.ip,
-                              port=self.__data_node_endpoint.port)
+        self._config_node_endpoint = 
AINodeDescriptor().get_config().get_ain_target_config_node_list()
 
     def borrow_config_node_client(self):
-        return ConfigNodeClient(config_leader=self.__config_node_endpoint)
-
-
-class DataNodeClient(object):
-    DEFAULT_FETCH_SIZE = 10000
-    DEFAULT_TIMEOUT = 60000
-
-    def __init__(self, host, port):
-        self.__host = host
-        self.__port = port
-
-        transport = TTransport.TFramedTransport(
-            TSocket.TSocket(self.__host, self.__port)
-        )
-        if not transport.isOpen():
-            try:
-                transport.open()
-            except TTransport.TTransportException as e:
-                logger.error("TTransportException: {}".format(e))
-                raise e
-
-        if descriptor.get_config().get_ain_thrift_compression_enabled():
-            protocol = TCompactProtocol.TCompactProtocol(transport)
-        else:
-            protocol = TBinaryProtocol.TBinaryProtocol(transport)
-        self.__client = IAINodeInternalRPCService.Client(protocol)
-
-    def fetch_timeseries(self,
-                         query_body: str,
-                         fetch_size: int = DEFAULT_FETCH_SIZE,
-                         timeout: int = DEFAULT_TIMEOUT) -> pd.DataFrame:
-        req = TFetchTimeseriesReq(
-            queryBody=query_body,
-            fetchSize=fetch_size,
-            timeout=timeout
-        )
-        try:
-            resp = self.__client.fetchTimeseries(req)
-            verify_success(resp.status, "An error occurs when calling 
fetch_timeseries()")
-
-            if len(resp.tsDataset) == 0:
-                raise RuntimeError(f'No data fetched with sql: {query_body}')
-            data = serde.convert_to_df(resp.columnNameList,
-                                       resp.columnTypeList,
-                                       resp.columnNameIndexMap,
-                                       resp.tsDataset)
-            if data.empty:
-                raise RuntimeError(
-                    f'Fetched empty data with sql: {query_body}')
-        except Exception as e:
-            logger.warning(
-                f'Fail to fetch data with sql: {query_body}')
-            raise e
-        query_id = resp.queryId
-        column_name_list = resp.columnNameList
-        column_type_list = resp.columnTypeList
-        column_name_index_map = resp.columnNameIndexMap
-        has_more_data = resp.hasMoreData
-        while has_more_data:
-            req = TFetchMoreDataReq(queryId=query_id, fetchSize=fetch_size)
-            try:
-                resp = self.__client.fetchMoreData(req)
-                verify_success(resp.status, "An error occurs when calling 
fetch_more_data()")
-                data = data.append(serde.convert_to_df(column_name_list,
-                                                       column_type_list,
-                                                       column_name_index_map,
-                                                       resp.tsDataset))
-                has_more_data = resp.hasMoreData
-            except Exception as e:
-                logger.warning(
-                    f'Fail to fetch more data with query id: {query_id}')
-                raise e
-        return data
+        return ConfigNodeClient(config_leader=self._config_node_endpoint)
 
 
 class ConfigNodeClient(object):
     def __init__(self, config_leader: TEndPoint):
-        self.__config_leader = config_leader
-        self.__config_nodes = []
-        self.__cursor = 0
-        self.__transport = None
-        self.__client = None
+        self._config_leader = config_leader
+        self._config_nodes = []
+        self._cursor = 0
+        self._transport = None
+        self._client = None
 
-        self.__MSG_RECONNECTION_FAIL = "Fail to connect to any config node. 
Please check status of ConfigNodes"
-        self.__RETRY_NUM = 5
-        self.__RETRY_INTERVAL_MS = 1
+        self._MSG_RECONNECTION_FAIL = "Fail to connect to any config node. 
Please check status of ConfigNodes"
+        self._RETRY_NUM = 5
+        self._RETRY_INTERVAL_MS = 1
 
-        self.__try_to_connect()
+        self._try_to_connect()
 
-    def __try_to_connect(self) -> None:
-        if self.__config_leader is not None:
+    def _try_to_connect(self) -> None:
+        if self._config_leader is not None:
             try:
-                self.__connect(self.__config_leader)
+                self._connect(self._config_leader)
                 return
             except TException:
-                logger.warning("The current node {} may have been down, try 
next node", self.__config_leader)
-                self.__config_leader = None
+                Logger().warning("The current node {} may have been down, try 
next node", self._config_leader)
+                self._config_leader = None
 
-        if self.__transport is not None:
-            self.__transport.close()
+        if self._transport is not None:
+            self._transport.close()
 
         try_host_num = 0
-        while try_host_num < len(self.__config_nodes):
-            self.__cursor = (self.__cursor + 1) % len(self.__config_nodes)
+        while try_host_num < len(self._config_nodes):
+            self._cursor = (self._cursor + 1) % len(self._config_nodes)
 
-            try_endpoint = self.__config_nodes[self.__cursor]
+            try_endpoint = self._config_nodes[self._cursor]
             try:
-                self.__connect(try_endpoint)
+                self._connect(try_endpoint)
                 return
             except TException:
-                logger.warning("The current node {} may have been down, try 
next node", try_endpoint)
+                Logger().warning("The current node {} may have been down, try 
next node", try_endpoint)
 
             try_host_num = try_host_num + 1
 
-        raise TException(self.__MSG_RECONNECTION_FAIL)
+        raise TException(self._MSG_RECONNECTION_FAIL)
 
-    def __connect(self, target_config_node: TEndPoint) -> None:
+    def _connect(self, target_config_node: TEndPoint) -> None:
         transport = TTransport.TFramedTransport(
             TSocket.TSocket(target_config_node.ip, target_config_node.port)
         )
@@ -170,36 +88,36 @@ class ConfigNodeClient(object):
             try:
                 transport.open()
             except TTransport.TTransportException as e:
-                logger.error("TTransportException: {}".format(e))
+                Logger().error("TTransportException: {}".format(e))
                 raise e
 
-        if descriptor.get_config().get_ain_thrift_compression_enabled():
+        if 
AINodeDescriptor().get_config().get_ain_thrift_compression_enabled():
             protocol = TCompactProtocol.TCompactProtocol(transport)
         else:
             protocol = TBinaryProtocol.TBinaryProtocol(transport)
-        self.__client = IConfigNodeRPCService.Client(protocol)
+        self._client = IConfigNodeRPCService.Client(protocol)
 
-    def __wait_and_reconnect(self) -> None:
+    def _wait_and_reconnect(self) -> None:
         # wait to start the next try
-        time.sleep(self.__RETRY_INTERVAL_MS)
+        time.sleep(self._RETRY_INTERVAL_MS)
 
         try:
-            self.__try_to_connect()
+            self._try_to_connect()
         except TException:
             # can not connect to each config node
-            self.__sync_latest_config_node_list()
-            self.__try_to_connect()
+            self._sync_latest_config_node_list()
+            self._try_to_connect()
 
-    def __sync_latest_config_node_list(self) -> None:
+    def _sync_latest_config_node_list(self) -> None:
         # TODO
         pass
 
-    def __update_config_node_leader(self, status: TSStatus) -> bool:
+    def _update_config_node_leader(self, status: TSStatus) -> bool:
         if status.code == TSStatusCode.REDIRECTION_RECOMMEND.get_status_code():
             if status.redirectNode is not None:
-                self.__config_leader = status.redirectNode
+                self._config_leader = status.redirectNode
             else:
-                self.__config_leader = None
+                self._config_leader = None
             return True
         return False
 
@@ -211,20 +129,20 @@ class ConfigNodeClient(object):
             versionInfo=version_info
         )
 
-        for _ in range(0, self.__RETRY_NUM):
+        for _ in range(0, self._RETRY_NUM):
             try:
-                resp = self.__client.registerAINode(req)
-                if not self.__update_config_node_leader(resp.status):
+                resp = self._client.registerAINode(req)
+                if not self._update_config_node_leader(resp.status):
                     verify_success(resp.status, "An error occurs when calling 
node_register()")
-                    self.__config_nodes = resp.configNodeList
+                    self._config_nodes = resp.configNodeList
                     return resp.aiNodeId
             except TTransport.TException:
-                logger.warning("Failed to connect to ConfigNode {} from AINode 
when executing node_register()",
-                               self.__config_leader)
-                self.__config_leader = None
-            self.__wait_and_reconnect()
+                Logger().warning("Failed to connect to ConfigNode {} from 
AINode when executing node_register()",
+                                 self._config_leader)
+                self._config_leader = None
+            self._wait_and_reconnect()
 
-        raise TException(self.__MSG_RECONNECTION_FAIL)
+        raise TException(self._MSG_RECONNECTION_FAIL)
 
     def node_restart(self, cluster_name: str, configuration: 
TAINodeConfiguration,
                      version_info: TNodeVersionInfo) -> None:
@@ -234,52 +152,52 @@ class ConfigNodeClient(object):
             versionInfo=version_info
         )
 
-        for _ in range(0, self.__RETRY_NUM):
+        for _ in range(0, self._RETRY_NUM):
             try:
-                resp = self.__client.restartAINode(req)
-                if not self.__update_config_node_leader(resp.status):
+                resp = self._client.restartAINode(req)
+                if not self._update_config_node_leader(resp.status):
                     verify_success(resp.status, "An error occurs when calling 
node_restart()")
-                    self.__config_nodes = resp.configNodeList
+                    self._config_nodes = resp.configNodeList
                     return resp.status
             except TTransport.TException:
-                logger.warning("Failed to connect to ConfigNode {} from AINode 
when executing node_restart()",
-                               self.__config_leader)
-                self.__config_leader = None
-            self.__wait_and_reconnect()
+                Logger().warning("Failed to connect to ConfigNode {} from 
AINode when executing node_restart()",
+                                 self._config_leader)
+                self._config_leader = None
+            self._wait_and_reconnect()
 
-        raise TException(self.__MSG_RECONNECTION_FAIL)
+        raise TException(self._MSG_RECONNECTION_FAIL)
 
     def node_remove(self, location: TAINodeLocation):
         req = TAINodeRemoveReq(
             aiNodeLocation=location
         )
-        for _ in range(0, self.__RETRY_NUM):
+        for _ in range(0, self._RETRY_NUM):
             try:
-                status = self.__client.removeAINode(req)
-                if not self.__update_config_node_leader(status):
+                status = self._client.removeAINode(req)
+                if not self._update_config_node_leader(status):
                     verify_success(status, "An error occurs when calling 
node_restart()")
                     return status
             except TTransport.TException:
-                logger.warning("Failed to connect to ConfigNode {} from AINode 
when executing node_remove()",
-                               self.__config_leader)
-                self.__config_leader = None
-            self.__wait_and_reconnect()
-        raise TException(self.__MSG_RECONNECTION_FAIL)
+                Logger().warning("Failed to connect to ConfigNode {} from 
AINode when executing node_remove()",
+                                 self._config_leader)
+                self._config_leader = None
+            self._wait_and_reconnect()
+        raise TException(self._MSG_RECONNECTION_FAIL)
 
     def get_ainode_configuration(self, node_id: int) -> map:
-        for _ in range(0, self.__RETRY_NUM):
+        for _ in range(0, self._RETRY_NUM):
             try:
-                resp = self.__client.getAINodeConfiguration(node_id)
-                if not self.__update_config_node_leader(resp.status):
+                resp = self._client.getAINodeConfiguration(node_id)
+                if not self._update_config_node_leader(resp.status):
                     verify_success(resp.status, "An error occurs when calling 
get_ainode_configuration()")
                     return resp.aiNodeConfigurationMap
             except TTransport.TException:
-                logger.warning("Failed to connect to ConfigNode {} from AINode 
when executing "
-                               "get_ainode_configuration()",
-                               self.__config_leader)
-                self.__config_leader = None
-            self.__wait_and_reconnect()
-        raise TException(self.__MSG_RECONNECTION_FAIL)
+                Logger().warning("Failed to connect to ConfigNode {} from 
AINode when executing "
+                                 "get_ainode_configuration()",
+                                 self._config_leader)
+                self._config_leader = None
+            self._wait_and_reconnect()
+        raise TException(self._MSG_RECONNECTION_FAIL)
 
 
 client_manager = ClientManager()
diff --git a/iotdb-core/ainode/iotdb/ainode/config.py 
b/iotdb-core/ainode/iotdb/ainode/config.py
index 8caa5cc1f08..66f09a42835 100644
--- a/iotdb-core/ainode/iotdb/ainode/config.py
+++ b/iotdb-core/ainode/iotdb/ainode/config.py
@@ -25,252 +25,222 @@ from iotdb.ainode.constant import 
(AINODE_CONF_DIRECTORY_NAME,
                                    AINODE_CONF_GIT_FILE_NAME, 
AINODE_CONF_POM_FILE_NAME, AINODE_ROOT_DIR,
                                    AINODE_ROOT_CONF_DIRECTORY_NAME)
 from iotdb.ainode.exception import BadNodeUrlError
-from iotdb.ainode.log import logger, set_logger
-from iotdb.ainode.util import parse_endpoint_url
+from iotdb.ainode.log import Logger
+from iotdb.ainode.util.decorator import singleton
 from iotdb.thrift.common.ttypes import TEndPoint
 
 
 class AINodeConfig(object):
     def __init__(self):
         # Used for connection of DataNode/ConfigNode clients
-        self.__ain_inference_rpc_address: str = AINODE_INFERENCE_RPC_ADDRESS
-        self.__ain_inference_rpc_port: int = AINODE_INFERENCE_RPC_PORT
+        self._ain_inference_rpc_address: str = AINODE_INFERENCE_RPC_ADDRESS
+        self._ain_inference_rpc_port: int = AINODE_INFERENCE_RPC_PORT
 
         # log directory
-        self.__ain_logs_dir: str = AINODE_LOG_DIR
+        self._ain_logs_dir: str = AINODE_LOG_DIR
 
         # Directory to save models
-        self.__ain_models_dir = AINODE_MODELS_DIR
+        self._ain_models_dir = AINODE_MODELS_DIR
 
-        self.__ain_system_dir = AINODE_SYSTEM_DIR
+        self._ain_system_dir = AINODE_SYSTEM_DIR
 
         # Whether to enable compression for thrift
-        self.__ain_thrift_compression_enabled = 
AINODE_THRIFT_COMPRESSION_ENABLED
+        self._ain_thrift_compression_enabled = 
AINODE_THRIFT_COMPRESSION_ENABLED
 
         # Cache number of model storage to avoid repeated loading
-        self.__mn_model_storage_cache_size = 30
-
-        # Maximum number of training model tasks, otherwise the task is pending
-        self.__mn_task_pool_size = 1
-
-        # Maximum number of trials to be explored in a tuning task
-        self.__mn_tuning_trial_num = 20
-
-        # Concurrency of trials in a tuning task
-        self.__mn_tuning_trial_concurrency = 4
+        self._ain_model_storage_cache_size = 30
 
         # Target ConfigNode to be connected by AINode
-        self.__ain_target_config_node_list: TEndPoint = TEndPoint("127.0.0.1", 
10710)
-
-        # Target DataNode to be connected by AINode
-        self.__mn_target_data_node: TEndPoint = TEndPoint("127.0.0.1", 10780)
+        self._ain_target_config_node_list: TEndPoint = TEndPoint("127.0.0.1", 
10710)
 
         # use for node management
-        self.__ainode_id = 0
-        self.__cluster_name = AINODE_CLUSTER_NAME
+        self._ainode_id = 0
+        self._cluster_name = AINODE_CLUSTER_NAME
 
-        self.__version_info = AINODE_VERSION_INFO
-        self.__build_info = AINODE_BUILD_INFO
+        self._version_info = AINODE_VERSION_INFO
+        self._build_info = AINODE_BUILD_INFO
 
     def get_cluster_name(self) -> str:
-        return self.__cluster_name
+        return self._cluster_name
 
     def set_cluster_name(self, cluster_name: str) -> None:
-        self.__cluster_name = cluster_name
+        self._cluster_name = cluster_name
 
     def get_version_info(self) -> str:
-        return self.__version_info
+        return self._version_info
 
     def get_ainode_id(self) -> int:
-        return self.__ainode_id
+        return self._ainode_id
 
-    def set_ainode_id(self, id: int) -> None:
-        self.__ainode_id = id
+    def set_ainode_id(self, ainode_id: int) -> None:
+        self._ainode_id = ainode_id
 
     def get_build_info(self) -> str:
-        return self.__build_info
+        return self._build_info
 
     def set_build_info(self, build_info: str) -> None:
-        self.__build_info = build_info
+        self._build_info = build_info
 
     def set_version_info(self, version_info: str) -> None:
-        self.__version_info = version_info
+        self._version_info = version_info
 
     def get_ain_inference_rpc_address(self) -> str:
-        return self.__ain_inference_rpc_address
+        return self._ain_inference_rpc_address
 
     def set_ain_inference_rpc_address(self, ain_inference_rpc_address: str) -> 
None:
-        self.__ain_inference_rpc_address = ain_inference_rpc_address
+        self._ain_inference_rpc_address = ain_inference_rpc_address
 
     def get_ain_inference_rpc_port(self) -> int:
-        return self.__ain_inference_rpc_port
+        return self._ain_inference_rpc_port
 
     def set_ain_inference_rpc_port(self, ain_inference_rpc_port: int) -> None:
-        self.__ain_inference_rpc_port = ain_inference_rpc_port
+        self._ain_inference_rpc_port = ain_inference_rpc_port
 
     def get_ain_logs_dir(self) -> str:
-        return self.__ain_logs_dir
+        return self._ain_logs_dir
 
     def set_ain_logs_dir(self, ain_logs_dir: str) -> None:
-        self.__ain_logs_dir = ain_logs_dir
+        self._ain_logs_dir = ain_logs_dir
 
     def get_ain_models_dir(self) -> str:
-        return self.__ain_models_dir
+        return self._ain_models_dir
 
     def set_ain_models_dir(self, ain_models_dir: str) -> None:
-        self.__ain_models_dir = ain_models_dir
+        self._ain_models_dir = ain_models_dir
 
     def get_ain_system_dir(self) -> str:
-        return self.__ain_system_dir
+        return self._ain_system_dir
 
     def set_ain_system_dir(self, ain_system_dir: str) -> None:
-        self.__ain_system_dir = ain_system_dir
+        self._ain_system_dir = ain_system_dir
 
     def get_ain_thrift_compression_enabled(self) -> bool:
-        return self.__ain_thrift_compression_enabled
+        return self._ain_thrift_compression_enabled
 
     def set_ain_thrift_compression_enabled(self, 
ain_thrift_compression_enabled: int) -> None:
-        self.__ain_thrift_compression_enabled = ain_thrift_compression_enabled
-
-    def get_mn_model_storage_cache_size(self) -> int:
-        return self.__mn_model_storage_cache_size
-
-    def set_mn_model_storage_cache_size(self, mn_model_storage_cache_size: 
int) -> None:
-        self.__mn_model_storage_cache_size = mn_model_storage_cache_size
+        self._ain_thrift_compression_enabled = ain_thrift_compression_enabled
 
-    def get_mn_mn_task_pool_size(self) -> int:
-        return self.__mn_task_pool_size
-
-    def set_mn_task_pool_size(self, mn_task_pool_size: int) -> None:
-        self.__mn_task_pool_size = mn_task_pool_size
-
-    def get_mn_tuning_trial_num(self) -> int:
-        return self.__mn_tuning_trial_num
-
-    def set_mn_tuning_trial_num(self, mn_tuning_trial_num: int) -> None:
-        self.__mn_tuning_trial_num = mn_tuning_trial_num
-
-    def get_mn_tuning_trial_concurrency(self) -> int:
-        return self.__mn_tuning_trial_concurrency
-
-    def set_mn_tuning_trial_concurrency(self, mn_tuning_trial_concurrency: 
int) -> None:
-        self.__mn_tuning_trial_concurrency = mn_tuning_trial_concurrency
+    def get_ain_model_storage_cache_size(self) -> int:
+        return self._ain_model_storage_cache_size
 
     def get_ain_target_config_node_list(self) -> TEndPoint:
-        return self.__ain_target_config_node_list
+        return self._ain_target_config_node_list
 
     def set_ain_target_config_node_list(self, ain_target_config_node_list: 
str) -> None:
-        self.__ain_target_config_node_list = 
parse_endpoint_url(ain_target_config_node_list)
-
-    def get_mn_target_data_node(self) -> TEndPoint:
-        return self.__mn_target_data_node
-
-    def set_mn_target_data_node(self, mn_target_data_node: str) -> None:
-        self.__mn_target_data_node = parse_endpoint_url(mn_target_data_node)
+        self._ain_target_config_node_list = 
parse_endpoint_url(ain_target_config_node_list)
 
 
+@singleton
 class AINodeDescriptor(object):
-    _instance = None
-    _first_init = False
-
-    def __new__(cls, *args, **kwargs):
-        if not cls._instance:
-            cls._instance = super().__new__(cls)
-        return cls._instance
 
     def __init__(self):
-        if not self._first_init:
-            self.__config = AINodeConfig()
-            self.load_config_from_file()
-            self._first_init = True
-
-    def load_properties(self, filepath, sep='=', comment_char='#'):
-        """
-        Read the file passed as parameter as a properties file.
-        """
-        props = {}
-        with open(filepath, "rt") as f:
-            for line in f:
-                l = line.strip()
-                if l and not l.startswith(comment_char):
-                    key_value = l.split(sep)
-                    key = key_value[0].strip()
-                    value = sep.join(key_value[1:]).strip().strip('"')
-                    props[key] = value
-        return props
-
-    def load_config_from_file(self) -> None:
-        system_properties_file = 
os.path.join(self.__config.get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME)
+        self._config = AINodeConfig()
+        self._load_config_from_file()
+        Logger().info("AINodeDescriptor is init successfully.")
+
+    def _load_config_from_file(self) -> None:
+        system_properties_file = 
os.path.join(self._config.get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME)
         if os.path.exists(system_properties_file):
-            system_configs = self.load_properties(system_properties_file)
+            system_configs = load_properties(system_properties_file)
             if 'ainode_id' in system_configs:
-                self.__config.set_ainode_id(int(system_configs['ainode_id']))
+                self._config.set_ainode_id(int(system_configs['ainode_id']))
 
         git_file = os.path.join(AINODE_ROOT_DIR, 
AINODE_ROOT_CONF_DIRECTORY_NAME, AINODE_CONF_GIT_FILE_NAME)
         if os.path.exists(git_file):
-            git_configs = self.load_properties(git_file)
+            git_configs = load_properties(git_file)
             if 'git.commit.id.abbrev' in git_configs:
                 build_info = git_configs['git.commit.id.abbrev']
                 if 'git.dirty' in git_configs:
                     if git_configs['git.dirty'] == "true":
                         build_info += "-dev"
-                self.__config.set_build_info(build_info)
+                self._config.set_build_info(build_info)
 
         pom_file = os.path.join(AINODE_ROOT_DIR, 
AINODE_ROOT_CONF_DIRECTORY_NAME, AINODE_CONF_POM_FILE_NAME)
         if os.path.exists(pom_file):
-            pom_configs = self.load_properties(pom_file)
+            pom_configs = load_properties(pom_file)
             if 'version' in pom_configs:
-                self.__config.set_version_info(pom_configs['version'])
+                self._config.set_version_info(pom_configs['version'])
 
         conf_file = os.path.join(AINODE_CONF_DIRECTORY_NAME, 
AINODE_CONF_FILE_NAME)
         if not os.path.exists(conf_file):
-            logger.info("Cannot find AINode config file '{}', use default 
configuration.".format(conf_file))
+            Logger().info("Cannot find AINode config file '{}', use default 
configuration.".format(conf_file))
             return
 
-        logger.info("Start to read AINode config file '{}'".format(conf_file))
-
         # noinspection PyBroadException
         try:
-            file_configs = self.load_properties(conf_file)
+            file_configs = load_properties(conf_file)
 
             config_keys = file_configs.keys()
 
             if 'ain_inference_rpc_address' in config_keys:
-                
self.__config.set_ain_inference_rpc_address(file_configs['ain_inference_rpc_address'])
+                
self._config.set_ain_inference_rpc_address(file_configs['ain_inference_rpc_address'])
 
             if 'ain_inference_rpc_port' in config_keys:
-                
self.__config.set_ain_inference_rpc_port(int(file_configs['ain_inference_rpc_port']))
-
-            if 'ain_logs_dir' in config_keys:
-                self.__config.set_ain_logs_dir(file_configs['ain_logs_dir'])
-
-            set_logger(self.__config.get_ain_logs_dir())
+                
self._config.set_ain_inference_rpc_port(int(file_configs['ain_inference_rpc_port']))
 
             if 'ain_models_dir' in config_keys:
-                
self.__config.set_ain_models_dir(file_configs['ain_models_dir'])
+                self._config.set_ain_models_dir(file_configs['ain_models_dir'])
 
             if 'ain_system_dir' in config_keys:
-                
self.__config.set_ain_system_dir(file_configs['ain_system_dir'])
+                self._config.set_ain_system_dir(file_configs['ain_system_dir'])
 
             if 'ain_seed_config_node' in config_keys:
-                
self.__config.set_ain_target_config_node_list(file_configs['ain_seed_config_node'])
+                
self._config.set_ain_target_config_node_list(file_configs['ain_seed_config_node'])
 
             if 'cluster_name' in config_keys:
-                self.__config.set_cluster_name(file_configs['cluster_name'])
+                self._config.set_cluster_name(file_configs['cluster_name'])
 
-            # AINODE_THRIFT_COMPRESSION_ENABLED
             if 'ain_thrift_compression_enabled' in config_keys:
-                
self.__config.set_ain_thrift_compression_enabled(int(file_configs['ain_thrift_compression_enabled']))
+                
self._config.set_ain_thrift_compression_enabled(int(file_configs['ain_thrift_compression_enabled']))
+
+            if 'ain_logs_dir' in config_keys:
+                log_dir = file_configs['ain_logs_dir']
+                self._config.set_ain_logs_dir(log_dir)
+                Logger(log_dir=log_dir).info(f"Successfully load config from 
{conf_file}.")
 
         except BadNodeUrlError:
-            logger.warning("Cannot load AINode conf file, use default 
configuration.")
+            Logger().warning("Cannot load AINode conf file, use default 
configuration.")
 
         except Exception as e:
-            logger.warning("Cannot load AINode conf file caused by: {}, use 
default configuration. ".format(e))
+            Logger().warning("Cannot load AINode conf file caused by: {}, use 
default configuration. ".format(e))
 
     def get_config(self) -> AINodeConfig:
-        return self.__config
-
-
-# initialize a singleton
-descriptor = AINodeDescriptor()
+        return self._config
+
+
+def load_properties(filepath, sep='=', comment_char='#'):
+    """
+    Read the file passed as parameter as a properties file.
+    """
+    props = {}
+    with open(filepath, "rt") as f:
+        for line in f:
+            l = line.strip()
+            if l and not l.startswith(comment_char):
+                key_value = l.split(sep)
+                key = key_value[0].strip()
+                value = sep.join(key_value[1:]).strip().strip('"')
+                props[key] = value
+    return props
+
+
+def parse_endpoint_url(endpoint_url: str) -> TEndPoint:
+    """ Parse TEndPoint from a given endpoint url.
+    Args:
+        endpoint_url: an endpoint url, format: ip:port
+    Returns:
+        TEndPoint
+    Raises:
+        BadNodeUrlError
+    """
+    split = endpoint_url.split(":")
+    if len(split) != 2:
+        raise BadNodeUrlError(endpoint_url)
+
+    ip = split[0]
+    try:
+        port = int(split[1])
+        result = TEndPoint(ip, port)
+        return result
+    except ValueError:
+        raise BadNodeUrlError(endpoint_url)
diff --git a/iotdb-core/ainode/iotdb/ainode/exception.py 
b/iotdb-core/ainode/iotdb/ainode/exception.py
index 133c9741a6c..56186ee2bef 100644
--- a/iotdb-core/ainode/iotdb/ainode/exception.py
+++ b/iotdb-core/ainode/iotdb/ainode/exception.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import re
 
 from iotdb.ainode.constant import DEFAULT_MODEL_FILE_NAME, 
DEFAULT_CONFIG_FILE_NAME
 
@@ -69,7 +70,7 @@ class UnsupportedError(_BaseError):
         self.message = "{0} is not supported in current version".format(msg)
 
 
-class InvaildUriError(_BaseError):
+class InvalidUriError(_BaseError):
     def __init__(self, uri: str):
         self.message = "Invalid uri: {}, there are no {} or {} under this 
uri.".format(uri, DEFAULT_MODEL_FILE_NAME,
                                                                                
        DEFAULT_CONFIG_FILE_NAME)
@@ -78,11 +79,10 @@ class InvaildUriError(_BaseError):
 class InvalidWindowArgumentError(_BaseError):
     def __init__(
             self,
-            window_interval: int,
-            window_step: int,
-            dataset_length: int):
-        self.message = "Invalid inference input: window_interval {0}, 
window_step {1}, dataset_length {2}".format(
-            window_interval, window_step, dataset_length)
+            window_interval,
+            window_step,
+            dataset_length):
+        self.message = f"Invalid inference input: window_interval 
{window_interval}, window_step {window_step}, dataset_length {dataset_length}"
 
 
 class InferenceModelInternalError(_BaseError):
@@ -121,3 +121,14 @@ class ListRangeException(_BaseError):
 class AttributeNotSupportError(_BaseError):
     def __init__(self, model_name: str, attribute_name: str):
         self.message = "Attribute {0} is not supported in model 
{1}".format(attribute_name, model_name)
+
+
+# This is used to extract the key message in RuntimeError instead of the 
traceback message
+def runtime_error_extractor(error_message):
+    pattern = re.compile(r"RuntimeError: (.+)")
+    match = pattern.search(error_message)
+
+    if match:
+        return match.group(1)
+    else:
+        return ""
diff --git a/iotdb-core/ainode/iotdb/ainode/factory.py 
b/iotdb-core/ainode/iotdb/ainode/factory.py
deleted file mode 100644
index 158eeb62467..00000000000
--- a/iotdb-core/ainode/iotdb/ainode/factory.py
+++ /dev/null
@@ -1,272 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-from abc import abstractmethod
-
-import numpy as np
-from sklearn.preprocessing import MinMaxScaler
-from sktime.annotation.hmm_learn import GaussianHMM, GMMHMM
-from sktime.annotation.stray import STRAY
-from sktime.forecasting.arima import ARIMA
-from sktime.forecasting.exp_smoothing import ExponentialSmoothing
-from sktime.forecasting.naive import NaiveForecaster
-from sktime.forecasting.trend import STLForecaster
-
-from iotdb.ainode.attribute import get_model_attributes, parse_attribute
-from iotdb.ainode.constant import BuiltInModelType
-from iotdb.ainode.exception import AttributeNotSupportError, 
BuiltInModelNotSupportError, InferenceModelInternalError
-
-
-class BuiltInModel(object):
-    def __init__(self, attributes):
-        self._attributes = attributes
-        self._model = None
-
-    @abstractmethod
-    def inference(self, data):
-        raise NotImplementedError
-
-
-class ArimaModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(ArimaModel, self).__init__(attributes)
-        self._model = ARIMA(
-            order=attributes['order'],
-            seasonal_order=attributes['seasonal_order'],
-            method=attributes['method'],
-            suppress_warnings=attributes['suppress_warnings'],
-            maxiter=attributes['maxiter'],
-            out_of_sample_size=attributes['out_of_sample_size'],
-            scoring=attributes['scoring'],
-            with_intercept=attributes['with_intercept'],
-            time_varying_regression=attributes['time_varying_regression'],
-            enforce_stationarity=attributes['enforce_stationarity'],
-            enforce_invertibility=attributes['enforce_invertibility'],
-            simple_differencing=attributes['simple_differencing'],
-            measurement_error=attributes['measurement_error'],
-            mle_regression=attributes['mle_regression'],
-            hamilton_representation=attributes['hamilton_representation'],
-            concentrate_scale=attributes['concentrate_scale']
-        )
-
-    def inference(self, data):
-        try:
-            predict_length = self._attributes['predict_length']
-            self._model.fit(data)
-            output = self._model.predict(fh=range(predict_length))
-            output = np.array(output, dtype=np.float64)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-class ExponentialSmoothingModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(ExponentialSmoothingModel, self).__init__(attributes)
-        self._model = ExponentialSmoothing(
-            damped_trend=attributes['damped_trend'],
-            initialization_method=attributes['initialization_method'],
-            optimized=attributes['optimized'],
-            remove_bias=attributes['remove_bias'],
-            use_brute=attributes['use_brute']
-        )
-
-    def inference(self, data):
-        try:
-            predict_length = self._attributes['predict_length']
-            self._model.fit(data)
-            output = self._model.predict(fh=range(predict_length))
-            output = np.array(output, dtype=np.float64)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-class NaiveForecasterModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(NaiveForecasterModel, self).__init__(attributes)
-        self._model = NaiveForecaster(
-            strategy=attributes['strategy'],
-            sp=attributes['sp']
-        )
-
-    def inference(self, data):
-        try:
-            predict_length = self._attributes['predict_length']
-            self._model.fit(data)
-            output = self._model.predict(fh=range(predict_length))
-            output = np.array(output, dtype=np.float64)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-class STLForecasterModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(STLForecasterModel, self).__init__(attributes)
-        self._model = STLForecaster(
-            sp=attributes['sp'],
-            seasonal=attributes['seasonal'],
-            seasonal_deg=attributes['seasonal_deg'],
-            trend_deg=attributes['trend_deg'],
-            low_pass_deg=attributes['low_pass_deg'],
-            seasonal_jump=attributes['seasonal_jump'],
-            trend_jump=attributes['trend_jump'],
-            low_pass_jump=attributes['low_pass_jump']
-        )
-
-    def inference(self, data):
-        try:
-            predict_length = self._attributes['predict_length']
-            self._model.fit(data)
-            output = self._model.predict(fh=range(predict_length))
-            output = np.array(output, dtype=np.float64)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-class GMMHMMModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(GMMHMMModel, self).__init__(attributes)
-        self._model = GMMHMM(
-            n_components=attributes['n_components'],
-            n_mix=attributes['n_mix'],
-            min_covar=attributes['min_covar'],
-            startprob_prior=attributes['startprob_prior'],
-            transmat_prior=attributes['transmat_prior'],
-            means_prior=attributes['means_prior'],
-            means_weight=attributes['means_weight'],
-            weights_prior=attributes['weights_prior'],
-            algorithm=attributes['algorithm'],
-            covariance_type=attributes['covariance_type'],
-            n_iter=attributes['n_iter'],
-            tol=attributes['tol'],
-            params=attributes['params'],
-            init_params=attributes['init_params'],
-            implementation=attributes['implementation']
-        )
-
-    def inference(self, data):
-        try:
-            self._model.fit(data)
-            output = self._model.predict(data)
-            output = np.array(output, dtype=np.int32)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-class GaussianHmmModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(GaussianHmmModel, self).__init__(attributes)
-        self._model = GaussianHMM(
-            n_components=attributes['n_components'],
-            covariance_type=attributes['covariance_type'],
-            min_covar=attributes['min_covar'],
-            startprob_prior=attributes['startprob_prior'],
-            transmat_prior=attributes['transmat_prior'],
-            means_prior=attributes['means_prior'],
-            means_weight=attributes['means_weight'],
-            covars_prior=attributes['covars_prior'],
-            covars_weight=attributes['covars_weight'],
-            algorithm=attributes['algorithm'],
-            n_iter=attributes['n_iter'],
-            tol=attributes['tol'],
-            params=attributes['params'],
-            init_params=attributes['init_params'],
-            implementation=attributes['implementation']
-        )
-
-    def inference(self, data):
-        try:
-            self._model.fit(data)
-            output = self._model.predict(data)
-            output = np.array(output, dtype=np.int32)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-class STRAYModel(BuiltInModel):
-    def __init__(self, attributes):
-        super(STRAYModel, self).__init__(attributes)
-        self._model = STRAY(
-            alpha=attributes['alpha'],
-            k=attributes['k'],
-            knn_algorithm=attributes['knn_algorithm'],
-            p=attributes['p'],
-            size_threshold=attributes['size_threshold'],
-            outlier_tail=attributes['outlier_tail']
-        )
-
-    def inference(self, data):
-        try:
-            data = MinMaxScaler().fit_transform(data)
-            output = self._model.fit_transform(data)
-            # change the output to int
-            output = np.array(output, dtype=np.int32)
-            return output
-        except Exception as e:
-            raise InferenceModelInternalError(str(e))
-
-
-def create_built_in_model(model_id, inference_attributes):
-    """
-    Args:
-        model_id: the unique id of the model
-        inference_attributes: a list of attributes to be inferred, in this 
function, the attributes will include some
-            parameters of the built-in model. Some parameters are optional, 
and if the parameters are not
-            specified, the default value will be used.
-    Returns:
-        model: the built-in model from sktime
-        attributes: a dict of attributes, where the key is the attribute name, 
the value is the parsed value of the
-            attribute
-    Description:
-        the create_built_in_model function will create the built-in model from 
sktime, which does not require user
-        registration. This module will parse the inference attributes and 
create the built-in model.
-    """
-    attribute_map = get_model_attributes(model_id)
-
-    # validate the inference attributes
-    for attribute_name in inference_attributes:
-        if attribute_name not in attribute_map:
-            raise AttributeNotSupportError(model_id, attribute_name)
-
-    # parse the inference attributes, attributes is a Dict[str, Any]
-    attributes = parse_attribute(inference_attributes, attribute_map)
-
-    # build the built-in model
-    model = None
-    if model_id == BuiltInModelType.ARIMA.value:
-        model = ArimaModel(attributes)
-    elif model_id == BuiltInModelType.EXPONENTIAL_SMOOTHING.value:
-        model = ExponentialSmoothingModel(attributes)
-    elif model_id == BuiltInModelType.NAIVE_FORECASTER.value:
-        model = NaiveForecasterModel(attributes)
-    elif model_id == BuiltInModelType.STL_FORECASTER.value:
-        model = STLForecasterModel(attributes)
-    elif model_id == BuiltInModelType.GMM_HMM.value:
-        model = GMMHMMModel(attributes)
-    elif model_id == BuiltInModelType.GAUSSIAN_HMM.value:
-        model = GaussianHmmModel(attributes)
-    elif model_id == BuiltInModelType.STRAY.value:
-        model = STRAYModel(attributes)
-    else:
-        raise BuiltInModelNotSupportError(model_id)
-
-    return model, attributes
diff --git a/iotdb-core/ainode/iotdb/ainode/handler.py 
b/iotdb-core/ainode/iotdb/ainode/handler.py
index c2ff78ee3bd..db615836aa8 100644
--- a/iotdb-core/ainode/iotdb/ainode/handler.py
+++ b/iotdb-core/ainode/iotdb/ainode/handler.py
@@ -16,17 +16,12 @@
 # under the License.
 #
 
-from iotdb.ainode.constant import TSStatusCode
-from iotdb.ainode.inference import inference_with_registered_model, 
inference_with_built_in_model, inference
-from iotdb.ainode.log import logger
 from iotdb.ainode.manager.cluster_manager import ClusterManager
+from iotdb.ainode.manager.inference_manager import InferenceManager
 from iotdb.ainode.manager.model_manager import ModelManager
-from iotdb.ainode.parser import (parse_inference_request)
-from iotdb.ainode.serde import convert_to_binary
-from iotdb.ainode.util import get_status
 from iotdb.thrift.ainode import IAINodeRPCService
 from iotdb.thrift.ainode.ttypes import (TDeleteModelReq, TRegisterModelReq,
-                                        TAIHeartbeatReq, TInferenceReq, 
TInferenceResp)
+                                        TAIHeartbeatReq, TInferenceReq)
 
 
 class AINodeRPCServiceHandler(IAINodeRPCService.Iface):
@@ -34,14 +29,13 @@ class AINodeRPCServiceHandler(IAINodeRPCService.Iface):
         self._model_manager = ModelManager()
 
     def registerModel(self, req: TRegisterModelReq):
-        self._model_manager.register_model(req)
+        return self._model_manager.register_model(req)
 
     def deleteModel(self, req: TDeleteModelReq):
-        self._model_manager.delete_model(req)
+        return self._model_manager.delete_model(req)
 
     def inference(self, req: TInferenceReq):
-        return inference(req, self._model_manager)
-
+        return InferenceManager.inference(req, self._model_manager)
 
     def getAIHeartbeat(self, req: TAIHeartbeatReq):
         return ClusterManager.get_heart_beat(req)
diff --git a/iotdb-core/ainode/iotdb/ainode/log.py 
b/iotdb-core/ainode/iotdb/ainode/log.py
index 5745b6fd2ab..800196986e9 100644
--- a/iotdb-core/ainode/iotdb/ainode/log.py
+++ b/iotdb-core/ainode/iotdb/ainode/log.py
@@ -23,6 +23,7 @@ import random
 import sys
 
 from iotdb.ainode.constant import STD_LEVEL, AINODE_LOG_FILE_NAMES, 
AINODE_LOG_FILE_LEVELS
+from iotdb.ainode.util.decorator import singleton
 
 
 class LoggerFilter(logging.Filter):
@@ -53,16 +54,19 @@ class LoggerFilter(logging.Filter):
         return f"[{pid}:{process_name}] {stack_info}"
 
 
+@singleton
 class Logger:
-    """
+    """ Logger is a singleton, it will be initialized when AINodeDescriptor is 
inited for the first time.
+        You can just use Logger() to get it anywhere.
+
     Args:
         log_dir: log directory
 
-    logger_format: log format of global logger
+    logger_format: log format
     logger: global logger with custom format and level
     file_handlers: file handlers for different levels
     console_handler: console handler for stdout
-    __lock: lock for logger
+    _lock: process lock for logger. This is just a precaution, we currently do 
not have multiprocessing
     """
 
     def __init__(self, log_dir=None):
@@ -98,37 +102,31 @@ class Logger:
                 self.file_handlers[l].setLevel(file_levels[l])
                 self.file_handlers[l].setFormatter(self.logger_format)
 
-            for filehandler in self.file_handlers:
-                self.logger.addHandler(filehandler)
-
+            for file_handler in self.file_handlers:
+                self.logger.addHandler(file_handler)
+        else:
+            log_dir = "default path"
 
         self.logger.addFilter(LoggerFilter())
-        self.__lock = multiprocessing.Lock()
+        self._lock = multiprocessing.Lock()
+        self.info(f"Logger init successfully. Log will be written to 
{log_dir}")
 
     def debug(self, *args) -> None:
-        self.__lock.acquire()
+        self._lock.acquire()
         self.logger.debug(' '.join(map(str, args)))
-        self.__lock.release()
+        self._lock.release()
 
     def info(self, *args) -> None:
-        self.__lock.acquire()
+        self._lock.acquire()
         self.logger.info(' '.join(map(str, args)))
-        self.__lock.release()
+        self._lock.release()
 
     def warning(self, *args) -> None:
-        self.__lock.acquire()
+        self._lock.acquire()
         self.logger.warning(' '.join(map(str, args)))
-        self.__lock.release()
+        self._lock.release()
 
     def error(self, *args) -> None:
-        self.__lock.acquire()
+        self._lock.acquire()
         self.logger.error(' '.join(map(str, args)))
-        self.__lock.release()
-
-
-logger = Logger()
-
-
-def set_logger(ain_logs_dir):
-    global logger
-    logger = Logger(ain_logs_dir)
+        self._lock.release()
diff --git a/iotdb-core/ainode/iotdb/ainode/manager/__init__.py 
b/iotdb-core/ainode/iotdb/ainode/manager/__init__.py
index 4b8ee97fad2..2a1e720805f 100644
--- a/iotdb-core/ainode/iotdb/ainode/manager/__init__.py
+++ b/iotdb-core/ainode/iotdb/ainode/manager/__init__.py
@@ -14,4 +14,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
\ No newline at end of file
+#
diff --git a/iotdb-core/ainode/iotdb/ainode/inference.py 
b/iotdb-core/ainode/iotdb/ainode/manager/inference_manager.py
similarity index 62%
rename from iotdb-core/ainode/iotdb/ainode/inference.py
rename to iotdb-core/ainode/iotdb/ainode/manager/inference_manager.py
index a6c3f093943..74c8299f3aa 100644
--- a/iotdb-core/ainode/iotdb/ainode/inference.py
+++ b/iotdb-core/ainode/iotdb/ainode/manager/inference_manager.py
@@ -18,30 +18,83 @@
 import pandas as pd
 from torch import tensor
 
-from iotdb.ainode.constant import BuiltInModelType, TSStatusCode
-from iotdb.ainode.exception import InvalidWindowArgumentError, 
InferenceModelInternalError, \
-    BuiltInModelNotSupportError
-from iotdb.ainode.factory import create_built_in_model
-from iotdb.ainode.log import logger
+from iotdb.ainode.constant import TSStatusCode
+from iotdb.ainode.exception import InvalidWindowArgumentError, 
InferenceModelInternalError, runtime_error_extractor
+from iotdb.ainode.log import Logger
 from iotdb.ainode.manager.model_manager import ModelManager
-from iotdb.ainode.parser import runtime_error_extractor, 
parse_inference_request
-from iotdb.ainode.serde import convert_to_binary
-from iotdb.ainode.util import get_status
-from iotdb.thrift.ainode.ttypes import TInferenceResp, TInferenceReq
+from iotdb.ainode.util.serde import convert_to_binary, convert_to_df
+from iotdb.ainode.util.status import get_status
+from iotdb.thrift.ainode.ttypes import TInferenceReq, TInferenceResp
+
+
+class InferenceManager:
+    @staticmethod
+    def inference(req: TInferenceReq, model_manager: ModelManager):
+        Logger().info(f"start inference registered model {req.modelId}")
+        try:
+            model_id, full_data, window_interval, window_step, 
inference_attributes = _parse_inference_request(req)
+
+            if model_id.startswith('_'):
+                # built-in models
+                Logger().info(f"start inference built-in model {model_id}")
+                # parse the inference attributes and create the built-in model
+                model = _get_built_in_model(model_id, model_manager, 
inference_attributes)
+                inference_results = _inference_with_built_in_model(
+                    model, full_data)
+            else:
+                # user-registered models
+                model = _get_model(model_id, model_manager, 
inference_attributes)
+                inference_results = _inference_with_registered_model(
+                    model, full_data, window_interval, window_step)
+            for i in range(len(inference_results)):
+                inference_results[i] = convert_to_binary(inference_results[i])
+            return TInferenceResp(
+                get_status(
+                    TSStatusCode.SUCCESS_STATUS),
+                inference_results)
+        except Exception as e:
+            Logger().warning(e)
+            inference_results = []
+            return 
TInferenceResp(get_status(TSStatusCode.AINODE_INTERNAL_ERROR, str(e)), 
inference_results)
+
+
+def _process_data(full_data):
+    """
+    Args:
+        full_data: a tuple of (data, time_stamp, type_list, column_name_list), 
where the data is a DataFrame with shape
+            (L, C), time_stamp is a DataFrame with shape(L, 1), type_list is a 
list of data types with length C,
+            column_name_list is a list of column names with length C, where L 
is the number of data points, C is the
+            number of variables, the data and time_stamp are aligned by index
+    Returns:
+        data: a tensor with shape (1, L, C)
+        data_length: the number of data points
+    Description:
+        the process_data module will convert the input data into a tensor with 
shape (1, L, C), where L is the number of
+        data points, C is the number of variables, the data and time_stamp are 
aligned by index. The module will also
+        convert the data type of each column to the corresponding type.
+    """
+    data, time_stamp, type_list, _ = full_data
+    data_length = time_stamp.shape[0]
+    data = data.fillna(0)
+    for i in range(len(type_list)):
+        if type_list[i] == "TEXT":
+            data[data.columns[i]] = 0
+        elif type_list[i] == "BOOLEAN":
+            data[data.columns[i]] = data[data.columns[i]].astype("int")
+    data = tensor(data.values).unsqueeze(0)
+    return data, data_length
 
 
-def inference_with_registered_model(model, full_data, window_interval, 
window_step, inference_attributes):
+def _inference_with_registered_model(model, full_data, window_interval, 
window_step):
     """
     Args:
-        model_id: the unique id of the model
+        model: the user-defined model
         full_data: a tuple of (data, time_stamp, type_list, column_name_list), 
where the data is a DataFrame with shape
             (L, C), time_stamp is a DataFrame with shape(L, 1), type_list is a 
list of data types with length C,
             column_name_list is a list of column names with length C, where L 
is the number of data points, C is the
             number of variables, the data and time_stamp are aligned by index
         window_interval: the length of each sliding window
         window_step: the step between two adjacent sliding windows
-        inference_attributes: a list of attributes to be inferred. In this 
function, the attributes will include the
-            acceleration, which indicates whether the model is accelerated by 
the torch. Compile
     Returns:
         outputs: a list of output DataFrames, where each DataFrame has shape 
(H', C'), where H' is the output window
             interval, C' is the number of variables in the output DataFrame
@@ -54,7 +107,7 @@ def inference_with_registered_model(model, full_data, 
window_interval, window_st
         a list.
     """
 
-    dataset, dataset_length = process_data(full_data)
+    dataset, dataset_length = _process_data(full_data)
 
     # check the validity of window_interval and window_step, the two arguments 
must be positive integers, and the
     # window_interval should not be larger than the dataset length
@@ -89,100 +142,66 @@ def inference_with_registered_model(model, full_data, 
window_interval, window_st
 
     return outputs
 
-def inference_with_built_in_model(model, full_data, inference_attributes):
+
+def _inference_with_built_in_model(model, full_data):
     """
     Args:
-        model_id: the unique id of the model
+        model: the built-in model
         full_data: a tuple of (data, time_stamp, type_list, column_name_list), 
where the data is a DataFrame with shape
             (L, C), time_stamp is a DataFrame with shape(L, 1), type_list is a 
list of data types with length C,
             column_name_list is a list of column names with length C, where L 
is the number of data points, C is the
             number of variables, the data and time_stamp are aligned by index
-        inference_attributes: a list of attributes to be inferred, in this 
function, the attributes will include some
-            parameters of the built-in model. Some parameters are optional, 
and if the parameters are not
-            specified, the default value will be used.
     Returns:
         outputs: a list of output DataFrames, where each DataFrame has shape 
(H', C'), where H' is the output window
             interval, C' is the number of variables in the output DataFrame
     Description:
-        the inference_with_built_in_model function will inference with 
built-in model from sktime, which does not
+        the inference_with_built_in_model function will inference with 
built-in model, which does not
         require user registration. This module will parse the inference 
attributes and create the built-in model, then
         feed the input data into the model to get the output, the output is a 
DataFrame with shape (H', C'), where H'
         is the output window interval, C' is the number of variables in the 
output DataFrame. Then the inference module
         will concatenate all the output DataFrames into a list.
     """
-    # model_id = model_id.lower()
-    # if model_id not in BuiltInModelType.values():
-    #     raise BuiltInModelNotSupportError(model_id)
-
-    logger.info(f"start inference built-in model {model_id}")
-
-    # parse the inference attributes and create the built-in model
-    model, attributes = create_built_in_model(model_id, inference_attributes)
 
     data, _, _, _ = full_data
-
     output = model.inference(data)
-
     # output: DataFrame, shape: (H', C')
     output = pd.DataFrame(output)
     outputs = [output]
     return outputs
 
-def inference(req: TInferenceReq, model_manager: ModelManager):
-    logger.info(f"start inference registered model {req.modelId}")
-    try:
-        model_id, full_data, window_interval, window_step, 
inference_attributes = parse_inference_request(req)
-
-        if inference_attributes is None or 'acceleration' not in 
inference_attributes:
-            # if the acceleration is not specified, then the acceleration will 
be set to default value False
-            acceleration = False
-        else:
-            # if the acceleration is specified, then the acceleration will be 
set to the specified value
-            acceleration = (inference_attributes['acceleration'].lower() == 
'true')
-        model = model_manager.load_model(model_id, acceleration)
-        if model_id.startswith('_'):
-            # built-in models
-            inference_results = inference_with_built_in_model(
-                model, full_data, inference_attributes)
-        else:
-            # user-registered models
-            inference_results = inference_with_registered_model(
-                model, full_data, window_interval, window_step, 
inference_attributes)
-        for i in range(len(inference_results)):
-            inference_results[i] = convert_to_binary(inference_results[i])
-        return TInferenceResp(
-            get_status(
-                TSStatusCode.SUCCESS_STATUS),
-            inference_results)
-    except Exception as e:
-        logger.warning(e)
-        inference_results = []
-        return TInferenceResp(get_status(TSStatusCode.AINODE_INTERNAL_ERROR, 
str(e)), inference_results)
-
-
-def process_data(full_data):
-    """
-    Args:
-        full_data: a tuple of (data, time_stamp, type_list, column_name_list), 
where the data is a DataFrame with shape
-            (L, C), time_stamp is a DataFrame with shape(L, 1), type_list is a 
list of data types with length C,
-            column_name_list is a list of column names with length C, where L 
is the number of data points, C is the
-            number of variables, the data and time_stamp are aligned by index
-    Returns:
-        data: a tensor with shape (1, L, C)
-        data_length: the number of data points
-    Description:
-        the process_data module will convert the input data into a tensor with 
shape (1, L, C), where L is the number of
-        data points, C is the number of variables, the data and time_stamp are 
aligned by index. The module will also
-        convert the data type of each column to the corresponding type.
-    """
-    data, time_stamp, type_list, _ = full_data
-    data_length = time_stamp.shape[0]
-    data = data.fillna(0)
-    for i in range(len(type_list)):
-        if type_list[i] == "TEXT":
-            data[data.columns[i]] = 0
-        elif type_list[i] == "BOOLEAN":
-            data[data.columns[i]] = data[data.columns[i]].astype("int")
-    data = tensor(data.values).unsqueeze(0)
-    return data, data_length
 
+def _get_model(model_id: str, model_manager: ModelManager, 
inference_attributes: {}):
+    if inference_attributes is None or 'acceleration' not in 
inference_attributes:
+        # if the acceleration is not specified, then the acceleration will be 
set to default value False
+        acceleration = False
+    else:
+        # if the acceleration is specified, then the acceleration will be set 
to the specified value
+        acceleration = (inference_attributes['acceleration'].lower() == 'true')
+    return model_manager.load_model(model_id, acceleration)
+
+
+def _get_built_in_model(model_id: str, model_manager: ModelManager, 
inference_attributes: {}):
+    return model_manager.load_built_in_model(model_id, inference_attributes)
+
+
+def _parse_inference_request(req: TInferenceReq):
+    binary_dataset = req.dataset
+    type_list = req.typeList
+    column_name_list = req.columnNameList
+    column_name_index = req.columnNameIndexMap
+    data = convert_to_df(column_name_list, type_list, column_name_index, 
[binary_dataset])
+    time_stamp, data = data[data.columns[0:1]], data[data.columns[1:]]
+    full_data = (data, time_stamp, type_list, column_name_list)
+    inference_attributes = req.inferenceAttributes
+    if inference_attributes is None:
+        inference_attributes = {}
+
+    window_params = req.windowParams
+    if window_params is None:
+        # set default window_step to infinity and window_interval to dataset 
length
+        window_step = float('inf')
+        window_interval = data.shape[0]
+    else:
+        window_step = window_params.windowStep
+        window_interval = window_params.windowInterval
+    return req.modelId, full_data, window_interval, window_step, 
inference_attributes
diff --git a/iotdb-core/ainode/iotdb/ainode/manager/model_manager.py 
b/iotdb-core/ainode/iotdb/ainode/manager/model_manager.py
index 86234307749..c7649d43dc0 100644
--- a/iotdb-core/ainode/iotdb/ainode/manager/model_manager.py
+++ b/iotdb-core/ainode/iotdb/ainode/manager/model_manager.py
@@ -18,10 +18,11 @@
 from yaml import YAMLError
 
 from iotdb.ainode.constant import TSStatusCode, BuiltInModelType
-from iotdb.ainode.exception import InvaildUriError, BadConfigValueError, 
BuiltInModelNotSupportError
-from iotdb.ainode.log import logger
+from iotdb.ainode.exception import InvalidUriError, BadConfigValueError, 
BuiltInModelNotSupportError
+from iotdb.ainode.log import Logger
+from iotdb.ainode.model.built_in_model_factory import fetch_built_in_model
 from iotdb.ainode.model.model_storage import ModelStorage
-from iotdb.ainode.util import get_status
+from iotdb.ainode.util.status import get_status
 from iotdb.thrift.ainode.ttypes import TRegisterModelReq, TRegisterModelResp, 
TDeleteModelReq
 
 
@@ -30,20 +31,20 @@ class ModelManager:
         self.model_storage = ModelStorage()
 
     def register_model(self, req: TRegisterModelReq):
-        logger.info(f"register model {req.modelId} from {req.uri}")
+        Logger().info(f"register model {req.modelId} from {req.uri}")
         try:
             configs, attributes = 
self.model_storage.register_model(req.modelId, req.uri)
             return TRegisterModelResp(get_status(TSStatusCode.SUCCESS_STATUS), 
configs, attributes)
-        except InvaildUriError as e:
-            logger.warning(e)
+        except InvalidUriError as e:
+            Logger().warning(e)
             self.model_storage.delete_model(req.modelId)
             return 
TRegisterModelResp(get_status(TSStatusCode.INVALID_URI_ERROR, e.message))
         except BadConfigValueError as e:
-            logger.warning(e)
+            Logger().warning(e)
             self.model_storage.delete_model(req.modelId)
             return 
TRegisterModelResp(get_status(TSStatusCode.INVALID_INFERENCE_CONFIG, e.message))
         except YAMLError as e:
-            logger.warning(e)
+            Logger().warning(e)
             self.model_storage.delete_model(req.modelId)
             if hasattr(e, 'problem_mark'):
                 mark = e.problem_mark
@@ -53,25 +54,26 @@ class ModelManager:
             return TRegisterModelResp(
                 get_status(TSStatusCode.INVALID_INFERENCE_CONFIG, f"An error 
occurred while parsing the yaml file"))
         except Exception as e:
-            logger.warning(e)
+            Logger().warning(e)
             self.model_storage.delete_model(req.modelId)
             return 
TRegisterModelResp(get_status(TSStatusCode.AINODE_INTERNAL_ERROR))
 
     def delete_model(self, req: TDeleteModelReq):
-        logger.info(f"delete model {req.modelId}")
+        Logger().info(f"delete model {req.modelId}")
         try:
             self.model_storage.delete_model(req.modelId)
             return get_status(TSStatusCode.SUCCESS_STATUS)
         except Exception as e:
-            logger.warning(e)
+            Logger().warning(e)
             return get_status(TSStatusCode.AINODE_INTERNAL_ERROR, str(e))
 
-
     def load_model(self, model_id: str, acceleration: bool = False):
+        Logger().info(f"load model {model_id}")
+        return self.model_storage.load_model(model_id, acceleration)
+
+    @staticmethod
+    def load_built_in_model(model_id: str, attributes: {}):
         model_id = model_id.lower()
         if model_id not in BuiltInModelType.values():
             raise BuiltInModelNotSupportError(model_id)
-        logger.info(f"load model {model_id}")
-        return self.model_storage.load_model(model_id, acceleration)
-
-
+        return fetch_built_in_model(model_id, attributes)
diff --git a/iotdb-core/ainode/iotdb/ainode/model/__init__.py 
b/iotdb-core/ainode/iotdb/ainode/model/__init__.py
index 4b8ee97fad2..2a1e720805f 100644
--- a/iotdb-core/ainode/iotdb/ainode/model/__init__.py
+++ b/iotdb-core/ainode/iotdb/ainode/model/__init__.py
@@ -14,4 +14,4 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
\ No newline at end of file
+#
diff --git a/iotdb-core/ainode/iotdb/ainode/attribute.py 
b/iotdb-core/ainode/iotdb/ainode/model/built_in_model_factory.py
similarity index 67%
rename from iotdb-core/ainode/iotdb/ainode/attribute.py
rename to iotdb-core/ainode/iotdb/ainode/model/built_in_model_factory.py
index a91ae436cd1..875c4cf74a0 100644
--- a/iotdb-core/ainode/iotdb/ainode/attribute.py
+++ b/iotdb-core/ainode/iotdb/ainode/model/built_in_model_factory.py
@@ -18,9 +18,86 @@
 from abc import abstractmethod
 from typing import List, Dict
 
+import numpy as np
+from sklearn.preprocessing import MinMaxScaler
+from sktime.annotation.hmm_learn import GaussianHMM, GMMHMM
+from sktime.annotation.stray import STRAY
+from sktime.forecasting.arima import ARIMA
+from sktime.forecasting.exp_smoothing import ExponentialSmoothing
+from sktime.forecasting.naive import NaiveForecaster
+from sktime.forecasting.trend import STLForecaster
+
 from iotdb.ainode.constant import AttributeName, BuiltInModelType
+from iotdb.ainode.exception import InferenceModelInternalError, 
AttributeNotSupportError
 from iotdb.ainode.exception import WrongAttributeTypeError, 
NumericalRangeException, StringRangeException, \
     ListRangeException, BuiltInModelNotSupportError
+from iotdb.ainode.log import Logger
+
+
+def get_model_attributes(model_id: str):
+    if model_id == BuiltInModelType.ARIMA.value:
+        attribute_map = arima_attribute_map
+    elif model_id == BuiltInModelType.NAIVE_FORECASTER.value:
+        attribute_map = naive_forecaster_attribute_map
+    elif model_id == BuiltInModelType.EXPONENTIAL_SMOOTHING.value:
+        attribute_map = exponential_smoothing_attribute_map
+    elif model_id == BuiltInModelType.STL_FORECASTER.value:
+        attribute_map = stl_forecaster_attribute_map
+    elif model_id == BuiltInModelType.GMM_HMM.value:
+        attribute_map = gmmhmm_attribute_map
+    elif model_id == BuiltInModelType.GAUSSIAN_HMM.value:
+        attribute_map = gaussian_hmm_attribute_map
+    elif model_id == BuiltInModelType.STRAY.value:
+        attribute_map = stray_attribute_map
+    else:
+        raise BuiltInModelNotSupportError(model_id)
+    return attribute_map
+
+
+def fetch_built_in_model(model_id, inference_attributes):
+    """
+    Args:
+        model_id: the unique id of the model
+        inference_attributes: a list of attributes to be inferred, in this 
function, the attributes will include some
+            parameters of the built-in model. Some parameters are optional, 
and if the parameters are not
+            specified, the default value will be used.
+    Returns:
+        model: the built-in model
+        attributes: a dict of attributes, where the key is the attribute name, 
the value is the parsed value of the
+            attribute
+    Description:
+        the create_built_in_model function will create the built-in model, 
which does not require user
+        registration. This module will parse the inference attributes and 
create the built-in model.
+    """
+    attribute_map = get_model_attributes(model_id)
+
+    # validate the inference attributes
+    for attribute_name in inference_attributes:
+        if attribute_name not in attribute_map:
+            raise AttributeNotSupportError(model_id, attribute_name)
+
+    # parse the inference attributes, attributes is a Dict[str, Any]
+    attributes = parse_attribute(inference_attributes, attribute_map)
+
+    # build the built-in model
+    if model_id == BuiltInModelType.ARIMA.value:
+        model = ArimaModel(attributes)
+    elif model_id == BuiltInModelType.EXPONENTIAL_SMOOTHING.value:
+        model = ExponentialSmoothingModel(attributes)
+    elif model_id == BuiltInModelType.NAIVE_FORECASTER.value:
+        model = NaiveForecasterModel(attributes)
+    elif model_id == BuiltInModelType.STL_FORECASTER.value:
+        model = STLForecasterModel(attributes)
+    elif model_id == BuiltInModelType.GMM_HMM.value:
+        model = GMMHMMModel(attributes)
+    elif model_id == BuiltInModelType.GAUSSIAN_HMM.value:
+        model = GaussianHmmModel(attributes)
+    elif model_id == BuiltInModelType.STRAY.value:
+        model = STRAYModel(attributes)
+    else:
+        raise BuiltInModelNotSupportError(model_id)
+
+    return model, attributes
 
 
 class Attribute(object):
@@ -29,7 +106,7 @@ class Attribute(object):
         Args:
             name: the name of the attribute
         """
-        self.name = name
+        self._name = name
 
     @abstractmethod
     def get_default_value(self):
@@ -67,7 +144,7 @@ class IntAttribute(Attribute):
         try:
             int_value = int(string_value)
         except:
-            raise WrongAttributeTypeError(self.name, "int")
+            raise WrongAttributeTypeError(self._name, "int")
         return int_value
 
 
@@ -94,7 +171,7 @@ class FloatAttribute(Attribute):
         try:
             float_value = float(string_value)
         except:
-            raise WrongAttributeTypeError(self.name, "float")
+            raise WrongAttributeTypeError(self._name, "float")
         return float_value
 
 
@@ -135,7 +212,7 @@ class BooleanAttribute(Attribute):
         elif string_value.lower() == "false":
             return False
         else:
-            raise WrongAttributeTypeError(self.name, "bool")
+            raise WrongAttributeTypeError(self._name, "bool")
 
 
 class ListAttribute(Attribute):
@@ -163,14 +240,14 @@ class ListAttribute(Attribute):
         try:
             list_value = eval(string_value)
         except:
-            raise WrongAttributeTypeError(self.name, "list")
+            raise WrongAttributeTypeError(self._name, "list")
         if not isinstance(list_value, list):
-            raise WrongAttributeTypeError(self.name, "list")
+            raise WrongAttributeTypeError(self._name, "list")
         for i in range(len(list_value)):
             try:
                 list_value[i] = self.__value_type(list_value[i])
             except:
-                raise ListRangeException(self.name, list_value, 
self.__type_to_str[self.__value_type])
+                raise ListRangeException(self._name, list_value, 
self.__type_to_str[self.__value_type])
         return list_value
 
 
@@ -199,15 +276,15 @@ class TupleAttribute(Attribute):
         try:
             tuple_value = eval(string_value)
         except:
-            raise WrongAttributeTypeError(self.name, "tuple")
+            raise WrongAttributeTypeError(self._name, "tuple")
         if not isinstance(tuple_value, tuple):
-            raise WrongAttributeTypeError(self.name, "tuple")
+            raise WrongAttributeTypeError(self._name, "tuple")
         list_value = list(tuple_value)
         for i in range(len(list_value)):
             try:
                 list_value[i] = self.__value_type(list_value[i])
             except:
-                raise ListRangeException(self.name, list_value, 
self.__type_to_str[self.__value_type])
+                raise ListRangeException(self._name, list_value, 
self.__type_to_str[self.__value_type])
         tuple_value = tuple(list_value)
         return tuple_value
 
@@ -234,8 +311,9 @@ def parse_attribute(input_attributes: Dict[str, str], 
attribute_map: Dict[str, A
         else:
             try:
                 attributes[attribute_name] = 
attribute_map[attribute_name].get_default_value()
-            except:
-                print("attribute_name: ", attribute_name)
+            except NotImplementedError as e:
+                Logger().error(f"attribute {attribute_name} is not 
implemented.")
+                raise e
     return attributes
 
 
@@ -517,6 +595,7 @@ gaussian_hmm_attribute_map = {
         value_choices=["log", "scaling"],
     )
 }
+
 # GMMHMM
 gmmhmm_attribute_map = {
     AttributeName.N_COMPONENTS.value: IntAttribute(
@@ -649,21 +728,195 @@ stray_attribute_map = {
 }
 
 
-def get_model_attributes(model_id: str):
-    if model_id == BuiltInModelType.ARIMA.value:
-        attribute_map = arima_attribute_map
-    elif model_id == BuiltInModelType.NAIVE_FORECASTER.value:
-        attribute_map = naive_forecaster_attribute_map
-    elif model_id == BuiltInModelType.EXPONENTIAL_SMOOTHING.value:
-        attribute_map = exponential_smoothing_attribute_map
-    elif model_id == BuiltInModelType.STL_FORECASTER.value:
-        attribute_map = stl_forecaster_attribute_map
-    elif model_id == BuiltInModelType.GMM_HMM.value:
-        attribute_map = gmmhmm_attribute_map
-    elif model_id == BuiltInModelType.GAUSSIAN_HMM.value:
-        attribute_map = gaussian_hmm_attribute_map
-    elif model_id == BuiltInModelType.STRAY.value:
-        attribute_map = stray_attribute_map
-    else:
-        raise BuiltInModelNotSupportError(model_id)
-    return attribute_map
+class BuiltInModel(object):
+    def __init__(self, attributes):
+        self._attributes = attributes
+        self._model = None
+
+    @abstractmethod
+    def inference(self, data):
+        raise NotImplementedError
+
+
+class ArimaModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(ArimaModel, self).__init__(attributes)
+        self._model = ARIMA(
+            order=attributes['order'],
+            seasonal_order=attributes['seasonal_order'],
+            method=attributes['method'],
+            suppress_warnings=attributes['suppress_warnings'],
+            maxiter=attributes['maxiter'],
+            out_of_sample_size=attributes['out_of_sample_size'],
+            scoring=attributes['scoring'],
+            with_intercept=attributes['with_intercept'],
+            time_varying_regression=attributes['time_varying_regression'],
+            enforce_stationarity=attributes['enforce_stationarity'],
+            enforce_invertibility=attributes['enforce_invertibility'],
+            simple_differencing=attributes['simple_differencing'],
+            measurement_error=attributes['measurement_error'],
+            mle_regression=attributes['mle_regression'],
+            hamilton_representation=attributes['hamilton_representation'],
+            concentrate_scale=attributes['concentrate_scale']
+        )
+
+    def inference(self, data):
+        try:
+            predict_length = self._attributes['predict_length']
+            self._model.fit(data)
+            output = self._model.predict(fh=range(predict_length))
+            output = np.array(output, dtype=np.float64)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
+
+
+class ExponentialSmoothingModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(ExponentialSmoothingModel, self).__init__(attributes)
+        self._model = ExponentialSmoothing(
+            damped_trend=attributes['damped_trend'],
+            initialization_method=attributes['initialization_method'],
+            optimized=attributes['optimized'],
+            remove_bias=attributes['remove_bias'],
+            use_brute=attributes['use_brute']
+        )
+
+    def inference(self, data):
+        try:
+            predict_length = self._attributes['predict_length']
+            self._model.fit(data)
+            output = self._model.predict(fh=range(predict_length))
+            output = np.array(output, dtype=np.float64)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
+
+
+class NaiveForecasterModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(NaiveForecasterModel, self).__init__(attributes)
+        self._model = NaiveForecaster(
+            strategy=attributes['strategy'],
+            sp=attributes['sp']
+        )
+
+    def inference(self, data):
+        try:
+            predict_length = self._attributes['predict_length']
+            self._model.fit(data)
+            output = self._model.predict(fh=range(predict_length))
+            output = np.array(output, dtype=np.float64)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
+
+
+class STLForecasterModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(STLForecasterModel, self).__init__(attributes)
+        self._model = STLForecaster(
+            sp=attributes['sp'],
+            seasonal=attributes['seasonal'],
+            seasonal_deg=attributes['seasonal_deg'],
+            trend_deg=attributes['trend_deg'],
+            low_pass_deg=attributes['low_pass_deg'],
+            seasonal_jump=attributes['seasonal_jump'],
+            trend_jump=attributes['trend_jump'],
+            low_pass_jump=attributes['low_pass_jump']
+        )
+
+    def inference(self, data):
+        try:
+            predict_length = self._attributes['predict_length']
+            self._model.fit(data)
+            output = self._model.predict(fh=range(predict_length))
+            output = np.array(output, dtype=np.float64)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
+
+
+class GMMHMMModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(GMMHMMModel, self).__init__(attributes)
+        self._model = GMMHMM(
+            n_components=attributes['n_components'],
+            n_mix=attributes['n_mix'],
+            min_covar=attributes['min_covar'],
+            startprob_prior=attributes['startprob_prior'],
+            transmat_prior=attributes['transmat_prior'],
+            means_prior=attributes['means_prior'],
+            means_weight=attributes['means_weight'],
+            weights_prior=attributes['weights_prior'],
+            algorithm=attributes['algorithm'],
+            covariance_type=attributes['covariance_type'],
+            n_iter=attributes['n_iter'],
+            tol=attributes['tol'],
+            params=attributes['params'],
+            init_params=attributes['init_params'],
+            implementation=attributes['implementation']
+        )
+
+    def inference(self, data):
+        try:
+            self._model.fit(data)
+            output = self._model.predict(data)
+            output = np.array(output, dtype=np.int32)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
+
+
+class GaussianHmmModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(GaussianHmmModel, self).__init__(attributes)
+        self._model = GaussianHMM(
+            n_components=attributes['n_components'],
+            covariance_type=attributes['covariance_type'],
+            min_covar=attributes['min_covar'],
+            startprob_prior=attributes['startprob_prior'],
+            transmat_prior=attributes['transmat_prior'],
+            means_prior=attributes['means_prior'],
+            means_weight=attributes['means_weight'],
+            covars_prior=attributes['covars_prior'],
+            covars_weight=attributes['covars_weight'],
+            algorithm=attributes['algorithm'],
+            n_iter=attributes['n_iter'],
+            tol=attributes['tol'],
+            params=attributes['params'],
+            init_params=attributes['init_params'],
+            implementation=attributes['implementation']
+        )
+
+    def inference(self, data):
+        try:
+            self._model.fit(data)
+            output = self._model.predict(data)
+            output = np.array(output, dtype=np.int32)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
+
+
+class STRAYModel(BuiltInModel):
+    def __init__(self, attributes):
+        super(STRAYModel, self).__init__(attributes)
+        self._model = STRAY(
+            alpha=attributes['alpha'],
+            k=attributes['k'],
+            knn_algorithm=attributes['knn_algorithm'],
+            p=attributes['p'],
+            size_threshold=attributes['size_threshold'],
+            outlier_tail=attributes['outlier_tail']
+        )
+
+    def inference(self, data):
+        try:
+            data = MinMaxScaler().fit_transform(data)
+            output = self._model.fit_transform(data)
+            # change the output to int
+            output = np.array(output, dtype=np.int32)
+            return output
+        except Exception as e:
+            raise InferenceModelInternalError(str(e))
diff --git a/iotdb-core/ainode/iotdb/ainode/model/model_fetcher.py 
b/iotdb-core/ainode/iotdb/ainode/model/model_factory.py
similarity index 52%
rename from iotdb-core/ainode/iotdb/ainode/model/model_fetcher.py
rename to iotdb-core/ainode/iotdb/ainode/model/model_factory.py
index c0443a0f088..bc5ce27fb3b 100644
--- a/iotdb-core/ainode/iotdb/ainode/model/model_fetcher.py
+++ b/iotdb-core/ainode/iotdb/ainode/model/model_factory.py
@@ -25,9 +25,10 @@ from requests.adapters import HTTPAdapter
 
 from iotdb.ainode.constant import DEFAULT_RECONNECT_TIMES, 
DEFAULT_RECONNECT_TIMEOUT, DEFAULT_CHUNK_SIZE, \
     DEFAULT_CONFIG_FILE_NAME, DEFAULT_MODEL_FILE_NAME
-from iotdb.ainode.exception import InvaildUriError
-from iotdb.ainode.log import logger
-from iotdb.ainode.parser import parse_inference_config
+from iotdb.ainode.exception import InvalidUriError, BadConfigValueError
+from iotdb.ainode.log import Logger
+from iotdb.ainode.util.serde import get_data_type_byte_from_str
+from iotdb.thrift.ainode.ttypes import TConfigs
 
 HTTP_PREFIX = "http://";
 HTTPS_PREFIX = "https://";
@@ -66,7 +67,7 @@ def _download_file(url: str, storage_path: str) -> None:
     Returns:
         None
     """
-    logger.debug(f"download file from {url} to {storage_path}")
+    Logger().debug(f"download file from {url} to {storage_path}")
 
     session = Session()
     adapter = HTTPAdapter(max_retries=DEFAULT_RECONNECT_TIMES)
@@ -81,7 +82,7 @@ def _download_file(url: str, storage_path: str) -> None:
             if chunk:
                 file.write(chunk)
 
-    logger.debug(f"download file from {url} to {storage_path} success")
+    Logger().debug(f"download file from {url} to {storage_path} success")
 
 
 def _register_model_from_network(uri: str, model_storage_path: str,
@@ -107,7 +108,7 @@ def _register_model_from_network(uri: str, 
model_storage_path: str,
     # read and parse config dict from config.yaml
     with open(config_storage_path, 'r', encoding='utf-8') as file:
         config_dict = yaml.safe_load(file)
-    configs, attributes = parse_inference_config(config_dict)
+    configs, attributes = _parse_inference_config(config_dict)
 
     # if config.yaml is correct, download model file
     _download_file(target_model_path, model_storage_path)
@@ -138,26 +139,93 @@ def _register_model_from_local(uri: str, 
model_storage_path: str,
     attributes = None
     if exist_model_file and exist_config_file:
         # copy config.yaml
-        logger.debug(f"copy file from {target_config_path} to 
{config_storage_path}")
+        Logger().debug(f"copy file from {target_config_path} to 
{config_storage_path}")
         shutil.copy(target_config_path, config_storage_path)
-        logger.debug(f"copy file from {target_config_path} to 
{config_storage_path} success")
+        Logger().debug(f"copy file from {target_config_path} to 
{config_storage_path} success")
 
         # read and parse config dict from config.yaml
         with open(config_storage_path, 'r', encoding='utf-8') as file:
             config_dict = yaml.safe_load(file)
-        configs, attributes = parse_inference_config(config_dict)
+        configs, attributes = _parse_inference_config(config_dict)
 
         # if config.yaml is correct, copy model file
-        logger.debug(f"copy file from {target_model_path} to 
{model_storage_path}")
+        Logger().debug(f"copy file from {target_model_path} to 
{model_storage_path}")
         shutil.copy(target_model_path, model_storage_path)
-        logger.debug(f"copy file from {target_model_path} to 
{model_storage_path} success")
+        Logger().debug(f"copy file from {target_model_path} to 
{model_storage_path} success")
 
     elif not exist_model_file or not exist_config_file:
-        raise InvaildUriError(uri)
+        raise InvalidUriError(uri)
 
     return configs, attributes
 
 
+def _parse_inference_config(config_dict):
+    """
+    Args:
+        config_dict: dict
+            - configs: dict
+                - input_shape (list<i32>): input shape of the model and needs 
to be two-dimensional array like [96, 2]
+                - output_shape (list<i32>): output shape of the model and 
needs to be two-dimensional array like [96, 2]
+                - input_type (list<str>): input type of the model and each 
element needs to be in ['bool', 'int32', 'int64', 'float32', 'float64', 
'text'], default float64
+                - output_type (list<str>): output type of the model and each 
element needs to be in ['bool', 'int32', 'int64', 'float32', 'float64', 
'text'], default float64
+            - attributes: dict
+    Returns:
+        configs: TConfigs
+        attributes: str
+    """
+    configs = config_dict['configs']
+
+    # check if input_shape and output_shape are two-dimensional array
+    if not (isinstance(configs['input_shape'], list) and 
len(configs['input_shape']) == 2):
+        raise BadConfigValueError('input_shape', configs['input_shape'],
+                                  'input_shape should be a two-dimensional 
array.')
+    if not (isinstance(configs['output_shape'], list) and 
len(configs['output_shape']) == 2):
+        raise BadConfigValueError('output_shape', configs['output_shape'],
+                                  'output_shape should be a two-dimensional 
array.')
+
+    # check if input_shape and output_shape are positive integer
+    input_shape_is_positive_number = isinstance(configs['input_shape'][0], 
int) and isinstance(
+        configs['input_shape'][1], int) and configs['input_shape'][0] > 0 and 
configs['input_shape'][1] > 0
+    if not input_shape_is_positive_number:
+        raise BadConfigValueError('input_shape', configs['input_shape'],
+                                  'element in input_shape should be positive 
integer.')
+
+    output_shape_is_positive_number = isinstance(configs['output_shape'][0], 
int) and isinstance(
+        configs['output_shape'][1], int) and configs['output_shape'][0] > 0 
and configs['output_shape'][1] > 0
+    if not output_shape_is_positive_number:
+        raise BadConfigValueError('output_shape', configs['output_shape'],
+                                  'element in output_shape should be positive 
integer.')
+
+    # check if input_type and output_type are one-dimensional array with right 
length
+    if 'input_type' in configs and not (
+            isinstance(configs['input_type'], list) and 
len(configs['input_type']) == configs['input_shape'][1]):
+        raise BadConfigValueError('input_type', configs['input_type'],
+                                  'input_type should be a one-dimensional 
array and length of it should be equal to input_shape[1].')
+
+    if 'output_type' in configs and not (
+            isinstance(configs['output_type'], list) and 
len(configs['output_type']) == configs['output_shape'][1]):
+        raise BadConfigValueError('output_type', configs['output_type'],
+                                  'output_type should be a one-dimensional 
array and length of it should be equal to output_shape[1].')
+
+    # parse input_type and output_type to byte
+    if 'input_type' in configs:
+        input_type = [get_data_type_byte_from_str(x) for x in 
configs['input_type']]
+    else:
+        input_type = [get_data_type_byte_from_str('float32')] * 
configs['input_shape'][1]
+
+    if 'output_type' in configs:
+        output_type = [get_data_type_byte_from_str(x) for x in 
configs['output_type']]
+    else:
+        output_type = [get_data_type_byte_from_str('float32')] * 
configs['output_shape'][1]
+
+    # parse attributes
+    attributes = ""
+    if 'attributes' in config_dict:
+        attributes = str(config_dict['attributes'])
+
+    return TConfigs(configs['input_shape'], configs['output_shape'], 
input_type, output_type), attributes
+
+
 def fetch_model_by_uri(uri: str, model_storage_path: str, config_storage_path: 
str):
     is_network_path, uri = _parse_uri(uri)
 
diff --git a/iotdb-core/ainode/iotdb/ainode/model/model_storage.py 
b/iotdb-core/ainode/iotdb/ainode/model/model_storage.py
index a5eb48a9f0e..e2a535f942b 100644
--- a/iotdb-core/ainode/iotdb/ainode/model/model_storage.py
+++ b/iotdb-core/ainode/iotdb/ainode/model/model_storage.py
@@ -27,13 +27,12 @@ import torch._dynamo
 import torch.nn as nn
 from pylru import lrucache
 
-from iotdb.ainode.config import descriptor
+from iotdb.ainode.config import AINodeDescriptor
 from iotdb.ainode.constant import (OptionsKey, DEFAULT_MODEL_FILE_NAME,
-                                   DEFAULT_CONFIG_FILE_NAME)
+                                   DEFAULT_CONFIG_FILE_NAME, ModelInputName)
 from iotdb.ainode.exception import ModelNotExistError
-from iotdb.ainode.log import logger
-from iotdb.ainode.model.model_fetcher import fetch_model_by_uri
-from iotdb.ainode.util import pack_input_dict
+from iotdb.ainode.log import Logger
+from iotdb.ainode.model.model_factory import fetch_model_by_uri
 
 
 class ModelStorage(object):
@@ -47,15 +46,15 @@ class ModelStorage(object):
 
     def __init__(self):
         if not self._first_init:
-            self.__model_dir = os.path.join(os.getcwd(), 
descriptor.get_config().get_ain_models_dir())
+            self.__model_dir = os.path.join(os.getcwd(), 
AINodeDescriptor().get_config().get_ain_models_dir())
             if not os.path.exists(self.__model_dir):
                 try:
                     os.makedirs(self.__model_dir)
                 except PermissionError as e:
-                    logger.error(e)
+                    Logger().error(e)
                     raise e
             self.lock = threading.RLock()
-            self.__model_cache = 
lrucache(descriptor.get_config().get_mn_model_storage_cache_size())
+            self.__model_cache = 
lrucache(AINodeDescriptor().get_config().get_ain_model_storage_cache_size())
             self._first_init = True
 
     def register_model(self, model_id: str, uri: str):
@@ -75,7 +74,7 @@ class ModelStorage(object):
                 os.makedirs(storage_path)
         model_storage_path = os.path.join(storage_path, 
DEFAULT_MODEL_FILE_NAME)
         config_storage_path = os.path.join(storage_path, 
DEFAULT_CONFIG_FILE_NAME)
-        fetch_model_by_uri(uri, model_storage_path, config_storage_path)
+        return fetch_model_by_uri(uri, model_storage_path, config_storage_path)
 
     def save_model(self,
                    model: nn.Module,
@@ -90,7 +89,7 @@ class ModelStorage(object):
 
         # Note: model config for time series should contain 'input_len' and 
'input_vars'
         sample_input = (
-            pack_input_dict(
+            _pack_input_dict(
                 torch.randn(1, model_config[OptionsKey.INPUT_LENGTH.name()], 
model_config[OptionsKey.INPUT_VARS.name()])
             )
         )
@@ -124,7 +123,7 @@ class ModelStorage(object):
                     try:
                         model = torch.compile(model)
                     except Exception as e:
-                        logger.warning(f"acceleration failed, fallback to 
normal mode: {str(e)}")
+                        Logger().warning(f"acceleration failed, fallback to 
normal mode: {str(e)}")
                 self.__model_cache[model_path] = model
                 return model
 
@@ -145,3 +144,22 @@ class ModelStorage(object):
     def _remove_from_cache(self, file_path: str) -> None:
         if file_path in self.__model_cache:
             del self.__model_cache[file_path]
+
+
+def _pack_input_dict(batch_x: torch.Tensor,
+                     batch_x_mark: torch.Tensor = None,
+                     dec_inp: torch.Tensor = None,
+                     batch_y_mark: torch.Tensor = None):
+    """
+    pack up inputs as a dict to adapt for different models
+    """
+    input_dict = {}
+    if batch_x is not None:
+        input_dict[ModelInputName.DATA_X.value] = batch_x
+    if batch_x_mark is not None:
+        input_dict[ModelInputName.TIME_STAMP_X] = batch_x_mark
+    if dec_inp is not None:
+        input_dict[ModelInputName.DEC_INP] = dec_inp
+    if batch_y_mark is not None:
+        input_dict[ModelInputName.TIME_STAMP_Y.value] = batch_y_mark
+    return input_dict
diff --git a/iotdb-core/ainode/iotdb/ainode/parser.py 
b/iotdb-core/ainode/iotdb/ainode/parser.py
deleted file mode 100644
index f3f8002333e..00000000000
--- a/iotdb-core/ainode/iotdb/ainode/parser.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-import argparse
-import re
-from abc import abstractmethod
-from typing import Dict
-
-from iotdb.ainode.constant import OptionsKey, TaskType, ForecastModelType
-from iotdb.ainode.exception import (MissingOptionError, RedundantOptionError,
-                                    UnsupportedError, BadConfigValueError)
-from iotdb.ainode.serde import convert_to_df, get_data_type_byte_from_str
-from iotdb.thrift.ainode.ttypes import TInferenceReq, TConfigs
-
-
-class TaskOptions(object):
-    def __init__(self, options: Dict):
-        self._raw_options = options
-
-        if OptionsKey.MODEL_TYPE.name() not in self._raw_options:
-            raise MissingOptionError(OptionsKey.MODEL_TYPE.name())
-        model_name = self._raw_options.pop(OptionsKey.MODEL_TYPE.name())
-        self.model_type = getattr(ForecastModelType, model_name.upper(), None)
-        if not self.model_type:
-            raise UnsupportedError(f"model_type {model_name}")
-
-        # training with auto-tuning as default
-        self.auto_tuning = 
str2bool(self._raw_options.pop(OptionsKey.AUTO_TUNING.name(), "false"))
-
-    @abstractmethod
-    def get_task_type(self) -> TaskType:
-        raise NotImplementedError("Subclasses must implement the validate() 
method.")
-
-    def _check_redundant_options(self) -> None:
-        if len(self._raw_options):
-            raise RedundantOptionError(str(self._raw_options))
-
-
-class ForecastTaskOptions(TaskOptions):
-    def __init__(self, options: Dict):
-        super().__init__(options)
-        self.input_length = 
self._raw_options.pop(OptionsKey.INPUT_LENGTH.name(), 96)
-        self.predict_length = 
self._raw_options.pop(OptionsKey.PREDICT_LENGTH.name(), 96)
-        self.predict_index_list = 
self._raw_options.pop(OptionsKey.PREDICT_INDEX_LIST.name(), None)
-        self.input_type_list = 
self._raw_options.pop(OptionsKey.INPUT_TYPE_LIST.name(), None)
-        super()._check_redundant_options()
-
-    def get_task_type(self) -> TaskType:
-        return TaskType.FORECAST
-
-
-def parse_task_type(options: Dict) -> TaskType:
-    if OptionsKey.TASK_TYPE.name() not in options:
-        raise MissingOptionError(OptionsKey.TASK_TYPE.name())
-    task_name = options.pop(OptionsKey.TASK_TYPE.name())
-    task_type = getattr(TaskType, task_name.upper(), None)
-    if not task_type:
-        raise UnsupportedError(f"task_type {task_name}")
-    return task_type
-
-
-def parse_task_options(options) -> TaskOptions:
-    task_type = parse_task_type(options)
-    if task_type == TaskType.FORECAST:
-        return ForecastTaskOptions(options)
-    else:
-        raise UnsupportedError(f"task type {task_type}")
-
-
-def parse_inference_config(config_dict):
-    """
-    Args:
-        config_dict: dict
-            - configs: dict
-                - input_shape (list<i32>): input shape of the model and needs 
to be two-dimensional array like [96, 2]
-                - output_shape (list<i32>): output shape of the model and 
needs to be two-dimensional array like [96, 2]
-                - input_type (list<str>): input type of the model and each 
element needs to be in ['bool', 'int32', 'int64', 'float32', 'float64', 
'text'], default float64
-                - output_type (list<str>): output type of the model and each 
element needs to be in ['bool', 'int32', 'int64', 'float32', 'float64', 
'text'], default float64
-            - attributes: dict
-    Returns:
-        configs: TConfigs
-        attributes: str
-    """
-    configs = config_dict['configs']
-
-    # check if input_shape and output_shape are two-dimensional array
-    if not (isinstance(configs['input_shape'], list) and 
len(configs['input_shape']) == 2):
-        raise BadConfigValueError('input_shape', configs['input_shape'],
-                                  'input_shape should be a two-dimensional 
array.')
-    if not (isinstance(configs['output_shape'], list) and 
len(configs['output_shape']) == 2):
-        raise BadConfigValueError('output_shape', configs['output_shape'],
-                                  'output_shape should be a two-dimensional 
array.')
-
-    # check if input_shape and output_shape are positive integer
-    input_shape_is_positive_number = isinstance(configs['input_shape'][0], 
int) and isinstance(
-        configs['input_shape'][1], int) and configs['input_shape'][0] > 0 and 
configs['input_shape'][1] > 0
-    if not input_shape_is_positive_number:
-        raise BadConfigValueError('input_shape', configs['input_shape'],
-                                  'element in input_shape should be positive 
integer.')
-
-    output_shape_is_positive_number = isinstance(configs['output_shape'][0], 
int) and isinstance(
-        configs['output_shape'][1], int) and configs['output_shape'][0] > 0 
and configs['output_shape'][1] > 0
-    if not output_shape_is_positive_number:
-        raise BadConfigValueError('output_shape', configs['output_shape'],
-                                  'element in output_shape should be positive 
integer.')
-
-    # check if input_type and output_type are one-dimensional array with right 
length
-    if 'input_type' in configs and not (
-            isinstance(configs['input_type'], list) and 
len(configs['input_type']) == configs['input_shape'][1]):
-        raise BadConfigValueError('input_type', configs['input_type'],
-                                  'input_type should be a one-dimensional 
array and length of it should be equal to input_shape[1].')
-
-    if 'output_type' in configs and not (
-            isinstance(configs['output_type'], list) and 
len(configs['output_type']) == configs['output_shape'][1]):
-        raise BadConfigValueError('output_type', configs['output_type'],
-                                  'output_type should be a one-dimensional 
array and length of it should be equal to output_shape[1].')
-
-    # parse input_type and output_type to byte
-    if 'input_type' in configs:
-        input_type = [get_data_type_byte_from_str(x) for x in 
configs['input_type']]
-    else:
-        input_type = [get_data_type_byte_from_str('float32')] * 
configs['input_shape'][1]
-
-    if 'output_type' in configs:
-        output_type = [get_data_type_byte_from_str(x) for x in 
configs['output_type']]
-    else:
-        output_type = [get_data_type_byte_from_str('float32')] * 
configs['output_shape'][1]
-
-    # parse attributes
-    attributes = ""
-    if 'attributes' in config_dict:
-        attributes = str(config_dict['attributes'])
-
-    return TConfigs(configs['input_shape'], configs['output_shape'], 
input_type, output_type), attributes
-
-
-def parse_inference_request(req: TInferenceReq):
-    binary_dataset = req.dataset
-    type_list = req.typeList
-    column_name_list = req.columnNameList
-    column_name_index = req.columnNameIndexMap
-    data = convert_to_df(column_name_list, type_list, column_name_index, 
[binary_dataset])
-    time_stamp, data = data[data.columns[0:1]], data[data.columns[1:]]
-    full_data = (data, time_stamp, type_list, column_name_list)
-    inference_attributes = req.inferenceAttributes
-    if inference_attributes is None:
-        inference_attributes = {}
-
-    window_params = req.windowParams
-    if window_params is None:
-        # set default window_step to infinity and window_interval to dataset 
length
-        window_step = float('inf')
-        window_interval = data.shape[0]
-    else:
-        window_step = window_params.windowStep
-        window_interval = window_params.windowInterval
-    return req.modelId, full_data, window_interval, window_step, 
inference_attributes
-
-
-def str2bool(value):
-    if value.lower() in ('yes', 'true', 't', 'y', '1'):
-        return True
-    elif value.lower() in ('no', 'false', 'f', 'n', '0'):
-        return False
-    else:
-        raise argparse.ArgumentTypeError('Boolean value expected.')
-
-
-# This is used to extract the key message in RuntimeError instead of the 
traceback message
-def runtime_error_extractor(error_message):
-    pattern = re.compile(r"RuntimeError: (.+)")
-    match = pattern.search(error_message)
-
-    if match:
-        return match.group(1)
-    else:
-        return ""
diff --git a/iotdb-core/ainode/iotdb/ainode/script.py 
b/iotdb-core/ainode/iotdb/ainode/script.py
index 4ed6f9209a9..4e11e922e8b 100644
--- a/iotdb-core/ainode/iotdb/ainode/script.py
+++ b/iotdb-core/ainode/iotdb/ainode/script.py
@@ -18,73 +18,158 @@
 import os
 import shutil
 import sys
+from datetime import datetime
+
+import psutil
 
 from iotdb.ainode.client import client_manager
-from iotdb.ainode.config import descriptor
-from iotdb.ainode.constant import TSStatusCode
+from iotdb.ainode.config import AINodeDescriptor
+from iotdb.ainode.constant import TSStatusCode, AINODE_SYSTEM_FILE_NAME
 from iotdb.ainode.exception import MissingConfigError
-from iotdb.ainode.log import logger
-from iotdb.ainode.service import AINode
-from iotdb.thrift.common.ttypes import TAINodeLocation, TEndPoint
+from iotdb.ainode.log import Logger
+from iotdb.ainode.service import RPCService
+from iotdb.thrift.common.ttypes import TAINodeLocation, TEndPoint, 
TAINodeConfiguration, TNodeResource
+from iotdb.thrift.confignode.ttypes import TNodeVersionInfo
 
-server: AINode = None
-POINT_COLON = ":"
 
-def main():
-    global server
-    arguments = sys.argv
-    if len(arguments) == 1:
-        logger.info("Command line argument must be specified.")
-        return
-    command = arguments[1]
-    if command == 'start':
+def _generate_configuration() -> TAINodeConfiguration:
+    location = TAINodeLocation(AINodeDescriptor().get_config().get_ainode_id(),
+                               
TEndPoint(AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
+                                         
AINodeDescriptor().get_config().get_ain_inference_rpc_port()))
+    resource = TNodeResource(
+        int(psutil.cpu_count()),
+        int(psutil.virtual_memory()[0])
+    )
+
+    return TAINodeConfiguration(location, resource)
+
+
+def _generate_version_info() -> TNodeVersionInfo:
+    return TNodeVersionInfo(AINodeDescriptor().get_config().get_version_info(),
+                            AINodeDescriptor().get_config().get_build_info())
+
+
+def _check_path_permission():
+    system_path = AINodeDescriptor().get_config().get_ain_system_dir()
+    if not os.path.exists(system_path):
+        try:
+            os.makedirs(system_path)
+            os.chmod(system_path, 0o777)
+        except PermissionError as e:
+            Logger().error(e)
+            raise e
+
+
+def start_ainode():
+    _check_path_permission()
+    system_properties_file = 
os.path.join(AINodeDescriptor().get_config().get_ain_system_dir(), 
AINODE_SYSTEM_FILE_NAME)
+    if not os.path.exists(system_properties_file):
+        # If the system.properties file does not exist, the AINode will 
register to ConfigNode.
         try:
-            server = AINode()
-            server.start()
+            Logger().info('IoTDB-AINode is registering to ConfigNode...')
+            ainode_id = 
client_manager.borrow_config_node_client().node_register(
+                AINodeDescriptor().get_config().get_cluster_name(),
+                _generate_configuration(),
+                _generate_version_info())
+            AINodeDescriptor().get_config().set_ainode_id(ainode_id)
+            system_properties = {
+                'ainode_id': ainode_id,
+                'cluster_name': 
AINodeDescriptor().get_config().get_cluster_name(),
+                'iotdb_version': 
AINodeDescriptor().get_config().get_version_info(),
+                'commit_id': AINodeDescriptor().get_config().get_build_info(),
+                'ain_rpc_address': 
AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
+                'ain_rpc_port': 
AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
+                'config_node_list': 
AINodeDescriptor().get_config().get_ain_target_config_node_list(),
+            }
+            with open(system_properties_file, 'w') as f:
+                f.write('#' + str(datetime.now()) + '\n')
+                for key, value in system_properties.items():
+                    f.write(key + '=' + str(value) + '\n')
+
         except Exception as e:
-            logger.error("Start AINode failed, because of: {}".format(e))
-            sys.exit(1)
-    elif command == 'remove':
+            Logger().error('IoTDB-AINode failed to register to ConfigNode: 
{}'.format(e))
+            raise e
+    else:
+        # If the system.properties file does exist, the AINode will just 
restart.
         try:
-            logger.info("Removing AINode...")
-            # Delete the current node
-            if len(arguments) == 2:
-                target_ainode_id = descriptor.get_config().get_ainode_id()
-                target_rpc_address = 
descriptor.get_config().get_ain_inference_rpc_address()
-                target_rpc_port = 
descriptor.get_config().get_ain_inference_rpc_port()
+            Logger().info('IoTDB-AINode is restarting...')
+            client_manager.borrow_config_node_client().node_restart(
+                AINodeDescriptor().get_config().get_cluster_name(),
+                _generate_configuration(),
+                _generate_version_info())
 
-            # Delete the node with a given id
-            elif len(arguments) == 3:
-                target_ainode_id = arguments[2]
+        except Exception as e:
+            Logger().error('IoTDB-AINode failed to restart: {}'.format(e))
+            raise e
 
-                # ainode id
-                ainode_configuration_map = 
client_manager.borrow_config_node_client().get_ainode_configuration(target_ainode_id)
+    rpc_service = RPCService()
+    rpc_service.start()
+    rpc_service.join(1)
+    if rpc_service.exit_code != 0:
+        return
+
+    Logger().info('IoTDB-AINode has successfully started.')
+
+
+def remove_ainode(arguments):
+    # Delete the current node
+    if len(arguments) == 2:
+        target_ainode_id = AINodeDescriptor().get_config().get_ainode_id()
+        target_rpc_address = 
AINodeDescriptor().get_config().get_ain_inference_rpc_address()
+        target_rpc_port = 
AINodeDescriptor().get_config().get_ain_inference_rpc_port()
+
+    # Delete the node with a given id
+    elif len(arguments) == 3:
+        target_ainode_id = int(arguments[2])
+        ainode_configuration_map = 
client_manager.borrow_config_node_client().get_ainode_configuration(
+            target_ainode_id)
 
-                end_point = 
ainode_configuration_map[target_ainode_id].location.internalEndPoint
-                target_rpc_address = end_point.ip
-                target_rpc_port = end_point.port
+        end_point = 
ainode_configuration_map[target_ainode_id].location.internalEndPoint
+        target_rpc_address = end_point.ip
+        target_rpc_port = end_point.port
 
-                if not end_point:
-                    raise MissingConfigError("NodeId: {} not found in cluster 
".format(target_ainode_id))
+        if not end_point:
+            raise MissingConfigError("NodeId: {} not found in cluster 
".format(target_ainode_id))
 
-                logger.info('Got target AINode id: 
{}'.format(target_ainode_id))
+        Logger().info('Got target AINode id: {}'.format(target_ainode_id))
 
-            else:
-                raise MissingConfigError("Invalid command")
+    else:
+        raise MissingConfigError("Invalid command")
+
+    location = TAINodeLocation(target_ainode_id, TEndPoint(target_rpc_address, 
target_rpc_port))
+    status = client_manager.borrow_config_node_client().node_remove(location)
 
-            location = TAINodeLocation(target_ainode_id, 
TEndPoint(target_rpc_address, target_rpc_port))
-            status = 
client_manager.borrow_config_node_client().node_remove(location)
+    if status.code == TSStatusCode.SUCCESS_STATUS.get_status_code():
+        Logger().info('IoTDB-AINode has successfully removed.')
+        if 
os.path.exists(AINodeDescriptor().get_config().get_ain_models_dir()):
+            shutil.rmtree(AINodeDescriptor().get_config().get_ain_models_dir())
 
-            if status.code == TSStatusCode.SUCCESS_STATUS.get_status_code():
-                logger.info('IoTDB-AINode has successfully removed.')
-                if 
os.path.exists(descriptor.get_config().get_ain_models_dir()):
-                    shutil.rmtree(descriptor.get_config().get_ain_models_dir())
 
+def main():
+    arguments = sys.argv
+    # load config
+    AINodeDescriptor()
+    if len(arguments) == 1:
+        Logger().info("Command line argument must be specified.")
+        return
+    command = arguments[1]
+    if command == 'start':
+        try:
+            Logger().info('IoTDB-AINode is starting...')
+            start_ainode()
         except Exception as e:
-            logger.error("Remove AINode failed, because of: {}".format(e))
+            Logger().error("Start AINode failed, because of: {}".format(e))
+            sys.exit(1)
+    elif command == 'remove':
+        try:
+            Logger().info("Removing AINode...")
+            remove_ainode(arguments)
+        except Exception as e:
+            Logger().error("Remove AINode failed, because of: {}".format(e))
             sys.exit(1)
     else:
-        logger.warning("Unknown argument: {}.".format(command))
+        Logger().warning("Unknown argument: {}.".format(command))
+
 
 if __name__ == '__main__':
-    main()
\ No newline at end of file
+    main()
diff --git a/iotdb-core/ainode/iotdb/ainode/service.py 
b/iotdb-core/ainode/iotdb/ainode/service.py
index b6bd73bde9b..be3edafcc80 100644
--- a/iotdb-core/ainode/iotdb/ainode/service.py
+++ b/iotdb-core/ainode/iotdb/ainode/service.py
@@ -15,24 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import os
-import sys
 import threading
-from datetime import datetime
 
-import psutil
 from thrift.protocol import TCompactProtocol, TBinaryProtocol
 from thrift.server import TServer
 from thrift.transport import TSocket, TTransport
 
-from iotdb.ainode.client import client_manager
-from iotdb.ainode.config import descriptor
-from iotdb.ainode.constant import AINODE_SYSTEM_FILE_NAME
+from iotdb.ainode.config import AINodeDescriptor
 from iotdb.ainode.handler import AINodeRPCServiceHandler
-from iotdb.ainode.log import logger
+from iotdb.ainode.log import Logger
 from iotdb.thrift.ainode import IAINodeRPCService
-from iotdb.thrift.common.ttypes import TAINodeConfiguration, TAINodeLocation, 
TEndPoint, TNodeResource
-from iotdb.thrift.confignode.ttypes import TNodeVersionInfo
 
 
 class RPCService(threading.Thread):
@@ -40,10 +32,10 @@ class RPCService(threading.Thread):
         self.exit_code = 0
         super().__init__()
         processor = 
IAINodeRPCService.Processor(handler=AINodeRPCServiceHandler())
-        transport = 
TSocket.TServerSocket(host=descriptor.get_config().get_ain_inference_rpc_address(),
-                                          
port=descriptor.get_config().get_ain_inference_rpc_port())
+        transport = 
TSocket.TServerSocket(host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
+                                          
port=AINodeDescriptor().get_config().get_ain_inference_rpc_port())
         transport_factory = TTransport.TFramedTransportFactory()
-        if descriptor.get_config().get_ain_thrift_compression_enabled():
+        if 
AINodeDescriptor().get_config().get_ain_thrift_compression_enabled():
             protocol_factory = TCompactProtocol.TCompactProtocolFactory()
         else:
             protocol_factory = TBinaryProtocol.TBinaryProtocolFactory()
@@ -51,89 +43,9 @@ class RPCService(threading.Thread):
         self.__pool_server = TServer.TThreadPoolServer(processor, transport, 
transport_factory, protocol_factory)
 
     def run(self) -> None:
-        logger.info("The RPC service thread begin to run...")
+        Logger().info("The RPC service thread begin to run...")
         try:
             self.__pool_server.serve()
         except Exception as e:
             self.exit_code = 1
-            logger.error(e)
-
-
-class AINode(object):
-    def __init__(self):
-        self.__rpc_service = RPCService()
-
-    def start(self) -> None:
-        logger.info('IoTDB-AINode is starting...')
-        system_path = descriptor.get_config().get_ain_system_dir()
-        system_properties_file = 
os.path.join(descriptor.get_config().get_ain_system_dir(), 
AINODE_SYSTEM_FILE_NAME)
-        if not os.path.exists(system_path):
-            try:
-                os.makedirs(system_path)
-                os.chmod(system_path, 0o777)
-            except PermissionError as e:
-                logger.error(e)
-                raise e
-
-        if not os.path.exists(system_properties_file):
-            # If the system.properties file does not exist, the AINode will 
register to ConfigNode.
-            try:
-                logger.info('IoTDB-AINode is registering to ConfigNode...')
-                ainode_id = 
client_manager.borrow_config_node_client().node_register(
-                    descriptor.get_config().get_cluster_name(),
-                    self._generate_configuration(),
-                    self._generate_version_info())
-                descriptor.get_config().set_ainode_id(ainode_id)
-                system_properties = {
-                    'ainode_id': ainode_id,
-                    'cluster_name': descriptor.get_config().get_cluster_name(),
-                    'iotdb_version': 
descriptor.get_config().get_version_info(),
-                    'commit_id': descriptor.get_config().get_build_info(),
-                    'ain_rpc_address': 
descriptor.get_config().get_ain_inference_rpc_address(),
-                    'ain_rpc_port': 
descriptor.get_config().get_ain_inference_rpc_port(),
-                    'config_node_list': 
descriptor.get_config().get_ain_target_config_node_list(),
-                }
-                with open(system_properties_file, 'w') as f:
-                    f.write('#' + str(datetime.now()) + '\n')
-                    for key, value in system_properties.items():
-                        f.write(key + '=' + str(value) + '\n')
-
-            except Exception as e:
-                logger.error('IoTDB-AINode failed to register to ConfigNode: 
{}'.format(e))
-                sys.exit(1)
-        else:
-            # If the system.properties file does exist, the AINode will just 
restart.
-            try:
-                logger.info('IoTDB-AINode is restarting...')
-                client_manager.borrow_config_node_client().node_restart(
-                    descriptor.get_config().get_cluster_name(),
-                    self._generate_configuration(),
-                    self._generate_version_info())
-
-            except Exception as e:
-                logger.error('IoTDB-AINode failed to restart: {}'.format(e))
-                sys.exit(1)
-
-        self.__rpc_service.start()
-        self.__rpc_service.join(1)
-        if self.__rpc_service.exit_code != 0:
-            return
-
-        logger.info('IoTDB-AINode has successfully started.')
-
-    @staticmethod
-    def _generate_configuration() -> TAINodeConfiguration:
-        location = TAINodeLocation(descriptor.get_config().get_ainode_id(),
-                                   
TEndPoint(descriptor.get_config().get_ain_inference_rpc_address(),
-                                             
descriptor.get_config().get_ain_inference_rpc_port()))
-        resource = TNodeResource(
-            int(psutil.cpu_count()),
-            int(psutil.virtual_memory()[0])
-        )
-
-        return TAINodeConfiguration(location, resource)
-
-    @staticmethod
-    def _generate_version_info() -> TNodeVersionInfo:
-        return TNodeVersionInfo(descriptor.get_config().get_version_info(),
-                                descriptor.get_config().get_build_info())
+            Logger().error(e)
diff --git a/iotdb-core/ainode/iotdb/ainode/util.py 
b/iotdb-core/ainode/iotdb/ainode/util.py
deleted file mode 100644
index af605bb30c2..00000000000
--- a/iotdb-core/ainode/iotdb/ainode/util.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import torch
-
-from iotdb.ainode.constant import TSStatusCode, ModelInputName
-from iotdb.ainode.exception import BadNodeUrlError
-from iotdb.ainode.log import logger
-from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
-
-
-def parse_endpoint_url(endpoint_url: str) -> TEndPoint:
-    """ Parse TEndPoint from a given endpoint url.
-    Args:
-        endpoint_url: an endpoint url, format: ip:port
-    Returns:
-        TEndPoint
-    Raises:
-        BadNodeUrlError
-    """
-    split = endpoint_url.split(":")
-    if len(split) != 2:
-        logger.warning("Illegal endpoint url format: {}".format(endpoint_url))
-        raise BadNodeUrlError(endpoint_url)
-
-    ip = split[0]
-    try:
-        port = int(split[1])
-        result = TEndPoint(ip, port)
-        return result
-    except ValueError as e:
-        logger.warning("Illegal endpoint url format: {} 
({})".format(endpoint_url, e))
-        raise BadNodeUrlError(endpoint_url)
-
-
-def get_status(status_code: TSStatusCode, message: str = None) -> TSStatus:
-    status = TSStatus(status_code.get_status_code())
-    status.message = message
-    return status
-
-
-def verify_success(status: TSStatus, err_msg: str) -> None:
-    if status.code != TSStatusCode.SUCCESS_STATUS.get_status_code():
-        logger.warning(err_msg + ", error status is ", status)
-        raise RuntimeError(str(status.code) + ": " + status.message)
-
-
-def pack_input_dict(batch_x: torch.Tensor,
-                    batch_x_mark: torch.Tensor = None,
-                    dec_inp: torch.Tensor = None,
-                    batch_y_mark: torch.Tensor = None):
-    """
-    pack up inputs as a dict to adapt for different models
-    """
-    input_dict = {}
-    if batch_x is not None:
-        input_dict[ModelInputName.DATA_X.value] = batch_x
-    if batch_x_mark is not None:
-        input_dict[ModelInputName.TIME_STAMP_X] = batch_x_mark
-    if dec_inp is not None:
-        input_dict[ModelInputName.DEC_INP] = dec_inp
-    if batch_y_mark is not None:
-        input_dict[ModelInputName.TIME_STAMP_Y.value] = batch_y_mark
-    return input_dict
diff --git a/iotdb-core/ainode/iotdb/ainode/encryption.py 
b/iotdb-core/ainode/iotdb/ainode/util/__init__.py
similarity index 100%
copy from iotdb-core/ainode/iotdb/ainode/encryption.py
copy to iotdb-core/ainode/iotdb/ainode/util/__init__.py
diff --git a/iotdb-core/ainode/iotdb/ainode/encryption.py 
b/iotdb-core/ainode/iotdb/ainode/util/decorator.py
similarity index 78%
copy from iotdb-core/ainode/iotdb/ainode/encryption.py
copy to iotdb-core/ainode/iotdb/ainode/util/decorator.py
index 2a1e720805f..33b9f4835ac 100644
--- a/iotdb-core/ainode/iotdb/ainode/encryption.py
+++ b/iotdb-core/ainode/iotdb/ainode/util/decorator.py
@@ -15,3 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+def singleton(cls):
+    instances = {}
+
+    def get_instance(*args, **kwargs):
+        if cls not in instances:
+            instances[cls] = cls(*args, **kwargs)
+        return instances[cls]
+
+    return get_instance
diff --git a/iotdb-core/ainode/iotdb/ainode/serde.py 
b/iotdb-core/ainode/iotdb/ainode/util/serde.py
similarity index 100%
rename from iotdb-core/ainode/iotdb/ainode/serde.py
rename to iotdb-core/ainode/iotdb/ainode/util/serde.py
diff --git a/iotdb-core/ainode/iotdb/ainode/encryption.py 
b/iotdb-core/ainode/iotdb/ainode/util/status.py
similarity index 57%
rename from iotdb-core/ainode/iotdb/ainode/encryption.py
rename to iotdb-core/ainode/iotdb/ainode/util/status.py
index 2a1e720805f..1bcbef7a806 100644
--- a/iotdb-core/ainode/iotdb/ainode/encryption.py
+++ b/iotdb-core/ainode/iotdb/ainode/util/status.py
@@ -15,3 +15,19 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+
+from iotdb.ainode.constant import TSStatusCode
+from iotdb.ainode.log import Logger
+from iotdb.thrift.common.ttypes import TSStatus
+
+
+def get_status(status_code: TSStatusCode, message: str = None) -> TSStatus:
+    status = TSStatus(status_code.get_status_code())
+    status.message = message
+    return status
+
+
+def verify_success(status: TSStatus, err_msg: str) -> None:
+    if status.code != TSStatusCode.SUCCESS_STATUS.get_status_code():
+        Logger().warning(err_msg + ", error status is ", status)
+        raise RuntimeError(str(status.code) + ": " + status.message)

Reply via email to