This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch query_v2_py in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f1532d9e1b85c07efd446547b48008b65c64f9df Author: HTHou <[email protected]> AuthorDate: Thu Mar 20 16:20:16 2025 +0800 dev more --- .../client-py/iotdb/utils/IoTDBRpcDataSet.py | 47 ++++++++++++++-------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py index 2d30d30c66d..c4e36a0f794 100644 --- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -22,6 +22,7 @@ import logging import numpy as np import pandas as pd +from datetime import date from thrift.transport import TTransport from iotdb.thrift.rpc.IClientRPCService import TSFetchResultsReq, TSCloseOperationReq from iotdb.tsfile.utils.date_utils import parse_int_to_date @@ -103,6 +104,7 @@ class IoTDBRpcDataSet(object): self.__query_result = query_result if query_result is not None: self.__query_result_size = len(query_result) + self.__query_result_index = 0 self.__is_closed = False self.__empty_resultSet = False self.__rows_index = 0 @@ -148,16 +150,15 @@ class IoTDBRpcDataSet(object): if ( self.has_cached_data_frame or self.__query_result is None - or len(self.__query_result) == 0 + or len(self.__query_result) <= self.__query_result_index ): return - binary_size = len(self.__query_result) - binary_index = 0 result = {} time_column_values, column_values, null_indicators, _ = deserialize( - self.__query_result[0] + self.__query_result[self.__query_result_index] ) - self.__query_result = None + self.__query_result[self.__query_result_index] = None + self.__query_result_index += 1 time_array = np.frombuffer( time_column_values, np.dtype(np.longlong).newbyteorder(">") ) @@ -191,7 +192,7 @@ class IoTDBRpcDataSet(object): ) # BOOLEAN elif data_type == 0: - data_array = np.frombuffer(value_buffer, np.dtype("?")) + data_array = np.array(value_buffer).astype("bool") # INT32, DATE elif data_type == 1 or data_type == 9: data_array = np.frombuffer( @@ -228,11 +229,16 @@ class IoTDBRpcDataSet(object): null_indicator = null_indicators[location] - if len(data_array) < total_length: + if len(data_array) < total_length or ( + data_type == 0 and null_indicator is not None + ): tmp_array = np.full(total_length, None, dtype=object) if null_indicator is not None: indexes = [not v for v in null_indicator] - tmp_array[indexes] = data_array + if data_type == 0: + tmp_array[indexes] = data_array[indexes] + else: + tmp_array[indexes] = data_array # INT32, DATE if data_type == 1 or data_type == 9: @@ -247,7 +253,6 @@ class IoTDBRpcDataSet(object): result[i + 1] = data_array self.data_frame = pd.DataFrame(result, dtype=object) - self.__query_result = None if not self.data_frame.empty: self.has_cached_data_frame = True @@ -255,7 +260,9 @@ class IoTDBRpcDataSet(object): return self.has_cached_data_frame def _has_next_result_set(self): - if (self.__query_result is not None) and (len(self.__query_result[0]) != 0): + if (self.__query_result is not None) and ( + len(self.__query_result) > self.__query_result_index + ): return True if self.__empty_resultSet: return False @@ -269,9 +276,10 @@ class IoTDBRpcDataSet(object): result[column_name] = [] while self._has_next_result_set(): time_column_values, column_values, null_indicators, _ = deserialize( - self.__query_result[0] + self.__query_result[self.__query_result_index] ) - self.__query_result = None + self.__query_result[self.__query_result_index] = None + self.__query_result_index += 1 time_array = np.frombuffer( time_column_values, np.dtype(np.longlong).newbyteorder(">") ) @@ -310,7 +318,7 @@ class IoTDBRpcDataSet(object): ) # BOOLEAN elif data_type == 0: - data_array = np.frombuffer(value_buffer, np.dtype("?")) + data_array = np.array(value_buffer).astype("bool") # INT32 elif data_type == 1: data_array = np.frombuffer( @@ -364,13 +372,15 @@ class IoTDBRpcDataSet(object): data_array = pd.Series(data_array).apply(parse_int_to_date) else: raise RuntimeError("unsupported data type {}.".format(data_type)) - if data_array.dtype.byteorder == ">": + if data_array.dtype.byteorder == ">" and len(data_array) > 0: data_array = data_array.byteswap().view( data_array.dtype.newbyteorder("<") ) tmp_array = [] null_indicator = null_indicators[location] - if len(data_array) < total_length: + if len(data_array) < total_length or ( + data_type == 0 and null_indicator is not None + ): # BOOLEAN, INT32, INT64, TIMESTAMP if ( data_type == 0 @@ -391,11 +401,14 @@ class IoTDBRpcDataSet(object): or data_type == 10 or data_type == 9 ): - tmp_array = np.full(total_length, None, dtype=data_array.dtype) + tmp_array = np.full(total_length, None, dtype=object) if null_indicator is not None: indexes = [not v for v in null_indicator] - tmp_array[indexes] = data_array + if data_type == 0: + tmp_array[indexes] = data_array[indexes] + else: + tmp_array[indexes] = data_array if data_type == 1: tmp_array = pd.Series(tmp_array).astype("Int32")
