This is an automated email from the ASF dual-hosted git repository. jfeinauer pushed a commit to branch feature/port-pr-2986-to-0-11 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fe2e8cde9ced624c54264d57ea86deeb1b9b4e14 Author: Julian Feinauer <[email protected]> AuthorDate: Sun Apr 4 12:03:51 2021 +0200 [CLIENT-PY] Fixed readme. [CLIENT-PY] Improved Release Documentation. [CLIENT-PY] Fixed print statements with logging output. Added Exceptions instead of empty returns. [CLIENT-PY] Applied flake8 and black for reformatting. [CLIENT-PY] Added release script to do a pypi release. [CLIENT-PY] Improved Release Documentation. [CLIENT-PY] Fixed Rat. [CLIENT-PY] Fixed migration. --- client-py/README.md | 26 +++- client-py/iotdb/Session.py | 185 +++++++------------------ client-py/iotdb/utils/IoTDBRpcDataSet.py | 22 +-- client-py/iotdb/utils/RowRecord.py | 1 - client-py/iotdb/utils/SessionDataSet.py | 7 +- client-py/iotdb/utils/Tablet.py | 9 +- client-py/readme.md | 0 client-py/{requirements_dev.txt => release.sh} | 25 +++- client-py/requirements_dev.txt | 5 +- client-py/setup.py | 2 +- pom.xml | 4 + 11 files changed, 121 insertions(+), 165 deletions(-) diff --git a/client-py/README.md b/client-py/README.md index c2e03cb..77d0a01 100644 --- a/client-py/README.md +++ b/client-py/README.md @@ -32,10 +32,10 @@ [](https://iotdb.apache.org/) -Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for -data management and analysis, deployable on the edge and the cloud. Due to its light-weight -architecture, high performance and rich feature set together with its deep integration with -Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, +Apache IoTDB (Database for Internet of Things) is an IoT native database with high performance for +data management and analysis, deployable on the edge and the cloud. Due to its light-weight +architecture, high performance and rich feature set together with its deep integration with +Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of massive data storage, high-speed data ingestion and complex data analysis in the IoT industrial fields. @@ -190,4 +190,20 @@ Both can be run by `black .` or `flake8 .` respectively. To do a release just ensure that you have the right set of generated thrift files. Then run linting and auto-formatting. Then, ensure that all tests work (via `pytest .`). -Then you are good to go to do a release! \ No newline at end of file +Then you are good to go to do a release! + +### Preparing your environment + +First, install all necessary dev dependencies via `pip install -r requirements_dev.txt`. + +### Doing the Release + +There is a convenient script `release.sh` to do all steps for a release. +Namely, these are + +* Remove all transient directories from last release (if exists) +* (Re-)generate all generated sources via mvn +* Run Linting (flake8) +* Run Tests via pytest +* Build +* Release to pypi diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py index e7a3618..7b28e91 100644 --- a/client-py/iotdb/Session.py +++ b/client-py/iotdb/Session.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # - +import logging import struct import time @@ -53,6 +53,8 @@ from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZone # from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq from .utils.IoTDBConstants import TSDataType +logger = logging.getLogger("IoTDB") + class Session(object): SUCCESS_CODE = 200 @@ -94,7 +96,7 @@ class Session(object): try: self.__transport.open() except TTransport.TTransportException as e: - print("TTransportException: ", e) + logger.exception("TTransportException!", exc_info=e) if enable_rpc_compression: self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport)) @@ -112,7 +114,7 @@ class Session(object): open_resp = self.__client.openSession(open_req) if self.protocol_version != open_resp.serverProtocolVersion: - print( + logger.exception( "Protocol differ, Client version is {}, but Server version is {}".format( self.protocol_version, open_resp.serverProtocolVersion ) @@ -126,7 +128,7 @@ class Session(object): except Exception as e: self.__transport.close() - print("session closed because: ", e) + logger.exception("session closed because: ", exc_info=e) if self.__zone_id is not None: self.set_time_zone(self.__zone_id) @@ -135,9 +137,6 @@ class Session(object): self.__is_close = False - def is_open(self): - return not self.__is_close - def close(self): if self.__is_close: return @@ -145,9 +144,9 @@ class Session(object): try: self.__client.closeSession(req) except TTransport.TException as e: - print( + logger.exception( "Error occurs when closing session at server. Maybe server is down. Error message: ", - e, + exc_info=e, ) finally: self.__is_close = True @@ -160,7 +159,9 @@ class Session(object): :param group_name: String, storage group name (starts from root) """ status = self.__client.setStorageGroup(self.__session_id, group_name) - print("setting storage group {} message: {}".format(group_name, status.message)) + logger.debug( + "setting storage group {} message: {}".format(group_name, status.message) + ) return Session.verify_success(status) @@ -178,7 +179,7 @@ class Session(object): :param storage_group_lst: List, paths of the target storage groups. """ status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) - print( + logger.debug( "delete storage group(s) {} message: {}".format( storage_group_lst, status.message ) @@ -201,7 +202,9 @@ class Session(object): self.__session_id, ts_path, data_type, encoding, compressor ) status = self.__client.createTimeseries(request) - print("creating time series {} message: {}".format(ts_path, status.message)) + logger.debug( + "creating time series {} message: {}".format(ts_path, status.message) + ) return Session.verify_success(status) @@ -223,7 +226,7 @@ class Session(object): self.__session_id, ts_path_lst, data_type_lst, encoding_lst, compressor_lst ) status = self.__client.createMultiTimeseries(request) - print( + logger.debug( "creating multiple time series {} message: {}".format( ts_path_lst, status.message ) @@ -237,7 +240,7 @@ class Session(object): :param paths_list: List of time series path, which should be complete (starts from root) """ status = self.__client.deleteTimeseries(self.__session_id, paths_list) - print( + logger.debug( "deleting multiple time series {} message: {}".format( paths_list, status.message ) @@ -265,9 +268,11 @@ class Session(object): request = TSDeleteDataReq(self.__session_id, paths_list, timestamp) try: status = self.__client.deleteData(request) - print("delete data from {}, message: {}".format(paths_list, status.message)) + logger.debug( + "delete data from {}, message: {}".format(paths_list, status.message) + ) except TTransport.TException as e: - print("data deletion fails because: ", e) + logger.exception("data deletion fails because: ", e) def insert_str_record(self, device_id, timestamp, measurements, string_values): """ special case for inserting one row of String (TEXT) value """ @@ -280,7 +285,7 @@ class Session(object): device_id, timestamp, measurements, data_types, string_values ) status = self.__client.insertStringRecord(request) - print( + logger.debug( "insert one record to device {} message: {}".format( device_id, status.message ) @@ -305,7 +310,7 @@ class Session(object): device_id, timestamp, measurements, data_types, values ) status = self.__client.insertRecord(request) - print( + logger.debug( "insert one record to device {} message: {}".format( device_id, status.message ) @@ -333,7 +338,7 @@ class Session(object): device_ids, times, measurements_lst, type_values_lst, values_lst ) status = self.__client.insertRecords(request) - print( + logger.debug( "insert multiple records to devices {} message: {}".format( device_ids, status.message ) @@ -358,7 +363,7 @@ class Session(object): device_id, timestamp, measurements, data_types, values ) status = self.__client.testInsertRecord(request) - print( + logger.debug( "testing! insert one record to device {} message: {}".format( device_id, status.message ) @@ -386,7 +391,9 @@ class Session(object): device_ids, times, measurements_lst, type_values_lst, values_lst ) status = self.__client.testInsertRecords(request) - print("testing! insert multiple records, message: {}".format(status.message)) + logger.debug( + "testing! insert multiple records, message: {}".format(status.message) + ) return Session.verify_success(status) @@ -394,9 +401,9 @@ class Session(object): self, device_id, timestamp, measurements, data_types, values ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): - print("length of data types does not equal to length of values!") - # could raise an error here. - return + raise RuntimeError( + "length of data types does not equal to length of values!" + ) values_in_bytes = Session.value_to_bytes(data_types, values) return TSInsertRecordReq( self.__session_id, device_id, measurements, values_in_bytes, timestamp @@ -406,9 +413,9 @@ class Session(object): self, device_id, timestamp, measurements, data_types, values ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): - print("length of data types does not equal to length of values!") - # could raise an error here. - return + raise RuntimeError( + "length of data types does not equal to length of values!" + ) return TSInsertStringRecordReq( self.__session_id, device_id, measurements, values, timestamp ) @@ -422,22 +429,18 @@ class Session(object): or (len(device_ids) != len(times)) or (len(times) != len(values_lst)) ): - print( + raise RuntimeError( "deviceIds, times, measurementsList and valuesList's size should be equal" ) - # could raise an error here. - return value_lst = [] for values, data_types, measurements in zip( values_lst, types_lst, measurements_lst ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): - print( + raise RuntimeError( "deviceIds, times, measurementsList and valuesList's size should be equal" ) - # could raise an error here. - return values_in_bytes = Session.value_to_bytes(data_types, values) value_lst.append(values_in_bytes) @@ -458,7 +461,7 @@ class Session(object): :param tablet: a tablet specified above """ status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet)) - print( + logger.debug( "insert one tablet to device {} message: {}".format( tablet.get_device_id(), status.message ) @@ -472,93 +475,10 @@ class Session(object): :param tablet_lst: List of tablets """ status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst)) - print("insert multiple tablets, message: {}".format(status.message)) - - return Session.verify_success(status) - - def insert_records_of_one_device( - self, device_id, times_list, measurements_list, types_list, values_list - ): - # sort by timestamp - sorted_zipped = sorted( - zip(times_list, measurements_list, types_list, values_list) - ) - result = zip(*sorted_zipped) - times_list, measurements_list, types_list, values_list = [ - list(x) for x in result - ] - - return self.insert_records_of_one_device_sorted( - device_id, times_list, measurements_list, types_list, values_list - ) - - def insert_records_of_one_device_sorted( - self, device_id, times_list, measurements_list, types_list, values_list - ): - """ - Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc - executeBatch, we pack some insert request in batch and send them to server. If you want improve - your performance, please see insertTablet method - - :param device_id: device id - :param times_list: timestamps list - :param measurements_list: measurements list - :param types_list: types list - :param values_list: values list - :param have_sorted: have these list been sorted by timestamp - """ - # check parameter - size = len(times_list) - if ( - size != len(measurements_list) - or size != len(types_list) - or size != len(values_list) - ): - print( - "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" - ) - return - - # check sorted - if not Session.check_sorted(times_list): - print("insert records of one device error: timestamp not sorted") - return - - request = self.gen_insert_records_of_one_device_request( - device_id, times_list, measurements_list, values_list, types_list - ) - - # send request - status = self.__client.insertRecordsOfOneDevice(request) - print("insert records of one device, message: {}".format(status.message)) + logger.debug("insert multiple tablets, message: {}".format(status.message)) return Session.verify_success(status) - def gen_insert_records_of_one_device_request( - self, device_id, times_list, measurements_list, values_list, types_list - ): - binary_value_list = [] - for values, data_types, measurements in zip( - values_list, types_list, measurements_list - ): - data_types = [data_type.value for data_type in data_types] - if (len(values) != len(data_types)) or (len(values) != len(measurements)): - print( - "insert records of one device error: deviceIds, times, measurementsList and valuesList's size should be equal" - ) - # could raise an error here. - return - values_in_bytes = Session.value_to_bytes(data_types, values) - binary_value_list.append(values_in_bytes) - - return TSInsertRecordsOfOneDeviceReq( - self.__session_id, - device_id, - measurements_list, - binary_value_list, - times_list, - ) - def test_insert_tablet(self, tablet): """ this method NOT insert data into database and the server just return after accept the request, this method @@ -566,7 +486,7 @@ class Session(object): :param tablet: a tablet of data """ status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet)) - print( + logger.debug( "testing! insert one tablet to device {} message: {}".format( tablet.get_device_id(), status.message ) @@ -583,7 +503,9 @@ class Session(object): status = self.__client.testInsertTablets( self.gen_insert_tablets_req(tablet_list) ) - print("testing! insert multiple tablets, message: {}".format(status.message)) + logger.debug( + "testing! insert multiple tablets, message: {}".format(status.message) + ) return Session.verify_success(status) @@ -626,14 +548,14 @@ class Session(object): size_lst, ) - def execute_query_statement(self, sql, timeout=0): + def execute_query_statement(self, sql): """ execute query sql statement and returns SessionDataSet :param sql: String, query sql statement :return: SessionDataSet, contains query results and relevant info (see SessionDataSet.py) """ request = TSExecuteStatementReq( - self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout + self.__session_id, sql, self.__statement_id, self.__fetch_size ) resp = self.__client.executeQueryStatement(request) return SessionDataSet( @@ -657,13 +579,12 @@ class Session(object): try: resp = self.__client.executeUpdateStatement(request) status = resp.status - print( + logger.debug( "execute non-query statement {} message: {}".format(sql, status.message) ) return Session.verify_success(status) except TTransport.TException as e: - print("execution of non-query statement fails because: ", e) - return -1 + raise RuntimeError("execution of non-query statement fails because: ", e) @staticmethod def value_to_bytes(data_types, values): @@ -705,9 +626,7 @@ class Session(object): values_tobe_packed.append(len(value_bytes)) values_tobe_packed.append(value_bytes) else: - print("Unsupported data type:" + str(data_type)) - # could raise an error here. - return + raise RuntimeError("Unsupported data type:" + str(data_type)) format_str = "".join(format_str_list) return struct.pack(format_str, *values_tobe_packed) @@ -717,22 +636,20 @@ class Session(object): try: resp = self.__client.getTimeZone(self.__session_id) except TTransport.TException as e: - print("Could not get time zone because: ", e) - raise Exception + raise RuntimeError("Could not get time zone because: ", e) return resp.timeZone def set_time_zone(self, zone_id): request = TSSetTimeZoneReq(self.__session_id, zone_id) try: status = self.__client.setTimeZone(request) - print( + logger.debug( "setting time zone_id as {}, message: {}".format( zone_id, status.message ) ) except TTransport.TException as e: - print("Could not set time zone because: ", e) - raise Exception + raise RuntimeError("Could not set time zone because: ", e) self.__zone_id = zone_id @staticmethod @@ -751,5 +668,5 @@ class Session(object): if status.code == Session.SUCCESS_CODE: return 0 - print("error status is", status) + logger.debug("error status is", status) return -1 diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py index 6920245..500d438 100644 --- a/client-py/iotdb/utils/IoTDBRpcDataSet.py +++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -17,10 +17,14 @@ # # for package +import logging + from thrift.transport import TTransport from iotdb.thrift.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq from iotdb.utils.IoTDBConstants import TSDataType +logger = logging.getLogger("IoTDB") + class IoTDBRpcDataSet(object): TIMESTAMP_STR = "Time" @@ -48,7 +52,6 @@ class IoTDBRpcDataSet(object): self.__client = client self.__fetch_size = fetch_size self.__column_size = len(column_name_list) - self.__default_time_out = 1000 self.__column_name_list = [] self.__column_type_list = [] @@ -107,14 +110,15 @@ class IoTDBRpcDataSet(object): status = self.__client.closeOperation( TSCloseOperationReq(self.__session_id, self.__query_id) ) - print( + logger.debug( "close session {}, message: {}".format( self.__session_id, status.message ) ) except TTransport.TException as e: - print("close session {} failed because: ".format(self.__session_id), e) - raise Exception + raise RuntimeError( + "close session {} failed because: ".format(self.__session_id), e + ) self.__is_closed = True self.__client = None @@ -173,8 +177,7 @@ class IoTDBRpcDataSet(object): self.__value[i] = value_buffer[4 : 4 + length] self.__query_data_set.valueList[i] = value_buffer[4 + length :] else: - print("unsupported data type {}.".format(data_type)) - # could raise exception here + raise RuntimeError("unsupported data type {}.".format(data_type)) self.__rows_index += 1 self.__has_cached_record = True @@ -185,8 +188,7 @@ class IoTDBRpcDataSet(object): self.__sql, self.__fetch_size, self.__query_id, - True, - self.__default_time_out, + True ) try: resp = self.__client.fetchResults(request) @@ -196,7 +198,9 @@ class IoTDBRpcDataSet(object): self.__query_data_set = resp.queryDataSet return resp.hasResultSet except TTransport.TException as e: - print("Cannot fetch result from server, because of network connection: ", e) + raise RuntimeError( + "Cannot fetch result from server, because of network connection: ", e + ) def is_null(self, index, row_num): bitmap = self.__current_bitmap[index] diff --git a/client-py/iotdb/utils/RowRecord.py b/client-py/iotdb/utils/RowRecord.py index aa51c71..16a88f1 100644 --- a/client-py/iotdb/utils/RowRecord.py +++ b/client-py/iotdb/utils/RowRecord.py @@ -17,7 +17,6 @@ # # for package -from .IoTDBConstants import TSDataType from .Field import Field # for debug diff --git a/client-py/iotdb/utils/SessionDataSet.py b/client-py/iotdb/utils/SessionDataSet.py index 8437010..f0f7266 100644 --- a/client-py/iotdb/utils/SessionDataSet.py +++ b/client-py/iotdb/utils/SessionDataSet.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # - +import logging import struct from iotdb.utils.Field import Field @@ -27,6 +27,8 @@ from iotdb.utils.RowRecord import RowRecord import pandas as pd +logger = logging.getLogger("IoTDB") + class SessionDataSet(object): def __init__( @@ -114,8 +116,7 @@ class SessionDataSet(object): elif data_type == TSDataType.TEXT: field.set_binary_value(value_bytes) else: - print("unsupported data type {}.".format(data_type)) - # could raise exception here + raise RuntimeError("unsupported data type {}.".format(data_type)) else: field = Field(None) out_fields.append(field) diff --git a/client-py/iotdb/utils/Tablet.py b/client-py/iotdb/utils/Tablet.py index 444f4e8..667adcb 100644 --- a/client-py/iotdb/utils/Tablet.py +++ b/client-py/iotdb/utils/Tablet.py @@ -40,8 +40,9 @@ class Tablet(object): :param timestamps: List. """ if len(timestamps) != len(values): - print("Input error! len(timestamps) does not equal to len(values)!") - # could raise an error here. + raise RuntimeError( + "Input error! len(timestamps) does not equal to len(values)!" + ) if not Tablet.check_sorted(timestamps): sorted_zipped = sorted(zip(timestamps, values)) @@ -124,9 +125,7 @@ class Tablet(object): values_tobe_packed.append(len(value_bytes)) values_tobe_packed.append(value_bytes) else: - print("Unsupported data type:" + str(self.__data_types[i])) - # could raise an error here. - return + raise RuntimeError("Unsupported data type:" + str(self.__data_types[i])) format_str = "".join(format_str_list) return struct.pack(format_str, *values_tobe_packed) diff --git a/client-py/readme.md b/client-py/readme.md deleted file mode 100644 index e69de29..0000000 diff --git a/client-py/requirements_dev.txt b/client-py/release.sh old mode 100644 new mode 100755 similarity index 69% copy from client-py/requirements_dev.txt copy to client-py/release.sh index 23e35d4..94a56a6 --- a/client-py/requirements_dev.txt +++ b/client-py/release.sh @@ -1,3 +1,6 @@ +#!/usr/bin/env bash +# +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,9 +19,19 @@ # under the License. # --r requirements.txt -# Pytest to run tests -pytest==6.2.2 -thrift==0.13.0 -flake8==3.9.0 -black==20.8b1 \ No newline at end of file +rm -Rf build +rm -Rf dist +rm -Rf iotdb_session.egg_info + +# (Re-)build generated code +(cd ..; mvn clean generate-sources -pl client-py -am) + +# Run Linting +flake8 + +# Run unit tests +pytest . + +# See https://packaging.python.org/tutorials/packaging-projects/ +python setup.py sdist bdist_wheel +twine upload --repository pypi dist/* \ No newline at end of file diff --git a/client-py/requirements_dev.txt b/client-py/requirements_dev.txt index 23e35d4..b7ad33c 100644 --- a/client-py/requirements_dev.txt +++ b/client-py/requirements_dev.txt @@ -21,4 +21,7 @@ pytest==6.2.2 thrift==0.13.0 flake8==3.9.0 -black==20.8b1 \ No newline at end of file +black==20.8b1 +# For releases +twine==3.4.1 +wheel==0.36.2 \ No newline at end of file diff --git a/client-py/setup.py b/client-py/setup.py index d752254..44e3ec6 100644 --- a/client-py/setup.py +++ b/client-py/setup.py @@ -30,7 +30,7 @@ except FileNotFoundError: print(long_description) setuptools.setup( - name="apache-iotdb", # Replace with your own username + name="apache-iotdb", # Replace with your own username version="0.11.3", author=" Apache Software Foundation", author_email="[email protected]", diff --git a/pom.xml b/pom.xml index 616e99c..51a8fd0 100644 --- a/pom.xml +++ b/pom.xml @@ -572,6 +572,10 @@ <exclude>LICENSE-binary</exclude> <!-- json does not support comments--> <exclude>**/*.json</exclude> + <!-- python --> + <exclude>.pytest_cache/**</exclude> + <exclude>venv/**</exclude> + <exclude>apache_iotdb.egg-info/**</exclude> </excludes> </configuration> </plugin>
