This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_python_V4_bak in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit fa9869bc5b2d47a8ce6491fbed7562d2a840804c Author: ColinLee <[email protected]> AuthorDate: Fri Feb 28 06:50:45 2025 +0800 fix some issues. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 43 +++++++++++++++-------------- cpp/src/cwrapper/tsfile_cwrapper.h | 18 +++++++++++-- python/tests/test_write.py | 23 ++++++++-------- python/tests/test_write_and_read.py | 13 ++++----- python/tsfile/schema.py | 8 +++--- python/tsfile/tsfile_cpp.pxd | 12 +++++++++ python/tsfile/tsfile_py_cpp.pxd | 4 +-- python/tsfile/tsfile_py_cpp.pyx | 52 ++++++++++++++++++------------------ python/tsfile/tsfile_reader.pyx | 14 ++++++---- python/tsfile/tsfile_table_writer.py | 4 +-- python/tsfile/tsfile_writer.pyx | 24 ++++++++--------- 11 files changed, 122 insertions(+), 93 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 143bf051..fbacab7f 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -179,31 +179,30 @@ TABLE_ADD_VALUE_BY_INDEX_DEF(float); TABLE_ADD_VALUE_BY_INDEX_DEF(double); TABLE_ADD_VALUE_BY_INDEX_DEF(bool); -/* // TsRecord API -TsRecord ts_record_new(const char *device_id, Timestamp timestamp, - int timeseries_num) { -auto *record = new storage::TsRecord(timestamp, device_id, -timeseries_num); return record; +TsRecord _ts_record_new(const char *device_id, Timestamp timestamp, + int timeseries_num) { + auto *record = new storage::TsRecord(timestamp, device_id, timeseries_num); + return record; } #define INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(type) \ -ERRNO insert_data_into_ts_record_by_name_##type( \ - TsRecord data, const char *measurement_name, type value) { \ - auto *record = (storage::TsRecord *)data; \ - storage::DataPoint point(measurement_name, value); \ - if (record->points_.size() + 1 > record->points_.capacity()) \ - return common::E_BUF_NOT_ENOUGH; \ - record->points_.push_back(point); \ - return common::E_OK; \ -} + ERRNO _insert_data_into_ts_record_by_name_##type( \ + TsRecord data, const char *measurement_name, type value) { \ + auto *record = (storage::TsRecord *)data; \ + storage::DataPoint point(measurement_name, value); \ + if (record->points_.size() + 1 > record->points_.capacity()) \ + return common::E_BUF_NOT_ENOUGH; \ + record->points_.push_back(point); \ + return common::E_OK; \ + } INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int32_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME_DEF(double); - +/* TsFileWriter tsfile_writer_new_with_conf(const char *pathname, const mode_t flag, ERRNO *err_code, TsFileConf *conf) { @@ -218,12 +217,6 @@ if (ret != common::E_OK) { return writer; } -ERRNO tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { -auto *w = static_cast<storage::TsFileWriter *>(writer); -const storage::TsRecord *record = static_cast<storage::TsRecord *>(data); -const int ret = w->write_record(*record); -return ret; -} */ ERRNO tsfile_writer_write(TsFileWriter writer, Tablet tablet) { auto *w = static_cast<storage::TsFileTableWriter *>(writer); @@ -463,7 +456,7 @@ TableSchema *tsfile_reader_get_all_table_schemas(TsFileReader reader, } // delete pointer -void free_tsfile_ts_record(TsRecord *record) { +void _free_tsfile_ts_record(TsRecord *record) { if (*record != nullptr) { delete static_cast<storage::TsRecord *>(*record); } @@ -616,6 +609,12 @@ ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { auto *tbl = static_cast<storage::Tablet *>(tablet); return w->write_table(*tbl); } +ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { + auto *w = static_cast<storage::TsFileWriter *>(writer); + const storage::TsRecord *record = static_cast<storage::TsRecord *>(data); + const int ret = w->write_record(*record); + return ret; +} ERRNO _tsfile_writer_close(TsFileWriter writer) { auto *w = static_cast<storage::TsFileWriter *>(writer); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 509d5368..4bece8e4 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -290,7 +290,7 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); -/* +*/ /*--------------------------TsFile Writer Register------------------------ */ /* @@ -482,7 +482,6 @@ TableSchema* tsfile_reader_get_all_table_schemas(TsFileReader reader, uint32_t* size); // Close and free resource. -void free_tsfile_ts_record(TsRecord* record); void free_tablet(Tablet* tablet); void free_tsfile_result_set(ResultSet* result_set); void free_result_set_meta_data(ResultSetMetaData result_set_meta_data); @@ -504,14 +503,29 @@ ERRNO _tsfile_writer_register_timeseries(TsFileWriter writer, const TimeseriesSchema* schema); ERRNO _tsfile_writer_register_device(TsFileWriter writer, const DeviceSchema* device_schema); +TsRecord _ts_record_new(const char* device_id, Timestamp timestamp, + int timeseries_num); + +#define INSERT_DATA_INTO_TS_RECORD_BY_NAME(type) \ + ERRNO _insert_data_into_ts_record_by_name_##type( \ + TsRecord data, const char* measurement_name, type value); + +INSERT_DATA_INTO_TS_RECORD_BY_NAME(int32_t); +INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t); +INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); +INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); +INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); + ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet); +ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord record); ERRNO _tsfile_writer_close(TsFileWriter writer); ResultSet _tsfile_reader_query_device(TsFileReader reader, const char* device_name, char** sensor_name, uint32_t sensor_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); +void _free_tsfile_ts_record(TsRecord* record); #ifdef __cplusplus } diff --git a/python/tests/test_write.py b/python/tests/test_write.py index f96e5400..0f5cd423 100644 --- a/python/tests/test_write.py +++ b/python/tests/test_write.py @@ -25,18 +25,17 @@ from tsfile import Tablet, RowRecord, Field from tsfile import TSDataType def test_row_record_write(): - pass - # try: - # writer = TsFileWriter("record_write.tsfile") - # timeseries = TimeseriesSchema("level1", TSDataType.INT64) - # writer.register_timeseries("root.device1", timeseries) - # - # record = RowRecord("root.device1", 10,[Field("level1", 10, TSDataType.INT64)]) - # writer.write_row_record(record) - # writer.close() - # finally: - # if os.path.exists("record_write.tsfile"): - # os.remove("record_write.tsfile") + try: + writer = TsFileWriter("record_write.tsfile") + timeseries = TimeseriesSchema("level1", TSDataType.INT64) + writer.register_timeseries("root.device1", timeseries) + + record = RowRecord("root.device1", 10,[Field("level1", 10, TSDataType.INT64)]) + writer.write_row_record(record) + writer.close() + finally: + if os.path.exists("record_write.tsfile"): + os.remove("record_write.tsfile") def test_tablet_write(): try: diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index 9d23fb1c..1423fb31 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -49,7 +49,8 @@ def test_row_record_write_and_read(): result = reader.query_timeseries("root.device1", ["level1","level2"], 10, 100) i = 10 while result.next(): - assert result.get_value_by_index(0) == i + assert result.get_value_by_index(1) == i + assert result.get_value_by_name("level1") == i assert result.get_value_by_name("level2") == i * 1.1 i = i + 1 print(reader.get_active_query_result()) @@ -86,8 +87,9 @@ def test_tablet_write_and_read(): result = reader.query_timeseries("root.device1", ["level0"], 0, 1000000) row_num = 0 while result.next(): - # assert result.is_null_by_index(1) == False - # assert result.get_value_by_name("level0") == row_num + assert result.is_null_by_index(1) == False + assert result.get_value_by_index(1) == row_num + assert result.get_value_by_name("level0") == row_num row_num = row_num + 1 assert row_num == max_row_num @@ -106,9 +108,8 @@ def test_table_writer(): ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]) try: with TsFileTableWriter("table_write.tsfile", table) as writer: - tablet = Tablet("test_table", ["device", "value"], - [TSDataType.STRING, TSDataType.DOUBLE], - [ColumnCategory.TAG, ColumnCategory.FIELD], 100) + tablet = Tablet( ["device", "value"], + [TSDataType.STRING, TSDataType.DOUBLE], 100) for i in range(100): tablet.add_timestamp(i, i) tablet.add_value_by_name("device", i, "device" + str(i)) diff --git a/python/tsfile/schema.py b/python/tsfile/schema.py index 775e58f6..f8b7e8bc 100644 --- a/python/tsfile/schema.py +++ b/python/tsfile/schema.py @@ -98,14 +98,14 @@ class TableSchema: class ResultSetMetaData: column_list = None data_types = None - device_id = None + table_name = None def __init__(self, column_list: List[str], data_types: List[TSDataType]): self.column_list = column_list self.data_types = data_types - def set_device_name(self, device_id: str): - self.device_id = device_id + def set_table_name(self, table_name: str): + self.table_name = table_name def get_data_type(self, column_index: int) -> TSDataType: return self.data_types[column_index] @@ -114,7 +114,7 @@ class ResultSetMetaData: return self.column_list[column_index] def get_column_name_index(self, column_name: str) -> int: - return self.column_list.index(self.device_id + "." + column_name) + return self.column_list.index(self.table_name + "." + column_name) def get_column_num(self): return len(self.column_list) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 518f3a71..b5fb1a53 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -123,6 +123,7 @@ cdef extern from "./tsfile_cwrapper.h": # writer : write tablet data and flush ErrorCode _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); ErrorCode _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet); + ErrorCode _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord record); # tablet : new and add timestamp/value into tablet Tablet _tablet_new_with_target_name(const char * device_id, char** column_name_list, @@ -144,6 +145,17 @@ cdef extern from "./tsfile_cwrapper.h": void free_tablet(Tablet * tablet); + # row_record + TsRecord _ts_record_new(const char * device_id, int64_t timestamp, int timeseries_num); + ErrorCode _insert_data_into_ts_record_by_name_int32_t(TsRecord data, const char *measurement_name, const int32_t value); + ErrorCode _insert_data_into_ts_record_by_name_int64_t(TsRecord data, const char *measurement_name, const int64_t value); + ErrorCode _insert_data_into_ts_record_by_name_float(TsRecord data, const char *measurement_name, const float value); + ErrorCode _insert_data_into_ts_record_by_name_double(TsRecord data, const char *measurement_name, const double value); + ErrorCode _insert_data_into_ts_record_by_name_bool(TsRecord data, const char *measurement_name,const bint value); + + void _free_tsfile_ts_record(TsRecord* record); + + # resulSet : query data from tsfile reader ResultSet tsfile_query_table(TsFileReader reader, const char * table_name, diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 4730969c..926a56da 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -31,13 +31,13 @@ cdef public api DeviceSchema* to_c_device_schema(object py_schema) cdef public api ColumnSchema* to_c_column_schema(object py_schema) cdef public api TableSchema* to_c_table_schema(object py_schema) cdef public api Tablet to_c_tablet(object tablet) -# cdef public api TsRecord to_c_record(object row_record) +cdef public api TsRecord to_c_record(object row_record) cdef public api void free_c_table_schema(TableSchema* c_schema) cdef public api void free_c_column_schema(ColumnSchema* c_schema) cdef public api void free_c_timeseries_schema(TimeseriesSchema* c_schema) cdef public api void free_c_device_schema(DeviceSchema* c_schema) cdef public api void free_c_tablet(Tablet tablet) -# cdef public api void free_c_row_record(TsRecord record) +cdef public api void free_c_row_record(TsRecord record) cdef public api TsFileWriter tsfile_writer_new_c(object pathname) except + cdef public api TsFileReader tsfile_reader_new_c(object pathname) except + cdef public api ErrorCode tsfile_writer_register_device_py_cpp(TsFileWriter writer, DeviceSchema *schema) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index f4a53e66..78334c2f 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -258,30 +258,30 @@ cdef Tablet to_c_tablet(object tablet): return ctablet -# cdef TsRecord to_c_record(object row_record): -# cdef int field_num = row_record.get_fields_num() -# cdef int64_t timestamp = <int64_t>row_record.get_timestamp() -# cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) -# cdef const char* device_id = device_id_bytes -# cdef TsRecord record -# cdef int i -# cdef TSDataType data_type -# record = ts_record_new(device_id, timestamp, field_num) -# for i in range(field_num): -# field = row_record.get_fields()[i] -# data_type = to_c_data_type(field.get_data_type()) -# if data_type == TS_DATATYPE_BOOLEAN: -# insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) -# elif data_type == TS_DATATYPE_INT32: -# insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) -# elif data_type == TS_DATATYPE_INT64: -# insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) -# elif data_type == TS_DATATYPE_DOUBLE: -# insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) -# elif data_type == TS_DATATYPE_FLOAT: -# insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) -# -# return record +cdef TsRecord to_c_record(object row_record): + cdef int field_num = row_record.get_fields_num() + cdef int64_t timestamp = <int64_t>row_record.get_timestamp() + cdef bytes device_id_bytes = PyUnicode_AsUTF8String(row_record.get_device_id()) + cdef const char* device_id = device_id_bytes + cdef TsRecord record + cdef int i + cdef TSDataType data_type + record = _ts_record_new(device_id, timestamp, field_num) + for i in range(field_num): + field = row_record.get_fields()[i] + data_type = to_c_data_type(field.get_data_type()) + if data_type == TS_DATATYPE_BOOLEAN: + _insert_data_into_ts_record_by_name_bool(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_bool_value()) + elif data_type == TS_DATATYPE_INT32: + _insert_data_into_ts_record_by_name_int32_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_int_value()) + elif data_type == TS_DATATYPE_INT64: + _insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) + elif data_type == TS_DATATYPE_DOUBLE: + _insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) + elif data_type == TS_DATATYPE_FLOAT: + _insert_data_into_ts_record_by_name_float(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_float_value()) + + return record # Free c structs' space cdef void free_c_table_schema(TableSchema* c_schema): @@ -305,8 +305,8 @@ cdef void free_c_device_schema(DeviceSchema* c_schema): cdef void free_c_tablet(Tablet tablet): free_tablet(&tablet) -# cdef void free_c_row_record(TsRecord record): -# free_tsfile_ts_record(&record) +cdef void free_c_row_record(TsRecord record): + _free_tsfile_ts_record(&record) # Reader and writer new. cdef TsFileWriter tsfile_writer_new_c(object pathname) except +: diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index d683aaf9..67b686fb 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -61,7 +61,7 @@ cdef class ResultSetPy: self.result = result metadata_c = tsfile_result_set_get_metadata(self.result) self.metadata = from_c_result_set_meta_data(metadata_c) - self.metadata.set_device_name(device_name) + self.metadata.set_table_name(device_name) free_result_set_meta_data(metadata_c) def next(self): @@ -116,9 +116,10 @@ cdef class ResultSetPy: Get value by index from query result set. """ self.check_result_set_invalid() - if tsfile_result_set_is_null_by_index(self.result, index): + if tsfile_result_set_is_null_by_index(self.result, index - 1): + print("None there") return None - data_type = self.metadata.get_data_type(index) + data_type = self.metadata.get_data_type(index - 1) if data_type == TSDataTypePy.INT32: return tsfile_result_set_get_value_by_index_int32_t(self.result, index) elif data_type == TSDataTypePy.INT64: @@ -136,6 +137,7 @@ cdef class ResultSetPy: """ self.check_result_set_invalid() if tsfile_result_set_is_null_by_name_c(self.result, column_name): + print("Get None") return None ind = self.metadata.get_column_name_index(column_name) return self.get_value_by_index(ind + 1) @@ -146,13 +148,15 @@ cdef class ResultSetPy: This method queries the underlying result set to determine if the value at the given column index position represents a null value. + + Index start from 1. """ self.check_result_set_invalid() - if index > len(self.metadata.column_list) or index < 1: + if index > (len(self.metadata.column_list) + 1) or index < 1: raise IndexError( f"Column index {index} out of range (column count: {len(self.metadata.column_list)})" ) - return tsfile_result_set_is_null_by_index(self.result, index) + return tsfile_result_set_is_null_by_index(self.result, index - 1) def is_null_by_name(self, name : str): """ diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index e96f0cbc..3dff5443 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -23,13 +23,13 @@ from tsfile import TsFileWriter class TsFileTableWriter(): def __init__(self, path : str, table_schema : TableSchema): - self.writer = super().__init__(path) + self.writer = TsFileWriter(path) self.writer.register_table(table_schema) self.exclusive_table_name_ = table_schema.get_table_name() def write_table(self, tablet : Tablet): if tablet.get_target_name() == None: - tablet.set_target_name(self.exclusive_table_name_) + tablet.set_table_name(self.exclusive_table_name_) elif self.exclusive_table_name_ is not None and tablet.get_target_name() != self.exclusive_table_name_: raise TableNotExistError self.writer.write_table(tablet) diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index c788bcf2..ddeec048 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -85,18 +85,18 @@ cdef class TsFileWriterPy: finally: free_c_tablet(ctablet) - # def write_row_record(self, record : RowRecord): - # """ - # Write a record into tsfile with tsfile writer. - # :param record: timestamp and data collection - # """ - # cdef TsRecord record_c = to_c_record(record) - # cdef ErrorCode errno - # try: - # errno = tsfile_writer_write_ts_record(self.writer, record_c) - # check_error(errno) - # finally: - # free_c_row_record(record_c) + def write_row_record(self, record : RowRecord): + """ + Write a record into tsfile with tsfile writer. + :param record: timestamp and data collection + """ + cdef TsRecord record_c = to_c_record(record) + cdef ErrorCode errno + try: + errno = _tsfile_writer_write_ts_record(self.writer, record_c) + check_error(errno) + finally: + free_c_row_record(record_c) def write_table(self, tablet : TabletPy): cdef Tablet ctablet = to_c_tablet(tablet)
