This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch colin_support_read_tree
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/colin_support_read_tree by
this push:
new 847a2d62 support read tree by python.
847a2d62 is described below
commit 847a2d627177f3f6e8313ae110be917e43caf04a
Author: ColinLee <[email protected]>
AuthorDate: Mon Nov 24 09:26:32 2025 +0800
support read tree by python.
---
cpp/src/cwrapper/tsfile_cwrapper.cc | 220 +++++++++++++++++++-----------------
cpp/src/cwrapper/tsfile_cwrapper.h | 4 +
python/tests/test_write_and_read.py | 17 ++-
python/tsfile/tsfile_cpp.pxd | 6 +
python/tsfile/tsfile_py_cpp.pxd | 2 +
python/tsfile/tsfile_py_cpp.pyx | 26 +++++
python/tsfile/tsfile_reader.pyx | 15 +++
python/tsfile/utils.py | 49 +++++---
8 files changed, 214 insertions(+), 125 deletions(-)
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index 1b09db49..ebe8107a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -72,7 +72,7 @@ int set_global_compression(uint8_t compression) {
return common::set_global_compression(compression);
}
-WriteFile write_file_new(const char *pathname, ERRNO *err_code) {
+WriteFile write_file_new(const char* pathname, ERRNO* err_code) {
int ret;
init_tsfile_config();
@@ -86,14 +86,14 @@ WriteFile write_file_new(const char *pathname, ERRNO
*err_code) {
flags |= O_BINARY;
#endif
mode_t mode = 0666;
- storage::WriteFile *file = new storage::WriteFile;
+ storage::WriteFile* file = new storage::WriteFile;
ret = file->create(pathname, flags, mode);
*err_code = ret;
return file;
}
-TsFileWriter tsfile_writer_new(WriteFile file, TableSchema *schema,
- ERRNO *err_code) {
+TsFileWriter tsfile_writer_new(WriteFile file, TableSchema* schema,
+ ERRNO* err_code) {
if (schema->column_num == 0) {
*err_code = common::E_INVALID_SCHEMA;
return nullptr;
@@ -121,19 +121,19 @@ TsFileWriter tsfile_writer_new(WriteFile file,
TableSchema *schema,
static_cast<common::ColumnCategory>(cur_schema.column_category));
}
- storage::TableSchema *table_schema =
+ storage::TableSchema* table_schema =
new storage::TableSchema(schema->table_name, column_schemas);
auto table_writer = new storage::TsFileTableWriter(
- static_cast<storage::WriteFile *>(file), table_schema);
+ static_cast<storage::WriteFile*>(file), table_schema);
delete table_schema;
*err_code = common::E_OK;
return table_writer;
}
TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file,
- TableSchema *schema,
+ TableSchema* schema,
uint64_t memory_threshold,
- ERRNO *err_code) {
+ ERRNO* err_code) {
if (schema->column_num == 0) {
*err_code = common::E_INVALID_SCHEMA;
return nullptr;
@@ -154,17 +154,17 @@ TsFileWriter
tsfile_writer_new_with_memory_threshold(WriteFile file,
static_cast<common::ColumnCategory>(cur_schema.column_category));
}
- storage::TableSchema *table_schema =
+ storage::TableSchema* table_schema =
new storage::TableSchema(schema->table_name, column_schemas);
- auto table_writer =
- new storage::TsFileTableWriter(static_cast<storage::WriteFile *>(file),
- table_schema, memory_threshold);
+ auto table_writer = new storage::TsFileTableWriter(
+ static_cast<storage::WriteFile*>(file), table_schema,
memory_threshold);
*err_code = common::E_OK;
delete table_schema;
return table_writer;
}
-TsFileReader tsfile_reader_new(const char *pathname, ERRNO *err_code) {
+
+TsFileReader tsfile_reader_new(const char* pathname, ERRNO* err_code) {
init_tsfile_config();
auto reader = new storage::TsFileReader();
int ret = reader->open(pathname);
@@ -180,7 +180,7 @@ ERRNO tsfile_writer_close(TsFileWriter writer) {
if (writer == nullptr) {
return common::E_OK;
}
- auto *w = static_cast<storage::TsFileTableWriter *>(writer);
+ auto* w = static_cast<storage::TsFileTableWriter*>(writer);
int ret = w->flush();
if (ret != common::E_OK) {
return ret;
@@ -194,12 +194,12 @@ ERRNO tsfile_writer_close(TsFileWriter writer) {
}
ERRNO tsfile_reader_close(TsFileReader reader) {
- auto *ts_reader = static_cast<storage::TsFileReader *>(reader);
+ auto* ts_reader = static_cast<storage::TsFileReader*>(reader);
delete ts_reader;
return common::E_OK;
}
-Tablet tablet_new(char **column_name_list, TSDataType *data_types,
+Tablet tablet_new(char** column_name_list, TSDataType* data_types,
uint32_t column_num, uint32_t max_rows) {
std::vector<std::string> measurement_list;
std::vector<common::TSDataType> data_type_list;
@@ -212,20 +212,20 @@ Tablet tablet_new(char **column_name_list, TSDataType
*data_types,
}
uint32_t tablet_get_cur_row_size(Tablet tablet) {
- return static_cast<storage::Tablet *>(tablet)->get_cur_row_size();
+ return static_cast<storage::Tablet*>(tablet)->get_cur_row_size();
}
ERRNO tablet_add_timestamp(Tablet tablet, uint32_t row_index,
Timestamp timestamp) {
- return static_cast<storage::Tablet *>(tablet)->add_timestamp(row_index,
- timestamp);
+ return static_cast<storage::Tablet*>(tablet)->add_timestamp(row_index,
+ timestamp);
}
#define TABLET_ADD_VALUE_BY_NAME_DEF(type) \
ERRNO tablet_add_value_by_name_##type(Tablet tablet, uint32_t row_index, \
- const char *column_name, \
+ const char* column_name, \
const type value) { \
- return static_cast<storage::Tablet *>(tablet)->add_value( \
+ return static_cast<storage::Tablet*>(tablet)->add_value( \
row_index, storage::to_lower(column_name), value); \
}
TABLET_ADD_VALUE_BY_NAME_DEF(int32_t);
@@ -235,9 +235,9 @@ TABLET_ADD_VALUE_BY_NAME_DEF(double);
TABLET_ADD_VALUE_BY_NAME_DEF(bool);
ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index,
- const char *column_name,
- const char *value) {
- return static_cast<storage::Tablet *>(tablet)->add_value(
+ const char* column_name,
+ const char* value) {
+ return static_cast<storage::Tablet*>(tablet)->add_value(
row_index, storage::to_lower(column_name), common::String(value));
}
@@ -245,14 +245,14 @@ ERRNO tablet_add_value_by_name_string(Tablet tablet,
uint32_t row_index,
ERRNO tablet_add_value_by_index_##type(Tablet tablet, uint32_t row_index, \
uint32_t column_index, \
const type value) { \
- return static_cast<storage::Tablet *>(tablet)->add_value( \
+ return static_cast<storage::Tablet*>(tablet)->add_value( \
row_index, column_index, value); \
}
ERRNO tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index,
uint32_t column_index,
- const char *value) {
- return static_cast<storage::Tablet *>(tablet)->add_value(
+ const char* value) {
+ return static_cast<storage::Tablet*>(tablet)->add_value(
row_index, column_index, common::String(value));
}
@@ -263,16 +263,16 @@ TABLE_ADD_VALUE_BY_INDEX_DEF(double);
TABLE_ADD_VALUE_BY_INDEX_DEF(bool);
// TsRecord API
-TsRecord _ts_record_new(const char *device_id, Timestamp timestamp,
+TsRecord _ts_record_new(const char* device_id, Timestamp timestamp,
int timeseries_num) {
- auto *record = new storage::TsRecord(timestamp, device_id, timeseries_num);
+ auto* record = new storage::TsRecord(timestamp, device_id, timeseries_num);
return record;
}
#define INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(type) \
ERRNO _insert_data_into_ts_record_by_name_##type( \
- TsRecord data, const char *measurement_name, type value) { \
- auto *record = (storage::TsRecord *)data; \
+ TsRecord data, const char* measurement_name, type value) { \
+ auto* record = (storage::TsRecord*)data; \
storage::DataPoint point(measurement_name, value); \
if (record->points_.size() + 1 > record->points_.capacity()) \
return common::E_BUF_NOT_ENOUGH; \
@@ -302,8 +302,8 @@ return writer;
*/
ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) {
- auto *w = static_cast<storage::TsFileTableWriter *>(writer);
- auto *tbl = static_cast<storage::Tablet *>(tablet);
+ auto* w = static_cast<storage::TsFileTableWriter*>(writer);
+ auto* tbl = static_cast<storage::Tablet*>(tablet);
return w->write_table(*tbl);
}
@@ -314,12 +314,12 @@ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet
tablet) {
// Query
-ResultSet tsfile_query_table(TsFileReader reader, const char *table_name,
- char **columns, uint32_t column_num,
+ResultSet tsfile_query_table(TsFileReader reader, const char* table_name,
+ char** columns, uint32_t column_num,
Timestamp start_time, Timestamp end_time,
- ERRNO *err_code) {
- auto *r = static_cast<storage::TsFileReader *>(reader);
- storage::ResultSet *table_result_set = nullptr;
+ ERRNO* err_code) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
+ storage::ResultSet* table_result_set = nullptr;
std::vector<std::string> column_names;
for (uint32_t i = 0; i < column_num; i++) {
column_names.emplace_back(columns[i]);
@@ -329,8 +329,22 @@ ResultSet tsfile_query_table(TsFileReader reader, const
char *table_name,
return table_result_set;
}
-bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) {
- auto *r = static_cast<storage::TableResultSet *>(result_set);
+ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
+ uint32_t column_num, Timestamp start_time,
+ Timestamp end_time, ERRNO* err_code) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
+ storage::ResultSet* table_result_set = nullptr;
+ std::vector<std::string> column_names;
+ for (uint32_t i = 0; i < column_num; i++) {
+ column_names.emplace_back(columns[i]);
+ }
+ *err_code = r->query_table_on_tree(column_names, start_time, end_time,
+ table_result_set);
+ return table_result_set;
+}
+
+bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) {
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
bool has_next = true;
int ret = common::E_OK;
ret = r->next(has_next);
@@ -343,8 +357,8 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO
*err_code) {
#define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type)
\
type tsfile_result_set_get_value_by_name_##type(ResultSet result_set,
\
- const char *column_name) {
\
- auto *r = static_cast<storage::TableResultSet *>(result_set);
\
+ const char* column_name) {
\
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
\
std::string column_name_(column_name);
\
return r->get_value<type>(column_name_);
\
}
@@ -354,13 +368,13 @@ TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(int32_t);
TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(int64_t);
TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(float);
TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(double);
-char *tsfile_result_set_get_value_by_name_string(ResultSet result_set,
- const char *column_name) {
- auto *r = static_cast<storage::TableResultSet *>(result_set);
+char* tsfile_result_set_get_value_by_name_string(ResultSet result_set,
+ const char* column_name) {
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
std::string column_name_(column_name);
- common::String *ret = r->get_value<common::String *>(column_name_);
+ common::String* ret = r->get_value<common::String*>(column_name_);
// Caller should free return's char* 's space.
- char *dup = (char *)malloc(ret->len_ + 1);
+ char* dup = (char*)malloc(ret->len_ + 1);
if (dup) {
memcpy(dup, ret->buf_, ret->len_);
dup[ret->len_] = '\0';
@@ -371,7 +385,7 @@ char *tsfile_result_set_get_value_by_name_string(ResultSet
result_set,
#define TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(type) \
type tsfile_result_set_get_value_by_index_##type(ResultSet result_set, \
uint32_t column_index) { \
- auto *r = static_cast<storage::TableResultSet *>(result_set); \
+ auto* r = static_cast<storage::TableResultSet*>(result_set); \
return r->get_value<type>(column_index); \
}
@@ -381,12 +395,12 @@ TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(float);
TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(double);
TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(bool);
-char *tsfile_result_set_get_value_by_index_string(ResultSet result_set,
+char* tsfile_result_set_get_value_by_index_string(ResultSet result_set,
uint32_t column_index) {
- auto *r = static_cast<storage::TableResultSet *>(result_set);
- common::String *ret = r->get_value<common::String *>(column_index);
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
+ common::String* ret = r->get_value<common::String*>(column_index);
// Caller should free return's char* 's space.
- char *dup = (char *)malloc(ret->len_ + 1);
+ char* dup = (char*)malloc(ret->len_ + 1);
if (dup) {
memcpy(dup, ret->buf_, ret->len_);
dup[ret->len_] = '\0';
@@ -395,19 +409,19 @@ char
*tsfile_result_set_get_value_by_index_string(ResultSet result_set,
}
bool tsfile_result_set_is_null_by_name(ResultSet result_set,
- const char *column_name) {
- auto *r = static_cast<storage::TableResultSet *>(result_set);
+ const char* column_name) {
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
return r->is_null(column_name);
}
bool tsfile_result_set_is_null_by_index(const ResultSet result_set,
const uint32_t column_index) {
- auto *r = static_cast<storage::TableResultSet *>(result_set);
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
return r->is_null(column_index);
}
ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) {
- auto *r = static_cast<storage::TableResultSet *>(result_set);
+ auto* r = static_cast<storage::TableResultSet*>(result_set);
if (result_set == NULL) {
return ResultSetMetaData();
}
@@ -417,8 +431,8 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet
result_set) {
r->get_metadata();
meta_data.column_num = result_set_metadata->get_column_count();
meta_data.column_names =
- static_cast<char **>(malloc(meta_data.column_num * sizeof(char *)));
- meta_data.data_types = static_cast<TSDataType *>(
+ static_cast<char**>(malloc(meta_data.column_num * sizeof(char*)));
+ meta_data.data_types = static_cast<TSDataType*>(
malloc(meta_data.column_num * sizeof(TSDataType)));
for (int i = 0; i < meta_data.column_num; i++) {
meta_data.column_names[i] =
@@ -429,7 +443,7 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet
result_set) {
return meta_data;
}
-char *tsfile_result_set_metadata_get_column_name(ResultSetMetaData result_set,
+char* tsfile_result_set_metadata_get_column_name(ResultSetMetaData result_set,
uint32_t column_index) {
if (column_index > (uint32_t)result_set.column_num) {
return nullptr;
@@ -482,15 +496,15 @@ int
tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set) {
// }
TableSchema tsfile_reader_get_table_schema(TsFileReader reader,
- const char *table_name) {
- auto *r = static_cast<storage::TsFileReader *>(reader);
+ const char* table_name) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
auto table_shcema = r->get_table_schema(table_name);
TableSchema ret_schema;
ret_schema.table_name = strdup(table_shcema->get_table_name().c_str());
int column_num = table_shcema->get_columns_num();
ret_schema.column_num = column_num;
ret_schema.column_schemas =
- static_cast<ColumnSchema *>(malloc(sizeof(ColumnSchema) * column_num));
+ static_cast<ColumnSchema*>(malloc(sizeof(ColumnSchema) * column_num));
for (int i = 0; i < column_num; i++) {
auto column_schema = table_shcema->get_measurement_schemas()[i];
ret_schema.column_schemas[i].column_name =
@@ -504,18 +518,18 @@ TableSchema tsfile_reader_get_table_schema(TsFileReader
reader,
return ret_schema;
}
-TableSchema *tsfile_reader_get_all_table_schemas(TsFileReader reader,
- uint32_t *size) {
- auto *r = static_cast<storage::TsFileReader *>(reader);
+TableSchema* tsfile_reader_get_all_table_schemas(TsFileReader reader,
+ uint32_t* size) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
auto table_schemas = r->get_all_table_schemas();
size_t table_num = table_schemas.size();
- TableSchema *ret =
- static_cast<TableSchema *>(malloc(sizeof(TableSchema) * table_num));
+ TableSchema* ret =
+ static_cast<TableSchema*>(malloc(sizeof(TableSchema) * table_num));
for (size_t i = 0; i < table_schemas.size(); i++) {
ret[i].table_name = strdup(table_schemas[i]->get_table_name().c_str());
int column_num = table_schemas[i]->get_columns_num();
ret[i].column_num = column_num;
- ret[i].column_schemas = static_cast<ColumnSchema *>(
+ ret[i].column_schemas = static_cast<ColumnSchema*>(
malloc(column_num * sizeof(ColumnSchema)));
auto column_schemas = table_schemas[i]->get_measurement_schemas();
for (int j = 0; j < column_num; j++) {
@@ -533,23 +547,23 @@ TableSchema
*tsfile_reader_get_all_table_schemas(TsFileReader reader,
}
// delete pointer
-void _free_tsfile_ts_record(TsRecord *record) {
+void _free_tsfile_ts_record(TsRecord* record) {
if (*record != nullptr) {
- delete static_cast<storage::TsRecord *>(*record);
+ delete static_cast<storage::TsRecord*>(*record);
}
*record = nullptr;
}
-void free_tablet(Tablet *tablet) {
+void free_tablet(Tablet* tablet) {
if (*tablet != nullptr) {
- delete static_cast<storage::Tablet *>(*tablet);
+ delete static_cast<storage::Tablet*>(*tablet);
}
*tablet = nullptr;
}
-void free_tsfile_result_set(ResultSet *result_set) {
+void free_tsfile_result_set(ResultSet* result_set) {
if (*result_set != nullptr) {
- delete static_cast<storage::ResultSet *>(*result_set);
+ delete static_cast<storage::ResultSet*>(*result_set);
}
*result_set = nullptr;
}
@@ -583,15 +597,15 @@ void free_table_schema(TableSchema schema) {
}
void free_column_schema(ColumnSchema schema) { free(schema.column_name); }
-void free_write_file(WriteFile *write_file) {
- auto f = static_cast<storage::WriteFile *>(*write_file);
+void free_write_file(WriteFile* write_file) {
+ auto f = static_cast<storage::WriteFile*>(*write_file);
delete f;
*write_file = nullptr;
}
// For Python API
-TsFileWriter _tsfile_writer_new(const char *pathname, uint64_t
memory_threshold,
- ERRNO *err_code) {
+TsFileWriter _tsfile_writer_new(const char* pathname, uint64_t
memory_threshold,
+ ERRNO* err_code) {
init_tsfile_config();
auto writer = new storage::TsFileWriter();
int flags = O_WRONLY | O_CREAT | O_TRUNC;
@@ -608,9 +622,9 @@ TsFileWriter _tsfile_writer_new(const char *pathname,
uint64_t memory_threshold,
return writer;
}
-Tablet _tablet_new_with_target_name(const char *device_id,
- char **column_name_list,
- TSDataType *data_types, int column_num,
+Tablet _tablet_new_with_target_name(const char* device_id,
+ char** column_name_list,
+ TSDataType* data_types, int column_num,
int max_rows) {
std::vector<std::string> measurement_list;
std::vector<common::TSDataType> data_type_list;
@@ -627,27 +641,27 @@ Tablet _tablet_new_with_target_name(const char *device_id,
}
}
-ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema *schema) {
- std::vector<storage::MeasurementSchema *> measurement_schemas;
+ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema* schema) {
+ std::vector<storage::MeasurementSchema*> measurement_schemas;
std::vector<common::ColumnCategory> column_categories;
measurement_schemas.resize(schema->column_num);
for (int i = 0; i < schema->column_num; i++) {
- ColumnSchema *cur_schema = schema->column_schemas + i;
+ ColumnSchema* cur_schema = schema->column_schemas + i;
measurement_schemas[i] = new storage::MeasurementSchema(
cur_schema->column_name,
static_cast<common::TSDataType>(cur_schema->data_type));
column_categories.push_back(
static_cast<common::ColumnCategory>(cur_schema->column_category));
}
- auto tsfile_writer = static_cast<storage::TsFileWriter *>(writer);
+ auto tsfile_writer = static_cast<storage::TsFileWriter*>(writer);
return
tsfile_writer->register_table(std::make_shared<storage::TableSchema>(
schema->table_name, measurement_schemas, column_categories));
}
ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer,
- const char *device_id,
- const TimeseriesSchema *schema) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
+ const char* device_id,
+ const TimeseriesSchema* schema) {
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
int ret = w->register_timeseries(
device_id,
@@ -660,8 +674,8 @@ ERRNO _tsfile_writer_register_timeseries(TsFileWriter
writer,
}
ERRNO _tsfile_writer_register_device(TsFileWriter writer,
- const device_schema *device_schema) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
+ const device_schema* device_schema) {
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
for (int column_id = 0; column_id < device_schema->timeseries_num;
column_id++) {
TimeseriesSchema schema = device_schema->timeseries_schema[column_id];
@@ -680,26 +694,26 @@ ERRNO _tsfile_writer_register_device(TsFileWriter writer,
}
ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
- const auto *tbl = static_cast<storage::Tablet *>(tablet);
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
+ const auto* tbl = static_cast<storage::Tablet*>(tablet);
return w->write_tablet(*tbl);
}
ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
- auto *tbl = static_cast<storage::Tablet *>(tablet);
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
+ auto* tbl = static_cast<storage::Tablet*>(tablet);
return w->write_table(*tbl);
}
ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
- const storage::TsRecord *record = static_cast<storage::TsRecord *>(data);
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
+ const storage::TsRecord* record = static_cast<storage::TsRecord*>(data);
const int ret = w->write_record(*record);
return ret;
}
ERRNO _tsfile_writer_close(TsFileWriter writer) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
int ret = w->flush();
if (ret != common::E_OK) {
return ret;
@@ -713,23 +727,23 @@ ERRNO _tsfile_writer_close(TsFileWriter writer) {
}
ERRNO _tsfile_writer_flush(TsFileWriter writer) {
- auto *w = static_cast<storage::TsFileWriter *>(writer);
+ auto* w = static_cast<storage::TsFileWriter*>(writer);
return w->flush();
}
ResultSet _tsfile_reader_query_device(TsFileReader reader,
- const char *device_name,
- char **sensor_name, uint32_t sensor_num,
+ const char* device_name,
+ char** sensor_name, uint32_t sensor_num,
Timestamp start_time, Timestamp end_time,
- ERRNO *err_code) {
- auto *r = static_cast<storage::TsFileReader *>(reader);
+ ERRNO* err_code) {
+ auto* r = static_cast<storage::TsFileReader*>(reader);
std::vector<std::string> selected_paths;
selected_paths.reserve(sensor_num);
for (uint32_t i = 0; i < sensor_num; i++) {
selected_paths.push_back(std::string(device_name) + "." +
std::string(sensor_name[i]));
}
- storage::ResultSet *qds = nullptr;
+ storage::ResultSet* qds = nullptr;
*err_code = r->query(selected_paths, start_time, end_time, qds);
return qds;
}
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h
b/cpp/src/cwrapper/tsfile_cwrapper.h
index 75dc0364..f94325aa 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -428,6 +428,10 @@ ResultSet tsfile_query_table(TsFileReader reader, const
char* table_name,
char** columns, uint32_t column_num,
Timestamp start_time, Timestamp end_time,
ERRNO* err_code);
+
+ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
+ uint32_t column_num, Timestamp start_time,
+ Timestamp end_time, ERRNO* err_code);
// ResultSet tsfile_reader_query_device(TsFileReader reader,
// const char* device_name,
// char** sensor_name, uint32_t
sensor_num,
diff --git a/python/tests/test_write_and_read.py
b/python/tests/test_write_and_read.py
index e5c87ab9..da6cc5c9 100644
--- a/python/tests/test_write_and_read.py
+++ b/python/tests/test_write_and_read.py
@@ -34,17 +34,18 @@ from tsfile.exceptions import TableNotExistError,
ColumnNotExistError, NotSuppor
def test_row_record_write_and_read():
try:
writer = TsFileWriter("record_write_and_read.tsfile")
- timeseries = TimeseriesSchema("level1", TSDataType.INT64)
- writer.register_timeseries("root.device1", timeseries)
+ writer.register_timeseries("root.device1", TimeseriesSchema("level1",
TSDataType.INT64))
writer.register_timeseries("root.device1", TimeseriesSchema("level2",
TSDataType.DOUBLE))
- writer.register_timeseries("root.device1", TimeseriesSchema("level3",
TSDataType.INT32))
+ writer.register_timeseries("root.device2", TimeseriesSchema("level1",
TSDataType.INT32))
max_row_num = 1000
for i in range(max_row_num):
row = RowRecord("root.device1", i,
[Field("level1", i + 1, TSDataType.INT64),
- Field("level2", i * 1.1, TSDataType.DOUBLE),
- Field("level3", i * 2, TSDataType.INT32)])
+ Field("level2", i * 1.1, TSDataType.DOUBLE)])
+ writer.write_row_record(row)
+ row = RowRecord("root.device2", i,
+ [Field("level1", i + 1, TSDataType.INT32)])
writer.write_row_record(row)
writer.close()
@@ -56,8 +57,14 @@ def test_row_record_write_and_read():
print(result.get_value_by_index(1))
print(reader.get_active_query_result())
result.close()
+ result2 = reader.query_table_on_tree(["level1", "level2"], 20, 50)
+ print(result2.read_data_frame())
+ result2.close()
print(reader.get_active_query_result())
reader.close()
+
+
+
finally:
if os.path.exists("record_write_and_read.tsfile"):
os.remove("record_write_and_read.tsfile")
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index 1b04051c..e9a67364 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -167,6 +167,12 @@ cdef extern from "./tsfile_cwrapper.h":
const char * table_name,
const char** columns, uint32_t column_num,
int64_t start_time, int64_t end_time,
ErrorCode *err_code)
+
+ ResultSet tsfile_query_table_on_tree(TsFileReader reader,
+ char** columns, uint32_t column_num,
+ int64_t start_time, int64_t end_time,
+ ErrorCode* err_code);
+
ResultSet _tsfile_reader_query_device(TsFileReader reader,
const char *device_name,
char ** sensor_name, uint32_t
sensor_num,
diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd
index ce907a79..6ed03838 100644
--- a/python/tsfile/tsfile_py_cpp.pxd
+++ b/python/tsfile/tsfile_py_cpp.pxd
@@ -49,6 +49,8 @@ cdef public api ErrorCode
tsfile_writer_register_table_py_cpp(TsFileWriter write
cdef public api bint tsfile_result_set_is_null_by_name_c(ResultSet result_set,
object name)
cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader,
object table_name, object column_list,
int64_t start_time, int64_t
end_time)
+cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader
reader, object column_list,
+ int64_t start_time, int64_t
end_time)
cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader,
object device_name, object sensor_list, int64_t start_time,
int64_t end_time)
cdef public api object get_table_schema(TsFileReader reader, object table_name)
diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx
index e1743039..1d7af385 100644
--- a/python/tsfile/tsfile_py_cpp.pyx
+++ b/python/tsfile/tsfile_py_cpp.pyx
@@ -515,6 +515,32 @@ cdef ResultSet tsfile_reader_query_table_c(TsFileReader
reader, object table_nam
free(<void*>columns)
columns = NULL
+cdef ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object
column_list,
+ int64_t start_time, int64_t
end_time):
+ cdef ResultSet result
+ cdef int column_num = len(column_list)
+ cdef char** columns = <char**> malloc(sizeof(char*) * column_num)
+ cdef int i
+ cdef ErrorCode code = 0
+ if columns == NULL:
+ raise MemoryError("Failed to allocate memory for columns")
+ try:
+ for i in range(column_num):
+ columns[i] = strdup((<str>column_list[i]).encode('utf-8'))
+ if columns[i] == NULL:
+ raise MemoryError("Failed to allocate memory for column name")
+ result = tsfile_query_table_on_tree(reader, columns, column_num,
start_time, end_time, &code)
+ check_error(code)
+ return result
+ finally:
+ if columns != NULL:
+ for i in range(column_num):
+ free(<void*>columns[i])
+ columns[i] = NULL
+ free(<void*>columns)
+ columns = NULL
+
+
cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object
device_name, object sensor_list, int64_t start_time,
int64_t end_time):
cdef ResultSet result
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index e8d38d7d..7e29213b 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -291,6 +291,21 @@ cdef class TsFileReaderPy:
pyresult.init_c(result, table_name)
self.activate_result_set_list.add(pyresult)
return pyresult
+
+ def query_table_on_tree(self, column_names : List[str],
+ start_time : int = INT64_MIN, end_time : int = INT64_MAX)
-> ResultSetPy:
+ """
+ Execute a time range query on specified columns on tree structure.
+ :return: query result handler.
+ """
+ cdef ResultSet result;
+ result = tsfile_reader_query_table_on_tree_c(self.reader,
+ [column_name.lower() for
column_name in column_names], start_time,
+ end_time)
+ pyresult = ResultSetPy(self, True)
+ pyresult.init_c(result, "root")
+ self.activate_result_set_list.add(pyresult)
+ return pyresult
def query_timeseries(self, device_name : str, sensor_list : List[str],
start_time : int = 0,
end_time : int = 0) -> ResultSetPy:
diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py
index 3d236606..f1fd51e7 100644
--- a/python/tsfile/utils.py
+++ b/python/tsfile/utils.py
@@ -29,28 +29,42 @@ def to_dataframe(file_path: str,
with TsFileReaderPy(file_path) as reader:
total_rows = 0
table_schema = reader.get_all_table_schemas()
- if len(table_schema) == 0:
- raise TableNotExistError("Not found any table in the TsFile.")
- if table_name is None:
- # get the first table name by default
- table_name, columns = next(iter(table_schema.items()))
+
+ # 判断是树模型还是表模型
+ is_tree_model = len(table_schema) == 0
+
+ if is_tree_model:
+ # 树模型需要明确指定列名
+ if column_names is None:
+ raise ValueError("树模型需要明确指定 column_names 参数")
else:
- if table_name not in table_schema:
- raise TableNotExistError(table_name)
- columns = table_schema[table_name]
+ # 表模型的处理逻辑
+ if table_name is None:
+ # get the first table name by default
+ table_name, columns = next(iter(table_schema.items()))
+ else:
+ if table_name not in table_schema:
+ raise TableNotExistError(table_name)
+ columns = table_schema[table_name]
- column_names_in_file = columns.get_column_names()
+ column_names_in_file = columns.get_column_names()
- if column_names is not None:
- for column in column_names:
- if column not in column_names_in_file:
- raise ColumnNotExistError(column)
- else:
- column_names = column_names_in_file
+ if column_names is not None:
+ for column in column_names:
+ if column not in column_names_in_file:
+ raise ColumnNotExistError(column)
+ else:
+ column_names = column_names_in_file
+ # 统一处理查询结果
df_list: list[pd.DataFrame] = []
-
- with reader.query_table(table_name, column_names) as result:
+
+ if is_tree_model:
+ query_result = reader.query_table_on_tree(column_names)
+ else:
+ query_result = reader.query_table(table_name, column_names)
+
+ with query_result as result:
while result.next():
if max_row_num is not None:
remaining_rows = max_row_num - total_rows
@@ -63,5 +77,6 @@ def to_dataframe(file_path: str,
else:
df = result.read_data_frame()
df_list.append(df)
+
df = pd.concat(df_list, ignore_index=True)
return df