This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 89a31913 Fix py read error (#433)
89a31913 is described below
commit 89a3191340605e1e33eabafdfacb09d9a9ee4478
Author: Colin Lee <[email protected]>
AuthorDate: Mon Mar 10 15:44:09 2025 +0800
Fix py read error (#433)
1. fix data losing when query a table that not all devices have data.
2. add support for py read table schema.
3. add support for py read dataframe.
---
cpp/src/common/tsblock/tsblock.h | 14 ++----
.../reader/block/device_ordered_tsblock_reader.cc | 18 +++++--
.../reader/block/single_device_tsblock_reader.cc | 27 ++++++++--
.../reader/block/single_device_tsblock_reader.h | 6 ++-
cpp/src/reader/table_query_executor.h | 2 +-
cpp/src/reader/table_result_set.cc | 14 ++++--
.../writer/table_view/tsfile_writer_table_test.cc | 21 +++++---
cpp/test/writer/tsfile_writer_test.cc | 8 ++-
python/tests/test_write_and_read.py | 48 +++++++++++++-----
python/tsfile/schema.py | 17 ++++++-
python/tsfile/tsfile_cpp.pxd | 27 ++++++----
python/tsfile/tsfile_py_cpp.pxd | 6 ++-
python/tsfile/tsfile_py_cpp.pyx | 38 ++++++++++++++-
python/tsfile/tsfile_reader.pyx | 57 +++++++++++++++++-----
14 files changed, 228 insertions(+), 75 deletions(-)
diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index 9bc865b0..47f110cc 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -50,10 +50,11 @@ class TsBlock {
max_row_count_(max_row_count),
tuple_desc_(tupledesc) {}
- virtual ~TsBlock() {
+ ~TsBlock() {
int size = vectors_.size();
for (int i = 0; i < size; ++i) {
delete vectors_[i];
+ vectors_[i] = nullptr;
}
}
@@ -227,16 +228,7 @@ class RowIterator {
column_count_ = tsblock_->tuple_desc_->get_column_count();
}
- ~RowIterator() {
- /*
- * if use RowIterator and ColIterator at the same time,
- * need to reset the offset after one is used,
- * otherwise it will cause the offset to be wrong
- */
- for (uint32_t i = 0; i < column_count_; ++i) {
- tsblock_->vectors_[i]->reset_offset();
- }
- }
+ ~RowIterator() {}
FORCE_INLINE bool end() { return row_id_ >= tsblock_->row_count_; }
diff --git a/cpp/src/reader/block/device_ordered_tsblock_reader.cc
b/cpp/src/reader/block/device_ordered_tsblock_reader.cc
index 191ed607..7469cc8b 100644
--- a/cpp/src/reader/block/device_ordered_tsblock_reader.cc
+++ b/cpp/src/reader/block/device_ordered_tsblock_reader.cc
@@ -33,7 +33,7 @@ int DeviceOrderedTsBlockReader::has_next(bool &has_next) {
while (device_task_iterator_->has_next()) {
DeviceQueryTask *task = nullptr;
if (IS_FAIL(device_task_iterator_->next(task))) {
- has_next = false;
+ return ret;
}
if (current_reader_) {
delete current_reader_;
@@ -43,15 +43,25 @@ int DeviceOrderedTsBlockReader::has_next(bool &has_next) {
task, block_size_, metadata_querier_, tsfile_io_reader_,
time_filter_,
field_filter_);
if (current_reader_ == nullptr) {
- has_next = false;
return common::E_OOM;
}
+
if (RET_FAIL(current_reader_->has_next(has_next))) {
return ret;
- } else {
- return common::E_OK;
+ }
+ // If current device has data, just return.
+ if (has_next) {
+ return ret;
+ }
+ // If current device does not have data, get next device.
+
+ // Free current device reader.
+ if (current_reader_) {
+ delete current_reader_;
+ current_reader_ = nullptr;
}
}
+ has_next = false;
return ret;
}
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index 99c8e3ed..ec8059f9 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -74,6 +74,13 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask*
device_query_task,
construct_column_context(time_series_index, time_filter);
}
+ // There is no data in this single device tsblock reader.
+ if (field_column_contexts_.empty()) {
+ delete current_block_;
+ current_block_ = nullptr;
+ return common::E_OK;
+ }
+
for (const auto& id_column :
device_query_task->get_column_mapping()->get_id_columns()) {
const auto& column_pos_in_result =
@@ -233,6 +240,10 @@ void SingleDeviceTsBlockReader::close() {
if (device_query_task_) {
device_query_task_->~DeviceQueryTask();
}
+ if (current_block_) {
+ delete current_block_;
+ current_block_ = nullptr;
+ }
}
int SingleDeviceTsBlockReader::construct_column_context(
@@ -258,6 +269,7 @@ int SingleDeviceTsBlockReader::construct_column_context(
device_query_task_->get_column_mapping()->get_column_pos(
time_series_index->get_measurement_name().to_std_string()),
pa_))) {
+ delete column_context;
return ret;
}
field_column_contexts_.insert(std::make_pair(
@@ -266,11 +278,15 @@ int SingleDeviceTsBlockReader::construct_column_context(
} else {
SingleMeasurementColumnContext* column_context =
new SingleMeasurementColumnContext(tsfile_io_reader_);
- column_context->init(
- device_query_task_, time_series_index, time_filter,
- device_query_task_->get_column_mapping()->get_column_pos(
- time_series_index->get_measurement_name().to_std_string()),
- pa_);
+ if (RET_FAIL(column_context->init(
+ device_query_task_, time_series_index, time_filter,
+ device_query_task_->get_column_mapping()->get_column_pos(
+ time_series_index->get_measurement_name().to_std_string()),
+ pa_))) {
+ delete column_context;
+ return ret;
+ }
+
field_column_contexts_.insert(std::make_pair(
time_series_index->get_measurement_name().to_std_string(),
column_context));
@@ -370,4 +386,5 @@ void SingleMeasurementColumnContext::fill_into(
col_appenders[pos + 1]->append(val, len);
}
}
+
} // namespace storage
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.h
b/cpp/src/reader/block/single_device_tsblock_reader.h
index d2949cd4..a94aed31 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.h
+++ b/cpp/src/reader/block/single_device_tsblock_reader.h
@@ -112,8 +112,11 @@ class SingleMeasurementColumnContext final : public
MeasurementColumnContext {
delete value_iter_;
value_iter_ = nullptr;
}
- ssi_->revert_tsblock();
+ if (ssi_) {
+ ssi_->revert_tsblock();
+ }
tsfile_io_reader_->revert_ssi(ssi_);
+ ssi_ = nullptr;
}
void fill_into(std::vector<common::ColAppender*>& col_appenders) override;
@@ -128,6 +131,7 @@ class SingleMeasurementColumnContext final : public
MeasurementColumnContext {
int get_current_value(char*& value, uint32_t& len) override;
int move_iter() override;
+
private:
std::string column_name_;
std::vector<int32_t> pos_in_result_;
diff --git a/cpp/src/reader/table_query_executor.h
b/cpp/src/reader/table_query_executor.h
index e9a3c513..32d522c0 100644
--- a/cpp/src/reader/table_query_executor.h
+++ b/cpp/src/reader/table_query_executor.h
@@ -50,7 +50,7 @@ class TableQueryExecutor {
tsfile_io_reader_->init(read_file);
meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_);
table_query_ordering_ = TableQueryOrdering::DEVICE;
- block_size_ = 10240;
+ block_size_ = 1024;
}
~TableQueryExecutor() {
if (meta_data_querier_ != nullptr) {
diff --git a/cpp/src/reader/table_result_set.cc
b/cpp/src/reader/table_result_set.cc
index a6c8f963..8eb5d9dd 100644
--- a/cpp/src/reader/table_result_set.cc
+++ b/cpp/src/reader/table_result_set.cc
@@ -39,9 +39,16 @@ int TableResultSet::next(bool& has_next) {
while (row_iterator_ == nullptr || !row_iterator_->has_next()) {
if (RET_FAIL(tsblock_reader_->has_next(has_next))) {
return ret;
- } else if (!has_next) {
+ }
+
+ if (!has_next) {
+ if (row_iterator_) {
+ delete row_iterator_;
+ row_iterator_ = nullptr;
+ }
break;
}
+
if (RET_FAIL(tsblock_reader_->next(tsblock_))) {
break;
}
@@ -49,6 +56,7 @@ int TableResultSet::next(bool& has_next) {
delete row_iterator_;
row_iterator_ = nullptr;
}
+
row_iterator_ = new common::RowIterator(tsblock_);
}
if (row_iterator_ == nullptr || !row_iterator_->has_next()) {
@@ -100,10 +108,6 @@ void TableResultSet::close() {
delete row_iterator_;
row_iterator_ = nullptr;
}
- if (tsblock_) {
- delete tsblock_;
- tsblock_ = nullptr;
- }
}
} // namespace storage
\ No newline at end of file
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index a616b7fa..bc152571 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -182,7 +182,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, DISABLED_WriteAndReadSimple) {
+TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(2);
@@ -210,19 +210,26 @@ TEST_F(TsFileWriterTableTest,
DISABLED_WriteAndReadSimple) {
TsFileReader reader = TsFileReader();
reader.open(write_file_.get_file_path());
ResultSet* ret = nullptr;
- int ret_value =
- reader.query("test_table", {"device", "value"}, 10, 50, ret);
+ int ret_value = reader.query("test_table", {"device", "value"}, 0, 50,
ret);
ASSERT_EQ(common::E_OK, ret_value);
+ ASSERT_EQ(ret_value, 0);
auto* table_result_set = (TableResultSet*)ret;
bool has_next = false;
- // There may be error in AlignedChunkReader::read_from_file_and_rewrap.
- // read_from_file_and_rewrap::read_file_->read 308 may get E_FILE_READ_ERR.
+ int cur_line = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
- std::cout << table_result_set->get_value<double>("value");
- std::cout << table_result_set->get_value<common::String*>("device");
+ cur_line++;
+ int64_t timestamp = table_result_set->get_value<int64_t>("time");
+ ASSERT_EQ(table_result_set->get_value<common::String*>("device")
+ ->to_std_string(),
+ "device" + to_string(timestamp));
+ ASSERT_EQ(table_result_set->get_value<double>("value"),
+ timestamp * 1.1);
}
+ ASSERT_EQ(cur_line, 51);
+ table_result_set->close();
reader.destroy_query_data_set(table_result_set);
+
reader.close();
delete table_schema;
}
\ No newline at end of file
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index b17a126c..bd4a4793 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -50,7 +50,6 @@ class TsFileWriterTest : public ::testing::Test {
ASSERT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK);
}
void TearDown() override {
- tsfile_writer_->close();
delete tsfile_writer_;
int ret = remove(file_name_.c_str());
ASSERT_EQ(0, ret);
@@ -111,7 +110,9 @@ class TsFileWriterTest : public ::testing::Test {
}
};
-TEST_F(TsFileWriterTest, InitWithNullWriteFile) {
+class TsFileWriterTestSimple : public ::testing::Test{};
+
+TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) {
TsFileWriter writer;
ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG);
}
@@ -226,6 +227,9 @@ TEST_F(TsFileWriterTest, RegisterTimeSeries) {
storage::MeasurementSchema(measurement_name, data_type,
encoding, compression_type)),
E_OK);
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
}
TEST_F(TsFileWriterTest, WriteMultipleRecords) {
diff --git a/python/tests/test_write_and_read.py
b/python/tests/test_write_and_read.py
index ff86db7f..30013057 100644
--- a/python/tests/test_write_and_read.py
+++ b/python/tests/test_write_and_read.py
@@ -16,16 +16,16 @@
# under the License.
#
-import pytest
-
import os
-from tsfile import TsFileWriter, TsFileReader, ColumnCategory
-from tsfile import TimeseriesSchema, DeviceSchema, ResultSetMetaData
+import pytest
+
from tsfile import ColumnSchema, TableSchema
from tsfile import TSDataType
from tsfile import Tablet, RowRecord, Field
+from tsfile import TimeseriesSchema
from tsfile import TsFileTableWriter
+from tsfile import TsFileWriter, TsFileReader, ColumnCategory
def test_row_record_write_and_read():
@@ -103,7 +103,7 @@ def test_tablet_write_and_read():
os.remove("tablet_write_and_read.tsfile")
-def test_table_writer():
+def test_table_writer_and_reader():
table = TableSchema("test_table",
[ColumnSchema("device", TSDataType.STRING,
ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE,
ColumnCategory.FIELD)])
@@ -117,14 +117,36 @@ def test_table_writer():
tablet.add_value_by_index(1, i, i * 100.0)
writer.write_table(tablet)
- # with TsFileReader("table_write.tsfile") as reader:
- # pass
- # with reader.query_table("test_table", ["device", "value"],
- # 10, 50) as result:
- # while result.next():
- # print(result.get_value_by_name("device"))
- # print(result.get_value_by_name("value"))
-
+ with TsFileReader("table_write.tsfile") as reader:
+ with reader.query_table("test_table", ["device", "value"],
+ 0, 10) as result:
+ cur_line = 0
+ while result.next():
+ cur_time = result.get_value_by_name("time")
+ assert result.get_value_by_name("device") == "device" +
str(cur_time)
+ assert result.get_value_by_name("value") == cur_time *
100.0
+ cur_line = cur_line + 1
+ assert cur_line == 11
+ with reader.query_table("test_table", ["device", "value"],
+ 0, 100) as result:
+ line_num = 0
+ print("dataframe")
+ while result.next():
+ data_frame = result.read_data_frame(max_row_num=30)
+ if 100 - line_num >= 30:
+ assert data_frame.shape == (30, 3)
+ else:
+ assert data_frame.shape == (100 - line_num, 3)
+ line_num += len(data_frame)
+
+ schemas = reader.get_all_table_schemas()
+ assert len(schemas) == 1
+ assert schemas["test_table"] is not None
+ tableSchema = schemas["test_table"]
+ assert tableSchema.get_table_name() == "test_table"
+ print(tableSchema)
+ assert tableSchema.__repr__() == ("TableSchema(test_table,
[ColumnSchema(device,"
+ " STRING, TAG),
ColumnSchema(value, DOUBLE, FIELD)])")
finally:
if os.path.exists("table_write.tsfile"):
os.remove("table_write.tsfile")
diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py
index 19f7a02e..65189b70 100644
--- a/python/tsfile/schema.py
+++ b/python/tsfile/schema.py
@@ -78,6 +78,9 @@ class ColumnSchema:
self.data_type = data_type
self.category = category
+ def __repr__(self) -> str:
+ return f"ColumnSchema({self.column_name}, {self.data_type.name},
{self.category.name})"
+
def get_column_name(self):
return self.column_name
@@ -103,6 +106,9 @@ class TableSchema:
def get_columns(self):
return self.columns
+ def __repr__(self) -> str:
+ return f"TableSchema({self.table_name}, {self.columns})"
+
class ResultSetMetaData:
"""Metadata container for query result sets (columns, types, table
name)."""
@@ -127,8 +133,15 @@ class ResultSetMetaData:
raise OverflowError
return self.column_list[column_index - 1]
- def get_column_name_index(self, column_name: str) -> int:
- return self.column_list.index(self.table_name + "." + column_name) + 1
+ def get_column_name_index(self, column_name: str, is_tree: bool = False)
-> int:
+ """
+ For Tree model, column is full path, column_name means sensor_name.
+ For Table model, column is just column name.
+ """
+ if is_tree:
+ return self.column_list.index(self.table_name + "." + column_name)
+ 1
+ else:
+ return self.column_list.index(column_name) + 1
def get_column_num(self):
return len(self.column_list)
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 9ebe8eee..b4f9ccf8 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -147,25 +147,33 @@ cdef extern from "./tsfile_cwrapper.h":
# row_record
TsRecord _ts_record_new(const char * device_id, int64_t timestamp, int
timeseries_num);
- ErrorCode _insert_data_into_ts_record_by_name_int32_t(TsRecord data, const
char *measurement_name, const int32_t value);
- ErrorCode _insert_data_into_ts_record_by_name_int64_t(TsRecord data, const
char *measurement_name, const int64_t value);
+ ErrorCode _insert_data_into_ts_record_by_name_int32_t(TsRecord data, const
char *measurement_name,
+ const int32_t value);
+ ErrorCode _insert_data_into_ts_record_by_name_int64_t(TsRecord data, const
char *measurement_name,
+ const int64_t value);
ErrorCode _insert_data_into_ts_record_by_name_float(TsRecord data, const
char *measurement_name, const float value);
- ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const
char *measurement_name, const double value);
- ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const
char *measurement_name,const bint value);
-
- void _free_tsfile_ts_record(TsRecord* record);
+ ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const
char *measurement_name,
+ const double value);
+ ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const
char *measurement_name, const bint value);
+ void _free_tsfile_ts_record(TsRecord * record);
# resulSet : query data from tsfile reader
ResultSet tsfile_query_table(TsFileReader reader,
- const char * table_name,
- const char** columns, uint32_t
column_num,
- int64_t start_time, int64_t end_time,
ErrorCode *err_code)
+ const char * table_name,
+ const char** columns, uint32_t column_num,
+ int64_t start_time, int64_t end_time,
ErrorCode *err_code)
ResultSet _tsfile_reader_query_device(TsFileReader reader,
const char *device_name,
char ** sensor_name, uint32_t
sensor_num,
int64_t start_time, int64_t
end_time, ErrorCode *err_code)
+ TableSchema tsfile_reader_get_table_schema(TsFileReader reader,
+ const char * table_name);
+
+ TableSchema * tsfile_reader_get_all_table_schemas(TsFileReader reader,
+ uint32_t * size);
+
# resultSet : get data from resultSet
bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code);
bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t
column_index);
@@ -178,5 +186,6 @@ cdef extern from "./tsfile_cwrapper.h":
float tsfile_result_set_get_value_by_index_float(ResultSet result_set,
uint32_t column_index);
double tsfile_result_set_get_value_by_index_double(ResultSet result_set,
uint32_t column_index);
char * tsfile_result_set_get_value_by_index_string(ResultSet result_set,
uint32_t column_index);
+
ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set);
void free_result_set_meta_data(ResultSetMetaData result_set_meta_data);
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index 926a56da..be1d5290 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -22,6 +22,8 @@ from .tsfile_cpp cimport *
cdef public api inline void check_error(int errcode, const char* context=NULL)
except *
cdef public api object from_c_result_set_meta_data(ResultSetMetaData schema)
+cdef public api object from_c_column_schema(ColumnSchema schema)
+cdef public api object from_c_table_schema(TableSchema schema)
cdef public api TSDataType to_c_data_type(object data_type)
cdef public api TSEncoding to_c_encoding_type(object encoding_type)
cdef public api CompressionType to_c_compression_type(object compression_type)
@@ -48,4 +50,6 @@ cdef public api bint
tsfile_result_set_is_null_by_name_c(ResultSet result_set, o
cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader,
object table_name, object column_list,
int64_t start_time, int64_t
end_time)
cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader,
object device_name, object sensor_list, int64_t start_time,
- int64_t end_time)
\ No newline at end of file
+ int64_t end_time)
+cdef public api object get_table_schema(TsFileReader reader, object table_name)
+cdef public api object get_all_table_schema(TsFileReader reader)
\ No newline at end of file
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index 206850c1..649c0755 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -31,6 +31,7 @@ from tsfile.exceptions import ERROR_MAPPING
from tsfile.schema import ResultSetMetaData as ResultSetMetaDataPy
from tsfile.schema import TSDataType as TSDataTypePy, TSEncoding as
TSEncodingPy
from tsfile.schema import Compressor as CompressorPy, ColumnCategory as
CategoryPy
+from tsfile.schema import TableSchema as TableSchemaPy, ColumnSchema as
ColumnSchemaPy
# check exception and set py exception object
cdef inline void check_error(int errcode, const char* context=NULL) except *:
@@ -57,6 +58,23 @@ cdef object from_c_result_set_meta_data(ResultSetMetaData
schema):
result = ResultSetMetaDataPy(column_list, data_types)
return result
+cdef object from_c_column_schema(ColumnSchema schema):
+ column_name = schema.column_name.decode('utf-8')
+ data_type = TSDataTypePy(schema.data_type)
+ category_type = CategoryPy(schema.column_category)
+ return ColumnSchemaPy(column_name, data_type, category_type)
+
+cdef object from_c_table_schema(TableSchema schema):
+ cdef int i
+ table_name = schema.table_name.decode('utf-8')
+ columns = []
+ for i in range(schema.column_num):
+ columns.append(from_c_column_schema(schema.column_schemas[i]))
+ free_c_table_schema(&schema)
+ return TableSchemaPy(table_name, columns)
+
+
+
# Convert from python to c struct
cdef dict TS_DATA_TYPE_MAP = {
TSDataTypePy.BOOLEAN: TSDataType.TS_DATATYPE_BOOLEAN,
@@ -141,7 +159,6 @@ cdef TimeseriesSchema* to_c_timeseries_schema(object
py_schema):
return c_schema
-
cdef DeviceSchema* to_c_device_schema(object py_schema):
cdef DeviceSchema* c_schema
c_schema = <DeviceSchema *> malloc(sizeof(DeviceSchema))
@@ -409,3 +426,22 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader
reader, object device_na
free(<void*>sensor_list_c)
sensor_list_c = NULL
+cdef object get_table_schema(TsFileReader reader, object table_name):
+ cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name)
+ cdef const char* table_name_c = table_name_bytes
+ cdef TableSchema schema = tsfile_reader_get_table_schema(reader,
table_name_c)
+ return from_c_table_schema(schema)
+
+cdef object get_all_table_schema(TsFileReader reader):
+ cdef uint32_t table_num = 0
+ cdef TableSchema* schemas
+ cdef int i
+
+ table_schemas = {}
+ schemas = tsfile_reader_get_all_table_schemas(reader, &table_num)
+ for i in range(table_num):
+ schema_py = from_c_table_schema(schemas[i])
+ table_schemas.update([(schema_py.get_table_name(), schema_py)])
+ free(schemas)
+ return table_schemas
+
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index 8a1661d4..e168c400 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -49,11 +49,13 @@ cdef class ResultSetPy:
cdef object valid
# The reader
cdef object tsfile_reader
+ cdef object is_tree
- def __init__(self, tsfile_reader : TsFileReaderPy):
+ def __init__(self, tsfile_reader : TsFileReaderPy, is_tree: bint = False):
self.metadata = None
self.valid = True
self.tsfile_reader = weakref.ref(tsfile_reader)
+ self.is_tree = is_tree
cdef init_c(self, ResultSet result, object device_name):
"""
@@ -90,7 +92,7 @@ cdef class ResultSetPy:
)
}
- def read_next_data_frame(self, max_row_num : int = 1024):
+ def read_data_frame(self, max_row_num : int = 1024):
"""
:param max_row_num: default row num: 1024
:return: a dataframe contains data from query result.
@@ -102,23 +104,40 @@ cdef class ResultSetPy:
date_columns = [
column_names[i]
for i in range(column_num)
- if self.metadata.get_data_type(i) == TSDataTypePy.DATE
+ if self.metadata.get_data_type(i + 1) == TSDataTypePy.DATE
]
- data_type = [self.metadata.get_data_type(i).to_pandas_dtype() for i in
range(column_num)]
+ data_type = [self.metadata.get_data_type(i + 1).to_pandas_dtype() for
i in range(column_num)]
data_container = {
column_name: [] for column_name in column_names
}
cur_line = 0
- while self.next() and cur_line < max_row_num:
- row_data = (
- self.get_value_by_index(i)
- for i in range(column_num)
- )
+
+ # User may call result_set.next() before or not, so we just get
current data.
+ # if there is no data in result set, we just get a None list.
+ row_data = [
+ self.get_value_by_index(i + 1)
+ for i in range(column_num)
+ ]
+
+ if not all(value is None for value in row_data):
for column_name, value in zip(column_names, row_data):
data_container[column_name].append(value)
+ cur_line += 1
+
+ while cur_line < max_row_num:
+ if self.next():
+ row_data = (
+ self.get_value_by_index(i + 1)
+ for i in range(column_num)
+ )
+ for column_name, value in zip(column_names, row_data):
+ data_container[column_name].append(value)
+ cur_line += 1
+ else:
+ break
df = pd.DataFrame(data_container)
data_type_dict = {col: dtype for col, dtype in zip(column_names,
data_type)}
@@ -171,10 +190,9 @@ cdef class ResultSetPy:
"""
self.check_result_set_invalid()
if tsfile_result_set_is_null_by_name_c(self.result, column_name):
- print("Get None")
return None
# get index in metadata, metadata ind from 0.
- ind = self.metadata.get_column_name_index(column_name)
+ ind = self.metadata.get_column_name_index(column_name, self.is_tree)
return self.get_value_by_index(ind)
def get_metadata(self):
@@ -200,7 +218,7 @@ cdef class ResultSetPy:
Checks whether the field with the specified column name in the result
set is null.
"""
self.check_result_set_invalid()
- ind = self.metadata.get_column_name_index(name)
+ ind = self.metadata.get_column_name_index(name, self.is_tree)
return self.is_null_by_index(ind + 1)
def check_result_set_invalid(self):
@@ -282,7 +300,7 @@ cdef class TsFileReaderPy:
"""
cdef ResultSet result;
result = tsfile_reader_query_paths_c(self.reader, device_name,
sensor_list, start_time, end_time)
- pyresult = ResultSetPy(self)
+ pyresult = ResultSetPy(self, True)
pyresult.init_c(result, device_name)
self.activate_result_set_list.add(pyresult)
return pyresult
@@ -295,6 +313,19 @@ cdef class TsFileReaderPy:
"""
self.activate_result_set_list.discard(result_set)
+ def get_table_schema(self, table_name : str):
+ """
+ Get table's schema with specify table name.
+ """
+ return get_table_schema(self.reader, table_name)
+
+ def get_all_table_schemas(self):
+ """
+ Get all tables schemas
+ """
+ return get_all_table_schema(self.reader)
+
+
def close(self):
"""
Close TsFile Reader, if reader has result sets, invalid them.