This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new c818006 [CLIENT-PY] [To 0.11]Fixed print statements with logging
output. Added Exceptions instead of empty returns (#2969)
c818006 is described below
commit c8180064e26f919dbfe1003c1eedca52283eb7f4
Author: Julian <[email protected]>
AuthorDate: Mon Apr 5 03:05:30 2021 +0200
[CLIENT-PY] [To 0.11]Fixed print statements with logging output. Added
Exceptions instead of empty returns (#2969)
* [IOTDB-1273]Feature/restrucutre python module as well as supporting
pandas dataframe (#2922)
* [CLIENT-PY] Fixed readme.
[CLIENT-PY] Improved Release Documentation.
[CLIENT-PY] Fixed print statements with logging output. Added Exceptions
instead of empty returns.
[CLIENT-PY] Applied flake8 and black for reformatting.
[CLIENT-PY] Added release script to do a pypi release.
[CLIENT-PY] Improved Release Documentation.
---
.gitignore | 3 +
client-py/{src/iotdb/__init__.py => .flake8} | 17 +-
client-py/.gitignore | 5 +
client-py/README.md | 209 +++++++++++
client-py/SessionExample.py | 177 +++++++++
client-py/SessionTest.py | 243 ++++++++++++
client-py/iotdb/IoTDBContainer.py | 51 +++
client-py/{src => }/iotdb/Session.py | 414 +++++++++++++++------
client-py/{src => }/iotdb/__init__.py | 1 -
client-py/{src => }/iotdb/utils/Field.py | 10 +-
client-py/{src => }/iotdb/utils/IoTDBConstants.py | 0
client-py/{src => }/iotdb/utils/IoTDBRpcDataSet.py | 103 +++--
client-py/{src => }/iotdb/utils/RowRecord.py | 6 +-
client-py/{src => }/iotdb/utils/SessionDataSet.py | 112 +++++-
client-py/{src => }/iotdb/utils/Tablet.py | 25 +-
client-py/{src => }/iotdb/utils/__init__.py | 1 -
client-py/pom.xml | 77 ++--
client-py/pypi/README.md | 73 ----
.../utils/IoTDBConstants.py => pyproject.toml} | 61 ++-
client-py/readme.md | 71 ----
client-py/{src/iotdb/__init__.py => release.sh} | 19 +
.../{src/iotdb/__init__.py => requirements.txt} | 4 +
.../iotdb/__init__.py => requirements_dev.txt} | 9 +
client-py/{pypi => }/setup.py | 18 +-
client-py/src/SessionExample.py | 105 ------
client-py/{src/iotdb => tests}/__init__.py | 1 -
.../IoTDBConstants.py => tests/test_dataframe.py} | 59 ++-
pom.xml | 4 +
28 files changed, 1320 insertions(+), 558 deletions(-)
diff --git a/.gitignore b/.gitignore
index d10c08b..182c9e7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -94,3 +94,6 @@ node_identifier
*.cmake
Makefile
**/CMakeFiles/
+
+# Exclude copied license
+/client-py/LICENSE
diff --git a/client-py/src/iotdb/__init__.py b/client-py/.flake8
similarity index 72%
copy from client-py/src/iotdb/__init__.py
copy to client-py/.flake8
index a4797b6..bd7e7bd 100644
--- a/client-py/src/iotdb/__init__.py
+++ b/client-py/.flake8
@@ -15,4 +15,19 @@
# specific language governing permissions and limitations
# under the License.
#
-
+[flake8]
+ignore =
+ E203,
+ W503
+max-line-length=200
+exclude =
+ .git,
+ test/*,
+ iotdb/thrift/**/*
+extend-exclude =
+ dist,
+ build,
+ venv
+show-source = True
+statistics = True
+format =
%(path)s:%(row)d,%(col)d:%(code)s:%(text)s:https://lintlyci.github.io/Flake8Rules/rules/%(code)s.html
diff --git a/client-py/.gitignore b/client-py/.gitignore
new file mode 100644
index 0000000..0778bd5
--- /dev/null
+++ b/client-py/.gitignore
@@ -0,0 +1,5 @@
+/iotdb/thrift/
+# generated by Pypi
+/build/
+/dist/
+/apache_iotdb.egg-info
diff --git a/client-py/README.md b/client-py/README.md
new file mode 100644
index 0000000..77d0a01
--- /dev/null
+++ b/client-py/README.md
@@ -0,0 +1,209 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# Apache IoTDB
+
+[](https://www.travis-ci.org/apache/iotdb)
+[](https://codecov.io/gh/thulab/iotdb)
+[](https://github.com/apache/iotdb/releases)
+[](https://www.apache.org/licenses/LICENSE-2.0.html)
+
+
+
+
+[](https://iotdb.apache.org/)
+
+
+Apache IoTDB (Database for Internet of Things) is an IoT native database with
high performance for
+data management and analysis, deployable on the edge and the cloud. Due to its
light-weight
+architecture, high performance and rich feature set together with its deep
integration with
+Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of
massive data storage,
+high-speed data ingestion and complex data analysis in the IoT industrial
fields.
+
+
+# Apache IoTDB Python Client API
+
+Using the package, you can write data to IoTDB, read data from IoTDB and
maintain the schema of IoTDB.
+
+## Requirements
+
+You have to install thrift (>=0.13) before using the package.
+
+## How to use (Example)
+
+First, download the package: `pip3 install apache-iotdb`
+
+You can get an example of using the package to read and write data at here:
[Example](https://github.com/apache/iotdb/blob/rel/0.11/client-py/src/SessionExample.py)
+
+(you need to add `import iotdb` in the head of the file)
+
+Or:
+
+```python
+
+from iotdb.Session import Session
+
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = 'root'
+password_ = 'root'
+session = Session(ip, port_, username_, password_)
+session.open(False)
+zone = session.get_time_zone()
+session.close()
+
+```
+
+## IoTDB Testcontainer
+
+The Test Support is based on the lib `testcontainers`
(https://testcontainers-python.readthedocs.io/en/latest/index.html) which you
need to install in your project if you want to use the feature.
+
+To start (and stop) an IoTDB Database in a Docker container simply do:
+```
+class MyTestCase(unittest.TestCase):
+
+ def test_something(self):
+ with IoTDBContainer() as c:
+ session = Session('localhost', c.get_exposed_port(6667), 'root',
'root')
+ session.open(False)
+ result = session.execute_query_statement("SHOW TIMESERIES")
+ print(result)
+ session.close()
+```
+
+by default it will load the image `apache/iotdb:latest`, if you want a
specific version just pass it like e.g. `IoTDBContainer("apache/iotdb:0.10.0")`
to get version `0.10.0` running.
+
+## Pandas Support
+
+To easily transform a query result to a [Pandas
Dataframe](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
+the SessionDataSet has a method `.todf()` which consumes the dataset and
transforms it to a pandas dataframe.
+
+Example:
+
+```python
+
+from iotdb.Session import Session
+
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = 'root'
+password_ = 'root'
+session = Session(ip, port_, username_, password_)
+session.open(False)
+result = session.execute_query_statement("SELECT * FROM root.*")
+
+# Transform to Pandas Dataset
+df = result.todf()
+
+session.close()
+
+# Now you can work with the dataframe
+df = ...
+```
+
+## Developers
+
+### Introduction
+
+This is an example of how to connect to IoTDB with python, using the thrift
rpc interfaces. Things
+are almost the same on Windows or Linux, but pay attention to the difference
like path separator.
+
+### Prerequisites
+
+python3.7 or later is preferred.
+
+You have to install Thrift (0.11.0 or later) to compile our thrift file into
python code. Below is the official
+tutorial of installation, eventually, you should have a thrift executable.
+
+```
+http://thrift.apache.org/docs/install/
+```
+
+Before starting you need to install `requirements_dev.txt` in your python
environment, e.g. by calling
+```
+pip install -r requirements_dev.txt
+```
+
+### Compile the thrift library and Debug
+
+In the root of IoTDB's source code folder, run `mvn clean generate-sources
-pl client-py -am`.
+
+This will automatically delete and repopulate the folder `iotdb/thrift` with
the generated thrift files.
+This folder is ignored from git and should **never be pushed to git!**
+
+**Notice** Do not upload `iotdb/thrift` to the git repo.
+
+
+### Session Client & Example
+
+We packed up the Thrift interface in `client-py/src/iotdb/Session.py` (similar
with its Java counterpart), also provided
+an example file `client-py/src/SessionExample.py` of how to use the session
module. please read it carefully.
+
+
+Or, another simple example:
+
+```python
+from iotdb.Session import Session
+
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = 'root'
+password_ = 'root'
+session = Session(ip, port_, username_, password_)
+session.open(False)
+zone = session.get_time_zone()
+session.close()
+```
+
+### Tests
+
+Please add your custom tests in `tests` folder.
+To run all defined tests just type `pytest .` in the root folder.
+
+**Notice** Some tests need docker to be started on your system as a test
instance is started in a docker container using
[testcontainers](https://testcontainers-python.readthedocs.io/en/latest/index.html).
+
+### Futher Tools
+
+[black](https://pypi.org/project/black/) and
[flake8](https://pypi.org/project/flake8/) are installed for autoformatting and
linting.
+Both can be run by `black .` or `flake8 .` respectively.
+
+## Releasing
+
+To do a release just ensure that you have the right set of generated thrift
files.
+Then run linting and auto-formatting.
+Then, ensure that all tests work (via `pytest .`).
+Then you are good to go to do a release!
+
+### Preparing your environment
+
+First, install all necessary dev dependencies via `pip install -r
requirements_dev.txt`.
+
+### Doing the Release
+
+There is a convenient script `release.sh` to do all steps for a release.
+Namely, these are
+
+* Remove all transient directories from last release (if exists)
+* (Re-)generate all generated sources via mvn
+* Run Linting (flake8)
+* Run Tests via pytest
+* Build
+* Release to pypi
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
new file mode 100644
index 0000000..bf56555
--- /dev/null
+++ b/client-py/SessionExample.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024,
zone_id="UTC+8")
+session.open(False)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+session.delete_storage_group("root.sg_test_02")
+session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
+
+# setting time series.
+session.create_time_series(
+ "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN,
Compressor.SNAPPY
+)
+session.create_time_series(
+ "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN,
Compressor.SNAPPY
+)
+session.create_time_series(
+ "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN,
Compressor.SNAPPY
+)
+
+# setting multiple time series once.
+ts_path_lst_ = [
+ "root.sg_test_01.d_01.s_04",
+ "root.sg_test_01.d_01.s_05",
+ "root.sg_test_01.d_01.s_06",
+ "root.sg_test_01.d_01.s_07",
+ "root.sg_test_01.d_01.s_08",
+ "root.sg_test_01.d_01.s_09",
+]
+data_type_lst_ = [
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_multi_time_series(
+ ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+session.delete_time_series(
+ [
+ "root.sg_test_01.d_01.s_07",
+ "root.sg_test_01.d_01.s_08",
+ "root.sg_test_01.d_01.s_09",
+ ]
+)
+
+# checking time series
+print(
+ "s_07 expecting False, checking result: ",
+ session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
+)
+print(
+ "s_03 expecting True, checking result: ",
+ session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
+)
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_,
values_)
+
+# insert multiple records into database
+measurements_list_ = [
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+ [False, 22, 33, 4.4, 55.1, "test_records01"],
+ [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+session.insert_records(
+ device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+)
+
+# insert one tablet into the database.
+values_ = [
+ [False, 10, 11, 1.1, 10011.1, "test01"],
+ [True, 100, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, 188.1, 688.25, "test03"],
+ [True, 0, 0, 0, 6.25, "test04"],
+] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+ "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+)
+session.insert_tablet(tablet_)
+
+# insert multiple tablets into database
+tablet_01 = Tablet(
+ "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+ "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14,
15]
+)
+session.insert_tablets([tablet_01, tablet_02])
+
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+session.insert_records_of_one_device(
+ "root.sg_test_01.d_01", time_list, measurements_list, data_types_list,
values_list
+)
+
+# execute non-query sql statement
+session.execute_non_query_statement(
+ "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+)
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from
root.sg_test_01.d_01")
+session_data_set.set_fetch_size(1024)
+while session_data_set.has_next():
+ print(session_data_set.next())
+session_data_set.close_operation_handle()
+
+# close session connection.
+session.close()
+
+print("All executions done!!")
diff --git a/client-py/SessionTest.py b/client-py/SessionTest.py
new file mode 100644
index 0000000..96ff00d
--- /dev/null
+++ b/client-py/SessionTest.py
@@ -0,0 +1,243 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
+from iotdb.utils.Tablet import Tablet
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+
+def test_fail(message):
+ global failed_count
+ global final_flag
+ print("*********")
+ print(message)
+ print("*********")
+ final_flag = False
+ failed_count += 1
+
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+session = Session(ip, port_, username_, password_, fetch_size=1024,
zone_id="UTC+8")
+session.open(False)
+
+if not session.is_open():
+ print("can't open session")
+ exit(1)
+
+# set and delete storage groups
+session.set_storage_group("root.sg_test_01")
+session.set_storage_group("root.sg_test_02")
+session.set_storage_group("root.sg_test_03")
+session.set_storage_group("root.sg_test_04")
+
+if session.delete_storage_group("root.sg_test_02") < 0:
+ test_fail("delete storage group failed")
+
+if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0:
+ test_fail("delete storage groups failed")
+
+# setting time series.
+session.create_time_series(
+ "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN,
Compressor.SNAPPY
+)
+session.create_time_series(
+ "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN,
Compressor.SNAPPY
+)
+session.create_time_series(
+ "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN,
Compressor.SNAPPY
+)
+
+# setting multiple time series once.
+ts_path_lst_ = [
+ "root.sg_test_01.d_01.s_04",
+ "root.sg_test_01.d_01.s_05",
+ "root.sg_test_01.d_01.s_06",
+ "root.sg_test_01.d_01.s_07",
+ "root.sg_test_01.d_01.s_08",
+ "root.sg_test_01.d_01.s_09",
+]
+data_type_lst_ = [
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
+compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
+session.create_multi_time_series(
+ ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
+)
+
+# delete time series
+if (
+ session.delete_time_series(
+ [
+ "root.sg_test_01.d_01.s_07",
+ "root.sg_test_01.d_01.s_08",
+ "root.sg_test_01.d_01.s_09",
+ ]
+ )
+ < 0
+):
+ test_fail("delete time series failed")
+
+# checking time series
+# s_07 expecting False
+if session.check_time_series_exists("root.sg_test_01.d_01.s_07"):
+ test_fail("root.sg_test_01.d_01.s_07 shouldn't exist")
+
+# s_03 expecting True
+if not session.check_time_series_exists("root.sg_test_01.d_01.s_03"):
+ test_fail("root.sg_test_01.d_01.s_03 should exist")
+
+# insert one record into the database.
+measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
+data_types_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+]
+if (
+ session.insert_record(
+ "root.sg_test_01.d_01", 1, measurements_, data_types_, values_
+ )
+ < 0
+):
+ test_fail("insert record failed")
+
+# insert multiple records into database
+measurements_list_ = [
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+]
+values_list_ = [
+ [False, 22, 33, 4.4, 55.1, "test_records01"],
+ [True, 77, 88, 1.25, 8.125, "test_records02"],
+]
+data_type_list_ = [data_types_, data_types_]
+device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
+if (
+ session.insert_records(
+ device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
+ )
+ < 0
+):
+ test_fail("insert records failed")
+
+# insert one tablet into the database.
+values_ = [
+ [False, 10, 11, 1.1, 10011.1, "test01"],
+ [True, 100, 11111, 1.25, 101.0, "test02"],
+ [False, 100, 1, 188.1, 688.25, "test03"],
+ [True, 0, 0, 0, 6.25, "test04"],
+] # Non-ASCII text will cause error since bytes can only hold 0-128 nums.
+timestamps_ = [4, 5, 6, 7]
+tablet_ = Tablet(
+ "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
+)
+if session.insert_tablet(tablet_) < 0:
+ test_fail("insert tablet failed")
+
+# insert multiple tablets into database
+tablet_01 = Tablet(
+ "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11]
+)
+tablet_02 = Tablet(
+ "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14,
15]
+)
+if session.insert_tablets([tablet_01, tablet_02]) < 0:
+ test_fail("insert tablets failed")
+
+# insert records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+]
+data_types_list = [
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64],
+]
+values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
+
+if (
+ session.insert_records_of_one_device(
+ "root.sg_test_01.d_01",
+ time_list,
+ measurements_list,
+ data_types_list,
+ values_list,
+ )
+ < 0
+):
+ test_fail("insert records of one device failed")
+
+# execute non-query sql statement
+if (
+ session.execute_non_query_statement(
+ "insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
+ )
+ < 0
+):
+ test_fail(
+ "execute 'insert into root.sg_test_01.d_01(timestamp, s_02) values(16,
188)' failed"
+ )
+
+# execute sql query statement
+session_data_set = session.execute_query_statement("select * from
root.sg_test_01.d_01")
+session_data_set.set_fetch_size(1024)
+expect_count = 16
+actual_count = 0
+while session_data_set.has_next():
+ actual_count += 1
+session_data_set.close_operation_handle()
+
+if actual_count != expect_count:
+ test_fail(
+ "query count mismatch: expect count: "
+ + str(expect_count)
+ + " actual count: "
+ + str(actual_count)
+ )
+
+# close session connection.
+session.close()
+
+if final_flag:
+ print("All executions done!!")
+else:
+ print("Some test failed, please have a check")
+ print("failed count: ", failed_count)
+ exit(1)
diff --git a/client-py/iotdb/IoTDBContainer.py
b/client-py/iotdb/IoTDBContainer.py
new file mode 100644
index 0000000..9a01887
--- /dev/null
+++ b/client-py/iotdb/IoTDBContainer.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from os import environ
+
+from testcontainers.core.container import DockerContainer
+from testcontainers.core.waiting_utils import wait_container_is_ready
+
+from iotdb.Session import Session
+
+
+class IoTDBContainer(DockerContainer):
+ IOTDB_USER = environ.get("IOTDB_USER", "root")
+ IOTDB_PASSWORD = environ.get("IOTDB_PASSWORD", "root")
+
+ def _configure(self):
+ pass
+
+ @wait_container_is_ready()
+ def _connect(self):
+ session = Session(
+ self.get_container_host_ip(), self.get_exposed_port(6667), "root",
"root"
+ )
+ session.open(False)
+ session.close()
+
+ def __init__(self, image="apache/iotdb:latest", **kwargs):
+ super(IoTDBContainer, self).__init__(image)
+ self.port_to_expose = 6667
+ self.with_exposed_ports(self.port_to_expose)
+
+ def start(self):
+ self._configure()
+ super().start()
+ self._connect()
+ return self
diff --git a/client-py/src/iotdb/Session.py b/client-py/iotdb/Session.py
similarity index 59%
rename from client-py/src/iotdb/Session.py
rename to client-py/iotdb/Session.py
index a9c9f07..7b28e91 100644
--- a/client-py/src/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -15,19 +15,29 @@
# specific language governing permissions and limitations
# under the License.
#
-
+import logging
import struct
-import sys
import time
-from .utils.SessionDataSet import SessionDataSet
-from .utils.IoTDBConstants import *
+from iotdb.utils.SessionDataSet import SessionDataSet
from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import TSocket, TTransport
-from .thrift.rpc.TSIService import Client, TSCreateTimeseriesReq,
TSInsertRecordReq, TSInsertStringRecordReq, TSInsertTabletReq, \
- TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq,
TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq
+from .thrift.rpc.TSIService import (
+ Client,
+ TSCreateTimeseriesReq,
+ TSInsertRecordReq,
+ TSInsertStringRecordReq,
+ TSInsertTabletReq,
+ TSExecuteStatementReq,
+ TSOpenSessionReq,
+ TSCreateMultiTimeseriesReq,
+ TSCloseSessionReq,
+ TSInsertTabletsReq,
+ TSInsertRecordsReq,
+ TSInsertRecordsOfOneDeviceReq,
+)
from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion,
TSSetTimeZoneReq
# for debug
@@ -41,15 +51,27 @@ from .thrift.rpc.ttypes import TSDeleteDataReq,
TSProtocolVersion, TSSetTimeZone
# TSExecuteStatementReq, TSOpenSessionReq, TSQueryDataSet,
TSFetchResultsReq, TSCloseOperationReq, \
# TSCreateMultiTimeseriesReq, TSCloseSessionReq, TSInsertTabletsReq,
TSInsertRecordsReq
# from iotdb.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion,
TSSetTimeZoneReq
+from .utils.IoTDBConstants import TSDataType
+
+logger = logging.getLogger("IoTDB")
class Session(object):
+ SUCCESS_CODE = 200
DEFAULT_FETCH_SIZE = 10000
- DEFAULT_USER = 'root'
- DEFAULT_PASSWORD = 'root'
- DEFAULT_ZONE_ID = time.strftime('%z')
-
- def __init__(self, host, port, user=DEFAULT_USER,
password=DEFAULT_PASSWORD, fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID):
+ DEFAULT_USER = "root"
+ DEFAULT_PASSWORD = "root"
+ DEFAULT_ZONE_ID = time.strftime("%z")
+
+ def __init__(
+ self,
+ host,
+ port,
+ user=DEFAULT_USER,
+ password=DEFAULT_PASSWORD,
+ fetch_size=DEFAULT_FETCH_SIZE,
+ zone_id=DEFAULT_ZONE_ID,
+ ):
self.__host = host
self.__port = port
self.__user = user
@@ -66,30 +88,37 @@ class Session(object):
def open(self, enable_rpc_compression):
if not self.__is_close:
return
- self.__transport =
TTransport.TFramedTransport(TSocket.TSocket(self.__host, self.__port))
+ self.__transport = TTransport.TFramedTransport(
+ TSocket.TSocket(self.__host, self.__port)
+ )
if not self.__transport.isOpen():
try:
self.__transport.open()
except TTransport.TTransportException as e:
- print('TTransportException: ', e)
+ logger.exception("TTransportException!", exc_info=e)
if enable_rpc_compression:
self.__client =
Client(TCompactProtocol.TCompactProtocol(self.__transport))
else:
self.__client =
Client(TBinaryProtocol.TBinaryProtocol(self.__transport))
- open_req = TSOpenSessionReq(client_protocol=self.protocol_version,
- username=self.__user,
- password=self.__password,
- zoneId=self.__zone_id)
+ open_req = TSOpenSessionReq(
+ client_protocol=self.protocol_version,
+ username=self.__user,
+ password=self.__password,
+ zoneId=self.__zone_id,
+ )
try:
open_resp = self.__client.openSession(open_req)
if self.protocol_version != open_resp.serverProtocolVersion:
- print("Protocol differ, Client version is {}, but Server
version is {}".format(
- self.protocol_version, open_resp.serverProtocolVersion))
+ logger.exception(
+ "Protocol differ, Client version is {}, but Server version
is {}".format(
+ self.protocol_version, open_resp.serverProtocolVersion
+ )
+ )
# version is less than 0.10
if open_resp.serverProtocolVersion == 0:
raise TTransport.TException(message="Protocol not
supported.")
@@ -99,7 +128,7 @@ class Session(object):
except Exception as e:
self.__transport.close()
- print("session closed because: ", e)
+ logger.exception("session closed because: ", exc_info=e)
if self.__zone_id is not None:
self.set_time_zone(self.__zone_id)
@@ -115,7 +144,10 @@ class Session(object):
try:
self.__client.closeSession(req)
except TTransport.TException as e:
- print("Error occurs when closing session at server. Maybe server
is down. Error message: ", e)
+ logger.exception(
+ "Error occurs when closing session at server. Maybe server is
down. Error message: ",
+ exc_info=e,
+ )
finally:
self.__is_close = True
if self.__transport is not None:
@@ -127,7 +159,11 @@ class Session(object):
:param group_name: String, storage group name (starts from root)
"""
status = self.__client.setStorageGroup(self.__session_id, group_name)
- print("setting storage group {} message: {}".format(group_name,
status.message))
+ logger.debug(
+ "setting storage group {} message: {}".format(group_name,
status.message)
+ )
+
+ return Session.verify_success(status)
def delete_storage_group(self, storage_group):
"""
@@ -135,7 +171,7 @@ class Session(object):
:param storage_group: String, path of the target storage group.
"""
groups = [storage_group]
- self.delete_storage_groups(groups)
+ return self.delete_storage_groups(groups)
def delete_storage_groups(self, storage_group_lst):
"""
@@ -143,7 +179,13 @@ class Session(object):
:param storage_group_lst: List, paths of the target storage groups.
"""
status = self.__client.deleteStorageGroups(self.__session_id,
storage_group_lst)
- print("delete storage group(s) {} message:
{}".format(storage_group_lst, status.message))
+ logger.debug(
+ "delete storage group(s) {} message: {}".format(
+ storage_group_lst, status.message
+ )
+ )
+
+ return Session.verify_success(status)
def create_time_series(self, ts_path, data_type, encoding, compressor):
"""
@@ -156,11 +198,19 @@ class Session(object):
data_type = data_type.value
encoding = encoding.value
compressor = compressor.value
- request = TSCreateTimeseriesReq(self.__session_id, ts_path, data_type,
encoding, compressor)
+ request = TSCreateTimeseriesReq(
+ self.__session_id, ts_path, data_type, encoding, compressor
+ )
status = self.__client.createTimeseries(request)
- print("creating time series {} message: {}".format(ts_path,
status.message))
+ logger.debug(
+ "creating time series {} message: {}".format(ts_path,
status.message)
+ )
+
+ return Session.verify_success(status)
- def create_multi_time_series(self, ts_path_lst, data_type_lst,
encoding_lst, compressor_lst):
+ def create_multi_time_series(
+ self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst
+ ):
"""
create multiple time series
:param ts_path_lst: List of String, complete time series paths (starts
from root)
@@ -172,10 +222,17 @@ class Session(object):
encoding_lst = [encoding.value for encoding in encoding_lst]
compressor_lst = [compressor.value for compressor in compressor_lst]
- request = TSCreateMultiTimeseriesReq(self.__session_id, ts_path_lst,
data_type_lst,
- encoding_lst, compressor_lst)
+ request = TSCreateMultiTimeseriesReq(
+ self.__session_id, ts_path_lst, data_type_lst, encoding_lst,
compressor_lst
+ )
status = self.__client.createMultiTimeseries(request)
- print("creating multiple time series {} message:
{}".format(ts_path_lst, status.message))
+ logger.debug(
+ "creating multiple time series {} message: {}".format(
+ ts_path_lst, status.message
+ )
+ )
+
+ return Session.verify_success(status)
def delete_time_series(self, paths_list):
"""
@@ -183,7 +240,13 @@ class Session(object):
:param paths_list: List of time series path, which should be complete
(starts from root)
"""
status = self.__client.deleteTimeseries(self.__session_id, paths_list)
- print("deleting multiple time series {} message:
{}".format(paths_list, status.message))
+ logger.debug(
+ "deleting multiple time series {} message: {}".format(
+ paths_list, status.message
+ )
+ )
+
+ return Session.verify_success(status)
def check_time_series_exists(self, path):
"""
@@ -205,16 +268,30 @@ class Session(object):
request = TSDeleteDataReq(self.__session_id, paths_list, timestamp)
try:
status = self.__client.deleteData(request)
- print("delete data from {}, message: {}".format(paths_list,
status.message))
+ logger.debug(
+ "delete data from {}, message: {}".format(paths_list,
status.message)
+ )
except TTransport.TException as e:
- print("data deletion fails because: ", e)
+ logger.exception("data deletion fails because: ", e)
def insert_str_record(self, device_id, timestamp, measurements,
string_values):
""" special case for inserting one row of String (TEXT) value """
+ if type(string_values) == str:
+ string_values = [string_values]
+ if type(measurements) == str:
+ measurements = [measurements]
data_types = [TSDataType.TEXT.value for _ in string_values]
- request = self.gen_insert_str_record_req(device_id, timestamp,
measurements, data_types, string_values)
+ request = self.gen_insert_str_record_req(
+ device_id, timestamp, measurements, data_types, string_values
+ )
status = self.__client.insertStringRecord(request)
- print("insert one record to device {} message: {}".format(device_id,
status.message))
+ logger.debug(
+ "insert one record to device {} message: {}".format(
+ device_id, status.message
+ )
+ )
+
+ return Session.verify_success(status)
def insert_record(self, device_id, timestamp, measurements, data_types,
values):
"""
@@ -229,11 +306,21 @@ class Session(object):
:param values: List, values to be inserted, for each sensor
"""
data_types = [data_type.value for data_type in data_types]
- request = self.gen_insert_record_req(device_id, timestamp,
measurements, data_types, values)
+ request = self.gen_insert_record_req(
+ device_id, timestamp, measurements, data_types, values
+ )
status = self.__client.insertRecord(request)
- print("insert one record to device {} message: {}".format(device_id,
status.message))
+ logger.debug(
+ "insert one record to device {} message: {}".format(
+ device_id, status.message
+ )
+ )
+
+ return Session.verify_success(status)
- def insert_records(self, device_ids, times, measurements_lst, types_lst,
values_lst):
+ def insert_records(
+ self, device_ids, times, measurements_lst, types_lst, values_lst
+ ):
"""
insert multiple rows of data, records are independent to each other,
in other words, there's no relationship
between those records
@@ -247,11 +334,21 @@ class Session(object):
for types in types_lst:
data_types = [data_type.value for data_type in types]
type_values_lst.append(data_types)
- request = self.gen_insert_records_req(device_ids, times,
measurements_lst, type_values_lst, values_lst)
+ request = self.gen_insert_records_req(
+ device_ids, times, measurements_lst, type_values_lst, values_lst
+ )
status = self.__client.insertRecords(request)
- print("insert multiple records to devices {} message:
{}".format(device_ids, status.message))
+ logger.debug(
+ "insert multiple records to devices {} message: {}".format(
+ device_ids, status.message
+ )
+ )
- def test_insert_record(self, device_id, timestamp, measurements,
data_types, values):
+ return Session.verify_success(status)
+
+ def test_insert_record(
+ self, device_id, timestamp, measurements, data_types, values
+ ):
"""
this method NOT insert data into database and the server just return
after accept the request, this method
should be used to test other time cost in client
@@ -262,11 +359,21 @@ class Session(object):
:param values: List, values to be inserted, for each sensor
"""
data_types = [data_type.value for data_type in data_types]
- request = self.gen_insert_record_req(device_id, timestamp,
measurements, data_types, values)
+ request = self.gen_insert_record_req(
+ device_id, timestamp, measurements, data_types, values
+ )
status = self.__client.testInsertRecord(request)
- print("testing! insert one record to device {} message:
{}".format(device_id, status.message))
+ logger.debug(
+ "testing! insert one record to device {} message: {}".format(
+ device_id, status.message
+ )
+ )
+
+ return Session.verify_success(status)
- def test_insert_records(self, device_ids, times, measurements_lst,
types_lst, values_lst):
+ def test_insert_records(
+ self, device_ids, times, measurements_lst, types_lst, values_lst
+ ):
"""
this method NOT insert data into database and the server just return
after accept the request, this method
should be used to test other time cost in client
@@ -280,43 +387,66 @@ class Session(object):
for types in types_lst:
data_types = [data_type.value for data_type in types]
type_values_lst.append(data_types)
- request = self.gen_insert_records_req(device_ids, times,
measurements_lst, type_values_lst, values_lst)
+ request = self.gen_insert_records_req(
+ device_ids, times, measurements_lst, type_values_lst, values_lst
+ )
status = self.__client.testInsertRecords(request)
- print("testing! insert multiple records, message:
{}".format(status.message))
+ logger.debug(
+ "testing! insert multiple records, message:
{}".format(status.message)
+ )
- def gen_insert_record_req(self, device_id, timestamp, measurements,
data_types, values):
+ return Session.verify_success(status)
+
+ def gen_insert_record_req(
+ self, device_id, timestamp, measurements, data_types, values
+ ):
if (len(values) != len(data_types)) or (len(values) !=
len(measurements)):
- print("length of data types does not equal to length of values!")
- # could raise an error here.
- return
+ raise RuntimeError(
+ "length of data types does not equal to length of values!"
+ )
values_in_bytes = Session.value_to_bytes(data_types, values)
- return TSInsertRecordReq(self.__session_id, device_id, measurements,
values_in_bytes, timestamp)
-
- def gen_insert_str_record_req(self, device_id, timestamp, measurements,
data_types, values):
- if (len(values) != len(data_types)) or (len(values) !=
len(measurements)):
- print("length of data types does not equal to length of
values!")
- # could raise an error here.
- return
- values_in_bytes = Session.value_to_bytes(data_types, values)
- return TSInsertStringRecordReq(self.__session_id, device_id,
measurements, values_in_bytes, timestamp)
+ return TSInsertRecordReq(
+ self.__session_id, device_id, measurements, values_in_bytes,
timestamp
+ )
- def gen_insert_records_req(self, device_ids, times, measurements_lst,
types_lst, values_lst):
- if (len(device_ids) != len(measurements_lst)) or (len(times) !=
len(types_lst)) or \
- (len(device_ids) != len(times)) or (len(times) != len(values_lst)):
- print("deviceIds, times, measurementsList and valuesList's size
should be equal")
- # could raise an error here.
- return
+ def gen_insert_str_record_req(
+ self, device_id, timestamp, measurements, data_types, values
+ ):
+ if (len(values) != len(data_types)) or (len(values) !=
len(measurements)):
+ raise RuntimeError(
+ "length of data types does not equal to length of values!"
+ )
+ return TSInsertStringRecordReq(
+ self.__session_id, device_id, measurements, values, timestamp
+ )
+
+ def gen_insert_records_req(
+ self, device_ids, times, measurements_lst, types_lst, values_lst
+ ):
+ if (
+ (len(device_ids) != len(measurements_lst))
+ or (len(times) != len(types_lst))
+ or (len(device_ids) != len(times))
+ or (len(times) != len(values_lst))
+ ):
+ raise RuntimeError(
+ "deviceIds, times, measurementsList and valuesList's size
should be equal"
+ )
value_lst = []
- for values, data_types, measurements in zip(values_lst, types_lst,
measurements_lst):
+ for values, data_types, measurements in zip(
+ values_lst, types_lst, measurements_lst
+ ):
if (len(values) != len(data_types)) or (len(values) !=
len(measurements)):
- print("deviceIds, times, measurementsList and valuesList's
size should be equal")
- # could raise an error here.
- return
+ raise RuntimeError(
+ "deviceIds, times, measurementsList and valuesList's size
should be equal"
+ )
values_in_bytes = Session.value_to_bytes(data_types, values)
value_lst.append(values_in_bytes)
- return TSInsertRecordsReq(self.__session_id, device_ids,
measurements_lst, value_lst, times)
+ return TSInsertRecordsReq(
+ self.__session_id, device_ids, measurements_lst, value_lst, times
+ )
def insert_tablet(self, tablet):
"""
@@ -331,7 +461,13 @@ class Session(object):
:param tablet: a tablet specified above
"""
status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet))
- print("insert one tablet to device {} message:
{}".format(tablet.get_device_id(), status.message))
+ logger.debug(
+ "insert one tablet to device {} message: {}".format(
+ tablet.get_device_id(), status.message
+ )
+ )
+
+ return Session.verify_success(status)
def insert_tablets(self, tablet_lst):
"""
@@ -339,7 +475,9 @@ class Session(object):
:param tablet_lst: List of tablets
"""
status =
self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst))
- print("insert multiple tablets, message: {}".format(status.message))
+ logger.debug("insert multiple tablets, message:
{}".format(status.message))
+
+ return Session.verify_success(status)
def test_insert_tablet(self, tablet):
"""
@@ -348,7 +486,13 @@ class Session(object):
:param tablet: a tablet of data
"""
status =
self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet))
- print("testing! insert one tablet to device {} message:
{}".format(tablet.get_device_id(), status.message))
+ logger.debug(
+ "testing! insert one tablet to device {} message: {}".format(
+ tablet.get_device_id(), status.message
+ )
+ )
+
+ return Session.verify_success(status)
def test_insert_tablets(self, tablet_list):
"""
@@ -356,14 +500,26 @@ class Session(object):
should be used to test other time cost in client
:param tablet_list: List of tablets
"""
- status =
self.__client.testInsertTablets(self.gen_insert_tablets_req(tablet_list))
- print("testing! insert multiple tablets, message:
{}".format(status.message))
+ status = self.__client.testInsertTablets(
+ self.gen_insert_tablets_req(tablet_list)
+ )
+ logger.debug(
+ "testing! insert multiple tablets, message:
{}".format(status.message)
+ )
+
+ return Session.verify_success(status)
def gen_insert_tablet_req(self, tablet):
data_type_values = [data_type.value for data_type in
tablet.get_data_types()]
- return TSInsertTabletReq(self.__session_id, tablet.get_device_id(),
tablet.get_measurements(),
- tablet.get_binary_values(),
tablet.get_binary_timestamps(),
- data_type_values, tablet.get_row_number())
+ return TSInsertTabletReq(
+ self.__session_id,
+ tablet.get_device_id(),
+ tablet.get_measurements(),
+ tablet.get_binary_values(),
+ tablet.get_binary_timestamps(),
+ data_type_values,
+ tablet.get_row_number(),
+ )
def gen_insert_tablets_req(self, tablet_lst):
device_id_lst = []
@@ -373,15 +529,24 @@ class Session(object):
type_lst = []
size_lst = []
for tablet in tablet_lst:
- data_type_values = [data_type.value for data_type in
tablet.get_data_types()]
+ data_type_values = [
+ data_type.value for data_type in tablet.get_data_types()
+ ]
device_id_lst.append(tablet.get_device_id())
measurements_lst.append(tablet.get_measurements())
values_lst.append(tablet.get_binary_values())
timestamps_lst.append(tablet.get_binary_timestamps())
type_lst.append(data_type_values)
size_lst.append(tablet.get_row_number())
- return TSInsertTabletsReq(self.__session_id, device_id_lst,
measurements_lst,
- values_lst, timestamps_lst, type_lst,
size_lst)
+ return TSInsertTabletsReq(
+ self.__session_id,
+ device_id_lst,
+ measurements_lst,
+ values_lst,
+ timestamps_lst,
+ type_lst,
+ size_lst,
+ )
def execute_query_statement(self, sql):
"""
@@ -389,10 +554,21 @@ class Session(object):
:param sql: String, query sql statement
:return: SessionDataSet, contains query results and relevant info (see
SessionDataSet.py)
"""
- request = TSExecuteStatementReq(self.__session_id, sql,
self.__statement_id, self.__fetch_size)
+ request = TSExecuteStatementReq(
+ self.__session_id, sql, self.__statement_id, self.__fetch_size
+ )
resp = self.__client.executeQueryStatement(request)
- return SessionDataSet(sql, resp.columns, resp.dataTypeList,
resp.columnNameIndexMap, resp.queryId,
- self.__client, self.__session_id,
resp.queryDataSet, resp.ignoreTimeStamp)
+ return SessionDataSet(
+ sql,
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
def execute_non_query_statement(self, sql):
"""
@@ -403,9 +579,12 @@ class Session(object):
try:
resp = self.__client.executeUpdateStatement(request)
status = resp.status
- print("execute non-query statement {} message: {}".format(sql,
status.message))
+ logger.debug(
+ "execute non-query statement {} message: {}".format(sql,
status.message)
+ )
+ return Session.verify_success(status)
except TTransport.TException as e:
- print("execution of non-query statement fails because: ", e)
+ raise RuntimeError("execution of non-query statement fails
because: ", e)
@staticmethod
def value_to_bytes(data_types, values):
@@ -413,44 +592,42 @@ class Session(object):
values_tobe_packed = []
for data_type, value in zip(data_types, values):
if data_type == TSDataType.BOOLEAN.value:
- format_str_list.append("h")
+ format_str_list.append("c")
format_str_list.append("?")
- values_tobe_packed.append(TSDataType.BOOLEAN.value)
+ values_tobe_packed.append(bytes([TSDataType.BOOLEAN.value]))
values_tobe_packed.append(value)
elif data_type == TSDataType.INT32.value:
- format_str_list.append("h")
+ format_str_list.append("c")
format_str_list.append("i")
- values_tobe_packed.append(TSDataType.INT32.value)
+ values_tobe_packed.append(bytes([TSDataType.INT32.value]))
values_tobe_packed.append(value)
elif data_type == TSDataType.INT64.value:
- format_str_list.append("h")
+ format_str_list.append("c")
format_str_list.append("q")
- values_tobe_packed.append(TSDataType.INT64.value)
+ values_tobe_packed.append(bytes([TSDataType.INT64.value]))
values_tobe_packed.append(value)
elif data_type == TSDataType.FLOAT.value:
- format_str_list.append("h")
+ format_str_list.append("c")
format_str_list.append("f")
- values_tobe_packed.append(TSDataType.FLOAT.value)
+ values_tobe_packed.append(bytes([TSDataType.FLOAT.value]))
values_tobe_packed.append(value)
elif data_type == TSDataType.DOUBLE.value:
- format_str_list.append("h")
+ format_str_list.append("c")
format_str_list.append("d")
- values_tobe_packed.append(TSDataType.DOUBLE.value)
+ values_tobe_packed.append(bytes([TSDataType.DOUBLE.value]))
values_tobe_packed.append(value)
elif data_type == TSDataType.TEXT.value:
- value_bytes = bytes(value, 'utf-8')
- format_str_list.append("h")
+ value_bytes = bytes(value, "utf-8")
+ format_str_list.append("c")
format_str_list.append("i")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
- values_tobe_packed.append(TSDataType.TEXT.value)
+ values_tobe_packed.append(bytes([TSDataType.TEXT.value]))
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
else:
- print("Unsupported data type:" + str(data_type))
- # could raise an error here.
- return
- format_str = ''.join(format_str_list)
+ raise RuntimeError("Unsupported data type:" + str(data_type))
+ format_str = "".join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
def get_time_zone(self):
@@ -459,16 +636,37 @@ class Session(object):
try:
resp = self.__client.getTimeZone(self.__session_id)
except TTransport.TException as e:
- print("Could not get time zone because: ", e)
- raise Exception
+ raise RuntimeError("Could not get time zone because: ", e)
return resp.timeZone
def set_time_zone(self, zone_id):
request = TSSetTimeZoneReq(self.__session_id, zone_id)
try:
status = self.__client.setTimeZone(request)
- print("setting time zone_id as {}, message: {}".format(zone_id,
status.message))
+ logger.debug(
+ "setting time zone_id as {}, message: {}".format(
+ zone_id, status.message
+ )
+ )
except TTransport.TException as e:
- print("Could not set time zone because: ", e)
- raise Exception
+ raise RuntimeError("Could not set time zone because: ", e)
self.__zone_id = zone_id
+
+ @staticmethod
+ def check_sorted(timestamps):
+ for i in range(1, len(timestamps)):
+ if timestamps[i] < timestamps[i - 1]:
+ return False
+ return True
+
+ @staticmethod
+ def verify_success(status):
+ """
+ verify success of operation
+ :param status: execution result status
+ """
+ if status.code == Session.SUCCESS_CODE:
+ return 0
+
+ logger.debug("error status is", status)
+ return -1
diff --git a/client-py/src/iotdb/__init__.py b/client-py/iotdb/__init__.py
similarity index 99%
copy from client-py/src/iotdb/__init__.py
copy to client-py/iotdb/__init__.py
index a4797b6..2a1e720 100644
--- a/client-py/src/iotdb/__init__.py
+++ b/client-py/iotdb/__init__.py
@@ -15,4 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
-
diff --git a/client-py/src/iotdb/utils/Field.py b/client-py/iotdb/utils/Field.py
similarity index 96%
rename from client-py/src/iotdb/utils/Field.py
rename to client-py/iotdb/utils/Field.py
index 55d1e33..0756b1c 100644
--- a/client-py/src/iotdb/utils/Field.py
+++ b/client-py/iotdb/utils/Field.py
@@ -19,11 +19,8 @@
# for package
from .IoTDBConstants import TSDataType
-# for debug
-# from IoTDBConstants import TSDataType
class Field(object):
-
def __init__(self, data_type):
"""
:param data_type: TSDataType
@@ -53,7 +50,9 @@ class Field(object):
elif output.get_data_type() == TSDataType.TEXT:
output.set_binary_value(field.get_binary_value())
else:
- raise Exception("unsupported data type
{}".format(output.get_data_type()))
+ raise Exception(
+ "unsupported data type {}".format(output.get_data_type())
+ )
return output
def get_data_type(self):
@@ -124,7 +123,7 @@ class Field(object):
elif self.__data_type == TSDataType.DOUBLE:
return str(self.__double_value)
elif self.__data_type == TSDataType.TEXT:
- return self.__binary_value.decode('utf-8')
+ return self.__binary_value.decode("utf-8")
else:
raise Exception("unsupported data type
{}".format(self.__data_type))
@@ -176,4 +175,3 @@ class Field(object):
else:
raise Exception("unsupported data type {}".format(data_type))
return field
-
diff --git a/client-py/src/iotdb/utils/IoTDBConstants.py
b/client-py/iotdb/utils/IoTDBConstants.py
similarity index 100%
copy from client-py/src/iotdb/utils/IoTDBConstants.py
copy to client-py/iotdb/utils/IoTDBConstants.py
diff --git a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
b/client-py/iotdb/utils/IoTDBRpcDataSet.py
similarity index 75%
rename from client-py/src/iotdb/utils/IoTDBRpcDataSet.py
rename to client-py/iotdb/utils/IoTDBRpcDataSet.py
index 21c2f0e..500d438 100644
--- a/client-py/src/iotdb/utils/IoTDBRpcDataSet.py
+++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -17,18 +17,13 @@
#
# for package
-from .IoTDBConstants import *
-
-# for debug
-# from IoTDBConstants import *
-
-import sys
-from os.path import dirname, abspath
-path = dirname(dirname(abspath(__file__)))
-sys.path.append(path)
+import logging
from thrift.transport import TTransport
from iotdb.thrift.rpc.TSIService import TSFetchResultsReq, TSCloseOperationReq
+from iotdb.utils.IoTDBConstants import TSDataType
+
+logger = logging.getLogger("IoTDB")
class IoTDBRpcDataSet(object):
@@ -37,8 +32,19 @@ class IoTDBRpcDataSet(object):
START_INDEX = 2
FLAG = 0x80
- def __init__(self, sql, column_name_list, column_type_list,
column_name_index, ignore_timestamp, query_id,
- client, session_id, query_data_set, fetch_size):
+ def __init__(
+ self,
+ sql,
+ column_name_list,
+ column_type_list,
+ column_name_index,
+ ignore_timestamp,
+ query_id,
+ client,
+ session_id,
+ query_data_set,
+ fetch_size,
+ ):
self.__session_id = session_id
self.__ignore_timestamp = ignore_timestamp
self.__sql = sql
@@ -56,15 +62,21 @@ class IoTDBRpcDataSet(object):
self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
if column_name_index is not None:
- self.__column_type_deduplicated_list = [None for _ in
range(len(column_name_index))]
+ self.__column_type_deduplicated_list = [
+ None for _ in range(len(column_name_index))
+ ]
for i in range(len(column_name_list)):
name = column_name_list[i]
self.__column_name_list.append(name)
self.__column_type_list.append(TSDataType[column_type_list[i]])
if name not in self.__column_ordinal_dict:
index = column_name_index[name]
- self.__column_ordinal_dict[name] = index +
IoTDBRpcDataSet.START_INDEX
- self.__column_type_deduplicated_list[index] =
TSDataType[column_type_list[i]]
+ self.__column_ordinal_dict[name] = (
+ index + IoTDBRpcDataSet.START_INDEX
+ )
+ self.__column_type_deduplicated_list[index] = TSDataType[
+ column_type_list[i]
+ ]
else:
index = IoTDBRpcDataSet.START_INDEX
self.__column_type_deduplicated_list = []
@@ -75,10 +87,14 @@ class IoTDBRpcDataSet(object):
if name not in self.__column_ordinal_dict:
self.__column_ordinal_dict[name] = index
index += 1
-
self.__column_type_deduplicated_list.append(TSDataType[column_type_list[i]])
+ self.__column_type_deduplicated_list.append(
+ TSDataType[column_type_list[i]]
+ )
self.__time_bytes = bytes(0)
- self.__current_bitmap = [bytes(0) for _ in
range(len(self.__column_type_deduplicated_list))]
+ self.__current_bitmap = [
+ bytes(0) for _ in range(len(self.__column_type_deduplicated_list))
+ ]
self.__value = [None for _ in
range(len(self.__column_type_deduplicated_list))]
self.__query_data_set = query_data_set
self.__is_closed = False
@@ -91,11 +107,18 @@ class IoTDBRpcDataSet(object):
return
if self.__client is not None:
try:
- status =
self.__client.closeOperation(TSCloseOperationReq(self.__session_id,
self.__query_id))
- print("close session {}, message:
{}".format(self.__session_id, status.message))
+ status = self.__client.closeOperation(
+ TSCloseOperationReq(self.__session_id, self.__query_id)
+ )
+ logger.debug(
+ "close session {}, message: {}".format(
+ self.__session_id, status.message
+ )
+ )
except TTransport.TException as e:
- print("close session {} failed because:
".format(self.__session_id), e)
- raise Exception
+ raise RuntimeError(
+ "close session {} failed because:
".format(self.__session_id), e
+ )
self.__is_closed = True
self.__client = None
@@ -112,7 +135,9 @@ class IoTDBRpcDataSet(object):
return False
def has_cached_result(self):
- return (self.__query_data_set is not None) and
(len(self.__query_data_set.time) != 0)
+ return (self.__query_data_set is not None) and (
+ len(self.__query_data_set.time) != 0
+ )
def construct_one_row(self):
# simulating buffer, read 8 bytes from data set and discard first 8
bytes which have been read.
@@ -146,18 +171,25 @@ class IoTDBRpcDataSet(object):
self.__value[i] = value_buffer[:8]
self.__query_data_set.valueList[i] = value_buffer[8:]
elif data_type == TSDataType.TEXT:
- length = int.from_bytes(value_buffer[:4], byteorder="big",
signed=False)
- self.__value[i] = value_buffer[4: 4 + length]
- self.__query_data_set.valueList[i] = value_buffer[4 +
length:]
+ length = int.from_bytes(
+ value_buffer[:4], byteorder="big", signed=False
+ )
+ self.__value[i] = value_buffer[4 : 4 + length]
+ self.__query_data_set.valueList[i] = value_buffer[4 +
length :]
else:
- print("unsupported data type {}.".format(data_type))
- # could raise exception here
+ raise RuntimeError("unsupported data type
{}.".format(data_type))
self.__rows_index += 1
self.__has_cached_record = True
def fetch_results(self):
self.__rows_index = 0
- request = TSFetchResultsReq(self.__session_id, self.__sql,
self.__fetch_size, self.__query_id, True)
+ request = TSFetchResultsReq(
+ self.__session_id,
+ self.__sql,
+ self.__fetch_size,
+ self.__query_id,
+ True
+ )
try:
resp = self.__client.fetchResults(request)
if not resp.hasResultSet:
@@ -166,15 +198,20 @@ class IoTDBRpcDataSet(object):
self.__query_data_set = resp.queryDataSet
return resp.hasResultSet
except TTransport.TException as e:
- print("Cannot fetch result from server, because of network
connection: ", e)
+ raise RuntimeError(
+ "Cannot fetch result from server, because of network
connection: ", e
+ )
def is_null(self, index, row_num):
bitmap = self.__current_bitmap[index]
shift = row_num % 8
- return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xff)) == 0
+ return ((IoTDBRpcDataSet.FLAG >> shift) & (bitmap & 0xFF)) == 0
def is_null_by_index(self, column_index):
- index =
self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] -
IoTDBRpcDataSet.START_INDEX
+ index = (
+
self.__column_ordinal_dict[self.find_column_name_by_index(column_index)]
+ - IoTDBRpcDataSet.START_INDEX
+ )
# time column will never be None
if index < 0:
return True
@@ -191,7 +228,11 @@ class IoTDBRpcDataSet(object):
if column_index <= 0:
raise Exception("Column index should start from 1")
if column_index > len(self.__column_name_list):
- raise Exception("column index {} out of range
{}".format(column_index, self.__column_size))
+ raise Exception(
+ "column index {} out of range {}".format(
+ column_index, self.__column_size
+ )
+ )
return self.__column_name_list[column_index - 1]
def get_fetch_size(self):
diff --git a/client-py/src/iotdb/utils/RowRecord.py
b/client-py/iotdb/utils/RowRecord.py
similarity index 93%
rename from client-py/src/iotdb/utils/RowRecord.py
rename to client-py/iotdb/utils/RowRecord.py
index 46d810e..16a88f1 100644
--- a/client-py/src/iotdb/utils/RowRecord.py
+++ b/client-py/iotdb/utils/RowRecord.py
@@ -17,22 +17,18 @@
#
# for package
-from .IoTDBConstants import TSDataType
from .Field import Field
# for debug
# from IoTDBConstants import TSDataType
# from Field import Field
-class RowRecord(object):
+class RowRecord(object):
def __init__(self, timestamp, field_list=None):
self.__timestamp = timestamp
self.__field_list = field_list
- def add_field(self, field):
- self.__field_list.append(field)
-
def add_field(self, value, data_type):
self.__field_list.append(Field.get_field(value, data_type))
diff --git a/client-py/src/iotdb/utils/SessionDataSet.py
b/client-py/iotdb/utils/SessionDataSet.py
similarity index 54%
rename from client-py/src/iotdb/utils/SessionDataSet.py
rename to client-py/iotdb/utils/SessionDataSet.py
index 91b1989..f0f7266 100644
--- a/client-py/src/iotdb/utils/SessionDataSet.py
+++ b/client-py/iotdb/utils/SessionDataSet.py
@@ -15,28 +15,46 @@
# specific language governing permissions and limitations
# under the License.
#
+import logging
+import struct
+
+from iotdb.utils.Field import Field
# for package
-from .IoTDBConstants import TSDataType
-from .IoTDBRpcDataSet import IoTDBRpcDataSet
-from .Field import Field
-from .RowRecord import RowRecord
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.IoTDBRpcDataSet import IoTDBRpcDataSet
+from iotdb.utils.RowRecord import RowRecord
-# for debug
-# from IoTDBConstants import TSDataType
-# from IoTDBRpcDataSet import IoTDBRpcDataSet
-# from Field import Field
-# from RowRecord import RowRecord
+import pandas as pd
-import struct
+logger = logging.getLogger("IoTDB")
class SessionDataSet(object):
-
- def __init__(self, sql, column_name_list, column_type_list,
column_name_index, query_id, client, session_id,
- query_data_set, ignore_timestamp):
- self.iotdb_rpc_data_set = IoTDBRpcDataSet(sql, column_name_list,
column_type_list, column_name_index,
- ignore_timestamp, query_id,
client, session_id, query_data_set, 1024)
+ def __init__(
+ self,
+ sql,
+ column_name_list,
+ column_type_list,
+ column_name_index,
+ query_id,
+ client,
+ session_id,
+ query_data_set,
+ ignore_timestamp,
+ ):
+ self.iotdb_rpc_data_set = IoTDBRpcDataSet(
+ sql,
+ column_name_list,
+ column_type_list,
+ column_name_index,
+ ignore_timestamp,
+ query_id,
+ client,
+ session_id,
+ query_data_set,
+ 1024,
+ )
def get_fetch_size(self):
return self.iotdb_rpc_data_set.get_fetch_size()
@@ -69,11 +87,16 @@ class SessionDataSet(object):
index -= 1
data_set_column_index -= 1
column_name = self.iotdb_rpc_data_set.get_column_names()[index]
- location =
self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name] -
IoTDBRpcDataSet.START_INDEX
+ location = (
+ self.iotdb_rpc_data_set.get_column_ordinal_dict()[column_name]
+ - IoTDBRpcDataSet.START_INDEX
+ )
if not
self.iotdb_rpc_data_set.is_null_by_index(data_set_column_index):
value_bytes = self.iotdb_rpc_data_set.get_values()[location]
- data_type =
self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[location]
+ data_type =
self.iotdb_rpc_data_set.get_column_type_deduplicated_list()[
+ location
+ ]
field = Field(data_type)
if data_type == TSDataType.BOOLEAN:
value = struct.unpack(">?", value_bytes)[0]
@@ -93,17 +116,66 @@ class SessionDataSet(object):
elif data_type == TSDataType.TEXT:
field.set_binary_value(value_bytes)
else:
- print("unsupported data type {}.".format(data_type))
- # could raise exception here
+ raise RuntimeError("unsupported data type
{}.".format(data_type))
else:
field = Field(None)
out_fields.append(field)
- return RowRecord(struct.unpack(">q",
self.iotdb_rpc_data_set.get_time_bytes())[0], out_fields)
+ return RowRecord(
+ struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0],
out_fields
+ )
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()
+ def todf(self):
+ return resultset_to_pandas(self)
+
+
+def resultset_to_pandas(result_set: SessionDataSet) -> pd.DataFrame:
+ """
+ Transforms a SessionDataSet from IoTDB to a Pandas Data Frame
+ Each Field from IoTDB is a column in Pandas
+ :param result_set:
+ :return:
+ """
+ # get column names and fields
+ column_names = result_set.get_column_names()
+
+ value_dict = {}
+
+ for i in range(len(column_names)):
+ value_dict[column_names[i]] = []
+
+ while result_set.has_next():
+ record = result_set.next()
+
+ value_dict["Time"].append(record.get_timestamp())
+
+ for col in range(len(record.get_fields())):
+ field: Field = record.get_fields()[col]
+
+ value_dict[column_names[col + 1]].append(get_typed_point(field))
+
+ return pd.DataFrame(value_dict)
+
+def get_typed_point(field: Field, none_value=None):
+ choices = {
+ # In Case of Boolean, cast to 0 / 1
+ TSDataType.BOOLEAN: lambda field: 1 if field.get_bool_value() else 0,
+ TSDataType.TEXT: lambda field: field.get_string_value(),
+ TSDataType.FLOAT: lambda field: field.get_float_value(),
+ TSDataType.INT32: lambda field: field.get_int_value(),
+ TSDataType.DOUBLE: lambda field: field.get_double_value(),
+ TSDataType.INT64: lambda field: field.get_long_value(),
+ }
+ result_next_type: TSDataType = field.get_data_type()
+ if result_next_type in choices.keys():
+ return choices.get(result_next_type)(field)
+ elif result_next_type is None:
+ return none_value
+ else:
+ raise Exception(f"Unknown DataType {result_next_type}!")
diff --git a/client-py/src/iotdb/utils/Tablet.py
b/client-py/iotdb/utils/Tablet.py
similarity index 90%
rename from client-py/src/iotdb/utils/Tablet.py
rename to client-py/iotdb/utils/Tablet.py
index cdb0c21..667adcb 100644
--- a/client-py/src/iotdb/utils/Tablet.py
+++ b/client-py/iotdb/utils/Tablet.py
@@ -16,17 +16,12 @@
# under the License.
#
-# for package
-from .IoTDBConstants import *
-
-# for debug
-# from IoTDBConstants import *
-
import struct
+from iotdb.utils.IoTDBConstants import TSDataType
-class Tablet(object):
+class Tablet(object):
def __init__(self, device_id, measurements, data_types, values,
timestamps):
"""
creating a tablet for insertion
@@ -45,8 +40,9 @@ class Tablet(object):
:param timestamps: List.
"""
if len(timestamps) != len(values):
- print("Input error! len(timestamps) does not equal to
len(values)!")
- # could raise an error here.
+ raise RuntimeError(
+ "Input error! len(timestamps) does not equal to len(values)!"
+ )
if not Tablet.check_sorted(timestamps):
sorted_zipped = sorted(zip(timestamps, values))
@@ -88,7 +84,7 @@ class Tablet(object):
format_str_list.append("q")
values_tobe_packed.append(timestamp)
- format_str = ''.join(format_str_list)
+ format_str = "".join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
def get_binary_values(self):
@@ -122,17 +118,14 @@ class Tablet(object):
values_tobe_packed.append(self.__values[j][i])
elif self.__data_types[i] == TSDataType.TEXT:
for j in range(self.__row_number):
- value_bytes = bytes(self.__values[j][i], 'utf-8')
+ value_bytes = bytes(self.__values[j][i], "utf-8")
format_str_list.append("i")
format_str_list.append(str(len(value_bytes)))
format_str_list.append("s")
values_tobe_packed.append(len(value_bytes))
values_tobe_packed.append(value_bytes)
else:
- print("Unsupported data type:" + str(self.__data_types[i]))
- # could raise an error here.
- return
+ raise RuntimeError("Unsupported data type:" +
str(self.__data_types[i]))
- format_str = ''.join(format_str_list)
+ format_str = "".join(format_str_list)
return struct.pack(format_str, *values_tobe_packed)
-
diff --git a/client-py/src/iotdb/utils/__init__.py
b/client-py/iotdb/utils/__init__.py
similarity index 99%
rename from client-py/src/iotdb/utils/__init__.py
rename to client-py/iotdb/utils/__init__.py
index a4797b6..2a1e720 100644
--- a/client-py/src/iotdb/utils/__init__.py
+++ b/client-py/iotdb/utils/__init__.py
@@ -15,4 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
-
diff --git a/client-py/pom.xml b/client-py/pom.xml
index 9df0aea..c4278fb 100644
--- a/client-py/pom.xml
+++ b/client-py/pom.xml
@@ -39,7 +39,37 @@
</dependencies>
<build>
<plugins>
- <!-- for pypi distribution -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ <!-- clean thrift folder -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>iotdb</directory>
+ <includes>
+ <include>thrift/</include>
+ </includes>
+ <followSymlinks>false</followSymlinks>
+ </fileset>
+ <fileset>
+ <directory>./</directory>
+ <includes>
+ <include>LICENSE</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <!-- fill thrift folder -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
@@ -57,16 +87,17 @@
</goals>
<configuration>
<encoding>utf-8</encoding>
-
<outputDirectory>${project.build.directory}/pypi/</outputDirectory>
+
<outputDirectory>${basedir}/iotdb/thrift/</outputDirectory>
<resources>
<resource>
-
<directory>${basedir}/../thrift/target/generated-sources-python</directory>
+
<directory>${basedir}/../thrift/target/generated-sources-python/iotdb/thrift/</directory>
</resource>
</resources>
</configuration>
</execution>
+ <!-- Copy License -->
<execution>
- <id>copy-license-resources</id>
+ <id>copy-pypi-file-resources</id>
<!-- here the phase you need -->
<phase>generate-sources</phase>
<goals>
@@ -74,10 +105,10 @@
</goals>
<configuration>
<encoding>utf-8</encoding>
-
<outputDirectory>${project.build.directory}/pypi</outputDirectory>
+ <outputDirectory>${basedir}/</outputDirectory>
<resources>
<resource>
- <directory>${basedir}/../</directory>
+ <directory>${basedir}/..</directory>
<includes>
<include>LICENSE</include>
</includes>
@@ -85,40 +116,6 @@
</resources>
</configuration>
</execution>
- <execution>
- <id>copy-python-file-resources</id>
- <!-- here the phase you need -->
- <phase>generate-sources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <encoding>utf-8</encoding>
-
<outputDirectory>${project.build.directory}/pypi/</outputDirectory>
- <resources>
- <resource>
- <directory>${basedir}/src/</directory>
- </resource>
- </resources>
- </configuration>
- </execution>
- <execution>
- <id>copy-pypi-file-resources</id>
- <!-- here the phase you need -->
- <phase>generate-sources</phase>
- <goals>
- <goal>copy-resources</goal>
- </goals>
- <configuration>
- <encoding>utf-8</encoding>
-
<outputDirectory>${project.build.directory}/pypi</outputDirectory>
- <resources>
- <resource>
- <directory>${basedir}/pypi</directory>
- </resource>
- </resources>
- </configuration>
- </execution>
</executions>
</plugin>
</plugins>
diff --git a/client-py/pypi/README.md b/client-py/pypi/README.md
deleted file mode 100644
index dc7182c..0000000
--- a/client-py/pypi/README.md
+++ /dev/null
@@ -1,73 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-# Apache IoTDB
-
-[](https://www.travis-ci.org/apache/iotdb)
-[](https://codecov.io/gh/thulab/iotdb)
-[](https://github.com/apache/iotdb/releases)
-[](https://www.apache.org/licenses/LICENSE-2.0.html)
-
-
-
-
-[](https://iotdb.apache.org/)
-
-
-Apache IoTDB (Database for Internet of Things) is an IoT native database with
high performance for
-data management and analysis, deployable on the edge and the cloud. Due to its
light-weight
-architecture, high performance and rich feature set together with its deep
integration with
-Apache Hadoop, Spark and Flink, Apache IoTDB can meet the requirements of
massive data storage,
-high-speed data ingestion and complex data analysis in the IoT industrial
fields.
-
-
-# Apache IoTDB Python Client API
-
-Using the package, you can write data to IoTDB, read data from IoTDB and
maintain the schema of IoTDB.
-
-## Requirements
-
-You have to install thrift (>=0.13) before using the package.
-
-## How to use (Example)
-
-First, download the package: `pip3 install apache-iotdb`
-
-You can get an example of using the package to read and write data at here:
[Example](https://github.com/apache/iotdb/blob/rel/0.11/client-py/src/SessionExample.py)
-
-(you need to add `import iotdb` in the head of the file)
-
-Or:
-
-```python
-
-from iotdb.Session import Session
-
-ip = "127.0.0.1"
-port_ = "6667"
-username_ = 'root'
-password_ = 'root'
-session = Session(ip, port_, username_, password_)
-session.open(False)
-zone = session.get_time_zone()
-session.close()
-
-```
\ No newline at end of file
diff --git a/client-py/src/iotdb/utils/IoTDBConstants.py
b/client-py/pyproject.toml
similarity index 61%
copy from client-py/src/iotdb/utils/IoTDBConstants.py
copy to client-py/pyproject.toml
index f053af7..389b729 100644
--- a/client-py/src/iotdb/utils/IoTDBConstants.py
+++ b/client-py/pyproject.toml
@@ -16,39 +16,30 @@
# under the License.
#
-from enum import Enum, unique
+[tool.black]
+line-length = 88
+target-version = ['py37']
+include = '\.pyi?$'
+exclude = '''
-
-@unique
-class TSDataType(Enum):
- BOOLEAN = 0
- INT32 = 1
- INT64 = 2
- FLOAT = 3
- DOUBLE = 4
- TEXT = 5
-
-
-@unique
-class TSEncoding(Enum):
- PLAIN = 0
- PLAIN_DICTIONARY = 1
- RLE = 2
- DIFF = 3
- TS_2DIFF = 4
- BITMAP = 5
- GORILLA_V1 = 6
- REGULAR = 7
- GORILLA = 8
-
-
-@unique
-class Compressor(Enum):
- UNCOMPRESSED = 0
- SNAPPY = 1
- GZIP = 2
- LZO = 3
- SDT = 4
- PAA = 5
- PLA = 6
- LZ4 = 7
+(
+ /(
+ \.eggs # exclude a few common directories in the
+ | \.git # root of the project
+ | \.hg
+ | \.mypy_cache
+ | \.tox
+ | \.venv
+ | venv
+ | _build
+ | buck-out
+ | build
+ | dist
+ | migrations
+ | test
+ | iotdb/thrift
+ )/
+ | foo.py # also separately exclude a file named foo.py in
+ # the root of the project
+)
+'''
\ No newline at end of file
diff --git a/client-py/readme.md b/client-py/readme.md
deleted file mode 100644
index 8bedf08..0000000
--- a/client-py/readme.md
+++ /dev/null
@@ -1,71 +0,0 @@
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
--->
-
-# Python Client
-## Introduction
-This is an example of how to connect to IoTDB with python, using the thrift
rpc interfaces. Things
-are almost the same on Windows or Linux, but pay attention to the difference
like path separator.
-
-## Prerequisites
-python3.7 or later is preferred.
-
-You have to install Thrift (0.11.1 or later) to compile our thrift file into
python code. Below is the official
-tutorial of installation, eventually, you should have a thrift executable.
-```
-http://thrift.apache.org/docs/install/
-```
-
-## Compile the thrift library and Debug
-
-In the root of IoTDB's source code folder, run `mvn generate-sources -pl
client-py -am`.
-
-Then a complete project will be generated at `client-py/target/pypi` folder.
-But !BE CAUTIOUS!
-All your modifications in `client-py/target/pypi` must be copied manually to
`client-py/src/` folder.
-Otherwise once you run `mvn clean`, you will lose all your effort.
-
-Or, you can also copy `client-py/target/pypi/iotdb/thrift` folder to
`client-py/src/thrift`, then the
-`src` folder will become also a complete python project.
-But !BE CAUTIOUS!
-Do not upload `client-py/src/thrift` to the git repo.
-
-
-## Session Client & Example
-We packed up the Thrift interface in `client-py/src/iotdb/Session.py` (similar
with its Java counterpart), also provided
-an example file `client-py/src/SessionExample.py` of how to use the session
module. please read it carefully.
-
-
-Or, another simple example:
-
-```$python
-
-from iotdb.Session import Session
-
-ip = "127.0.0.1"
-port_ = "6667"
-username_ = 'root'
-password_ = 'root'
-session = Session(ip, port_, username_, password_)
-session.open(False)
-zone = session.get_time_zone()
-session.close()
-
-```
\ No newline at end of file
diff --git a/client-py/src/iotdb/__init__.py b/client-py/release.sh
old mode 100644
new mode 100755
similarity index 69%
copy from client-py/src/iotdb/__init__.py
copy to client-py/release.sh
index a4797b6..94a56a6
--- a/client-py/src/iotdb/__init__.py
+++ b/client-py/release.sh
@@ -1,3 +1,6 @@
+#!/usr/bin/env bash
+#
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,3 +19,19 @@
# under the License.
#
+rm -Rf build
+rm -Rf dist
+rm -Rf iotdb_session.egg_info
+
+# (Re-)build generated code
+(cd ..; mvn clean generate-sources -pl client-py -am)
+
+# Run Linting
+flake8
+
+# Run unit tests
+pytest .
+
+# See https://packaging.python.org/tutorials/packaging-projects/
+python setup.py sdist bdist_wheel
+twine upload --repository pypi dist/*
\ No newline at end of file
diff --git a/client-py/src/iotdb/__init__.py b/client-py/requirements.txt
similarity index 92%
copy from client-py/src/iotdb/__init__.py
copy to client-py/requirements.txt
index a4797b6..55839d9 100644
--- a/client-py/src/iotdb/__init__.py
+++ b/client-py/requirements.txt
@@ -16,3 +16,7 @@
# under the License.
#
+# Pandas Export
+pandas==1.2.3
+# Testcontainer
+testcontainers==3.3.0
\ No newline at end of file
diff --git a/client-py/src/iotdb/__init__.py b/client-py/requirements_dev.txt
similarity index 84%
copy from client-py/src/iotdb/__init__.py
copy to client-py/requirements_dev.txt
index a4797b6..b7ad33c 100644
--- a/client-py/src/iotdb/__init__.py
+++ b/client-py/requirements_dev.txt
@@ -16,3 +16,12 @@
# under the License.
#
+-r requirements.txt
+# Pytest to run tests
+pytest==6.2.2
+thrift==0.13.0
+flake8==3.9.0
+black==20.8b1
+# For releases
+twine==3.4.1
+wheel==0.36.2
\ No newline at end of file
diff --git a/client-py/pypi/setup.py b/client-py/setup.py
similarity index 82%
rename from client-py/pypi/setup.py
rename to client-py/setup.py
index a353043..44e3ec6 100644
--- a/client-py/pypi/setup.py
+++ b/client-py/setup.py
@@ -21,16 +21,16 @@ import io
try:
- with io.open('README.md', encoding='utf-8') as f:
+ with io.open("README.md", encoding="utf-8") as f:
long_description = f.read()
except FileNotFoundError:
- long_description = ''
+ long_description = ""
print(long_description)
setuptools.setup(
- name="apache-iotdb", # Replace with your own username
+ name="apache-iotdb", # Replace with your own username
version="0.11.3",
author=" Apache Software Foundation",
author_email="[email protected]",
@@ -40,8 +40,10 @@ setuptools.setup(
url="https://github.com/apache/iotdb",
packages=setuptools.find_packages(),
install_requires=[
- 'thrift>=0.13.0',
- ],
+ "thrift>=0.13.0",
+ "pandas>=1.0.0,<1.99.99",
+ "testcontainers>=2.0.0",
+ ],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: Apache Software License",
@@ -49,7 +51,7 @@ setuptools.setup(
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
- python_requires='>=3.7',
- license='Apache License, Version 2.0',
- website='https://iotdb.apache.org',
+ python_requires=">=3.7",
+ license="Apache License, Version 2.0",
+ website="https://iotdb.apache.org",
)
diff --git a/client-py/src/SessionExample.py b/client-py/src/SessionExample.py
deleted file mode 100644
index 697d075..0000000
--- a/client-py/src/SessionExample.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Uncomment the following line to use apache-iotdb module installed by pip3
-
-from iotdb.Session import Session
-from iotdb.utils.Tablet import Tablet
-from iotdb.utils.IoTDBConstants import *
-
-# creating session connection.
-ip = "127.0.0.1"
-port_ = "6667"
-username_ = 'root'
-password_ = 'root'
-session = Session(ip, port_, username_, password_, fetch_size=1024,
zone_id='UTC+8')
-session.open(False)
-
-# set and delete storage groups
-session.set_storage_group("root.sg_test_01")
-session.set_storage_group("root.sg_test_02")
-session.set_storage_group("root.sg_test_03")
-session.set_storage_group("root.sg_test_04")
-session.delete_storage_group("root.sg_test_02")
-session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
-
-# setting time series.
-session.create_time_series("root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN,
TSEncoding.PLAIN, Compressor.SNAPPY)
-session.create_time_series("root.sg_test_01.d_01.s_02", TSDataType.INT32,
TSEncoding.PLAIN, Compressor.SNAPPY)
-session.create_time_series("root.sg_test_01.d_01.s_03", TSDataType.INT64,
TSEncoding.PLAIN, Compressor.SNAPPY)
-
-# setting multiple time series once.
-ts_path_lst_ = ["root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05",
"root.sg_test_01.d_01.s_06",
- "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09"]
-data_type_lst_ = [TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT,
- TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
-encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
-compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
-session.create_multi_time_series(ts_path_lst_, data_type_lst_, encoding_lst_,
compressor_lst_)
-
-# delete time series
-session.delete_time_series(["root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09"])
-
-# checking time series
-print("s_07 expecting False, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_01.s_07"))
-print("s_03 expecting True, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_01.s_03"))
-
-# insert one record into the database.
-measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
-values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
-data_types_ = [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64,
- TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT]
-session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_,
values_)
-
-# insert multiple records into database
-measurements_list_ = [["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
- ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]]
-values_list_ = [[False, 22, 33, 4.4, 55.1, "test_records01"],
- [True, 77, 88, 1.25, 8.125, "test_records02"]]
-data_type_list_ = [data_types_, data_types_]
-device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
-session.insert_records(device_ids_, [2, 3], measurements_list_,
data_type_list_, values_list_)
-
-# insert one tablet into the database.
-values_ = [[False, 10, 11, 1.1, 10011.1, "test01"],
- [True, 100, 11111, 1.25, 101.0, "test02"],
- [False, 100, 1, 188.1, 688.25, "test03"],
- [True, 0, 0, 0, 6.25, "test04"]] # Non-ASCII text will cause error
since bytes can only hold 0-128 nums.
-timestamps_ = [4, 5, 6, 7]
-tablet_ = Tablet("root.sg_test_01.d_01", measurements_, data_types_, values_,
timestamps_)
-session.insert_tablet(tablet_)
-
-# insert multiple tablets into database
-tablet_01 = Tablet("root.sg_test_01.d_01", measurements_, data_types_,
values_, [8, 9, 10, 11])
-tablet_02 = Tablet("root.sg_test_01.d_01", measurements_, data_types_,
values_, [12, 13, 14, 15])
-session.insert_tablets([tablet_01, tablet_02])
-
-# execute non-query sql statement
-session.execute_non_query_statement("insert into
root.sg_test_01.d_01(timestamp, s_02) values(16, 188);")
-
-# execute sql query statement
-session_data_set = session.execute_query_statement("select * from
root.sg_test_01.d_01")
-session_data_set.set_fetch_size(1024)
-while session_data_set.has_next():
- print(session_data_set.next())
-session_data_set.close_operation_handle()
-
-# close session connection.
-session.close()
-
-print("All executions done!!")
diff --git a/client-py/src/iotdb/__init__.py b/client-py/tests/__init__.py
similarity index 99%
rename from client-py/src/iotdb/__init__.py
rename to client-py/tests/__init__.py
index a4797b6..2a1e720 100644
--- a/client-py/src/iotdb/__init__.py
+++ b/client-py/tests/__init__.py
@@ -15,4 +15,3 @@
# specific language governing permissions and limitations
# under the License.
#
-
diff --git a/client-py/src/iotdb/utils/IoTDBConstants.py
b/client-py/tests/test_dataframe.py
similarity index 52%
rename from client-py/src/iotdb/utils/IoTDBConstants.py
rename to client-py/tests/test_dataframe.py
index f053af7..30f38e3 100644
--- a/client-py/src/iotdb/utils/IoTDBConstants.py
+++ b/client-py/tests/test_dataframe.py
@@ -16,39 +16,26 @@
# under the License.
#
-from enum import Enum, unique
-
-
-@unique
-class TSDataType(Enum):
- BOOLEAN = 0
- INT32 = 1
- INT64 = 2
- FLOAT = 3
- DOUBLE = 4
- TEXT = 5
-
-
-@unique
-class TSEncoding(Enum):
- PLAIN = 0
- PLAIN_DICTIONARY = 1
- RLE = 2
- DIFF = 3
- TS_2DIFF = 4
- BITMAP = 5
- GORILLA_V1 = 6
- REGULAR = 7
- GORILLA = 8
-
-
-@unique
-class Compressor(Enum):
- UNCOMPRESSED = 0
- SNAPPY = 1
- GZIP = 2
- LZO = 3
- SDT = 4
- PAA = 5
- PLA = 6
- LZ4 = 7
+from iotdb.Session import Session
+from iotdb.IoTDBContainer import IoTDBContainer
+
+from numpy.testing import assert_array_equal
+
+
+def test_simple_query():
+ with IoTDBContainer("apache/iotdb:0.11.2") as db:
+ db: IoTDBContainer
+ session = Session(db.get_container_host_ip(),
db.get_exposed_port(6667))
+ session.open(False)
+
+ # Write data
+ session.insert_str_record("root.device", 123, "pressure", "15.0")
+
+ # Read
+ session_data_set = session.execute_query_statement("SELECT * FROM
root.*")
+ df = session_data_set.todf()
+
+ session.close()
+
+ assert list(df.columns) == ["Time", "root.device.pressure"]
+ assert_array_equal(df.values, [[123.0, 15.0]])
diff --git a/pom.xml b/pom.xml
index 616e99c..51a8fd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -572,6 +572,10 @@
<exclude>LICENSE-binary</exclude>
<!-- json does not support comments-->
<exclude>**/*.json</exclude>
+ <!-- python -->
+ <exclude>.pytest_cache/**</exclude>
+ <exclude>venv/**</exclude>
+ <exclude>apache_iotdb.egg-info/**</exclude>
</excludes>
</configuration>
</plugin>