This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch rel/0.10 in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 239c7dcb28cd9ce2d74ea0e6ecf270d8fcbde629 Author: Zekun Li <[email protected]> AuthorDate: Mon Jul 20 14:16:45 2020 +0800 python session client ver-0.10.0 --- client-py/src/Session.py | 460 +++++++++++++++++++++++++++++++++ client-py/src/SessionExample.py | 105 ++++++++ client-py/src/SessionUT.py | 107 ++++++++ client-py/src/utils/Field.py | 176 +++++++++++++ client-py/src/utils/IoTDBConstants.py | 52 ++++ client-py/src/utils/IoTDBRpcDataSet.py | 219 ++++++++++++++++ client-py/src/utils/RowRecord.py | 55 ++++ client-py/src/utils/SessionDataSet.py | 101 ++++++++ client-py/src/utils/Tablet.py | 133 ++++++++++ client-py/src/utils/__init__.py | 18 ++ 10 files changed, 1426 insertions(+) diff --git a/client-py/src/Session.py b/client-py/src/Session.py new file mode 100644 index 0000000..c43f274 --- /dev/null +++ b/client-py/src/Session.py @@ -0,0 +1,460 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +import struct + +# if you run compile.sh, you can use the following code: +sys.path.append("../target") + +# if you use maven to compile the thrift api, just use the following code: +# sys.path.append("../../thrift/target/generated-sources-python") + +sys.path.append("./utils") + +from IoTDBConstants import * +from SessionDataSet import SessionDataSet + +from thrift.protocol import TBinaryProtocol, TCompactProtocol +from thrift.transport import TSocket, TTransport + +from iotdb.rpc.TSIService import Client, TSCreateTimeseriesReq, TSInsertRecordReq, TSInsertTabletReq, \ + TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet, TSFetchResultsReq, TSCloseOperationReq, \ + TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq +from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq + + +class Session(object): + DEFAULT_FETCH_SIZE = 10000 + DEFAULT_USER = 'root' + DEFAULT_PASSWORD = 'root' + + def __init__(self, host, port, user=DEFAULT_USER, password=DEFAULT_PASSWORD, fetch_size=DEFAULT_FETCH_SIZE): + self.__host = host + self.__port = port + self.__user = user + self.__password = password + self.__fetch_size = fetch_size + self.__is_close = True + self.__transport = None + self.__client = None + self.protocol_version = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2 + self.__session_id = None + self.__statement_id = None + self.__zone_id = None + + def open(self, enable_rpc_compression): + if not self.__is_close: + return + self.__transport = TTransport.TBufferedTransport(TSocket.TSocket(self.__host, self.__port)) + + if not self.__transport.isOpen(): + try: + self.__transport.open() + except TTransport.TTransportException as e: + print('TTransportException: ', e) + + if enable_rpc_compression: + self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport)) + else: + self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport)) + + open_req = TSOpenSessionReq(client_protocol=self.protocol_version, + username=self.__user, + password=self.__password) + + try: + open_resp = self.__client.openSession(open_req) + + if self.protocol_version != open_resp.serverProtocolVersion: + print("Protocol differ, Client version is {}, but Server version is {}".format( + self.protocol_version, open_resp.serverProtocolVersion)) + # version is less than 0.10 + if open_resp.serverProtocolVersion == 0: + raise TTransport.TException(message="Protocol not supported.") + + self.__session_id = open_resp.sessionId + self.__statement_id = self.__client.requestStatementId(self.__session_id) + + except Exception as e: + self.__transport.close() + print("session closed because: ", e) + + if self.__zone_id is not None: + self.set_time_zone(self.__zone_id) + else: + self.__zone_id = self.get_time_zone() + + self.__is_close = False + + def close(self): + if self.__is_close: + return + req = TSCloseSessionReq(self.__session_id) + try: + self.__client.closeSession(req) + except TTransport.TException as e: + print("Error occurs when closing session at server. Maybe server is down. Error message: ", e) + finally: + self.__is_close = True + if self.__transport is not None: + self.__transport.close() + + def set_storage_group(self, group_name): + """ + set one storage group + :param group_name: String, storage group name (starts from root) + """ + status = self.__client.setStorageGroup(self.__session_id, group_name) + print("setting storage group {} message: {}".format(group_name, status.message)) + + def delete_storage_group(self, storage_group): + """ + delete one storage group. + :param storage_group: String, path of the target storage group. + """ + groups = [storage_group] + self.delete_storage_groups(groups) + + def delete_storage_groups(self, storage_group_lst): + """ + delete multiple storage groups. + :param storage_group_lst: List, paths of the target storage groups. + """ + status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) + print("delete storage group(s) {} message: {}".format(storage_group_lst, status.message)) + + def create_time_series(self, ts_path, data_type, encoding, compressor): + """ + create single time series + :param ts_path: String, complete time series path (starts from root) + :param data_type: TSDataType, data type for this time series + :param encoding: TSEncoding, encoding for this time series + :param compressor: Compressor, compressing type for this time series + """ + data_type = data_type.value + encoding = encoding.value + compressor = compressor.value + request = TSCreateTimeseriesReq(self.__session_id, ts_path, data_type, encoding, compressor) + status = self.__client.createTimeseries(request) + print("creating time series {} message: {}".format(ts_path, status.message)) + + def create_multi_time_series(self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst): + """ + create multiple time series + :param ts_path_lst: List of String, complete time series paths (starts from root) + :param data_type_lst: List of TSDataType, data types for time series + :param encoding_lst: List of TSEncoding, encodings for time series + :param compressor_lst: List of Compressor, compressing types for time series + """ + data_type_lst = [data_type.value for data_type in data_type_lst] + encoding_lst = [encoding.value for encoding in encoding_lst] + compressor_lst = [compressor.value for compressor in compressor_lst] + + request = TSCreateMultiTimeseriesReq(self.__session_id, ts_path_lst, data_type_lst, + encoding_lst, compressor_lst) + resp = self.__client.createMultiTimeseries(request) + print("creating multiple time series {} message: {}".format(ts_path_lst, resp.statusList[0].message)) + + def delete_time_series(self, paths_list): + """ + delete multiple time series, including data and schema + :param paths_list: List of time series path, which should be complete (starts from root) + """ + status = self.__client.deleteTimeseries(self.__session_id, paths_list) + print("deleting multiple time series {} message: {}".format(paths_list, status.message)) + + def check_time_series_exists(self, path): + """ + check whether a specific time series exists + :param path: String, complete path of time series for checking + :return Boolean value indicates whether it exists. + """ + data_set = self.execute_query_statement("SHOW TIMESERIES {}".format(path)) + result = data_set.has_next() + data_set.close_operation_handle() + return result + + def delete_data(self, paths_list, timestamp): + """ + delete all data <= time in multiple time series + :param paths_list: time series list that the data in. + :param timestamp: data with time stamp less than or equal to time will be deleted. + """ + request = TSDeleteDataReq(self.__session_id, paths_list, timestamp) + try: + status = self.__client.deleteData(request) + print("delete data from {}, message: {}".format(paths_list, status.message)) + except TTransport.TException as e: + print("data deletion fails because: ", e) + + def insert_str_record(self, device_id, timestamp, measurements, string_values): + """ special case for inserting one row of String (TEXT) value """ + data_types = [TSDataType.TEXT.value for _ in string_values] + request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, string_values) + status = self.__client.insertRecord(request) + print("insert one record to device {} message: {}".format(device_id, status.message)) + + def insert_record(self, device_id, timestamp, measurements, data_types, values): + """ + insert one row of record into database, if you want improve your performance, please use insertTablet method + for example a record at time=10086 with three measurements is: + timestamp, m1, m2, m3 + 10086, 125.3, True, text1 + :param device_id: String, time series path for device + :param timestamp: Integer, indicate the timestamp of the row of data + :param measurements: List of String, sensor names + :param data_types: List of TSDataType, indicate the data type for each sensor + :param values: List, values to be inserted, for each sensor + """ + data_types = [data_type.value for data_type in data_types] + request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values) + status = self.__client.insertRecord(request) + print("insert one record to device {} message: {}".format(device_id, status.message)) + + def insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst): + """ + insert multiple rows of data, records are independent to each other, in other words, there's no relationship + between those records + :param device_ids: List of String, time series paths for device + :param times: List of Integer, timestamps for records + :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device + :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device + :param values_lst: 2-D List, values to be inserted, for each device + """ + type_values_lst = [] + for types in types_lst: + data_types = [data_type.value for data_type in types] + type_values_lst.append(data_types) + request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst) + resp = self.__client.insertRecords(request) + print("insert multiple records to devices {} message: {}".format(device_ids, resp.statusList[0].message)) + + def test_insert_record(self, device_id, timestamp, measurements, data_types, values): + """ + this method NOT insert data into database and the server just return after accept the request, this method + should be used to test other time cost in client + :param device_id: String, time series path for device + :param timestamp: Integer, indicate the timestamp of the row of data + :param measurements: List of String, sensor names + :param data_types: List of TSDataType, indicate the data type for each sensor + :param values: List, values to be inserted, for each sensor + """ + data_types = [data_type.value for data_type in data_types] + request = self.gen_insert_record_req(device_id, timestamp, measurements, data_types, values) + status = self.__client.testInsertRecord(request) + print("testing! insert one record to device {} message: {}".format(device_id, status.message)) + + def test_insert_records(self, device_ids, times, measurements_lst, types_lst, values_lst): + """ + this method NOT insert data into database and the server just return after accept the request, this method + should be used to test other time cost in client + :param device_ids: List of String, time series paths for device + :param times: List of Integer, timestamps for records + :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device + :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device + :param values_lst: 2-D List, values to be inserted, for each device + """ + type_values_lst = [] + for types in types_lst: + data_types = [data_type.value for data_type in types] + type_values_lst.append(data_types) + request = self.gen_insert_records_req(device_ids, times, measurements_lst, type_values_lst, values_lst) + status = self.__client.testInsertRecords(request) + print("testing! insert multiple records, message: {}".format(status.message)) + + def gen_insert_record_req(self, device_id, timestamp, measurements, data_types, values): + if (len(values) != len(data_types)) or (len(values) != len(measurements)): + print("length of data types does not equal to length of values!") + # could raise an error here. + return + values_in_bytes = Session.value_to_bytes(data_types, values) + return TSInsertRecordReq(self.__session_id, device_id, measurements, values_in_bytes, timestamp) + + def gen_insert_records_req(self, device_ids, times, measurements_lst, types_lst, values_lst): + if (len(device_ids) != len(measurements_lst)) or (len(times) != len(types_lst)) or \ + (len(device_ids) != len(times)) or (len(times) != len(values_lst)): + print("deviceIds, times, measurementsList and valuesList's size should be equal") + # could raise an error here. + return + + value_lst = [] + for values, data_types, measurements in zip(values_lst, types_lst, measurements_lst): + if (len(values) != len(data_types)) or (len(values) != len(measurements)): + print("deviceIds, times, measurementsList and valuesList's size should be equal") + # could raise an error here. + return + values_in_bytes = Session.value_to_bytes(data_types, values) + value_lst.append(values_in_bytes) + + return TSInsertRecordsReq(self.__session_id, device_ids, measurements_lst, value_lst, times) + + def insert_tablet(self, tablet): + """ + insert one tablet, in a tablet, for each timestamp, the number of measurements is same + for example three records in the same device can form a tablet: + timestamps, m1, m2, m3 + 1, 125.3, True, text1 + 2, 111.6, False, text2 + 3, 688.6, True, text3 + Notice: The tablet should not have empty cell + The tablet itself is sorted (see docs of Tablet.py) + :param tablet: a tablet specified above + """ + resp = self.__client.insertTablet(self.gen_insert_tablet_req(tablet)) + print("insert one tablet to device {} message: {}".format(tablet.get_device_id(), resp.statusList[0].message)) + + def insert_tablets(self, tablet_lst): + """ + insert multiple tablets, tablets are independent to each other + :param tablet_lst: List of tablets + """ + resp = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst)) + print("insert multiple tablets, message: {}".format(resp.statusList[0].message)) + + def test_insert_tablet(self, tablet): + """ + this method NOT insert data into database and the server just return after accept the request, this method + should be used to test other time cost in client + :param tablet: a tablet of data + """ + status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet)) + print("testing! insert one tablet to device {} message: {}".format(tablet.get_device_id(), status.message)) + + def test_insert_tablets(self, tablet_list): + """ + this method NOT insert data into database and the server just return after accept the request, this method + should be used to test other time cost in client + :param tablet_list: List of tablets + """ + status = self.__client.testInsertTablets(self.gen_insert_tablets_req(tablet_list)) + print("testing! insert multiple tablets, message: {}".format(status.message)) + + def gen_insert_tablet_req(self, tablet): + data_type_values = [data_type.value for data_type in tablet.get_data_types()] + return TSInsertTabletReq(self.__session_id, tablet.get_device_id(), tablet.get_measurements(), + tablet.get_binary_values(), tablet.get_binary_timestamps(), + data_type_values, tablet.get_row_number()) + + def gen_insert_tablets_req(self, tablet_lst): + device_id_lst = [] + measurements_lst = [] + values_lst = [] + timestamps_lst = [] + type_lst = [] + size_lst = [] + for tablet in tablet_lst: + data_type_values = [data_type.value for data_type in tablet.get_data_types()] + device_id_lst.append(tablet.get_device_id()) + measurements_lst.append(tablet.get_measurements()) + values_lst.append(tablet.get_binary_values()) + timestamps_lst.append(tablet.get_binary_timestamps()) + type_lst.append(data_type_values) + size_lst.append(tablet.get_row_number()) + return TSInsertTabletsReq(self.__session_id, device_id_lst, measurements_lst, + values_lst, timestamps_lst, type_lst, size_lst) + + def execute_query_statement(self, sql): + """ + execute query sql statement and returns SessionDataSet + :param sql: String, query sql statement + :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py) + """ + request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id, self.__fetch_size) + resp = self.__client.executeQueryStatement(request) + return SessionDataSet(sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, resp.queryId, + self.__client, self.__session_id, resp.queryDataSet, resp.ignoreTimeStamp) + + def execute_non_query_statement(self, sql): + """ + execute non-query sql statement + :param sql: String, non-query sql statement + """ + request = TSExecuteStatementReq(self.__session_id, sql, self.__statement_id) + try: + resp = self.__client.executeUpdateStatement(request) + status = resp.status + print("execute non-query statement {} message: {}".format(sql, status.message)) + except TTransport.TException as e: + print("execution of non-query statement fails because: ", e) + + @staticmethod + def value_to_bytes(data_types, values): + format_str_list = [">"] + values_tobe_packed = [] + for data_type, value in zip(data_types, values): + if data_type == TSDataType.BOOLEAN.value: + format_str_list.append("h") + format_str_list.append("?") + values_tobe_packed.append(TSDataType.BOOLEAN.value) + values_tobe_packed.append(value) + elif data_type == TSDataType.INT32.value: + format_str_list.append("h") + format_str_list.append("i") + values_tobe_packed.append(TSDataType.INT32.value) + values_tobe_packed.append(value) + elif data_type == TSDataType.INT64.value: + format_str_list.append("h") + format_str_list.append("q") + values_tobe_packed.append(TSDataType.INT64.value) + values_tobe_packed.append(value) + elif data_type == TSDataType.FLOAT.value: + format_str_list.append("h") + format_str_list.append("f") + values_tobe_packed.append(TSDataType.FLOAT.value) + values_tobe_packed.append(value) + elif data_type == TSDataType.DOUBLE.value: + format_str_list.append("h") + format_str_list.append("d") + values_tobe_packed.append(TSDataType.DOUBLE.value) + values_tobe_packed.append(value) + elif data_type == TSDataType.TEXT.value: + value_bytes = bytes(value, 'utf-8') + format_str_list.append("h") + format_str_list.append("i") + format_str_list.append(str(len(value_bytes))) + format_str_list.append("s") + values_tobe_packed.append(TSDataType.TEXT.value) + values_tobe_packed.append(len(value_bytes)) + values_tobe_packed.append(value_bytes) + else: + print("Unsupported data type:" + str(data_type)) + # could raise an error here. + return + format_str = ''.join(format_str_list) + return struct.pack(format_str, *values_tobe_packed) + + def get_time_zone(self): + if self.__zone_id is not None: + return self.__zone_id + try: + resp = self.__client.getTimeZone(self.__session_id) + except TTransport.TException as e: + print("Could not get time zone because: ", e) + raise Exception + return resp.timeZone + + def set_time_zone(self, zone_id): + request = TSSetTimeZoneReq(self.__session_id, zone_id) + try: + status = self.__client.setTimeZone(request) + print("setting time zone_id as {}, message: {}".format(zone_id, status.message)) + except TTransport.TException as e: + print("Could not get time zone because: ", e) + raise Exception + self.__zone_id = zone_id diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py new file mode 100644 index 0000000..2aac6e9 --- /dev/null +++ b/client-py/src/SessionExample.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +sys.path.append("./utils") +from IoTDBConstants import * +from Tablet import Tablet +from Session import Session + +# creating session connection. +ip = "127.0.0.1" +port_ = "6667" +username_ = 'root' +password_ = 'root' +session = Session(ip, port_, username_, password_) +session.open(False) + +# set and delete storage groups +session.set_storage_group("root.sg_test_01") +session.set_storage_group("root.sg_test_02") +session.set_storage_group("root.sg_test_03") +session.set_storage_group("root.sg_test_04") +session.delete_storage_group("root.sg_test_02") +session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) + +# setting time series. +session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY) +session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY) +session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY) + +# setting multiple time series once. +ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06", + "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"] +data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, + TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_) + +# delete time series +session.delete_time_series(["root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"]) + +# checking time series +print("s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07")) +print("s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03")) + +# insert one record into the database. +measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] +values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] +data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, + TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT] +session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) + +# insert multiple records into database +measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]] +values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"], + [True, 77, 88, 1.25, 8.125, "test_records02"]] +data_type_list_ = [data_types_, data_types_] +device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] +session.insert_records(device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_) + +# insert one tablet into the database. +values_ = [[False, 10, 11, 1.1, 10011.1, "test01"], + [True, 100, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, 188.1, 688.25, "test03"], + [True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [4, 5, 6, 7] +tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_) +session.insert_tablet(tablet_) + +# insert multiple tablets into database +tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]) +tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15]) +session.insert_tablets([tablet_01, tablet_02]) + +# execute non-query sql statement +session.execute_non_query_statement("insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188);") + +# execute sql query statement +session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_01") +session_data_set.set_fetch_size(1024) +while session_data_set.has_next(): + print(session_data_set.next()) +session_data_set.close_operation_handle() + +# close session connection. +session.close() + +print("All executions done!!") diff --git a/client-py/src/SessionUT.py b/client-py/src/SessionUT.py new file mode 100644 index 0000000..fc79cfe --- /dev/null +++ b/client-py/src/SessionUT.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +import sys +import struct + +sys.path.append("./utils") + +from Session import Session +from Tablet import Tablet +from IoTDBConstants import * +from SessionDataSet import SessionDataSet + +from thrift.protocol import TBinaryProtocol, TCompactProtocol +from thrift.transport import TSocket, TTransport + + +class MyTestCase(unittest.TestCase): + + def setUp(self) -> None: + self.session = Session("127.0.0.1", 6667, "root", "root") + self.session.open(False) + + def tearDown(self) -> None: + self.session.close() + + def test_insert_by_str(self): + self.session.set_storage_group("root.sg1") + + self.__createTimeSeries(self.session) + self.insertByStr(self.session) + + # sql test + self.insert_via_sql(self.session) + self.query3() + + def test_insert_by_blank_str_infer_type(self): + device_id = "root.sg1.d1" + measurements = ["s1"] + values = ["1.0"] + self.session.insert_str_record(device_id, 1, measurements, values) + + expected = "root.sg1.d1.s1 " + + self.assertFalse(self.session.check_time_series_exists("root.sg1.d1.s1 ")) + data_set = self.session.execute_query_statement("show timeseries") + i = 0 + while data_set.hasNext(): + self.assertEquals(expected[i], str(data_set.next().get_fields().get(0))) + i += 1 + + @staticmethod + def __createTimeSeries(session): + session.create_time_series("root.sg1.d1.s1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY) + session.create_time_series("root.sg1.d1.s2", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY) + session.create_time_series("root.sg1.d1.s3", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY) + session.create_time_series("root.sg1.d2.s1", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY) + session.create_time_series("root.sg1.d2.s2", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY) + session.create_time_series("root.sg1.d2.s3", TSDataType.TEXT, TSEncoding.PLAIN, Compressor.SNAPPY) + + @staticmethod + def insertByStr(session): + + device_id = "root.sg1.d1" + measurements = ["s1", "s2", "s3"] + + for time in range(100): + values = ["1", "2", "3"] + session.insert_str_record(device_id, time, measurements, values) + + @staticmethod + def insert_via_sql(session): + session.execute_non_query_statement("insert into root.sg1.d1(timestamp, s1, s2, s3) values(100, 1, 2, 3)") + + def query3(self): + session_data_set = self.session.execute_query_statement("select * from root.sg1.d1") + session_data_set.set_fetch_size(1024) + count = 0 + while session_data_set.has_next(): + index = 1 + count += 1 + for f in session_data_set.next().get_fields(): + self.assertEqual(str(index), f.get_string_value()) + index += 1 + + self.assertEqual(101, count) + session_data_set.close_operation_handle() + + +if __name__ == '__main__': + unittest.main() diff --git a/client-py/src/utils/Field.py b/client-py/src/utils/Field.py new file mode 100644 index 0000000..7b5fea5 --- /dev/null +++ b/client-py/src/utils/Field.py @@ -0,0 +1,176 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from IoTDBConstants import TSDataType + + +class Field(object): + + def __init__(self, data_type): + """ + :param data_type: TSDataType + """ + self.__data_type = data_type + self.__bool_value = None + self.__int_value = None + self.__long_value = None + self.__float_value = None + self.__double_value = None + self.__binary_value = None + + @staticmethod + def copy(field): + output = Field(field.get_data_type()) + if output.get_data_type() is not None: + if output.get_data_type() == TSDataType.BOOLEAN: + output.set_bool_value(field.get_bool_value()) + elif output.get_data_type() == TSDataType.INT32: + output.set_int_value(field.get_int_value()) + elif output.get_data_type() == TSDataType.INT64: + output.set_long_value(field.get_long_value()) + elif output.get_data_type() == TSDataType.FLOAT: + output.set_float_value(field.get_float_value()) + elif output.get_data_type() == TSDataType.DOUBLE: + output.set_double_value(field.get_double_value()) + elif output.get_data_type() == TSDataType.TEXT: + output.set_binary_value(field.get_binary_value()) + else: + raise Exception("unsupported data type {}".format(output.get_data_type())) + return output + + def get_data_type(self): + return self.__data_type + + def is_null(self): + return self.__data_type is None + + def set_bool_value(self, value): + self.__bool_value = value + + def get_bool_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + return self.__bool_value + + def set_int_value(self, value): + self.__int_value = value + + def get_int_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + return self.__int_value + + def set_long_value(self, value): + self.__long_value = value + + def get_long_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + return self.__long_value + + def set_float_value(self, value): + self.__float_value = value + + def get_float_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + return self.__float_value + + def set_double_value(self, value): + self.__double_value = value + + def get_double_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + return self.__double_value + + def set_binary_value(self, value): + self.__binary_value = value + + def get_binary_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + return self.__binary_value + + def get_string_value(self): + if self.__data_type is None: + return "None" + elif self.__data_type == TSDataType.BOOLEAN: + return str(self.__bool_value) + elif self.__data_type == TSDataType.INT64: + return str(self.__long_value) + elif self.__data_type == TSDataType.INT32: + return str(self.__int_value) + elif self.__data_type == TSDataType.FLOAT: + return str(self.__float_value) + elif self.__data_type == TSDataType.DOUBLE: + return str(self.__double_value) + elif self.__data_type == TSDataType.TEXT: + return self.__binary_value.decode('utf-8') + else: + raise Exception("unsupported data type {}".format(self.__data_type)) + + def __str__(self): + return self.get_string_value() + + def get_object_value(self, data_type): + """ + :param data_type: TSDataType + """ + if self.__data_type is None: + return None + elif data_type == TSDataType.BOOLEAN: + return self.get_bool_value() + elif data_type == TSDataType.INT32: + return self.get_int_value() + elif data_type == TSDataType.INT64: + return self.get_long_value() + elif data_type == TSDataType.FLOAT: + return self.get_float_value() + elif data_type == TSDataType.DOUBLE: + return self.get_double_value() + elif data_type == TSDataType.TEXT: + return self.get_binary_value() + else: + raise Exception("unsupported data type {}".format(data_type)) + + @staticmethod + def get_field(value, data_type): + """ + :param value: field value corresponding to the data type + :param data_type: TSDataType + """ + if value is None: + return None + field = Field(data_type) + if data_type == TSDataType.BOOLEAN: + field.set_bool_value(value) + elif data_type == TSDataType.INT32: + field.set_int_value(value) + elif data_type == TSDataType.INT64: + field.set_long_value(value) + elif data_type == TSDataType.FLOAT: + field.set_float_value(value) + elif data_type == TSDataType.DOUBLE: + field.set_double_value(value) + elif data_type == TSDataType.TEXT: + field.set_binary_value(value) + else: + raise Exception("unsupported data type {}".format(data_type)) + return field + diff --git a/client-py/src/utils/IoTDBConstants.py b/client-py/src/utils/IoTDBConstants.py new file mode 100644 index 0000000..7e31262 --- /dev/null +++ b/client-py/src/utils/IoTDBConstants.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from enum import Enum, unique + + +@unique +class TSDataType(Enum): + BOOLEAN = 0 + INT32 = 1 + INT64 = 2 + FLOAT = 3 + DOUBLE = 4 + TEXT = 5 + + +@unique +class TSEncoding(Enum): + PLAIN = 0 + PLAIN_DICTIONARY = 1 + RLE = 2 + DIFF = 3 + TS_2DIFF = 4 + BITMAP = 5 + GORILLA = 6 + REGULAR = 7 + + +@unique +class Compressor(Enum): + UNCOMPRESSED = 0 + SNAPPY = 1 + GZIP = 2 + LZO = 3 + SDT = 4 + PAA = 5 + PLA = 6 diff --git a/client-py/src/utils/IoTDBRpcDataSet.py b/client-py/src/utils/IoTDBRpcDataSet.py new file mode 100644 index 0000000..f712c98 --- /dev/null +++ b/client-py/src/utils/IoTDBRpcDataSet.py @@ -0,0 +1,219 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from IoTDBConstants import * + +from thrift.transport import TTransport +from iotdb.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq + + +class IoTDBRpcDataSet(object): + TIMESTAMP_STR = "Time" + # VALUE_IS_NULL = "The value got by %s (column name) is NULL." + START_INDEX = 2 + FLAG = 0x80 + + def __init__(self, sql, column_name_list, column_type_list, column_name_index, ignore_timestamp, query_id, + client, session_id, query_data_set, fetch_size): + self.__session_id = session_id + self.__ignore_timestamp = ignore_timestamp + self.__sql = sql + self.__query_id = query_id + self.__client = client + self.__fetch_size = fetch_size + self.__column_size = len(column_name_list) + + self.__column_name_list = [] + self.__column_type_list = [] + self.__column_ordinal_dict = {} + if not ignore_timestamp: + self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR) + self.__column_type_list.append(TSDataType.INT64) + self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1 + + if column_name_index is not None: + self.__column_type_deduplicated_list = [None for _ in range(len(column_name_index))] + for i in range(len(column_name_list)): + name = column_name_list[i] + self.__column_name_list.append(name) + self.__column_type_list.append(TSDataType[column_type_list[i]]) + if name not in self.__column_ordinal_dict: + index = column_name_index[name] + self.__column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX + self.__column_type_deduplicated_list[index] = TSDataType[column_type_list[i]] + else: + index = IoTDBRpcDataSet.START_INDEX + self.__column_type_deduplicated_list = [] + for i in range(len(column_name_list)): + name = column_name_list[i] + self.__column_name_list.append(name) + self.__column_type_list.append(TSDataType[column_type_list[i]]) + if name not in self.__column_ordinal_dict: + self.__column_ordinal_dict[name] = index + index += 1 + self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]]) + + self.__time_bytes = bytes(0) + self.__current_bitmap = [bytes(0) for _ in range(len(self.__column_type_deduplicated_list))] + self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))] + self.__query_data_set = query_data_set + self.__is_closed = False + self.__empty_resultSet = False + self.__has_cached_record = False + self.__rows_index = 0 + + def close(self): + if self.__is_closed: + return + if self.__client is not None: + try: + status = self.__client.closeOperation(TSCloseOperationReq(self.__session_id, self.__query_id)) + print("close session {}, message: {}".format(self.__session_id, status.message)) + except TTransport.TException as e: + print("close session {} failed because: ".format(self.__session_id), e) + raise Exception + + self.__is_closed = True + self.__client = None + + def next(self): + if self.has_cached_result(): + self.construct_one_row() + return True + if self.__empty_resultSet: + return False + if self.fetch_results(): + self.construct_one_row() + return True + return False + + def has_cached_result(self): + return (self.__query_data_set is not None) and (len(self.__query_data_set.time) != 0) + + def construct_one_row(self): + # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read. + self.__time_bytes = self.__query_data_set.time[:8] + self.__query_data_set.time = self.__query_data_set.time[8:] + for i in range(len(self.__query_data_set.bitmapList)): + bitmap_buffer = self.__query_data_set.bitmapList[i] + + # another 8 new rows, should move the bitmap buffer position to next byte + if self.__rows_index % 8 == 0: + self.__current_bitmap[i] = bitmap_buffer[0] + self.__query_data_set.bitmapList[i] = bitmap_buffer[1:] + if not self.is_null(i, self.__rows_index): + value_buffer = self.__query_data_set.valueList[i] + data_type = self.__column_type_deduplicated_list[i] + + # simulating buffer + if data_type == TSDataType.BOOLEAN: + self.__value[i] = value_buffer[:1] + self.__query_data_set.valueList[i] = value_buffer[1:] + elif data_type == TSDataType.INT32: + self.__value[i] = value_buffer[:4] + self.__query_data_set.valueList[i] = value_buffer[4:] + elif data_type == TSDataType.INT64: + self.__value[i] = value_buffer[:8] + self.__query_data_set.valueList[i] = value_buffer[8:] + elif data_type == TSDataType.FLOAT: + self.__value[i] = value_buffer[:4] + self.__query_data_set.valueList[i] = value_buffer[4:] + elif data_type == TSDataType.DOUBLE: + self.__value[i] = value_buffer[:8] + self.__query_data_set.valueList[i] = value_buffer[8:] + elif data_type == TSDataType.TEXT: + length = int.from_bytes(value_buffer[:4], byteorder="big", signed=False) + self.__value[i] = value_buffer[4: 4 + length] + self.__query_data_set.valueList[i] = value_buffer[4 + length:] + else: + print("unsupported data type {}.".format(data_type)) + # could raise exception here + self.__rows_index += 1 + self.__has_cached_record = True + + def fetch_results(self): + self.__rows_index = 0 + request = TSFetchResultsReq(self.__session_id, self.__sql, self.__fetch_size, self.__query_id, True) + try: + resp = self.__client.fetchResults(request) + if not resp.hasResultSet: + self.__empty_resultSet = True + else: + self.__query_data_set = resp.queryDataSet + return resp.hasResultSet + except TTransport.TException as e: + print("Cannot fetch result from server, because of network connection: ", e) + + def is_null(self, index, row_num): + bitmap = self.__current_bitmap[index] + shift = row_num % 8 + return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0 + + def is_null_by_index(self, column_index): + index = self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - IoTDBRpcDataSet.START_INDEX + # time column will never be None + if index < 0: + return True + return self.is_null(index, self.__rows_index - 1) + + def is_null_by_name(self, column_name): + index = self.__column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX + # time column will never be None + if index < 0: + return True + return self.is_null(index, self.__rows_index - 1) + + def find_column_name_by_index(self, column_index): + if column_index <= 0: + raise Exception("Column index should start from 1") + if column_index > len(self.__column_name_list): + raise Exception("column index {} out of range {}".format(column_index, self.__column_size)) + return self.__column_name_list[column_index - 1] + + def get_fetch_size(self): + return self.__fetch_size + + def set_fetch_size(self, fetch_size): + self.__fetch_size = fetch_size + + def get_column_names(self): + return self.__column_name_list + + def get_column_types(self): + return self.__column_type_list + + def get_column_size(self): + return self.__column_size + + def get_ignore_timestamp(self): + return self.__ignore_timestamp + + def get_column_ordinal_dict(self): + return self.__column_ordinal_dict + + def get_column_type_deduplicated_list(self): + return self.__column_type_deduplicated_list + + def get_values(self): + return self.__value + + def get_time_bytes(self): + return self.__time_bytes + + def get_has_cached_record(self): + return self.__has_cached_record diff --git a/client-py/src/utils/RowRecord.py b/client-py/src/utils/RowRecord.py new file mode 100644 index 0000000..78e0362 --- /dev/null +++ b/client-py/src/utils/RowRecord.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from IoTDBConstants import TSDataType +from Field import Field + + +class RowRecord(object): + + def __init__(self, timestamp, field_list=None): + self.__timestamp = timestamp + self.__field_list = field_list + + def add_field(self, field): + self.__field_list.append(field) + + def add_field(self, value, data_type): + self.__field_list.append(Field.get_field(value, data_type)) + + def __str__(self): + str_list = [str(self.__timestamp)] + for field in self.__field_list: + str_list.append("\t\t") + str_list.append(str(field)) + return "".join(str_list) + + def get_timestamp(self): + return self.__timestamp + + def set_timestamp(self, timestamp): + self.__timestamp = timestamp + + def get_fields(self): + return self.__field_list + + def set_fields(self, field_list): + self.__field_list = field_list + + def set_field(self, index, field): + self.__field_list[index] = field diff --git a/client-py/src/utils/SessionDataSet.py b/client-py/src/utils/SessionDataSet.py new file mode 100644 index 0000000..92662cc --- /dev/null +++ b/client-py/src/utils/SessionDataSet.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from IoTDBConstants import TSDataType +from IoTDBRpcDataSet import IoTDBRpcDataSet +from Field import Field +from RowRecord import RowRecord +import struct + + +class SessionDataSet(object): + + def __init__(self, sql, column_name_list, column_type_list, column_name_index, query_id, client, session_id, + query_data_set, ignore_timestamp): + self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list, column_type_list, column_name_index, + ignore_timestamp, query_id, client, session_id, query_data_set, 1024) + + def get_fetch_size(self): + return self.iotdb_rpc_data_set.get_fetch_size() + + def set_fetch_size(self, fetch_size): + self.iotdb_rpc_data_set.set_fetch_size(fetch_size) + + def get_column_names(self): + return self.iotdb_rpc_data_set.get_column_names() + + def get_column_types(self): + return self.iotdb_rpc_data_set.get_column_types() + + def has_next(self): + return self.iotdb_rpc_data_set.next() + + def next(self): + if not self.iotdb_rpc_data_set.get_has_cached_record(): + if not self.has_next(): + return None + self.iotdb_rpc_data_set.has_cached_record = False + return self.construct_row_record_from_value_array() + + def construct_row_record_from_value_array(self): + out_fields = [] + for i in range(self.iotdb_rpc_data_set.get_column_size()): + index = i + 1 + data_set_column_index = i + IoTDBRpcDataSet.START_INDEX + if self.iotdb_rpc_data_set.get_ignore_timestamp(): + index -= 1 + data_set_column_index -= 1 + column_name = self.iotdb_rpc_data_set.get_column_names()[index] + location = self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] - IoTDBRpcDataSet.START_INDEX + + if not self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index): + value_bytes = self.iotdb_rpc_data_set.get_values()[location] + data_type = self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location] + field = Field(data_type) + if data_type == TSDataType.BOOLEAN: + value = struct.unpack(">?", value_bytes)[0] + field.set_bool_value(value) + elif data_type == TSDataType.INT32: + value = struct.unpack(">i", value_bytes)[0] + field.set_int_value(value) + elif data_type == TSDataType.INT64: + value = struct.unpack(">q", value_bytes)[0] + field.set_long_value(value) + elif data_type == TSDataType.FLOAT: + value = struct.unpack(">f", value_bytes)[0] + field.set_float_value(value) + elif data_type == TSDataType.DOUBLE: + value = struct.unpack(">d", value_bytes)[0] + field.set_double_value(value) + elif data_type == TSDataType.TEXT: + field.set_binary_value(value_bytes) + else: + print("unsupported data type {}.".format(data_type)) + # could raise exception here + else: + field = Field(None) + out_fields.append(field) + + return RowRecord(struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields) + + def close_operation_handle(self): + self.iotdb_rpc_data_set.close() + + + + diff --git a/client-py/src/utils/Tablet.py b/client-py/src/utils/Tablet.py new file mode 100644 index 0000000..51798a4 --- /dev/null +++ b/client-py/src/utils/Tablet.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from IoTDBConstants import * +import struct + + +class Tablet(object): + + def __init__(self, device_id, measurements, data_types, values, timestamps): + """ + creating a tablet for insertion + for example, considering device: root.sg1.d1 + timestamps, m1, m2, m3 + 1, 125.3, True, text1 + 2, 111.6, False, text2 + 3, 688.6, True, text3 + Notice: The tablet should not have empty cell + The tablet will be sorted at the initialization by timestamps + + :param device_id: String, IoTDB time series path to device layer (without sensor). + :param measurements: List, sensors. + :param data_types: TSDataType List, specify value types for sensors. + :param values: 2-D List, the values of each row should be the outer list element. + :param timestamps: List. + """ + if len(timestamps) != len(values): + print("Input error! len(timestamps) does not equal to len(values)!") + # could raise an error here. + + if not Tablet.check_sorted(timestamps): + sorted_zipped = sorted(zip(timestamps, values)) + result = zip(*sorted_zipped) + self.__timestamps, self.__values = [list(x) for x in result] + else: + self.__values = values + self.__timestamps = timestamps + + self.__device_id = device_id + self.__measurements = measurements + self.__data_types = data_types + self.__row_number = len(timestamps) + self.__column_number = len(measurements) + + @staticmethod + def check_sorted(timestamps): + for i in range(1, len(timestamps)): + if timestamps[i] < timestamps[i - 1]: + return False + return True + + def get_measurements(self): + return self.__measurements + + def get_data_types(self): + return self.__data_types + + def get_row_number(self): + return self.__row_number + + def get_device_id(self): + return self.__device_id + + def get_binary_timestamps(self): + format_str_list = [">"] + values_tobe_packed = [] + for timestamp in self.__timestamps: + format_str_list.append("q") + values_tobe_packed.append(timestamp) + + format_str = ''.join(format_str_list) + return struct.pack(format_str, *values_tobe_packed) + + def get_binary_values(self): + format_str_list = [">"] + values_tobe_packed = [] + for i in range(self.__column_number): + if self.__data_types[i] == TSDataType.BOOLEAN: + format_str_list.append(str(self.__row_number)) + format_str_list.append("?") + for j in range(self.__row_number): + values_tobe_packed.append(self.__values[j][i]) + elif self.__data_types[i] == TSDataType.INT32: + format_str_list.append(str(self.__row_number)) + format_str_list.append("i") + for j in range(self.__row_number): + values_tobe_packed.append(self.__values[j][i]) + elif self.__data_types[i] == TSDataType.INT64: + format_str_list.append(str(self.__row_number)) + format_str_list.append("q") + for j in range(self.__row_number): + values_tobe_packed.append(self.__values[j][i]) + elif self.__data_types[i] == TSDataType.FLOAT: + format_str_list.append(str(self.__row_number)) + format_str_list.append("f") + for j in range(self.__row_number): + values_tobe_packed.append(self.__values[j][i]) + elif self.__data_types[i] == TSDataType.DOUBLE: + format_str_list.append(str(self.__row_number)) + format_str_list.append("d") + for j in range(self.__row_number): + values_tobe_packed.append(self.__values[j][i]) + elif self.__data_types[i] == TSDataType.TEXT: + for j in range(self.__row_number): + value_bytes = bytes(self.__values[j][i], 'utf-8') + format_str_list.append("i") + format_str_list.append(str(len(value_bytes))) + format_str_list.append("s") + values_tobe_packed.append(len(value_bytes)) + values_tobe_packed.append(value_bytes) + else: + print("Unsupported data type:" + str(self.__data_types[i])) + # could raise an error here. + return + + format_str = ''.join(format_str_list) + return struct.pack(format_str, *values_tobe_packed) + diff --git a/client-py/src/utils/__init__.py b/client-py/src/utils/__init__.py new file mode 100644 index 0000000..a4797b6 --- /dev/null +++ b/client-py/src/utils/__init__.py @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +
