This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_python_V4_bak in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit b7aea9d8df5860f35aaec8ffa1208471211455e7 Author: colin <[email protected]> AuthorDate: Thu Feb 27 22:35:30 2025 +0800 some code. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 221 +++++++++++++++++++++-------------- cpp/src/cwrapper/tsfile_cwrapper.h | 23 +++- python/setup.py | 2 +- python/tests/test_basic.py | 2 +- python/tests/test_write.py | 22 ++-- python/tsfile/__init__.py | 2 +- python/tsfile/tablet.py | 17 ++- python/tsfile/tsfile_cpp.pxd | 100 +++++++--------- python/tsfile/tsfile_py_cpp.pxd | 4 +- python/tsfile/tsfile_py_cpp.pyx | 72 ++++++------ python/tsfile/tsfile_reader.pyx | 6 +- python/tsfile/tsfile_table_writer.py | 27 ++++- python/tsfile/tsfile_writer.pyx | 32 +++-- 13 files changed, 309 insertions(+), 221 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index dbd85b52..16ae2427 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -218,73 +218,12 @@ if (ret != common::E_OK) { return writer; } -ERRNO tsfile_writer_register_table(TsFileWriter writer, TableSchema *schema) -{ -std::vector<storage::MeasurementSchema *> measurement_schemas; -std::vector<common::ColumnCategory> column_categories; -measurement_schemas.resize(schema->column_num); -for (int i = 0; i < schema->column_num; i++) { - ColumnSchema *cur_schema = schema->column_schemas + i; - measurement_schemas[i] = new storage::MeasurementSchema( - cur_schema->column_name, - static_cast<common::TSDataType>(cur_schema->data_type)); - column_categories.push_back( - static_cast<common::ColumnCategory>(cur_schema->column_category)); -} -auto tsfile_writer = static_cast<storage::TsFileWriter *>(writer); -return -tsfile_writer->register_table(std::make_shared<storage::TableSchema>( - schema->table_name, measurement_schemas, column_categories)); -} - -ERRNO tsfile_writer_register_timeseries(TsFileWriter writer, - const char *device_id, - const TimeseriesSchema *schema) { -auto *w = static_cast<storage::TsFileWriter *>(writer); - -int ret = w->register_timeseries( - device_id, - storage::MeasurementSchema( - schema->timeseries_name, - static_cast<common::TSDataType>(schema->data_type), - static_cast<common::TSEncoding>(schema->encoding), - static_cast<common::CompressionType>(schema->compression))); -return ret; -} - -ERRNO tsfile_writer_register_device(TsFileWriter writer, - const device_schema *device_schema) { -auto *w = static_cast<storage::TsFileWriter *>(writer); -for (int column_id = 0; column_id < device_schema->timeseries_num; - column_id++) { - TimeseriesSchema schema = - device_schema->timeseries_schema[column_id]; const ERRNO ret = - w->register_timeseries( - device_schema->device_name, - storage::MeasurementSchema( - schema.timeseries_name, - static_cast<common::TSDataType>(schema.data_type), - static_cast<common::TSEncoding>(schema.encoding), - static_cast<common::CompressionType>(schema.compression))); - if (ret != common::E_OK) { - return ret; - } -} -return common::E_OK; -} - ERRNO tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { auto *w = static_cast<storage::TsFileWriter *>(writer); const storage::TsRecord *record = static_cast<storage::TsRecord *>(data); const int ret = w->write_record(*record); return ret; } - -ERRNO tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet) { -auto *w = static_cast<storage::TsFileWriter *>(writer); -const auto *tbl = static_cast<storage::Tablet *>(tablet); -return w->write_tablet(*tbl); -} */ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) { auto *w = static_cast<storage::TsFileTableWriter *>(writer); @@ -314,23 +253,6 @@ ResultSet tsfile_query_table(TsFileReader reader, const char *table_name, return table_result_set; } -// ResultSet tsfile_reader_query_device(TsFileReader reader, -// const char *device_name, -// char **sensor_name, uint32_t sensor_num, -// Timestamp start_time, Timestamp -// end_time) { -// auto *r = static_cast<storage::TsFileReader *>(reader); -// std::vector<std::string> selected_paths; -// selected_paths.reserve(sensor_num); -// for (uint32_t i = 0; i < sensor_num; i++) { -// selected_paths.push_back(std::string(device_name) + "." + -// std::string(sensor_name[i])); -// } -// storage::ResultSet *qds = nullptr; -// r->query(selected_paths, start_time, end_time, qds); -// return qds; -// } - bool tsfile_result_set_next(ResultSet result_set, ERRNO *err_code) { auto *r = static_cast<storage::TableResultSet *>(result_set); bool has_next = true; @@ -481,7 +403,6 @@ int tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set) { // return schema; // } - TableSchema tsfile_reader_get_table_schema(TsFileReader reader, const char *table_name) { auto *r = static_cast<storage::TsFileReader *>(reader); @@ -490,14 +411,21 @@ TableSchema tsfile_reader_get_table_schema(TsFileReader reader, ret_schema.table_name = strdup(table_shcema->get_table_name().c_str()); int column_num = table_shcema->get_columns_num(); ret_schema.column_num = column_num; - ret_schema.column_schemas = static_cast<ColumnSchema*>(malloc(sizeof(ColumnSchema) * column_num)); + ret_schema.column_schemas = + static_cast<ColumnSchema *>(malloc(sizeof(ColumnSchema) * column_num)); for (int i = 0; i < column_num; i++) { auto column_schema = table_shcema->get_measurement_schemas()[i]; - ret_schema.column_schemas[i].column_name = strdup(column_schema->measurement_name_.c_str()); - ret_schema.column_schemas[i].data_type = static_cast<TSDataType>(column_schema->data_type_); - ret_schema.column_schemas[i].compression = static_cast<CompressionType>(column_schema->compression_type_); - ret_schema.column_schemas[i].encoding = static_cast<TSEncoding>(column_schema->encoding_); - ret_schema.column_schemas[i].column_category = static_cast<ColumnCategory>(table_shcema->get_column_categories()[i]); + ret_schema.column_schemas[i].column_name = + strdup(column_schema->measurement_name_.c_str()); + ret_schema.column_schemas[i].data_type = + static_cast<TSDataType>(column_schema->data_type_); + ret_schema.column_schemas[i].compression = + static_cast<CompressionType>(column_schema->compression_type_); + ret_schema.column_schemas[i].encoding = + static_cast<TSEncoding>(column_schema->encoding_); + ret_schema.column_schemas[i].column_category = + static_cast<ColumnCategory>( + table_shcema->get_column_categories()[i]); } return ret_schema; } @@ -589,6 +517,129 @@ void free_write_file(WriteFile *write_file) { *write_file = nullptr; } +// For Python API +TsFileWriter _tsfile_writer_new(const char *pathname, ERRNO *err_code) { + init_tsfile_config(); + auto writer = new storage::TsFileWriter(); + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + int ret = writer->open(pathname, flags, 0644); + if (ret != common::E_OK) { + delete writer; + *err_code = ret; + return nullptr; + } + return writer; +} + +Tablet _tablet_new_with_target_name(const char *device_id, char **column_name_list, + TSDataType *data_types, + ColumnCategory *column_category, int column_num, + int max_rows) { + std::vector<std::string> measurement_list; + std::vector<common::TSDataType> data_type_list; + std::vector<common::ColumnCategory> categories; + for (int i = 0; i < column_num; i++) { + measurement_list.emplace_back(column_name_list[i]); + data_type_list.push_back( + static_cast<common::TSDataType>(*(data_types + i))); + categories.emplace_back( + static_cast<common::ColumnCategory>(*(column_category + i))); + } + auto *tablet = new storage::Tablet(device_id, measurement_list, + data_type_list, categories, max_rows); + return tablet; +} + +ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema *schema) { + std::vector<storage::MeasurementSchema *> measurement_schemas; + std::vector<common::ColumnCategory> column_categories; + measurement_schemas.resize(schema->column_num); + for (int i = 0; i < schema->column_num; i++) { + ColumnSchema *cur_schema = schema->column_schemas + i; + measurement_schemas[i] = new storage::MeasurementSchema( + cur_schema->column_name, + static_cast<common::TSDataType>(cur_schema->data_type)); + column_categories.push_back( + static_cast<common::ColumnCategory>(cur_schema->column_category)); + } + auto tsfile_writer = static_cast<storage::TsFileWriter *>(writer); + return tsfile_writer->register_table(std::make_shared<storage::TableSchema>( + schema->table_name, measurement_schemas, column_categories)); +} + +ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, + const char *device_id, + const TimeseriesSchema *schema) { + auto *w = static_cast<storage::TsFileWriter *>(writer); + + int ret = w->register_timeseries( + device_id, + storage::MeasurementSchema( + schema->timeseries_name, + static_cast<common::TSDataType>(schema->data_type), + static_cast<common::TSEncoding>(schema->encoding), + static_cast<common::CompressionType>(schema->compression))); + return ret; +} + +ERRNO _tsfile_writer_register_device(TsFileWriter writer, + const device_schema *device_schema) { + auto *w = static_cast<storage::TsFileWriter *>(writer); + for (int column_id = 0; column_id < device_schema->timeseries_num; + column_id++) { + TimeseriesSchema schema = device_schema->timeseries_schema[column_id]; + const ERRNO ret = w->register_timeseries( + device_schema->device_name, + storage::MeasurementSchema( + schema.timeseries_name, + static_cast<common::TSDataType>(schema.data_type), + static_cast<common::TSEncoding>(schema.encoding), + static_cast<common::CompressionType>(schema.compression))); + if (ret != common::E_OK) { + return ret; + } + } + return common::E_OK; +} +ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet) { + auto *w = static_cast<storage::TsFileWriter *>(writer); + const auto *tbl = static_cast<storage::Tablet *>(tablet); + return w->write_tablet(*tbl); +} + +ERRNO _tsfile_writer_close(TsFileWriter writer) { + auto *w = static_cast<storage::TsFileWriter *>(writer); + int ret = w->flush(); + if (ret != common::E_OK) { + return ret; + } + ret = w->close(); + if (ret != common::E_OK) { + return ret; + } + delete w; + return ret; +} + +ResultSet _tsfile_reader_query_device(TsFileReader reader, + const char *device_name, + char **sensor_name, uint32_t sensor_num, + Timestamp start_time, + Timestamp end_time, ERRNO *err_code) { + auto *r = static_cast<storage::TsFileReader *>(reader); + std::vector<std::string> selected_paths; + selected_paths.reserve(sensor_num); + for (uint32_t i = 0; i < sensor_num; i++) { + selected_paths.push_back(std::string(device_name) + "." + + std::string(sensor_name[i])); + } + storage::ResultSet *qds = nullptr; + *err_code = r->query(selected_paths, start_time, end_time, qds); + return qds; +} #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index af1c9c29..3bacdf2a 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -471,7 +471,7 @@ int tsfile_result_set_metadata_get_column_num(ResultSetMetaData result_set); * @note Caller should call free_table_schema to free the tableschema. */ TableSchema tsfile_reader_get_table_schema(TsFileReader reader, - const char* table_name); + const char* table_name); /** * @brief Gets all table schema in the tsfile. * @@ -492,6 +492,27 @@ void free_table_schema(TableSchema schema); void free_column_schema(ColumnSchema schema); void free_write_file(WriteFile* write_file); +// For Python API +TsFileWriter _tsfile_writer_new(const char* pathname, ERRNO* err_code); +Tablet _tablet_new_with_target_name(const char* device_id, + char** column_name_list, + TSDataType* data_types, + ColumnCategory* column_category, + int column_num, int max_rows); +ERRNO _tsfile_writer_register_table(TsFileWriter writer, TableSchema* schema); +ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, + const char* device_id, + const TimeseriesSchema* schema); +ERRNO _tsfile_writer_register_device(TsFileWriter writer, + const DeviceSchema* device_schema); +ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); +ERRNO _tsfile_writer_close(TsFileWriter writer); +ResultSet _tsfile_reader_query_device(TsFileReader reader, + const char* device_name, + char** sensor_name, uint32_t sensor_num, + Timestamp start_time, Timestamp end_time, + ERRNO* err_code); + #ifdef __cplusplus } #endif diff --git a/python/setup.py b/python/setup.py index 84727278..7449babd 100644 --- a/python/setup.py +++ b/python/setup.py @@ -62,7 +62,7 @@ tsfile_py_include_file = os.path.join(project_dir, "tsfile", "tsfile_cwrapper.h" copy_tsfile_header(tsfile_c_include_file, tsfile_py_include_file) ## Copy shared library -tsfile_shared_source_dir = os.path.join(project_dir, "..", "cpp", "target", "build", "lib") +tsfile_shared_source_dir = os.path.join(project_dir, "..", "cpp", "build","Release", "lib") tsfile_shared_dir = os.path.join(project_dir, "tsfile") if system == "Darwin": diff --git a/python/tests/test_basic.py b/python/tests/test_basic.py index b44cea60..3f34b811 100644 --- a/python/tests/test_basic.py +++ b/python/tests/test_basic.py @@ -28,7 +28,7 @@ def test_tablet(): column_category = [ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD, ColumnCategory.TAG] tablet = Tablet("test", column_names, data_types, column_category) - assert "test" == tablet.get_device_id() + assert "test" == tablet.get_target_name() assert 4 == len(tablet.get_column_name_list()) assert TSDataType.INT32 == tablet.get_data_type_list()[0] diff --git a/python/tests/test_write.py b/python/tests/test_write.py index 1ba3aa5b..8750e3be 100644 --- a/python/tests/test_write.py +++ b/python/tests/test_write.py @@ -25,17 +25,17 @@ from tsfile import Tablet, RowRecord, Field from tsfile import TSDataType def test_row_record_write(): - try: - writer = TsFileWriter("record_write.tsfile") - timeseries = TimeseriesSchema("level1", TSDataType.INT64) - writer.register_timeseries("root.device1", timeseries) - - record = RowRecord("root.device1", 10,[Field("level1", 10, TSDataType.INT64)]) - writer.write_row_record(record) - writer.close() - finally: - if os.path.exists("record_write.tsfile"): - os.remove("record_write.tsfile") + # try: + # writer = TsFileWriter("record_write.tsfile") + # timeseries = TimeseriesSchema("level1", TSDataType.INT64) + # writer.register_timeseries("root.device1", timeseries) + # + # record = RowRecord("root.device1", 10,[Field("level1", 10, TSDataType.INT64)]) + # writer.write_row_record(record) + # writer.close() + # finally: + # if os.path.exists("record_write.tsfile"): + # os.remove("record_write.tsfile") def test_tablet_write(): try: diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index 651ff496..df51bcfa 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -32,4 +32,4 @@ from .date_utils import * from .exceptions import * from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as ResultSet from .tsfile_writer import TsFileWriterPy as TsFileWriter -from .tsfile_table_writer import * \ No newline at end of file +from .tsfile_table_writer import TsFileTableWriter \ No newline at end of file diff --git a/python/tsfile/tablet.py b/python/tsfile/tablet.py index 9791874d..5e28bff5 100644 --- a/python/tsfile/tablet.py +++ b/python/tsfile/tablet.py @@ -28,17 +28,16 @@ from .constants import TSDataType, ColumnCategory class Tablet(object): - def __init__(self, device_id: str, column_name_list: list[str], type_list: list[TSDataType], - category_list: list[ColumnCategory] = None, max_row_num: int = 1024): + def __init__(self, column_name_list: list[str], type_list: list[TSDataType], + max_row_num: int = 1024): self.timestamp_list = [None for _ in range(max_row_num)] self.data_list: List[List[Union[int, float, bool, str, bytes, None]]] = [ [None for _ in range(max_row_num)] for _ in range(len(column_name_list)) ] - self.device_id = device_id + self.target_name = None self.column_name_list = column_name_list self.type_list = type_list self.max_row_num = max_row_num - self.category_list = category_list self._type_ranges = { TSDataType.INT32: (-2 ** 31, 2 ** 31 - 1), @@ -54,6 +53,9 @@ class Tablet(object): if not (0 <= row_index < self.max_row_num): raise IndexError(f"Row index {row_index} out of range [0, {self.max_row_num - 1}]") + def set_table_name(self, table_name: str): + self.target_name = table_name + def get_column_name_list(self): return self.column_name_list @@ -63,8 +65,8 @@ class Tablet(object): def get_timestamp_list(self): return self.timestamp_list - def get_device_id(self): - return self.device_id + def get_target_name(self): + return self.target_name def get_value_list(self): return self.data_list @@ -72,9 +74,6 @@ class Tablet(object): def get_max_row_num(self): return self.max_row_num - def get_category_list(self): - return self.category_list - def add_column(self, column_name: str, column_type: TSDataType): self.column_name_list.append(column_name) self.type_list.append(column_type) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 03ed385e..859d8037 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -27,11 +27,11 @@ cdef extern from "./tsfile_cwrapper.h": ctypedef int64_t timestamp # reader and writer etc - ctypedef void* TsFileReader - ctypedef void* TsFileWriter - ctypedef void* Tablet - ctypedef void* TsRecord - ctypedef void* ResultSet + ctypedef void * TsFileReader + ctypedef void * TsFileWriter + ctypedef void * Tablet + ctypedef void * TsRecord + ctypedef void * ResultSet # enum types ctypedef enum TSDataType: @@ -77,99 +77,89 @@ cdef extern from "./tsfile_cwrapper.h": # struct types ctypedef struct ColumnSchema: - char* column_name + char * column_name TSDataType data_type ColumnCategory column_category ctypedef struct TableSchema: - char* table_name - ColumnSchema* column_schemas + char * table_name + ColumnSchema * column_schemas int column_num ctypedef struct TimeseriesSchema: - char* timeseries_name + char * timeseries_name TSDataType data_type TSEncoding encoding CompressionType compression ctypedef struct DeviceSchema: - char* device_name - TimeseriesSchema* timeseries_schema + char * device_name + TimeseriesSchema * timeseries_schema int timeseries_num ctypedef struct ResultSetMetaData: char** column_names - TSDataType* data_types + TSDataType * data_types int column_num - - # Function Declarations # reader:new and close - TsFileReader tsfile_reader_new(const char* pathname, ErrorCode* err_code); + TsFileReader tsfile_reader_new(const char * pathname, ErrorCode * err_code); ErrorCode tsfile_reader_close(TsFileReader reader) # writer: new and close - TsFileWriter tsfile_writer_new(const char * pathname, ErrorCode* err_code); - ErrorCode tsfile_writer_close(TsFileWriter writer) + TsFileWriter _tsfile_writer_new(const char * pathname, ErrorCode * err_code); + ErrorCode _tsfile_writer_close(TsFileWriter writer); # writer : register table, device and timeseries - ErrorCode tsfile_writer_register_table(TsFileWriter writer, TableSchema* schema); - ErrorCode tsfile_writer_register_timeseries(TsFileWriter writer, const char* device_name, - TimeseriesSchema* schema); - ErrorCode tsfile_writer_register_device(TsFileWriter writer, DeviceSchema* device_schema); + ErrorCode _tsfile_writer_register_table(TsFileWriter writer, TableSchema * schema); + ErrorCode _tsfile_writer_register_timeseries(TsFileWriter writer, + const char * device_id, + const TimeseriesSchema * schema); + ErrorCode _tsfile_writer_register_device(TsFileWriter writer, + const DeviceSchema * device_schema); # writer : write tablet data and flush - ErrorCode tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); - ErrorCode tsfile_writer_write_table(TsFileWriter writer, Tablet tablet); - ErrorCode tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord record); - - ErrorCode tsfile_writer_flush_data(TsFileWriter writer); + ErrorCode _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); # tablet : new and add timestamp/value into tablet - Tablet tablet_new_with_device(const char* device_id, const char** column_name_list, TSDataType* data_types, - ColumnCategory* category, int column_num, int max_rows); - Tablet tablet_new(const char** column_names, TSDataType* data_types, int column_num); + Tablet _tablet_new_with_target_name(const char * device_id, + char** column_name_list, + TSDataType * data_types, + ColumnCategory * column_category, + int column_num, int max_rows); + + Tablet tablet_new(const char** column_names, TSDataType * data_types, int column_num); ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t timestamp); - ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t row_index, uint32_t column_index, int64_t value); - ErrorCode tablet_add_value_by_index_int32_t(Tablet tablet, uint32_t row_index, uint32_t column_index, int32_t value); + ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t row_index, uint32_t column_index, + int64_t value); + ErrorCode tablet_add_value_by_index_int32_t(Tablet tablet, uint32_t row_index, uint32_t column_index, + int32_t value); ErrorCode tablet_add_value_by_index_double(Tablet tablet, uint32_t row_index, uint32_t column_index, double value); ErrorCode tablet_add_value_by_index_float(Tablet tablet, uint32_t row_index, uint32_t column_index, float value); ErrorCode tablet_add_value_by_index_bool(Tablet tablet, uint32_t row_index, uint32_t column_index, bint value); ErrorCode tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index, - uint32_t column_index, char* value); - - void free_tablet(Tablet* tablet); - - # row_record - TsRecord ts_record_new(const char * device_id, int64_t timestamp, int timeseries_num); - ErrorCode insert_data_into_ts_record_by_name_int32_t(TsRecord data, const char *measurement_name, const int32_t value); - ErrorCode insert_data_into_ts_record_by_name_int64_t(TsRecord data, const char *measurement_name, const int64_t value); - ErrorCode insert_data_into_ts_record_by_name_float(TsRecord data, const char *measurement_name, const float value); - ErrorCode insert_data_into_ts_record_by_name_double(TsRecord data, const char *measurement_name, const double value); - ErrorCode insert_data_into_ts_record_by_name_bool(TsRecord data, const char *measurement_name,const bint value); - - void free_tsfile_ts_record(TsRecord* record); + uint32_t column_index, char * value); + void free_tablet(Tablet * tablet); # resulSet : query data from tsfile reader - ResultSet tsfile_reader_query_table(TsFileReader reader, - const char* table_name, + ResultSet tsfile_query_table(TsFileReader reader, + const char * table_name, const char** columns, uint32_t column_num, - int64_t start_time, int64_t end_time); - ResultSet tsfile_reader_query_device(TsFileReader reader, - const char * device_name, - char** sensor_name, uint32_t sensor_num, - timestamp start_time, timestamp end_time); + int64_t start_time, int64_t end_time, ErrorCode *err_code) + ResultSet _tsfile_reader_query_device(TsFileReader reader, + const char *device_name, + char ** sensor_name, uint32_t sensor_num, + int64_t start_time, int64_t end_time, ErrorCode *err_code) # resultSet : get data from resultSet - bint tsfile_result_set_next(ResultSet result_set); + bint tsfile_result_set_next(ResultSet result_set, ErrorCode * err_code); bint tsfile_result_set_is_null_by_index(ResultSet result_set, uint32_t column_index); - bint tsfile_result_set_is_null_by_name(ResultSet result_set, const char* column_name); - void free_tsfile_result_set(ResultSet* result_set); - + bint tsfile_result_set_is_null_by_name(ResultSet result_set, const char * column_name); + void free_tsfile_result_set(ResultSet * result_set); int32_t tsfile_result_set_get_value_by_index_int32_t(ResultSet result_set, uint32_t column_index); int64_t tsfile_result_set_get_value_by_index_int64_t(ResultSet result_set, uint32_t column_index); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 926a56da..4730969c 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -31,13 +31,13 @@ cdef public api DeviceSchema* to_c_device_schema(object py_schema) cdef public api ColumnSchema* to_c_column_schema(object py_schema) cdef public api TableSchema* to_c_table_schema(object py_schema) cdef public api Tablet to_c_tablet(object tablet) -cdef public api TsRecord to_c_record(object row_record) +# cdef public api TsRecord to_c_record(object row_record) cdef public api void free_c_table_schema(TableSchema* c_schema) cdef public api void free_c_column_schema(ColumnSchema* c_schema) cdef public api void free_c_timeseries_schema(TimeseriesSchema* c_schema) cdef public api void free_c_device_schema(DeviceSchema* c_schema) cdef public api void free_c_tablet(Tablet tablet) -cdef public api void free_c_row_record(TsRecord record) +# cdef public api void free_c_row_record(TsRecord record) cdef public api TsFileWriter tsfile_writer_new_c(object pathname) except + cdef public api TsFileReader tsfile_reader_new_c(object pathname) except + cdef public api ErrorCode tsfile_writer_register_device_py_cpp(TsFileWriter writer, DeviceSchema *schema) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index caeec657..8d10f485 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -180,7 +180,7 @@ cdef Tablet to_c_tablet(object tablet): cdef int max_row_num cdef TSDataType data_type cdef int64_t timestamp - cdef bytes device_id_bytes = PyUnicode_AsUTF8String(tablet.get_device_id()) + cdef bytes device_id_bytes = PyUnicode_AsUTF8String(tablet.get_target_name()) cdef const char * device_id_c = device_id_bytes cdef char** columns_names cdef TSDataType* column_types @@ -203,7 +203,7 @@ cdef Tablet to_c_tablet(object tablet): max_row_num = tablet.get_max_row_num() - ctablet = tablet_new_with_device(device_id_c, columns_names, columns_types, column_category, column_num, + ctablet = _tablet_new_with_target_name(device_id_c, columns_names, columns_types, column_category, column_num, max_row_num) free(columns_types) for i in range(column_num): @@ -261,30 +261,30 @@ cdef Tablet to_c_tablet(object tablet): return ctablet -cdef TsRecord to_c_record(object row_record): - cdef int field_num = row_record.get_fields_num() - cdef int64_t timestamp = <int64_t>row_record.get_timestamp() - cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) - cdef const char* device_id = device_id_bytes - cdef TsRecord record - cdef int i - cdef TSDataType data_type - record = ts_record_new(device_id, timestamp, field_num) - for i in range(field_num): - field = row_record.get_fields()[i] - data_type = to_c_data_type(field.get_data_type()) - if data_type == TS_DATATYPE_BOOLEAN: - insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) - elif data_type == TS_DATATYPE_INT32: - insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) - elif data_type == TS_DATATYPE_INT64: - insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) - elif data_type == TS_DATATYPE_DOUBLE: - insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) - elif data_type == TS_DATATYPE_FLOAT: - insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) - - return record +# cdef TsRecord to_c_record(object row_record): +# cdef int field_num = row_record.get_fields_num() +# cdef int64_t timestamp = <int64_t>row_record.get_timestamp() +# cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) +# cdef const char* device_id = device_id_bytes +# cdef TsRecord record +# cdef int i +# cdef TSDataType data_type +# record = ts_record_new(device_id, timestamp, field_num) +# for i in range(field_num): +# field = row_record.get_fields()[i] +# data_type = to_c_data_type(field.get_data_type()) +# if data_type == TS_DATATYPE_BOOLEAN: +# insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) +# elif data_type == TS_DATATYPE_INT32: +# insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) +# elif data_type == TS_DATATYPE_INT64: +# insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) +# elif data_type == TS_DATATYPE_DOUBLE: +# insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) +# elif data_type == TS_DATATYPE_FLOAT: +# insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) +# +# return record # Free c structs' space cdef void free_c_table_schema(TableSchema* c_schema): @@ -308,8 +308,8 @@ cdef void free_c_device_schema(DeviceSchema* c_schema): cdef void free_c_tablet(Tablet tablet): free_tablet(&tablet) -cdef void free_c_row_record(TsRecord record): - free_tsfile_ts_record(&record) +# cdef void free_c_row_record(TsRecord record): +# free_tsfile_ts_record(&record) # Reader and writer new. cdef TsFileWriter tsfile_writer_new_c(object pathname) except +: @@ -317,7 +317,7 @@ cdef TsFileWriter tsfile_writer_new_c(object pathname) except +: cdef TsFileWriter writer cdef bytes encoded_path = PyUnicode_AsUTF8String(pathname) cdef const char* c_path = encoded_path - writer = tsfile_writer_new(c_path, &errno) + writer = _tsfile_writer_new(c_path, &errno) check_error(errno) return writer @@ -333,7 +333,7 @@ cdef TsFileReader tsfile_reader_new_c(object pathname) except +: # Register table and device cdef ErrorCode tsfile_writer_register_device_py_cpp(TsFileWriter writer, DeviceSchema *schema): cdef ErrorCode errno - errno = tsfile_writer_register_device(writer, schema) + errno = _tsfile_writer_register_device(writer, schema) return errno cdef ErrorCode tsfile_writer_register_timeseries_py_cpp(TsFileWriter writer, object device_name, @@ -341,12 +341,12 @@ cdef ErrorCode tsfile_writer_register_timeseries_py_cpp(TsFileWriter writer, obj cdef ErrorCode errno cdef bytes encoded_device_name = PyUnicode_AsUTF8String(device_name) cdef const char* c_device_name = encoded_device_name - errno = tsfile_writer_register_timeseries(writer, c_device_name, schema) + errno = _tsfile_writer_register_timeseries(writer, c_device_name, schema) return errno cdef ErrorCode tsfile_writer_register_table_py_cpp(TsFileWriter writer, TableSchema *schema): cdef ErrorCode errno - errno = tsfile_writer_register_table(writer, schema) + errno = _tsfile_writer_register_table(writer, schema) return errno cdef bint tsfile_result_set_is_null_by_name_c(ResultSet result_set, object name): @@ -362,6 +362,7 @@ cdef ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_nam cdef const char* table_name_c = table_name_bytes cdef char** columns = <char**> malloc(sizeof(char*) * column_num) cdef int i + cdef ErrorCode code = 0 if columns == NULL: raise MemoryError("Failed to allocate memory for columns") try: @@ -369,7 +370,8 @@ cdef ResultSet tsfile_reader_query_table_c(TsFileReader reader, object table_nam columns[i] = strdup((<str>column_list[i]).encode('utf-8')) if columns[i] == NULL: raise MemoryError("Failed to allocate memory for column name") - result = tsfile_reader_query_table(reader, table_name_c, columns, column_num, start_time, end_time) + result = tsfile_query_table(reader, table_name_c, columns, column_num, start_time, end_time, &code) + check_error(code) return result finally: if columns != NULL: @@ -387,6 +389,7 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na cdef bytes device_name_bytes = PyUnicode_AsUTF8String(device_name) cdef const char* device_name_c = device_name_bytes cdef int i + cdef ErrorCode code = 0 if sensor_list_c == NULL: raise MemoryError("Failed to allocate memory for paths") try: @@ -394,7 +397,8 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na sensor_list_c[i] = strdup((<str>sensor_list[i]).encode('utf-8')) if sensor_list_c[i] == NULL: raise MemoryError("Failed to allocate memory for path") - result = tsfile_reader_query_device(reader, device_name_c, sensor_list_c, path_num, start_time, end_time) + result = _tsfile_reader_query_device(reader, device_name_c, sensor_list_c, path_num, start_time, end_time, &code) + check_error(code) return result finally: if sensor_list_c != NULL: diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index d3f82a62..b69b11d6 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -68,8 +68,12 @@ cdef class ResultSetPy: """ Check if the query has next rows. """ + cdef ErrorCode code = 0 self.check_result_set_invalid() - return tsfile_result_set_next(self.result) + has_next = tsfile_result_set_next(self.result, &code) + check_error(code) + return has_next + def get_result_column_info(self): return { diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index 6b794ebb..e96f0cbc 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -15,12 +15,33 @@ # specific language governing permissions and limitations # under the License. # -from tsfile import TableSchema + +from tsfile import TableSchema, Tablet, TableNotExistError from tsfile import TsFileWriter -class TsFileTableWriter(TsFileWriter): +class TsFileTableWriter(): def __init__(self, path : str, table_schema : TableSchema): self.writer = super().__init__(path) - super().register_table(table_schema) \ No newline at end of file + self.writer.register_table(table_schema) + self.exclusive_table_name_ = table_schema.get_table_name() + + def write_table(self, tablet : Tablet): + if tablet.get_target_name() == None: + tablet.set_target_name(self.exclusive_table_name_) + elif self.exclusive_table_name_ is not None and tablet.get_target_name() != self.exclusive_table_name_: + raise TableNotExistError + self.writer.write_table(tablet) + + def close(self): + self.writer.close() + + def __dealloc__(self): + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() \ No newline at end of file diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index d87656f3..80fc7c80 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -80,29 +80,29 @@ cdef class TsFileWriterPy: cdef Tablet ctablet = to_c_tablet(tablet) cdef ErrorCode errno try: - errno = tsfile_writer_write_tablet(self.writer, ctablet) + errno = _tsfile_writer_write_tablet(self.writer, ctablet) check_error(errno) finally: free_c_tablet(ctablet) - def write_row_record(self, record : RowRecord): - """ - Write a record into tsfile with tsfile writer. - :param record: timestamp and data collection - """ - cdef TsRecord record_c = to_c_record(record) - cdef ErrorCode errno - try: - errno = tsfile_writer_write_ts_record(self.writer, record_c) - check_error(errno) - finally: - free_c_row_record(record_c) + # def write_row_record(self, record : RowRecord): + # """ + # Write a record into tsfile with tsfile writer. + # :param record: timestamp and data collection + # """ + # cdef TsRecord record_c = to_c_record(record) + # cdef ErrorCode errno + # try: + # errno = tsfile_writer_write_ts_record(self.writer, record_c) + # check_error(errno) + # finally: + # free_c_row_record(record_c) def write_table(self, tablet : TabletPy): cdef Tablet ctablet = to_c_tablet(tablet) cdef ErrorCode errno try: - errno = tsfile_writer_write_table(self.writer, ctablet) + errno = _tsfile_writer_write_tablet(self.writer, ctablet) check_error(errno) finally: free_c_tablet(ctablet) @@ -114,9 +114,7 @@ cdef class TsFileWriterPy: cdef ErrorCode errno if self.writer == NULL: return - errno = tsfile_writer_flush_data(self.writer) - check_error(errno) - errno = tsfile_writer_close(self.writer) + errno = _tsfile_writer_close(self.writer) check_error(errno) self.writer = NULL
