This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f7d7f68aff7 [AINode] Sync codes for ainode (#17139)
f7d7f68aff7 is described below

commit f7d7f68aff7fc64672142a3275b4259c231df154
Author: Leo <[email protected]>
AuthorDate: Fri Feb 6 08:57:59 2026 +0800

    [AINode] Sync codes for ainode (#17139)
    
    Co-authored-by: Liu Zhengyun <[email protected]>
---
 iotdb-core/ainode/iotdb/ainode/core/config.py      |  15 ---
 iotdb-core/ainode/iotdb/ainode/core/constant.py    |   1 -
 .../core/inference/inference_request_pool.py       |   4 +-
 .../core/inference/pipeline/basic_pipeline.py      |  74 +++++++++++--
 .../ainode/iotdb/ainode/core/ingress/iotdb.py      |   8 --
 .../iotdb/ainode/core/manager/inference_manager.py | 116 ++++++++++++---------
 .../ainode/resources/conf/iotdb-ainode.properties  |   4 -
 .../thrift-ainode/src/main/thrift/ainode.thrift    |   3 +-
 8 files changed, 135 insertions(+), 90 deletions(-)

diff --git a/iotdb-core/ainode/iotdb/ainode/core/config.py 
b/iotdb-core/ainode/iotdb/ainode/core/config.py
index b14efa3bedf..4995dda7bf3 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/config.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/config.py
@@ -23,7 +23,6 @@ from iotdb.ainode.core.constant import (
     AINODE_CLUSTER_INGRESS_ADDRESS,
     AINODE_CLUSTER_INGRESS_PASSWORD,
     AINODE_CLUSTER_INGRESS_PORT,
-    AINODE_CLUSTER_INGRESS_TIME_ZONE,
     AINODE_CLUSTER_INGRESS_USERNAME,
     AINODE_CLUSTER_NAME,
     AINODE_CONF_DIRECTORY_NAME,
@@ -69,7 +68,6 @@ class AINodeConfig(object):
         self._ain_cluster_ingress_port = AINODE_CLUSTER_INGRESS_PORT
         self._ain_cluster_ingress_username = AINODE_CLUSTER_INGRESS_USERNAME
         self._ain_cluster_ingress_password = AINODE_CLUSTER_INGRESS_PASSWORD
-        self._ain_cluster_ingress_time_zone = AINODE_CLUSTER_INGRESS_TIME_ZONE
 
         # Inference configuration
         self._ain_inference_batch_interval_in_ms: int = (
@@ -287,14 +285,6 @@ class AINodeConfig(object):
     ) -> None:
         self._ain_cluster_ingress_password = ain_cluster_ingress_password
 
-    def get_ain_cluster_ingress_time_zone(self) -> str:
-        return self._ain_cluster_ingress_time_zone
-
-    def set_ain_cluster_ingress_time_zone(
-        self, ain_cluster_ingress_time_zone: str
-    ) -> None:
-        self._ain_cluster_ingress_time_zone = ain_cluster_ingress_time_zone
-
 
 @singleton
 class AINodeDescriptor(object):
@@ -432,11 +422,6 @@ class AINodeDescriptor(object):
                     file_configs["ain_cluster_ingress_password"]
                 )
 
-            if "ain_cluster_ingress_time_zone" in config_keys:
-                self._config.set_ain_cluster_ingress_time_zone(
-                    file_configs["ain_cluster_ingress_time_zone"]
-                )
-
         except BadNodeUrlException:
             logger.warning("Cannot load AINode conf file, use default 
configuration.")
 
diff --git a/iotdb-core/ainode/iotdb/ainode/core/constant.py 
b/iotdb-core/ainode/iotdb/ainode/core/constant.py
index b0019722630..4a2ee543d1f 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/constant.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/constant.py
@@ -39,7 +39,6 @@ AINODE_CLUSTER_INGRESS_ADDRESS = "127.0.0.1"
 AINODE_CLUSTER_INGRESS_PORT = 6667
 AINODE_CLUSTER_INGRESS_USERNAME = "root"
 AINODE_CLUSTER_INGRESS_PASSWORD = "root"
-AINODE_CLUSTER_INGRESS_TIME_ZONE = "UTC+8"
 
 # RPC config
 AINODE_THRIFT_COMPRESSION_ENABLED = False
diff --git 
a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py 
b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
index dcfa4528fce..8121d4fecd8 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/inference/inference_request_pool.py
@@ -127,7 +127,9 @@ class InferenceRequestPool(mp.Process):
             for i in range(batch_inputs.size(0)):
                 batch_input_list.append({"targets": batch_inputs[i]})
             batch_inputs = self._inference_pipeline.preprocess(
-                batch_input_list, output_length=requests[0].output_length
+                batch_input_list,
+                output_length=requests[0].output_length,
+                auto_adapt=True,
             )
             if isinstance(self._inference_pipeline, ForecastPipeline):
                 batch_output = self._inference_pipeline.forecast(
diff --git 
a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py 
b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
index ece395bf697..5d0026522a1 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/inference/pipeline/basic_pipeline.py
@@ -19,13 +19,16 @@
 from abc import ABC, abstractmethod
 
 import torch
+from torch.nn import functional as F
 
 from iotdb.ainode.core.exception import InferenceModelInternalException
+from iotdb.ainode.core.log import Logger
 from iotdb.ainode.core.manager.device_manager import DeviceManager
 from iotdb.ainode.core.model.model_info import ModelInfo
 from iotdb.ainode.core.model.model_loader import load_model
 
 BACKEND = DeviceManager()
+logger = Logger()
 
 
 class BasicPipeline(ABC):
@@ -70,6 +73,7 @@ class ForecastPipeline(BasicPipeline):
 
             infer_kwargs (dict, optional): Additional keyword arguments for 
inference, such as:
                 - `output_length`(int): Used to check validation of 
'future_covariates' if provided.
+                - `auto_adapt`(bool): Whether to automatically adapt the 
covariates.
 
         Raises:
             ValueError: If the input format is incorrect (e.g., missing keys, 
invalid tensor shapes).
@@ -80,6 +84,7 @@ class ForecastPipeline(BasicPipeline):
 
         if isinstance(inputs, list):
             output_length = infer_kwargs.get("output_length", 96)
+            auto_adapt = infer_kwargs.get("auto_adapt", True)
             for idx, input_dict in enumerate(inputs):
                 # Check if the dictionary contains the expected keys
                 if not isinstance(input_dict, dict):
@@ -121,10 +126,30 @@ class ForecastPipeline(BasicPipeline):
                         raise ValueError(
                             f"Each value in 'past_covariates' must be 
torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
                         )
-                    if cov_value.ndim != 1 or cov_value.shape[0] != 
input_length:
+                    if cov_value.ndim != 1:
                         raise ValueError(
-                            f"Each covariate in 'past_covariates' must have 
shape ({input_length},), but got shape {cov_value.shape} for key '{cov_key}' at 
index {idx}."
+                            f"Individual `past_covariates` must be 1-d, found: 
{cov_key} with {cov_value.ndim} dimensions in element at index {idx}."
                         )
+                    # If any past_covariate's length is not equal to 
input_length, process it accordingly.
+                    if cov_value.shape[0] != input_length:
+                        if auto_adapt:
+                            if cov_value.shape[0] > input_length:
+                                logger.warning(
+                                    f"Past covariate {cov_key} at index {idx} 
has length {cov_value.shape[0]} (> {input_length}), which will be truncated 
from the beginning."
+                                )
+                                past_covariates[cov_key] = 
cov_value[-input_length:]
+                            else:
+                                logger.warning(
+                                    f"Past covariate {cov_key} at index {idx} 
has length {cov_value.shape[0]} (< {input_length}), which will be padded with 
zeros at the beginning."
+                                )
+                                pad_size = input_length - cov_value.shape[0]
+                                past_covariates[cov_key] = F.pad(
+                                    cov_value, (pad_size, 0)
+                                )
+                        else:
+                            raise ValueError(
+                                f"Individual `past_covariates` must be 1-d 
with length equal to the length of `target` (= {input_length}), found: 
{cov_key} with shape {tuple(cov_value.shape)} in element at index {idx}."
+                            )
 
                 # Check 'future_covariates' if it exists (optional)
                 future_covariates = input_dict.get("future_covariates", {})
@@ -134,19 +159,52 @@ class ForecastPipeline(BasicPipeline):
                     )
                 # If future_covariates exists, check if they are a subset of 
past_covariates
                 if future_covariates:
-                    for cov_key, cov_value in future_covariates.items():
+                    for cov_key, cov_value in list(future_covariates.items()):
+                        # If any future_covariate not found in 
past_covariates, ignore it or raise an error.
                         if cov_key not in past_covariates:
-                            raise ValueError(
-                                f"Key '{cov_key}' in 'future_covariates' is 
not in 'past_covariates' at index {idx}."
-                            )
+                            if auto_adapt:
+                                future_covariates.pop(cov_key)
+                                logger.warning(
+                                    f"Future covariate {cov_key} not found in 
past_covariates {list(past_covariates.keys())}, which will be ignored when 
executing forecasting."
+                                )
+                                if not future_covariates:
+                                    input_dict.pop("future_covariates")
+                                continue
+                            else:
+                                raise ValueError(
+                                    f"Expected keys in `future_covariates` to 
be a subset of `past_covariates` {list(past_covariates.keys())}, "
+                                    f"but found {cov_key} in element at index 
{idx}."
+                                )
                         if not isinstance(cov_value, torch.Tensor):
                             raise ValueError(
                                 f"Each value in 'future_covariates' must be 
torch.Tensor, but got {type(cov_value)} for key '{cov_key}' at index {idx}."
                             )
-                        if cov_value.ndim != 1 or cov_value.shape[0] != 
output_length:
+                        if cov_value.ndim != 1:
                             raise ValueError(
-                                f"Each covariate in 'future_covariates' must 
have shape ({output_length},), but got shape {cov_value.shape} for key 
'{cov_key}' at index {idx}."
+                                f"Individual `future_covariates` must be 1-d, 
found: {cov_key} with {cov_value.ndim} dimensions in element at index {idx}."
                             )
+                        # If any future_covariate's length is not equal to 
output_length, process it accordingly.
+                        if cov_value.shape[0] != output_length:
+                            if auto_adapt:
+                                if cov_value.shape[0] > output_length:
+                                    logger.warning(
+                                        f"Future covariate {cov_key} at index 
{idx} has length {cov_value.shape[0]} (> {output_length}), which will be 
truncated from the end."
+                                    )
+                                    future_covariates[cov_key] = cov_value[
+                                        :output_length
+                                    ]
+                                else:
+                                    logger.warning(
+                                        f"Future covariate {cov_key} at index 
{idx} has length {cov_value.shape[0]} (< {output_length}), which will be padded 
with zeros at the end."
+                                    )
+                                    pad_size = output_length - 
cov_value.shape[0]
+                                    future_covariates[cov_key] = F.pad(
+                                        cov_value, (0, pad_size)
+                                    )
+                            else:
+                                raise ValueError(
+                                    f"Individual `future_covariates` must be 
1-d with length equal to `output_length` (= {output_length}), found: {cov_key} 
with shape {tuple(cov_value.shape)} in element at index {idx}."
+                                )
         else:
             raise ValueError(
                 f"The inputs must be a list of dictionaries, but got 
{type(inputs)}."
diff --git a/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py 
b/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
index 13c56ca9d2d..be1dd9bf1c2 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/ingress/iotdb.py
@@ -69,9 +69,6 @@ class IoTDBTreeModelDataset(BasicDatabaseForecastDataset):
         password: str = AINodeDescriptor()
         .get_config()
         .get_ain_cluster_ingress_password(),
-        time_zone: str = AINodeDescriptor()
-        .get_config()
-        .get_ain_cluster_ingress_time_zone(),
         use_rate: float = 1.0,
         offset_rate: float = 0.0,
     ):
@@ -90,7 +87,6 @@ class IoTDBTreeModelDataset(BasicDatabaseForecastDataset):
             node_urls=[f"{ip}:{port}"],
             user=username,
             password=password,
-            zone_id=time_zone,
             use_ssl=AINodeDescriptor()
             .get_config()
             .get_ain_cluster_ingress_ssl_enabled(),
@@ -258,9 +254,6 @@ class IoTDBTableModelDataset(BasicDatabaseForecastDataset):
         password: str = AINodeDescriptor()
         .get_config()
         .get_ain_cluster_ingress_password(),
-        time_zone: str = AINodeDescriptor()
-        .get_config()
-        .get_ain_cluster_ingress_time_zone(),
         use_rate: float = 1.0,
         offset_rate: float = 0.0,
     ):
@@ -272,7 +265,6 @@ class IoTDBTableModelDataset(BasicDatabaseForecastDataset):
             node_urls=[f"{ip}:{port}"],
             username=username,
             password=password,
-            time_zone=time_zone,
             use_ssl=AINodeDescriptor()
             .get_config()
             .get_ain_cluster_ingress_ssl_enabled(),
diff --git a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py 
b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
index 2ad25ad0529..07ca8a63bce 100644
--- a/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
+++ b/iotdb-core/ainode/iotdb/ainode/core/manager/inference_manager.py
@@ -175,6 +175,66 @@ class InferenceManager:
             with self._result_wrapper_lock:
                 del self._result_wrapper_map[req_id]
 
+    def _do_inference_and_construct_resp(
+        self,
+        model_id: str,
+        model_inputs_list: list[dict[str, torch.Tensor | dict[str, 
torch.Tensor]]],
+        output_length: int,
+        inference_attrs: dict,
+        **kwargs,
+    ) -> list[bytes]:
+        auto_adapt = kwargs.get("auto_adapt", True)
+        if (
+            output_length
+            > 
AINodeDescriptor().get_config().get_ain_inference_max_output_length()
+        ):
+            raise NumericalRangeException(
+                "output_length",
+                output_length,
+                1,
+                
AINodeDescriptor().get_config().get_ain_inference_max_output_length(),
+            )
+
+        if self._pool_controller.has_running_pools(model_id):
+            infer_req = InferenceRequest(
+                req_id=generate_req_id(),
+                model_id=model_id,
+                inputs=torch.stack(
+                    [data["targets"] for data in model_inputs_list], dim=0
+                ),
+                output_length=output_length,
+            )
+            outputs = self._process_request(infer_req)
+        else:
+            model_info = self._model_manager.get_model_info(model_id)
+            inference_pipeline = load_pipeline(
+                model_info, device=self._backend.torch_device("cpu")
+            )
+            inputs = inference_pipeline.preprocess(
+                model_inputs_list,
+                output_length=output_length,
+                auto_adapt=auto_adapt,
+            )
+            if isinstance(inference_pipeline, ForecastPipeline):
+                outputs = inference_pipeline.forecast(
+                    inputs, output_length=output_length, **inference_attrs
+                )
+            elif isinstance(inference_pipeline, ClassificationPipeline):
+                outputs = inference_pipeline.classify(inputs)
+            elif isinstance(inference_pipeline, ChatPipeline):
+                outputs = inference_pipeline.chat(inputs)
+            else:
+                outputs = None
+                logger.error("[Inference] Unsupported pipeline type.")
+            outputs = inference_pipeline.postprocess(outputs)
+
+        # convert tensor into tsblock for the output in each batch
+        resp_list = []
+        for batch_idx, output in enumerate(outputs):
+            resp = convert_tensor_to_tsblock(output)
+            resp_list.append(resp)
+        return resp_list
+
     def _run(
         self,
         req,
@@ -191,65 +251,17 @@ class InferenceManager:
             inference_attrs = extract_attrs(req)
             output_length = int(inference_attrs.pop("output_length", 96))
 
-            # model_inputs_list: Each element is a dict, which contains the 
following keys:
-            #   `targets`: The input tensor for the target variable(s), whose 
shape is [target_count, input_length].
             model_inputs_list: list[
                 dict[str, torch.Tensor | dict[str, torch.Tensor]]
             ] = [{"targets": inputs[0]}]
 
-            if (
-                output_length
-                > 
AINodeDescriptor().get_config().get_ain_inference_max_output_length()
-            ):
-                raise NumericalRangeException(
-                    "output_length",
-                    output_length,
-                    1,
-                    AINodeDescriptor()
-                    .get_config()
-                    .get_ain_inference_max_output_length(),
-                )
-
-            if self._pool_controller.has_running_pools(model_id):
-                infer_req = InferenceRequest(
-                    req_id=generate_req_id(),
-                    model_id=model_id,
-                    inputs=torch.stack(
-                        [data["targets"] for data in model_inputs_list], dim=0
-                    ),
-                    output_length=output_length,
-                )
-                outputs = self._process_request(infer_req)
-            else:
-                model_info = self._model_manager.get_model_info(model_id)
-                inference_pipeline = load_pipeline(
-                    model_info, device=self._backend.torch_device("cpu")
-                )
-                inputs = inference_pipeline.preprocess(
-                    model_inputs_list, output_length=output_length
-                )
-                if isinstance(inference_pipeline, ForecastPipeline):
-                    outputs = inference_pipeline.forecast(
-                        inputs, output_length=output_length, **inference_attrs
-                    )
-                elif isinstance(inference_pipeline, ClassificationPipeline):
-                    outputs = inference_pipeline.classify(inputs)
-                elif isinstance(inference_pipeline, ChatPipeline):
-                    outputs = inference_pipeline.chat(inputs)
-                else:
-                    outputs = None
-                    logger.error("[Inference] Unsupported pipeline type.")
-                outputs = inference_pipeline.postprocess(outputs)
-
-            # convert tensor into tsblock for the output in each batch
-            output_list = []
-            for batch_idx, output in enumerate(outputs):
-                output = convert_tensor_to_tsblock(output)
-                output_list.append(output)
+            resp_list = self._do_inference_and_construct_resp(
+                model_id, model_inputs_list, output_length, inference_attrs
+            )
 
             return resp_cls(
                 get_status(TSStatusCode.SUCCESS_STATUS),
-                [output_list[0]] if single_batch else output_list,
+                [resp_list[0]] if single_batch else resp_list,
             )
 
         except Exception as e:
diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties 
b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
index fc569b27807..5b653d678a6 100644
--- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
+++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
@@ -52,10 +52,6 @@ ain_cluster_ingress_username=root
 # Datatype: String
 ain_cluster_ingress_password=root
 
-# The time zone of the IoTDB cluster.
-# Datatype: String
-ain_cluster_ingress_time_zone=UTC+8
-
 # The device space allocated for inference
 # Datatype: Float
 ain_inference_memory_usage_ratio=0.2
diff --git a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift 
b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
index 68347b89203..0416f3c69cb 100644
--- a/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
+++ b/iotdb-protocol/thrift-ainode/src/main/thrift/ainode.thrift
@@ -87,7 +87,8 @@ struct TForecastReq {
   3: required i32 outputLength
   4: optional string historyCovs
   5: optional string futureCovs
-  6: optional map<string, string> options
+  6: optional bool autoAdapt
+  7: optional map<string, string> options
 }
 
 struct TForecastResp {

Reply via email to