This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_support_read_tree in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 53dd3a925aca9fec65d16de066c737be39b1fcbc Author: ColinLee <[email protected]> AuthorDate: Mon Nov 24 09:26:32 2025 +0800 support read tree by python. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 220 +++++++++++++++++++----------------- cpp/src/cwrapper/tsfile_cwrapper.h | 4 + python/tests/test_write_and_read.py | 17 ++- python/tsfile/tsfile_cpp.pxd | 6 + python/tsfile/tsfile_py_cpp.pxd | 2 + python/tsfile/tsfile_py_cpp.pyx | 26 +++++ python/tsfile/tsfile_reader.pyx | 15 +++ python/tsfile/utils.py | 49 +++++--- 8 files changed, 214 insertions(+), 125 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 1b09db49..ebe8107a 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -72,7 +72,7 @@ int set_global_compression(uint8_t compression) { return common::set_global_compression(compression); } -WriteFile write_file_new(const char *pathname, ERRNO *err_code) { +WriteFile write_file_new(const char* pathname, ERRNO* err_code) { int ret; init_tsfile_config(); @@ -86,14 +86,14 @@ WriteFile write_file_new(const char *pathname, ERRNO *err_code) { flags |= O_BINARY; #endif mode_t mode = 0666; - storage::WriteFile *file = new storage::WriteFile; + storage::WriteFile* file = new storage::WriteFile; ret = file->create(pathname, flags, mode); *err_code = ret; return file; } -TsFileWriter tsfile_writer_new(WriteFile file, TableSchema *schema, - ERRNO *err_code) { +TsFileWriter tsfile_writer_new(WriteFile file, TableSchema* schema, + ERRNO* err_code) { if (schema->column_num == 0) { *err_code = common::E_INVALID_SCHEMA; return nullptr; @@ -121,19 +121,19 @@ TsFileWriter tsfile_writer_new(WriteFile file, TableSchema *schema, static_cast<common::ColumnCategory>(cur_schema.column_category)); } - storage::TableSchema *table_schema = + storage::TableSchema* table_schema = new storage::TableSchema(schema->table_name, column_schemas); auto table_writer = new storage::TsFileTableWriter( - static_cast<storage::WriteFile *>(file), table_schema); + static_cast<storage::WriteFile*>(file), table_schema); delete table_schema; *err_code = common::E_OK; return table_writer; } TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file, - TableSchema *schema, + TableSchema* schema, uint64_t memory_threshold, - ERRNO *err_code) { + ERRNO* err_code) { if (schema->column_num == 0) { *err_code = common::E_INVALID_SCHEMA; return nullptr; @@ -154,17 +154,17 @@ TsFileWriter tsfile_writer_new_with_memory_threshold(WriteFile file, static_cast<common::ColumnCategory>(cur_schema.column_category)); } - storage::TableSchema *table_schema = + storage::TableSchema* table_schema = new storage::TableSchema(schema->table_name, column_schemas); - auto table_writer = - new storage::TsFileTableWriter(static_cast<storage::WriteFile *>(file), - table_schema, memory_threshold); + auto table_writer = new storage::TsFileTableWriter( + static_cast<storage::WriteFile*>(file), table_schema, memory_threshold); *err_code = common::E_OK; delete table_schema; return table_writer; } -TsFileReader tsfile_reader_new(const char *pathname, ERRNO *err_code) { + +TsFileReader tsfile_reader_new(const char* pathname, ERRNO* err_code) { init_tsfile_config(); auto reader = new storage::TsFileReader(); int ret = reader->open(pathname); @@ -180,7 +180,7 @@ ERRNO tsfile_writer_close(TsFileWriter writer) { if (writer == nullptr) { return common::E_OK; } - auto *w = static_cast<storage::TsFileTableWriter *>(writer); + auto* w = static_cast<storage::TsFileTableWriter*>(writer); int ret = w->flush(); if (ret != common::E_OK) { return ret; @@ -194,12 +194,12 @@ ERRNO tsfile_writer_close(TsFileWriter writer) { } ERRNO tsfile_reader_close(TsFileReader reader) { - auto *ts_reader = static_cast<storage::TsFileReader *>(reader); + auto* ts_reader = static_cast<storage::TsFileReader*>(reader); delete ts_reader; return common::E_OK; } -Tablet tablet_new(char **column_name_list, TSDataType *data_types, +Tablet tablet_new(char** column_name_list, TSDataType* data_types, uint32_t column_num, uint32_t max_rows) { std::vector<std::string> measurement_list; std::vector<common::TSDataType> data_type_list; @@ -212,20 +212,20 @@ Tablet tablet_new(char **column_name_list, TSDataType *data_types, } uint32_t tablet_get_cur_row_size(Tablet tablet) { - return static_cast<storage::Tablet *>(tablet)->get_cur_row_size(); + return static_cast<storage::Tablet*>(tablet)->get_cur_row_size(); } ERRNO tablet_add_timestamp(Tablet tablet, uint32_t row_index, Timestamp timestamp) { - return static_cast<storage::Tablet *>(tablet)->add_timestamp(row_index, - timestamp); + return static_cast<storage::Tablet*>(tablet)->add_timestamp(row_index, + timestamp); } #define TABLET_ADD_VALUE_BY_NAME_DEF(type) \ ERRNO tablet_add_value_by_name_##type(Tablet tablet, uint32_t row_index, \ - const char *column_name, \ + const char* column_name, \ const type value) { \ - return static_cast<storage::Tablet *>(tablet)->add_value( \ + return static_cast<storage::Tablet*>(tablet)->add_value( \ row_index, storage::to_lower(column_name), value); \ } TABLET_ADD_VALUE_BY_NAME_DEF(int32_t); @@ -235,9 +235,9 @@ TABLET_ADD_VALUE_BY_NAME_DEF(double); TABLET_ADD_VALUE_BY_NAME_DEF(bool); ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, - const char *column_name, - const char *value) { - return static_cast<storage::Tablet *>(tablet)->add_value( + const char* column_name, + const char* value) { + return static_cast<storage::Tablet*>(tablet)->add_value( row_index, storage::to_lower(column_name), common::String(value)); } @@ -245,14 +245,14 @@ ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, ERRNO tablet_add_value_by_index_##type(Tablet tablet, uint32_t row_index, \ uint32_t column_index, \ const type value) { \ - return static_cast<storage::Tablet *>(tablet)->add_value( \ + return static_cast<storage::Tablet*>(tablet)->add_value( \ row_index, column_index, value); \ } ERRNO tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index, uint32_t column_index, - const char *value) { - return static_cast<storage::Tablet *>(tablet)->add_value( + const char* value) { + return static_cast<storage::Tablet*>(tablet)->add_value( row_index, column_index, common::String(value)); } @@ -263,16 +263,16 @@ TABLE_ADD_VALUE_BY_INDEX_DEF(double); TABLE_ADD_VALUE_BY_INDEX_DEF(bool); // TsRecord API -TsRecord _ts_record_new(const char *device_id, Timestamp timestamp, +TsRecord _ts_record_new(const char* device_id, Timestamp timestamp, int timeseries_num) { - auto *record = new storage::TsRecord(timestamp, device_id, timeseries_num); + auto* record = new storage::TsRecord(timestamp, device_id, timeseries_num); return record; } #define INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(type) \ ERRNO _insert_data_into_ts_record_by_name_##type( \ - TsRecord data, const char *measurement_name, type value) { \ - auto *record = (storage::TsRecord *)data; \ + TsRecord data, const char* measurement_name, type value) { \ + auto* record = (storage::TsRecord*)data; \ storage::DataPoint point(measurement_name, value); \ if (record->points_.size() + 1 > record->points_.capacity()) \ return common::E_BUF_NOT_ENOUGH; \ @@ -302,8 +302,8 @@ return writer; */ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) { - auto *w = static_cast<storage::TsFileTableWriter *>(writer); - auto *tbl = static_cast<storage::Tablet *>(tablet); + auto* w = static_cast<storage::TsFileTableWriter*>(writer); + auto* tbl = static_cast<storage::Tablet*>(tablet); return w->write_table(*tbl); } @@ -314,12 +314,12 @@ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) { // Query -ResultSet tsfile_query_table(TsFileReader reader, const char *table_name, - char **columns, uint32_t column_num, +ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, - ERRNO *err_code) { - auto *r = static_cast<storage::TsFileReader *>(reader); - storage::ResultSet *table_result_set = nullptr; + ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); + storage::ResultSet* table_result_set = nullptr; std::vector<std::string> column_names; for (uint32_t i = 0; i < column_num; i++) { column_names.emplace_back(columns[i]); @@ -329,8 +329,22 @@ ResultSet tsfile_query_table(TsFileReader reader, const char *table_name, return table_result_set; } -bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) { - auto *r = static_cast<storage::TableResultSet *>(result_set); +ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, + uint32_t column_num, Timestamp start_time, + Timestamp end_time, ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector<std::string> column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query_table_on_tree(column_names, start_time, end_time, + table_result_set); + return table_result_set; +} + +bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { + auto* r = static_cast<storage::TableResultSet*>(result_set); bool has_next = true; int ret = common::E_OK; ret = r->next(has_next); @@ -343,8 +357,8 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) { #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ - const char *column_name) { \ - auto *r = static_cast<storage::TableResultSet *>(result_set); \ + const char* column_name) { \ + auto* r = static_cast<storage::TableResultSet*>(result_set); \ std::string column_name_(column_name); \ return r->get_value<type>(column_name_); \ } @@ -354,13 +368,13 @@ TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(int32_t); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(int64_t); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(float); TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(double); -char *tsfile_result_set_get_value_by_name_string(ResultSet result_set, - const char *column_name) { - auto *r = static_cast<storage::TableResultSet *>(result_set); +char* tsfile_result_set_get_value_by_name_string(ResultSet result_set, + const char* column_name) { + auto* r = static_cast<storage::TableResultSet*>(result_set); std::string column_name_(column_name); - common::String *ret = r->get_value<common::String *>(column_name_); + common::String* ret = r->get_value<common::String*>(column_name_); // Caller should free return's char* 's space. - char *dup = (char *)malloc(ret->len_ + 1); + char* dup = (char*)malloc(ret->len_ + 1); if (dup) { memcpy(dup, ret->buf_, ret->len_); dup[ret->len_] = '\0'; @@ -371,7 +385,7 @@ char *tsfile_result_set_get_value_by_name_string(ResultSet result_set, #define TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(type) \ type tsfile_result_set_get_value_by_index_##type(ResultSet result_set, \ uint32_t column_index) { \ - auto *r = static_cast<storage::TableResultSet *>(result_set); \ + auto* r = static_cast<storage::TableResultSet*>(result_set); \ return r->get_value<type>(column_index); \ } @@ -381,12 +395,12 @@ TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(float); TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(double); TSFILE_RESULT_SET_GET_VALUE_BY_INDEX_DEF(bool); -char *tsfile_result_set_get_value_by_index_string(ResultSet result_set, +char* tsfile_result_set_get_value_by_index_string(ResultSet result_set, uint32_t column_index) { - auto *r = static_cast<storage::TableResultSet *>(result_set); - common::String *ret = r->get_value<common::String *>(column_index); + auto* r = static_cast<storage::TableResultSet*>(result_set); + common::String* ret = r->get_value<common::String*>(column_index); // Caller should free return's char* 's space. - char *dup = (char *)malloc(ret->len_ + 1); + char* dup = (char*)malloc(ret->len_ + 1); if (dup) { memcpy(dup, ret->buf_, ret->len_); dup[ret->len_] = '\0'; @@ -395,19 +409,19 @@ char *tsfile_result_set_get_value_by_index_string(ResultSet result_set, } bool tsfile_result_set_is_null_by_name(ResultSet result_set, - const char *column_name) { - auto *r = static_cast<storage::TableResultSet *>(result_set); + const char* column_name) { + auto* r = static_cast<storage::TableResultSet*>(result_set); return r->is_null(column_name); } bool tsfile_result_set_is_null_by_index(const ResultSet result_set, const uint32_t column_index) { - auto *r = static_cast<storage::TableResultSet *>(result_set); + auto* r = static_cast<storage::TableResultSet*>(result_set); return r->is_null(column_index); } ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { - auto *r = static_cast<storage::TableResultSet *>(result_set); + auto* r = static_cast<storage::TableResultSet*>(result_set); if (result_set == NULL) { return ResultSetMetaData(); } @@ -417,8 +431,8 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { r->get_metadata(); meta_data.column_num = result_set_metadata->get_column_count(); meta_data.column_names = - static_cast<char **>(malloc(meta_data.column_num * sizeof(char *))); - meta_data.data_types = static_cast<TSDataType *>( + static_cast<char**>(malloc(meta_data.column_num * sizeof(char*))); + meta_data.data_types = static_cast<TSDataType*>( malloc(meta_data.column_num * sizeof(TSDataType))); for (int i = 0; i < meta_data.column_num; i++) { meta_data.column_names[i] = @@ -429,7 +443,7 @@ ResultSetMetaData tsfile_result_set_get_metadata(ResultSet result_set) { return meta_data; } -char *tsfile_result_set_metadata_get_column_name(ResultSetMetaData result_set, +char* tsfile_result_set_metadata_get_column_name(ResultSetMetaData result_set, uint32_t column_index) { if (column_index > (uint32_t)result_set.column_num) { return nullptr; @@ -482,15 +496,15 @@ int tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set) { // } TableSchema tsfile_reader_get_table_schema(TsFileReader reader, - const char *table_name) { - auto *r = static_cast<storage::TsFileReader *>(reader); + const char* table_name) { + auto* r = static_cast<storage::TsFileReader*>(reader); auto table_shcema = r->get_table_schema(table_name); TableSchema ret_schema; ret_schema.table_name = strdup(table_shcema->get_table_name().c_str()); int column_num = table_shcema->get_columns_num(); ret_schema.column_num = column_num; ret_schema.column_schemas = - static_cast<ColumnSchema *>(malloc(sizeof(ColumnSchema) * column_num)); + static_cast<ColumnSchema*>(malloc(sizeof(ColumnSchema) * column_num)); for (int i = 0; i < column_num; i++) { auto column_schema = table_shcema->get_measurement_schemas()[i]; ret_schema.column_schemas[i].column_name = @@ -504,18 +518,18 @@ TableSchema tsfile_reader_get_table_schema(TsFileReader reader, return ret_schema; } -TableSchema *tsfile_reader_get_all_table_schemas(TsFileReader reader, - uint32_t *size) { - auto *r = static_cast<storage::TsFileReader *>(reader); +TableSchema* tsfile_reader_get_all_table_schemas(TsFileReader reader, + uint32_t* size) { + auto* r = static_cast<storage::TsFileReader*>(reader); auto table_schemas = r->get_all_table_schemas(); size_t table_num = table_schemas.size(); - TableSchema *ret = - static_cast<TableSchema *>(malloc(sizeof(TableSchema) * table_num)); + TableSchema* ret = + static_cast<TableSchema*>(malloc(sizeof(TableSchema) * table_num)); for (size_t i = 0; i < table_schemas.size(); i++) { ret[i].table_name = strdup(table_schemas[i]->get_table_name().c_str()); int column_num = table_schemas[i]->get_columns_num(); ret[i].column_num = column_num; - ret[i].column_schemas = static_cast<ColumnSchema *>( + ret[i].column_schemas = static_cast<ColumnSchema*>( malloc(column_num * sizeof(ColumnSchema))); auto column_schemas = table_schemas[i]->get_measurement_schemas(); for (int j = 0; j < column_num; j++) { @@ -533,23 +547,23 @@ TableSchema *tsfile_reader_get_all_table_schemas(TsFileReader reader, } // delete pointer -void _free_tsfile_ts_record(TsRecord *record) { +void _free_tsfile_ts_record(TsRecord* record) { if (*record != nullptr) { - delete static_cast<storage::TsRecord *>(*record); + delete static_cast<storage::TsRecord*>(*record); } *record = nullptr; } -void free_tablet(Tablet *tablet) { +void free_tablet(Tablet* tablet) { if (*tablet != nullptr) { - delete static_cast<storage::Tablet *>(*tablet); + delete static_cast<storage::Tablet*>(*tablet); } *tablet = nullptr; } -void free_tsfile_result_set(ResultSet *result_set) { +void free_tsfile_result_set(ResultSet* result_set) { if (*result_set != nullptr) { - delete static_cast<storage::ResultSet *>(*result_set); + delete static_cast<storage::ResultSet*>(*result_set); } *result_set = nullptr; } @@ -583,15 +597,15 @@ void free_table_schema(TableSchema schema) { } void free_column_schema(ColumnSchema schema) { free(schema.column_name); } -void free_write_file(WriteFile *write_file) { - auto f = static_cast<storage::WriteFile *>(*write_file); +void free_write_file(WriteFile* write_file) { + auto f = static_cast<storage::WriteFile*>(*write_file); delete f; *write_file = nullptr; } // For Python API -TsFileWriter _tsfile_writer_new(const char *pathname, uint64_t memory_threshold, - ERRNO *err_code) { +TsFileWriter _tsfile_writer_new(const char* pathname, uint64_t memory_threshold, + ERRNO* err_code) { init_tsfile_config(); auto writer = new storage::TsFileWriter(); int flags = O_WRONLY | O_CREAT | O_TRUNC; @@ -608,9 +622,9 @@ TsFileWriter _tsfile_writer_new(const char *pathname, uint64_t memory_threshold, return writer; } -Tablet _tablet_new_with_target_name(const char *device_id, - char **column_name_list, - TSDataType *data_types, int column_num, +Tablet _tablet_new_with_target_name(const char* device_id, + char** column_name_list, + TSDataType* data_types, int column_num, int max_rows) { std::vector<std::string> measurement_list; std::vector<common::TSDataType> data_type_list; @@ -627,27 +641,27 @@ Tablet _tablet_new_with_target_name(const char *device_id, } } -ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema *schema) { - std::vector<storage::MeasurementSchema *> measurement_schemas; +ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema* schema) { + std::vector<storage::MeasurementSchema*> measurement_schemas; std::vector<common::ColumnCategory> column_categories; measurement_schemas.resize(schema->column_num); for (int i = 0; i < schema->column_num; i++) { - ColumnSchema *cur_schema = schema->column_schemas + i; + ColumnSchema* cur_schema = schema->column_schemas + i; measurement_schemas[i] = new storage::MeasurementSchema( cur_schema->column_name, static_cast<common::TSDataType>(cur_schema->data_type)); column_categories.push_back( static_cast<common::ColumnCategory>(cur_schema->column_category)); } - auto tsfile_writer = static_cast<storage::TsFileWriter *>(writer); + auto tsfile_writer = static_cast<storage::TsFileWriter*>(writer); return tsfile_writer->register_table(std::make_shared<storage::TableSchema>( schema->table_name, measurement_schemas, column_categories)); } ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, - const char *device_id, - const TimeseriesSchema *schema) { - auto *w = static_cast<storage::TsFileWriter *>(writer); + const char* device_id, + const TimeseriesSchema* schema) { + auto* w = static_cast<storage::TsFileWriter*>(writer); int ret = w->register_timeseries( device_id, @@ -660,8 +674,8 @@ ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, } ERRNO _tsfile_writer_register_device(TsFileWriter writer, - const device_schema *device_schema) { - auto *w = static_cast<storage::TsFileWriter *>(writer); + const device_schema* device_schema) { + auto* w = static_cast<storage::TsFileWriter*>(writer); for (int column_id = 0; column_id < device_schema->timeseries_num; column_id++) { TimeseriesSchema schema = device_schema->timeseries_schema[column_id]; @@ -680,26 +694,26 @@ ERRNO _tsfile_writer_register_device(TsFileWriter writer, } ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet) { - auto *w = static_cast<storage::TsFileWriter *>(writer); - const auto *tbl = static_cast<storage::Tablet *>(tablet); + auto* w = static_cast<storage::TsFileWriter*>(writer); + const auto* tbl = static_cast<storage::Tablet*>(tablet); return w->write_tablet(*tbl); } ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { - auto *w = static_cast<storage::TsFileWriter *>(writer); - auto *tbl = static_cast<storage::Tablet *>(tablet); + auto* w = static_cast<storage::TsFileWriter*>(writer); + auto* tbl = static_cast<storage::Tablet*>(tablet); return w->write_table(*tbl); } ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { - auto *w = static_cast<storage::TsFileWriter *>(writer); - const storage::TsRecord *record = static_cast<storage::TsRecord *>(data); + auto* w = static_cast<storage::TsFileWriter*>(writer); + const storage::TsRecord* record = static_cast<storage::TsRecord*>(data); const int ret = w->write_record(*record); return ret; } ERRNO _tsfile_writer_close(TsFileWriter writer) { - auto *w = static_cast<storage::TsFileWriter *>(writer); + auto* w = static_cast<storage::TsFileWriter*>(writer); int ret = w->flush(); if (ret != common::E_OK) { return ret; @@ -713,23 +727,23 @@ ERRNO _tsfile_writer_close(TsFileWriter writer) { } ERRNO _tsfile_writer_flush(TsFileWriter writer) { - auto *w = static_cast<storage::TsFileWriter *>(writer); + auto* w = static_cast<storage::TsFileWriter*>(writer); return w->flush(); } ResultSet _tsfile_reader_query_device(TsFileReader reader, - const char *device_name, - char **sensor_name, uint32_t sensor_num, + const char* device_name, + char** sensor_name, uint32_t sensor_num, Timestamp start_time, Timestamp end_time, - ERRNO *err_code) { - auto *r = static_cast<storage::TsFileReader *>(reader); + ERRNO* err_code) { + auto* r = static_cast<storage::TsFileReader*>(reader); std::vector<std::string> selected_paths; selected_paths.reserve(sensor_num); for (uint32_t i = 0; i < sensor_num; i++) { selected_paths.push_back(std::string(device_name) + "." + std::string(sensor_name[i])); } - storage::ResultSet *qds = nullptr; + storage::ResultSet* qds = nullptr; *err_code = r->query(selected_paths, start_time, end_time, qds); return qds; } diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 75dc0364..f94325aa 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -428,6 +428,10 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); + +ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, + uint32_t column_num, Timestamp start_time, + Timestamp end_time, ERRNO* err_code); // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, // char** sensor_name, uint32_t sensor_num, diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index e5c87ab9..da6cc5c9 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -34,17 +34,18 @@ from tsfile.exceptions import TableNotExistError, ColumnNotExistError, NotSuppor def test_row_record_write_and_read(): try: writer = TsFileWriter("record_write_and_read.tsfile") - timeseries = TimeseriesSchema("level1", TSDataType.INT64) - writer.register_timeseries("root.device1", timeseries) + writer.register_timeseries("root.device1", TimeseriesSchema("level1", TSDataType.INT64)) writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE)) - writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32)) + writer.register_timeseries("root.device2", TimeseriesSchema("level1", TSDataType.INT32)) max_row_num = 1000 for i in range(max_row_num): row = RowRecord("root.device1", i, [Field("level1", i + 1, TSDataType.INT64), - Field("level2", i * 1.1, TSDataType.DOUBLE), - Field("level3", i * 2, TSDataType.INT32)]) + Field("level2", i * 1.1, TSDataType.DOUBLE)]) + writer.write_row_record(row) + row = RowRecord("root.device2", i, + [Field("level1", i + 1, TSDataType.INT32)]) writer.write_row_record(row) writer.close() @@ -56,8 +57,14 @@ def test_row_record_write_and_read(): print(result.get_value_by_index(1)) print(reader.get_active_query_result()) result.close() + result2 = reader.query_table_on_tree(["level1", "level2"], 20, 50) + print(result2.read_data_frame()) + result2.close() print(reader.get_active_query_result()) reader.close() + + + finally: if os.path.exists("record_write_and_read.tsfile"): os.remove("record_write_and_read.tsfile") diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 1b04051c..e9a67364 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -167,6 +167,12 @@ cdef extern from "./tsfile_cwrapper.h": const char * table_name, const char** columns, uint32_t column_num, int64_t start_time, int64_t end_time, ErrorCode *err_code) + + ResultSet tsfile_query_table_on_tree(TsFileReader reader, + char** columns, uint32_t column_num, + int64_t start_time, int64_t end_time, + ErrorCode* err_code); + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char *device_name, char ** sensor_name, uint32_t sensor_num, diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index ce907a79..6ed03838 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -49,6 +49,8 @@ cdef public api ErrorCode tsfile_writer_register_table_py_cpp(TsFileWriter write cdef public api bint tsfile_result_set_is_null_by_name_c(ResultSet result_set, object name) cdef public api ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_name, object column_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object column_list, + int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) cdef public api object get_table_schema(TsFileReader reader, object table_name) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index e1743039..1d7af385 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -515,6 +515,32 @@ cdef ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_nam free(<void*>columns) columns = NULL +cdef ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reader, object column_list, + int64_t start_time, int64_t end_time): + cdef ResultSet result + cdef int column_num = len(column_list) + cdef char** columns = <char**> malloc(sizeof(char*) * column_num) + cdef int i + cdef ErrorCode code = 0 + if columns == NULL: + raise MemoryError("Failed to allocate memory for columns") + try: + for i in range(column_num): + columns[i] = strdup((<str>column_list[i]).encode('utf-8')) + if columns[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_query_table_on_tree(reader, columns, column_num, start_time, end_time, &code) + check_error(code) + return result + finally: + if columns != NULL: + for i in range(column_num): + free(<void*>columns[i]) + columns[i] = NULL + free(<void*>columns) + columns = NULL + + cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time): cdef ResultSet result diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index e8d38d7d..7e29213b 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -291,6 +291,21 @@ cdef class TsFileReaderPy: pyresult.init_c(result, table_name) self.activate_result_set_list.add(pyresult) return pyresult + + def query_table_on_tree(self, column_names : List[str], + start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: + """ + Execute a time range query on specified columns on tree structure. + :return: query result handler. + """ + cdef ResultSet result; + result = tsfile_reader_query_table_on_tree_c(self.reader, + [column_name.lower() for column_name in column_names], start_time, + end_time) + pyresult = ResultSetPy(self, True) + pyresult.init_c(result, "root") + self.activate_result_set_list.add(pyresult) + return pyresult def query_timeseries(self, device_name : str, sensor_list : List[str], start_time : int = 0, end_time : int = 0) -> ResultSetPy: diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index 3d236606..f1fd51e7 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -29,28 +29,42 @@ def to_dataframe(file_path: str, with TsFileReaderPy(file_path) as reader: total_rows = 0 table_schema = reader.get_all_table_schemas() - if len(table_schema) == 0: - raise TableNotExistError("Not found any table in the TsFile.") - if table_name is None: - # get the first table name by default - table_name, columns = next(iter(table_schema.items())) + + # 判断是树模型还是表模型 + is_tree_model = len(table_schema) == 0 + + if is_tree_model: + # 树模型需要明确指定列名 + if column_names is None: + raise ValueError("树模型需要明确指定 column_names 参数") else: - if table_name not in table_schema: - raise TableNotExistError(table_name) - columns = table_schema[table_name] + # 表模型的处理逻辑 + if table_name is None: + # get the first table name by default + table_name, columns = next(iter(table_schema.items())) + else: + if table_name not in table_schema: + raise TableNotExistError(table_name) + columns = table_schema[table_name] - column_names_in_file = columns.get_column_names() + column_names_in_file = columns.get_column_names() - if column_names is not None: - for column in column_names: - if column not in column_names_in_file: - raise ColumnNotExistError(column) - else: - column_names = column_names_in_file + if column_names is not None: + for column in column_names: + if column not in column_names_in_file: + raise ColumnNotExistError(column) + else: + column_names = column_names_in_file + # 统一处理查询结果 df_list: list[pd.DataFrame] = [] - - with reader.query_table(table_name, column_names) as result: + + if is_tree_model: + query_result = reader.query_table_on_tree(column_names) + else: + query_result = reader.query_table(table_name, column_names) + + with query_result as result: while result.next(): if max_row_num is not None: remaining_rows = max_row_num - total_rows @@ -63,5 +77,6 @@ def to_dataframe(file_path: str, else: df = result.read_data_frame() df_list.append(df) + df = pd.concat(df_list, ignore_index=True) return df
