This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch rc/2.2.0_bak in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 63440d4e4e1636311fde2d395b9b89cf3e809d80 Author: Colin Lee <[email protected]> AuthorDate: Sat Dec 13 11:33:43 2025 +0800 Fix release issue. (#662) --- cpp/examples/c_examples/demo_write.c | 6 +- cpp/src/common/allocator/my_string.h | 49 ++-- cpp/src/common/tablet.cc | 80 ++++--- cpp/src/common/tablet.h | 60 ++--- cpp/src/cwrapper/tsfile_cwrapper.cc | 21 +- cpp/src/cwrapper/tsfile_cwrapper.h | 17 +- cpp/test/cwrapper/c_release_test.cc | 18 +- cpp/test/cwrapper/cwrapper_test.cc | 6 +- python/tests/test_write_and_read.py | 449 ++++++++++++++++++++++++++++++++++- python/tsfile/constants.py | 15 +- python/tsfile/field.py | 46 ++-- python/tsfile/tablet.py | 8 +- python/tsfile/tsfile_cpp.pxd | 7 +- python/tsfile/tsfile_py_cpp.pyx | 36 ++- python/tsfile/tsfile_reader.pyx | 25 +- python/tsfile/utils.py | 5 +- 16 files changed, 668 insertions(+), 180 deletions(-) diff --git a/cpp/examples/c_examples/demo_write.c b/cpp/examples/c_examples/demo_write.c index 326cfdcf..925e3af8 100644 --- a/cpp/examples/c_examples/demo_write.c +++ b/cpp/examples/c_examples/demo_write.c @@ -75,8 +75,10 @@ ERRNO write_tsfile() { for (int row = 0; row < 5; row++) { Timestamp timestamp = row; tablet_add_timestamp(tablet, row, timestamp); - tablet_add_value_by_name_string(tablet, row, "id1", "id_field_1"); - tablet_add_value_by_name_string(tablet, row, "id2", "id_field_2"); + tablet_add_value_by_name_string_with_len( + tablet, row, "id1", "id_field_1", strlen("id_field_1")); + tablet_add_value_by_name_string_with_len( + tablet, row, "id2", "id_field_2", strlen("id_field_2")); tablet_add_value_by_name_int32_t(tablet, row, "s1", row); } diff --git a/cpp/src/common/allocator/my_string.h b/cpp/src/common/allocator/my_string.h index f3ec60a3..279ee798 100644 --- a/cpp/src/common/allocator/my_string.h +++ b/cpp/src/common/allocator/my_string.h @@ -30,17 +30,24 @@ namespace common { // String that use PageArena struct String { - char *buf_; + char* buf_; uint32_t len_; String() : buf_(nullptr), len_(0) {} - String(char *buf, uint32_t len) : buf_(buf), len_(len) {} - String(const std::string &str, common::PageArena &pa) + String(char* buf, uint32_t len) : buf_(buf), len_(len) {} + + // NOTE: + // This constructor accepts a const char* to preserve constness for possible + // future copying. The internal buf_ must be considered read-only. + String(const char* buf, uint32_t len) + : buf_(const_cast<char*>(buf)), len_(len) {} + + String(const std::string& str, common::PageArena& pa) : buf_(nullptr), len_(0) { dup_from(str, pa); } - String(const std::string &str) { - buf_ = (char *)str.c_str(); + String(const std::string& str) { + buf_ = (char*)str.c_str(); len_ = str.size(); } FORCE_INLINE bool is_null() const { return buf_ == nullptr && len_ == 0; } @@ -48,7 +55,7 @@ struct String { len_ = 0; buf_ = nullptr; } - FORCE_INLINE int dup_from(const std::string &str, common::PageArena &pa) { + FORCE_INLINE int dup_from(const std::string& str, common::PageArena& pa) { len_ = str.size(); if (UNLIKELY(len_ == 0)) { return common::E_OK; @@ -61,11 +68,11 @@ struct String { return common::E_OK; } - FORCE_INLINE bool operator==(const String &other) const { + FORCE_INLINE bool operator==(const String& other) const { return equal_to(other); } - FORCE_INLINE int dup_from(const String &str, common::PageArena &pa) { + FORCE_INLINE int dup_from(const String& str, common::PageArena& pa) { len_ = str.len_; if (UNLIKELY(len_ == 0)) { buf_ = nullptr; @@ -79,8 +86,8 @@ struct String { return common::E_OK; } - FORCE_INLINE int dup_from(const char *str, uint32_t len, - common::PageArena &pa) { + FORCE_INLINE int dup_from(const char* str, uint32_t len, + common::PageArena& pa) { len_ = len; if (UNLIKELY(len_ == 0)) { return common::E_OK; @@ -93,8 +100,8 @@ struct String { return common::E_OK; } - FORCE_INLINE int build_from(const String &s1, const String &s2, - common::PageArena &pa) { + FORCE_INLINE int build_from(const String& s1, const String& s2, + common::PageArena& pa) { len_ = s1.len_ + s2.len_; buf_ = pa.alloc(len_); if (IS_NULL(buf_)) { @@ -104,11 +111,11 @@ struct String { memcpy(buf_ + s1.len_, s2.buf_, s2.len_); return common::E_OK; } - FORCE_INLINE void shallow_copy_from(const String &src) { + FORCE_INLINE void shallow_copy_from(const String& src) { buf_ = src.buf_; len_ = src.len_; } - FORCE_INLINE bool equal_to(const String &that) const { + FORCE_INLINE bool equal_to(const String& that) const { return (len_ == 0 && that.len_ == 0) || ((len_ == that.len_) && (0 == memcmp(buf_, that.buf_, len_))); } @@ -121,7 +128,7 @@ struct String { // } // strict less than. If @this equals to @that, return false. - FORCE_INLINE bool less_than(const String &that) const { + FORCE_INLINE bool less_than(const String& that) const { if (len_ == 0 || that.len_ == 0) { return false; } @@ -139,7 +146,7 @@ struct String { // return = 0, if this = that // return < 0, if this < that // return > 0, if this > that - FORCE_INLINE int compare(const String &that) const { + FORCE_INLINE int compare(const String& that) const { if (len_ == 0 && that.len_ == 0) { return 0; } @@ -158,19 +165,19 @@ struct String { } } - FORCE_INLINE void max(const String &that, common::PageArena &pa) { + FORCE_INLINE void max(const String& that, common::PageArena& pa) { if (compare(that) < 0) { this->dup_from(that, pa); } } - FORCE_INLINE void min(const String &that, common::PageArena &pa) { + FORCE_INLINE void min(const String& that, common::PageArena& pa) { if (compare(that) > 0) { this->dup_from(that, pa); } } - bool operator<(const String &other) const { + bool operator<(const String& other) const { if (this->is_null() && other.is_null()) { return false; } @@ -192,7 +199,7 @@ struct String { std::string to_std_string() const { return std::string(buf_, len_); } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const String &s) { + friend std::ostream& operator<<(std::ostream& os, const String& s) { os << s.len_ << "@"; for (uint32_t i = 0; i < s.len_; i++) { os << s.buf_[i]; @@ -203,7 +210,7 @@ struct String { }; struct StringLessThan { - FORCE_INLINE bool operator()(const String &s1, const String &s2) const { + FORCE_INLINE bool operator()(const String& s1, const String& s2) const { return s1.less_than(s2); } }; diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index dcb5e063..2a22d78d 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -30,7 +30,7 @@ namespace storage { int Tablet::init() { ASSERT(timestamps_ == nullptr); - timestamps_ = (int64_t *)malloc(sizeof(int64_t) * max_row_num_); + timestamps_ = (int64_t*)malloc(sizeof(int64_t) * max_row_num_); cur_row_size_ = 0; size_t schema_count = schema_vec_->size(); @@ -45,39 +45,39 @@ int Tablet::init() { } ASSERT(schema_map_.size() == schema_count); value_matrix_ = - (ValueMatrixEntry *)malloc(sizeof(ValueMatrixEntry) * schema_count); + (ValueMatrixEntry*)malloc(sizeof(ValueMatrixEntry) * schema_count); for (size_t c = 0; c < schema_count; ++c) { - const MeasurementSchema &schema = schema_vec_->at(c); + const MeasurementSchema& schema = schema_vec_->at(c); switch (schema.data_type_) { case BOOLEAN: - value_matrix_[c].bool_data = (bool *)malloc( + value_matrix_[c].bool_data = (bool*)malloc( get_data_type_size(schema.data_type_) * max_row_num_); memset(value_matrix_[c].bool_data, 0, get_data_type_size(schema.data_type_) * max_row_num_); break; case DATE: case INT32: - value_matrix_[c].int32_data = (int32_t *)malloc( + value_matrix_[c].int32_data = (int32_t*)malloc( get_data_type_size(schema.data_type_) * max_row_num_); memset(value_matrix_[c].int32_data, 0, get_data_type_size(schema.data_type_) * max_row_num_); break; case TIMESTAMP: case INT64: - value_matrix_[c].int64_data = (int64_t *)malloc( + value_matrix_[c].int64_data = (int64_t*)malloc( get_data_type_size(schema.data_type_) * max_row_num_); memset(value_matrix_[c].int64_data, 0, get_data_type_size(schema.data_type_) * max_row_num_); break; case FLOAT: - value_matrix_[c].float_data = (float *)malloc( + value_matrix_[c].float_data = (float*)malloc( get_data_type_size(schema.data_type_) * max_row_num_); memset(value_matrix_[c].float_data, 0, get_data_type_size(schema.data_type_) * max_row_num_); break; case DOUBLE: - value_matrix_[c].double_data = (double *)malloc( + value_matrix_[c].double_data = (double*)malloc( get_data_type_size(schema.data_type_) * max_row_num_); memset(value_matrix_[c].double_data, 0, get_data_type_size(schema.data_type_) * max_row_num_); @@ -86,7 +86,7 @@ int Tablet::init() { case TEXT: case STRING: { value_matrix_[c].string_data = - (common::String *)malloc(sizeof(String) * max_row_num_); + (common::String*)malloc(sizeof(String) * max_row_num_); break; } default: @@ -110,7 +110,7 @@ void Tablet::destroy() { if (value_matrix_ != nullptr) { for (size_t c = 0; c < schema_vec_->size(); c++) { - const MeasurementSchema &schema = schema_vec_->at(c); + const MeasurementSchema& schema = schema_vec_->at(c); switch (schema.data_type_) { case DATE: case INT32: @@ -163,12 +163,12 @@ int Tablet::add_timestamp(uint32_t row_index, int64_t timestamp) { return E_OK; } -void *Tablet::get_value(int row_index, uint32_t schema_index, - common::TSDataType &data_type) const { +void* Tablet::get_value(int row_index, uint32_t schema_index, + common::TSDataType& data_type) const { if (UNLIKELY(schema_index >= schema_vec_->size())) { return nullptr; } - const MeasurementSchema &schema = schema_vec_->at(schema_index); + const MeasurementSchema& schema = schema_vec_->at(schema_index); ValueMatrixEntry column_values = value_matrix_[schema_index]; data_type = schema.data_type_; @@ -177,23 +177,23 @@ void *Tablet::get_value(int row_index, uint32_t schema_index, } switch (schema.data_type_) { case BOOLEAN: { - bool *bool_values = column_values.bool_data; + bool* bool_values = column_values.bool_data; return &bool_values[row_index]; } case INT32: { - int32_t *int32_values = column_values.int32_data; + int32_t* int32_values = column_values.int32_data; return &int32_values[row_index]; } case INT64: { - int64_t *int64_values = column_values.int64_data; + int64_t* int64_values = column_values.int64_data; return &int64_values[row_index]; } case FLOAT: { - float *float_values = column_values.float_data; + float* float_values = column_values.float_data; return &float_values[row_index]; } case DOUBLE: { - double *double_values = column_values.double_data; + double* double_values = column_values.double_data; return &double_values[row_index]; } case STRING: { @@ -207,8 +207,8 @@ void *Tablet::get_value(int row_index, uint32_t schema_index, template <> void Tablet::process_val(uint32_t row_index, uint32_t schema_index, - common::String val) { - value_matrix_[schema_index].string_data[row_index].dup_from(val, + common::String str) { + value_matrix_[schema_index].string_data[row_index].dup_from(str, page_arena_); bitmaps_[schema_index].clear(row_index); /* mark as non-null */ } @@ -254,7 +254,7 @@ int Tablet::add_value(uint32_t row_index, uint32_t schema_index, T val) { ASSERT(false); ret = common::E_OUT_OF_RANGE; } else { - const MeasurementSchema &schema = schema_vec_->at(schema_index); + const MeasurementSchema& schema = schema_vec_->at(schema_index); auto dic = GetDataTypesFromTemplateType<T>(); if (dic.find(schema.data_type_) == dic.end()) { return E_TYPE_NOT_MATCH; @@ -291,19 +291,25 @@ int Tablet::add_value(uint32_t row_index, uint32_t schema_index, if (UNLIKELY(schema_index >= schema_vec_->size())) { ASSERT(false); ret = common::E_OUT_OF_RANGE; + } else { + const MeasurementSchema& schema = schema_vec_->at(schema_index); + auto dic = GetDataTypesFromTemplateType<common::String>(); + if (dic.find(schema.data_type_) == dic.end()) { + return E_TYPE_NOT_MATCH; + } + process_val(row_index, schema_index, val); } - process_val(row_index, schema_index, val); return ret; } template <> int Tablet::add_value(uint32_t row_index, uint32_t schema_index, - const char *val) { + const char* val) { return add_value(row_index, schema_index, String(val)); } template <typename T> -int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, +int Tablet::add_value(uint32_t row_index, const std::string& measurement_name, T val) { int ret = common::E_OK; if (err_code_ != E_OK) { @@ -319,8 +325,8 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, } template <> -int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, - const char *val) { +int Tablet::add_value(uint32_t row_index, const std::string& measurement_name, + const char* val) { return add_value(row_index, measurement_name, String(val)); } @@ -336,22 +342,22 @@ template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, double val); template int Tablet::add_value(uint32_t row_index, - const std::string &measurement_name, bool val); + const std::string& measurement_name, bool val); template int Tablet::add_value(uint32_t row_index, - const std::string &measurement_name, + const std::string& measurement_name, int32_t val); template int Tablet::add_value(uint32_t row_index, - const std::string &measurement_name, + const std::string& measurement_name, int64_t val); template int Tablet::add_value(uint32_t row_index, - const std::string &measurement_name, float val); + const std::string& measurement_name, float val); template int Tablet::add_value(uint32_t row_index, - const std::string &measurement_name, double val); + const std::string& measurement_name, double val); template int Tablet::add_value(uint32_t row_index, - const std::string &measurement_name, String val); + const std::string& measurement_name, String val); void Tablet::set_column_categories( - const std::vector<ColumnCategory> &column_categories) { + const std::vector<ColumnCategory>& column_categories) { column_categories_ = column_categories; id_column_indexes_.clear(); for (size_t i = 0; i < column_categories_.size(); i++) { @@ -363,11 +369,11 @@ void Tablet::set_column_categories( } std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const { - std::vector<std::string *> id_array; + std::vector<std::string*> id_array; id_array.push_back(new std::string(insert_target_name_)); for (auto id_column_idx : id_column_indexes_) { common::TSDataType data_type = INVALID_DATATYPE; - void *value_ptr = get_value(i, id_column_idx, data_type); + void* value_ptr = get_value(i, id_column_idx, data_type); if (value_ptr == nullptr) { id_array.push_back(nullptr); continue; @@ -375,7 +381,7 @@ std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const { common::String str; switch (data_type) { case STRING: - str = *static_cast<common::String *>(value_ptr); + str = *static_cast<common::String*>(value_ptr); if (str.buf_ == nullptr || str.len_ == 0) { id_array.push_back(new std::string()); } else { @@ -387,7 +393,7 @@ std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const { } } auto res = std::make_shared<StringArrayDeviceID>(id_array); - for (auto &id : id_array) { + for (auto& id : id_array) { if (id != nullptr) { delete id; } diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index b30fc512..04fee764 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -48,12 +48,12 @@ class TabletColIterator; class Tablet { struct ValueMatrixEntry { union { - int32_t *int32_data; - int64_t *int64_data; - float *float_data; - double *double_data; - bool *bool_data; - common::String *string_data; + int32_t* int32_data; + int64_t* int64_data; + float* float_data; + double* double_data; + bool* bool_data; + common::String* string_data; }; }; @@ -62,7 +62,7 @@ class Tablet { int err_code_ = common::E_OK; public: - Tablet(const std::string &device_id, + Tablet(const std::string& device_id, std::shared_ptr<std::vector<MeasurementSchema>> schema_vec, int max_rows = DEFAULT_MAX_ROWS) : max_row_num_(max_rows), @@ -81,9 +81,9 @@ class Tablet { err_code_ = init(); } - Tablet(const std::string &device_id, - const std::vector<std::string> *measurement_list, - const std::vector<common::TSDataType> *data_type_list, + Tablet(const std::string& device_id, + const std::vector<std::string>* measurement_list, + const std::vector<common::TSDataType>* data_type_list, int max_row_num = DEFAULT_MAX_ROWS) : max_row_num_(max_row_num), insert_target_name_(device_id), @@ -105,7 +105,7 @@ class Tablet { std::transform(measurement_list->begin(), measurement_list->end(), data_type_list->begin(), std::back_inserter(measurement_vec), - [](const std::string &name, common::TSDataType type) { + [](const std::string& name, common::TSDataType type) { return MeasurementSchema(name, type); }); schema_vec_ = @@ -113,10 +113,10 @@ class Tablet { err_code_ = init(); } - Tablet(const std::string &insert_target_name, - const std::vector<std::string> &column_names, - const std::vector<common::TSDataType> &data_types, - const std::vector<common::ColumnCategory> &column_categories, + Tablet(const std::string& insert_target_name, + const std::vector<std::string>& column_names, + const std::vector<common::TSDataType>& data_types, + const std::vector<common::ColumnCategory>& column_categories, int max_rows = DEFAULT_MAX_ROWS) : max_row_num_(max_rows), cur_row_size_(0), @@ -145,8 +145,8 @@ class Tablet { * @param max_rows The maximum number of rows that this tablet can hold. * Defaults to DEFAULT_MAX_ROWS. */ - Tablet(const std::vector<std::string> &column_names, - const std::vector<common::TSDataType> &data_types, + Tablet(const std::vector<std::string>& column_names, + const std::vector<common::TSDataType>& data_types, uint32_t max_rows = DEFAULT_MAX_ROWS) : max_row_num_(max_rows), cur_row_size_(0), @@ -164,8 +164,8 @@ class Tablet { ~Tablet() { destroy(); } - const std::string &get_table_name() const { return insert_target_name_; } - void set_table_name(const std::string &table_name) { + const std::string& get_table_name() const { return insert_target_name_; } + void set_table_name(const std::string& table_name) { insert_target_name_ = table_name; } size_t get_column_count() const { return schema_vec_->size(); } @@ -181,8 +181,8 @@ class Tablet { */ int add_timestamp(uint32_t row_index, int64_t timestamp); - void *get_value(int row_index, uint32_t schema_index, - common::TSDataType &data_type) const; + void* get_value(int row_index, uint32_t schema_index, + common::TSDataType& data_type) const; /** * @brief Template function to add a value of type T to the specified row * and column. @@ -199,7 +199,7 @@ class Tablet { int add_value(uint32_t row_index, uint32_t schema_index, T val); void set_column_categories( - const std::vector<common::ColumnCategory> &column_categories); + const std::vector<common::ColumnCategory>& column_categories); std::shared_ptr<IDeviceID> get_device_id(int i) const; /** * @brief Template function to add a value of type T to the specified row @@ -214,23 +214,23 @@ class Tablet { * @return Returns 0 on success, or a non-zero error code on failure. */ template <typename T> - int add_value(uint32_t row_index, const std::string &measurement_name, + int add_value(uint32_t row_index, const std::string& measurement_name, T val); - FORCE_INLINE const std::string &get_column_name( + FORCE_INLINE const std::string& get_column_name( uint32_t column_index) const { return schema_vec_->at(column_index).measurement_name_; } - void set_column_name(uint32_t column_index, const std::string &name) { + void set_column_name(uint32_t column_index, const std::string& name) { schema_vec_->at(column_index).measurement_name_ = name; } - const std::map<std::string, int> &get_schema_map() const { + const std::map<std::string, int>& get_schema_map() const { return schema_map_; } - void set_schema_map(const std::map<std::string, int> &schema_map) { + void set_schema_map(const std::map<std::string, int>& schema_map) { schema_map_ = schema_map; } @@ -252,9 +252,9 @@ class Tablet { std::string insert_target_name_; std::shared_ptr<std::vector<MeasurementSchema>> schema_vec_; std::map<std::string, int> schema_map_; - int64_t *timestamps_; - ValueMatrixEntry *value_matrix_; - common::BitMap *bitmaps_; + int64_t* timestamps_; + ValueMatrixEntry* value_matrix_; + common::BitMap* bitmaps_; std::vector<common::ColumnCategory> column_categories_; std::vector<int> id_column_indexes_; }; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 0b4e69dd..7c22ccd5 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -236,11 +236,14 @@ TABLET_ADD_VALUE_BY_NAME_DEF(float); TABLET_ADD_VALUE_BY_NAME_DEF(double); TABLET_ADD_VALUE_BY_NAME_DEF(bool); -ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, - const char* column_name, - const char* value) { +ERRNO tablet_add_value_by_name_string_with_len(Tablet tablet, + uint32_t row_index, + const char* column_name, + const char* value, + int value_len) { return static_cast<storage::Tablet*>(tablet)->add_value( - row_index, storage::to_lower(column_name), common::String(value)); + row_index, storage::to_lower(column_name), + common::String(value, value_len)); } #define TABLE_ADD_VALUE_BY_INDEX_DEF(type) \ @@ -251,11 +254,13 @@ ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, row_index, column_index, value); \ } -ERRNO tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index, - uint32_t column_index, - const char* value) { +ERRNO tablet_add_value_by_index_string_with_len(Tablet tablet, + uint32_t row_index, + uint32_t column_index, + const char* value, + int value_len) { return static_cast<storage::Tablet*>(tablet)->add_value( - row_index, column_index, common::String(value)); + row_index, column_index, common::String(value, value_len)); } TABLE_ADD_VALUE_BY_INDEX_DEF(int32_t); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 093b413e..d9fe6bb8 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -35,6 +35,7 @@ typedef enum { TS_DATATYPE_DOUBLE = 4, TS_DATATYPE_TEXT = 5, TS_DATATYPE_VECTOR = 6, + TS_DATATYPE_TIMESTAMP = 8, TS_DATATYPE_DATE = 9, TS_DATATYPE_BLOB = 10, TS_DATATYPE_STRING = 11, @@ -337,9 +338,11 @@ TABLET_ADD_VALUE_BY_NAME(bool); * @param value [in] Null-terminated string. Ownership remains with caller. * @return ERRNO. */ -ERRNO tablet_add_value_by_name_string(Tablet tablet, uint32_t row_index, - const char* column_name, - const char* value); +ERRNO tablet_add_value_by_name_string_with_len(Tablet tablet, + uint32_t row_index, + const char* column_name, + const char* value, + int value_len); /** * @brief Adds a value to a Tablet row by column index (generic types). @@ -365,9 +368,11 @@ TABLE_ADD_VALUE_BY_INDEX(bool); * * @param value [in] Null-terminated string. Copied internally. */ -ERRNO tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index, - uint32_t column_index, - const char* value); +ERRNO tablet_add_value_by_index_string_with_len(Tablet tablet, + uint32_t row_index, + uint32_t column_index, + const char* value, + int value_len); /*--------------------------TsRecord API------------------------ */ /* diff --git a/cpp/test/cwrapper/c_release_test.cc b/cpp/test/cwrapper/c_release_test.cc index 85583d52..76fdb3ea 100644 --- a/cpp/test/cwrapper/c_release_test.cc +++ b/cpp/test/cwrapper/c_release_test.cc @@ -20,6 +20,10 @@ #include <gtest/gtest.h> #include <unistd.h> #include <utils/db_utils.h> + +#include <cstring> +#include <string> + extern "C" { #include "cwrapper/errno_define_c.h" #include "cwrapper/tsfile_cwrapper.h" @@ -175,9 +179,11 @@ TEST_F(CReleaseTest, TsFileWriterWriteDataAbnormalColumn) { Tablet tablet = tablet_new(column_list, type_list, 3, 100); for (int i = 0; i < 100; i++) { tablet_add_timestamp(tablet, i, static_cast<int64_t>(i)); - tablet_add_value_by_name_string(tablet, i, "!@#$%^*()_+-=", "device1"); - tablet_add_value_by_index_string( - tablet, i, 1, std::string("sensor" + std::to_string(i)).c_str()); + tablet_add_value_by_name_string_with_len( + tablet, i, "!@#$%^*()_+-=", "device1", strlen("device1")); + tablet_add_value_by_index_string_with_len( + tablet, i, 1, std::string("sensor" + std::to_string(i)).c_str(), + std::string("sensor" + std::to_string(i)).length()); tablet_add_value_by_name_double(tablet, i, "!@#$%^*()_+-=1", i * 100.0); } ASSERT_EQ(RET_OK, tsfile_writer_write(writer, tablet)); @@ -276,7 +282,8 @@ TEST_F(CReleaseTest, TsFileWriterMultiDataType) { for (int i = 0; i < 1000; i++) { // negative timestamp included tablet_add_timestamp(tablet, i, static_cast<int64_t>(i - 10)); - tablet_add_value_by_name_string(tablet, i, "TAG", "device1"); + tablet_add_value_by_name_string_with_len(tablet, i, "TAG", "device1", + strlen("device1")); tablet_add_value_by_name_int32_t(tablet, i, "INT32", i); tablet_add_value_by_index_int64_t(tablet, i, 2, i * 100); tablet_add_value_by_index_float(tablet, i, 3, i * 100.0); @@ -369,7 +376,8 @@ TEST_F(CReleaseTest, TsFileWriterConfTest) { Tablet tablet = tablet_new(column_list, type_list, 2, 10); for (int i = 0; i < 10; i++) { tablet_add_timestamp(tablet, i, static_cast<int64_t>(i)); - tablet_add_value_by_name_string(tablet, i, "id", "device1"); + tablet_add_value_by_name_string_with_len(tablet, i, "id", "device1", + strlen("device1")); tablet_add_value_by_name_int32_t(tablet, i, "value", i); } diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 8c1fd82a..5998939a 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -20,6 +20,8 @@ #include <unistd.h> #include <utils/db_utils.h> +#include <cstring> + #include "common/row_record.h" #include "cwrapper/tsfile_cwrapper.h" #include "reader/result_set.h" @@ -224,9 +226,9 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { for (int i = 0; i < schema.column_num; i++) { switch (schema.column_schemas[i].data_type) { case TS_DATATYPE_STRING: - tablet_add_value_by_name_string( + tablet_add_value_by_name_string_with_len( tablet, l, schema.column_schemas[i].column_name, - literal); + literal, strlen(literal)); break; case TS_DATATYPE_INT64: tablet_add_value_by_name_int64_t( diff --git a/python/tests/test_write_and_read.py b/python/tests/test_write_and_read.py index 8846348f..b327e2d3 100644 --- a/python/tests/test_write_and_read.py +++ b/python/tests/test_write_and_read.py @@ -16,6 +16,8 @@ # under the License. # +from datetime import date + import numpy as np import pandas as pd import pytest @@ -44,6 +46,7 @@ def test_row_record_write_and_read(): writer.register_timeseries("root.device1", TimeseriesSchema("level5", TSDataType.TEXT)) writer.register_timeseries("root.device1", TimeseriesSchema("level6", TSDataType.BLOB)) writer.register_timeseries("root.device1", TimeseriesSchema("level7", TSDataType.DATE)) + writer.register_timeseries("root.device1", TimeseriesSchema("level8", TSDataType.TIMESTAMP)) max_row_num = 10 @@ -55,7 +58,8 @@ def test_row_record_write_and_read(): Field("level4", f"string_value_{i}", TSDataType.STRING), Field("level5", f"text_value_{i}", TSDataType.TEXT), Field("level6", f"blob_data_{i}".encode('utf-8'), TSDataType.BLOB), - Field("level7", i, TSDataType.DATE)]) + Field("level7", date(2025, 1, i % 20 + 1), TSDataType.DATE), + Field("level8", i, TSDataType.TIMESTAMP)]) writer.write_row_record(row) writer.close() @@ -63,10 +67,11 @@ def test_row_record_write_and_read(): reader = TsFileReader("record_write_and_read.tsfile") result = reader.query_timeseries( "root.device1", - ["level1", "level2", "level3", "level4", "level5", "level6", "level7"], + ["level1", "level2", "level3", "level4", "level5", "level6", "level7", "level8"], 0, 100, ) + assert len(reader.get_active_query_result()) == 1 for row_num in range(max_row_num): @@ -78,7 +83,8 @@ def test_row_record_write_and_read(): assert result.get_value_by_index(5) == f"string_value_{row_num}" assert result.get_value_by_index(6) == f"text_value_{row_num}" assert result.get_value_by_index(7) == f"blob_data_{row_num}" - assert result.get_value_by_index(8) == row_num + assert result.get_value_by_index(8) == date(2025, 1, row_num % 20 + 1) + assert result.get_value_by_index(9) == row_num assert not result.next() assert len(reader.get_active_query_result()) == 1 @@ -93,6 +99,7 @@ def test_row_record_write_and_read(): if os.path.exists("record_write_and_read.tsfile"): os.remove("record_write_and_read.tsfile") + def test_tree_query_to_dataframe_variants(): file_path = "tree_query_to_dataframe.tsfile" device_ids = [ @@ -157,7 +164,6 @@ def test_tree_query_to_dataframe_variants(): writer.close() df_all = to_dataframe(file_path, start_time=0, end_time=rows_per_device) - print(df_all) total_rows = len(device_ids) * rows_per_device assert df_all.shape[0] == total_rows for measurement in all_measurements: @@ -221,7 +227,6 @@ def test_tree_query_to_dataframe_variants(): assert isinstance(batch, pd.DataFrame) assert set(batch.columns).issuperset({"time", "level"}) iter_rows += len(batch) - print(batch) assert iter_rows == 18 iterator = to_dataframe( @@ -237,7 +242,6 @@ def test_tree_query_to_dataframe_variants(): assert isinstance(batch, pd.DataFrame) assert set(batch.columns).issuperset({"time", "level"}) iter_rows += len(batch) - print(batch) assert iter_rows == 9 with pytest.raises(ColumnNotExistError): @@ -246,6 +250,7 @@ def test_tree_query_to_dataframe_variants(): if os.path.exists(file_path): os.remove(file_path) + def test_get_all_timeseries_schemas(): file_path = "get_all_timeseries_schema.tsfile" device_ids = [ @@ -309,6 +314,7 @@ def test_get_all_timeseries_schemas(): if os.path.exists(file_path): os.remove(file_path) + def test_tablet_write_and_read(): try: if os.path.exists("tablet_write_and_read.tsfile"): @@ -354,6 +360,7 @@ def test_tablet_write_and_read(): if os.path.exists("tablet_write_and_read.tsfile"): os.remove("tablet_write_and_read.tsfile") + def test_table_writer_and_reader(): table = TableSchema("test_table", [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), @@ -409,6 +416,7 @@ def test_table_writer_and_reader(): if os.path.exists("table_write.tsfile"): os.remove("table_write.tsfile") + def test_query_result_detach_from_reader(): try: ## Prepare data @@ -439,6 +447,7 @@ def test_query_result_detach_from_reader(): if os.path.exists("query_result_detach_from_reader.tsfile"): os.remove("query_result_detach_from_reader.tsfile") + def test_lower_case_name(): if os.path.exists("lower_case_name.tsfile"): os.remove("lower_case_name.tsfile") @@ -462,6 +471,7 @@ def test_lower_case_name(): assert data_frame.shape == (100, 3) assert data_frame["value"].sum() == 5445.0 + def test_tsfile_config(): from tsfile import get_tsfile_config, set_tsfile_config @@ -514,6 +524,7 @@ def test_tsfile_config(): with pytest.raises(NotSupportedError): set_tsfile_config({"time_compress_type_": Compressor.PAA}) + def test_tsfile_to_df(): table = TableSchema("test_table", [ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), @@ -548,6 +559,432 @@ def test_tsfile_to_df(): os.remove("table_write_to_df.tsfile") +def test_tree_all_datatype_query_to_dataframe_variants(): + tsfile_path = "record_write_and_read.tsfile" + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + writer = TsFileWriter(tsfile_path) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL1", TSDataType.INT64) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL2", TSDataType.DOUBLE) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL3", TSDataType.INT32) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL4", TSDataType.STRING) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL5", TSDataType.TEXT) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL6", TSDataType.BLOB) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL7", TSDataType.DATE) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL8", TSDataType.TIMESTAMP) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL9", TSDataType.BOOLEAN) + ) + writer.register_timeseries( + "root.Device1", TimeseriesSchema("LeveL10", TSDataType.FLOAT) + ) + + max_row_num = 100 + + for i in range(max_row_num): + row = RowRecord( + "root.Device1", + i - int(max_row_num / 2), + [ + Field("LeveL1", i * 1, TSDataType.INT64), + Field("LeveL2", i * 2.2, TSDataType.DOUBLE), + Field("LeveL3", i * 3, TSDataType.INT32), + Field("LeveL4", f"string_value_{i}", TSDataType.STRING), + Field("LeveL5", f"text_value_{i}", TSDataType.TEXT), + Field("LeveL6", f"blob_data_{i}".encode('utf-8'), TSDataType.BLOB), + Field("LeveL7", date(2025, 1, i % 20 + 1), TSDataType.DATE), + Field("LeveL8", i * 8, TSDataType.TIMESTAMP), + Field("LeveL9", i % 2 == 0, TSDataType.BOOLEAN), + Field("LeveL10", i * 10.1, TSDataType.FLOAT), + ], + ) + writer.write_row_record(row) + + writer.close() + + df1_1 = to_dataframe(tsfile_path) + assert df1_1.shape[0] == max_row_num + for i in range(max_row_num): + assert df1_1.iloc[i, 0] == i - int(max_row_num / 2) + assert df1_1.iloc[i, 1] == "root" + assert df1_1.iloc[i, 2] == "Device1" + + df2_1 = to_dataframe(tsfile_path, column_names=["LeveL1"]) + for i in range(max_row_num): + assert df2_1.iloc[i, 3] == np.int64(i * 1) + df2_2 = to_dataframe(tsfile_path, column_names=["LeveL2"]) + for i in range(max_row_num): + assert df2_2.iloc[i, 3] == np.float64(i * 2.2) + df2_3 = to_dataframe(tsfile_path, column_names=["LeveL3"]) + for i in range(max_row_num): + assert df2_3.iloc[i, 3] == np.int32(i * 3) + df2_4 = to_dataframe(tsfile_path, column_names=["LeveL4"]) + for i in range(max_row_num): + assert df2_4.iloc[i, 3] == f"string_value_{i}" + df2_5 = to_dataframe(tsfile_path, column_names=["LeveL5"]) + for i in range(max_row_num): + assert df2_5.iloc[i, 3] == f"text_value_{i}" + df2_6 = to_dataframe(tsfile_path, column_names=["LeveL6"]) + for i in range(max_row_num): + assert df2_6.iloc[i, 3] == f"blob_data_{i}".encode('utf-8') + df2_7 = to_dataframe(tsfile_path, column_names=["LeveL7"]) + for i in range(max_row_num): + assert df2_7.iloc[i, 3] == date(2025, 1, i % 20 + 1) + df2_8 = to_dataframe(tsfile_path, column_names=["LeveL8"]) + for i in range(max_row_num): + assert df2_8.iloc[i, 3] == np.int64(i * 8) + df2_9 = to_dataframe(tsfile_path, column_names=["LeveL9"]) + for i in range(max_row_num): + assert df2_9.iloc[i, 3] == (i % 2 == 0) + df2_10 = to_dataframe(tsfile_path, column_names=["LeveL10"]) + for i in range(max_row_num): + assert df2_10.iloc[i, 3] == np.float32(i * 10.1) + df2_11 = to_dataframe(tsfile_path, column_names=["LeveL9"]) + for i in range(max_row_num): + assert df2_11.iloc[i, 3] == (i % 2 == 0) + df2_12 = to_dataframe( + tsfile_path, + column_names=[ + "LeveL1", + "LeveL2", + "LeveL3", + "LeveL4", + "LeveL5", + "LeveL6", + "LeveL7", + "LeveL8", + "LeveL9", + "LeveL10", + ], + ) + for i in range(max_row_num): + assert df2_12.iloc[i, 3] == np.int64(i * 1) + assert df2_12.iloc[i, 4] == np.float64(i * 2.2) + assert df2_12.iloc[i, 5] == np.int32(i * 3) + assert df2_12.iloc[i, 6] == f"string_value_{i}" + assert df2_12.iloc[i, 7] == f"text_value_{i}" + assert df2_12.iloc[i, 8] == f"blob_data_{i}".encode('utf-8') + assert df2_12.iloc[i, 9] == date(2025, 1, i % 20 + 1) + assert df2_12.iloc[i, 10] == np.int64(i * 8) + assert df2_12.iloc[i, 11] == (i % 2 == 0) + assert df2_12.iloc[i, 12] == np.float32(i * 10.1) + + df3_1 = to_dataframe(tsfile_path, start_time=10) + assert df3_1.shape[0] == 40 + df3_2 = to_dataframe(tsfile_path, start_time=-10) + assert df3_2.shape[0] == 60 + df3_3 = to_dataframe(tsfile_path, end_time=5) + assert df3_3.shape[0] == 56 + df3_4 = to_dataframe(tsfile_path, end_time=-5) + assert df3_4.shape[0] == 46 + df3_5 = to_dataframe(tsfile_path, start_time=5, end_time=5) + assert df3_5.shape[0] == 1 + df3_6 = to_dataframe(tsfile_path, start_time=-5, end_time=-5) + assert df3_6.shape[0] == 1 + df3_7 = to_dataframe(tsfile_path, start_time=10, end_time=-10) + assert df3_7.shape[0] == 0 + df3_8 = to_dataframe(tsfile_path, start_time=-10, end_time=10) + assert df3_8.shape[0] == 21 + df3_8 = to_dataframe(tsfile_path, start_time=-50, end_time=50) + assert df3_8.shape[0] == max_row_num + + df4_1 = to_dataframe(tsfile_path, max_row_num=1) + assert df4_1.shape[0] == 1 + df4_2 = to_dataframe(tsfile_path, max_row_num=10) + assert df4_2.shape[0] == 10 + df4_3 = to_dataframe(tsfile_path, max_row_num=100) + assert df4_3.shape[0] == 100 + df4_4 = to_dataframe(tsfile_path, max_row_num=1000) + assert df4_4.shape[0] == 100 + df4_5 = to_dataframe(tsfile_path, max_row_num=0) + assert df4_5.shape[0] == 0 + df4_6 = to_dataframe(tsfile_path, max_row_num=-10) + assert df4_6.shape[0] == 0 + + for df5_1 in to_dataframe(tsfile_path, max_row_num=10, as_iterator=True): + assert df5_1.shape[0] == 10 + for df5_2 in to_dataframe(tsfile_path, max_row_num=-10, as_iterator=True): + assert df5_2.shape[0] == 1 + for df5_3 in to_dataframe(tsfile_path, max_row_num=1000, as_iterator=True): + assert df5_3.shape[0] == 100 + for df5_4 in to_dataframe(tsfile_path, max_row_num=3, as_iterator=True): + if df5_4.iloc[0, 0] <= 48: + assert df5_4.shape[0] == 3 + else: + assert df5_4.shape[0] == 1 + + row_num = 0 + for df6_1 in to_dataframe( + tsfile_path, + column_names=["LeveL1", "LeveL2"], + start_time=-50, + end_time=10, + max_row_num=1, + as_iterator=True, + ): + assert df6_1.shape[0] == 1 + assert df6_1.iloc[0, 0] == -50 + row_num + assert df6_1.iloc[0, 3] == row_num + assert df6_1.iloc[0, 4] == row_num * 2.2 + row_num += 1 + + df7_1 = to_dataframe(tsfile_path, table_name="test") + assert df7_1.shape[0] == max_row_num + assert df7_1.iloc[0, 0] == -int(max_row_num / 2) + + try: + to_dataframe(tsfile_path, column_names=["non_existent_column"]) + except ColumnNotExistError: + pass + + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + +def test_table_all_datatype_query_to_dataframe_variants(): + tsfile_path = "test_table.tsfile" + table = TableSchema( + "test_table", + [ + ColumnSchema("Device1", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("Device2", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("Value1", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ColumnSchema("Value2", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("Value3", TSDataType.INT64, ColumnCategory.FIELD), + ColumnSchema("Value4", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("Value5", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("Value6", TSDataType.TEXT, ColumnCategory.FIELD), + ColumnSchema("Value7", TSDataType.STRING, ColumnCategory.FIELD), + ColumnSchema("Value8", TSDataType.BLOB, ColumnCategory.FIELD), + ColumnSchema("Value9", TSDataType.TIMESTAMP, ColumnCategory.FIELD), + ColumnSchema("Value10", TSDataType.DATE, ColumnCategory.FIELD), + ], + ) + dateSet = set() + try: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + max_row_num = 100 + with TsFileTableWriter(tsfile_path, table) as writer: + tablet = Tablet( + [ + "Device1", + "Device2", + "Value1", + "Value2", + "Value3", + "Value4", + "Value5", + "Value6", + "Value7", + "Value8", + "Value9", + "Value10", + ], + [ + TSDataType.STRING, + TSDataType.STRING, + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.STRING, + TSDataType.BLOB, + TSDataType.TIMESTAMP, + TSDataType.DATE, + ], + max_row_num, + ) + for i in range(max_row_num): + tablet.add_timestamp(i, i) + tablet.add_value_by_name("Device1", i, "d1_" + str(i)) + tablet.add_value_by_name("Device2", i, "d2_" + str(i)) + tablet.add_value_by_name("Value1", i, i % 2 == 0) + tablet.add_value_by_name("Value2", i, i * 3) + tablet.add_value_by_name("Value3", i, i * 4) + tablet.add_value_by_name("Value4", i, i * 5.5) + tablet.add_value_by_name("Value5", i, i * 6.6) + tablet.add_value_by_name("Value6", i, f"string_value_{i}") + tablet.add_value_by_name("Value7", i, f"text_value_{i}") + tablet.add_value_by_name("Value8", i, f"blob_data_{i}".encode('utf-8')) + tablet.add_value_by_name("Value9", i, i * 9) + tablet.add_value_by_name("Value10", i, date(2025, 1, i % 20 + 1)) + dateSet.add(date(2025, 1, i % 20 + 1)) + writer.write_table(tablet) + + df1_1 = to_dataframe(tsfile_path) + assert df1_1.shape[0] == max_row_num + for i in range(max_row_num): + assert df1_1.iloc[i, 1] == "d1_" + str(df1_1.iloc[i, 0]) + assert df1_1.iloc[i, 2] == "d2_" + str(df1_1.iloc[i, 0]) + + df2_1 = to_dataframe(tsfile_path, column_names=["Value1"]) + for i in range(max_row_num): + assert df2_1.iloc[i, 1] == np.bool_(df2_1.iloc[i, 0] % 2 == 0) + df2_2 = to_dataframe(tsfile_path, column_names=["Value2"]) + for i in range(max_row_num): + assert df2_2.iloc[i, 1] == np.int32(df2_2.iloc[i, 0] * 3) + df2_3 = to_dataframe(tsfile_path, column_names=["Value3"]) + for i in range(max_row_num): + assert df2_3.iloc[i, 1] == np.int64(df2_3.iloc[i, 0] * 4) + df2_4 = to_dataframe(tsfile_path, column_names=["Value4"]) + for i in range(max_row_num): + assert df2_4.iloc[i, 1] == np.float32(df2_4.iloc[i, 0] * 5.5) + df2_5 = to_dataframe(tsfile_path, column_names=["Value5"]) + for i in range(max_row_num): + assert df2_5.iloc[i, 1] == np.float64(df2_5.iloc[i, 0] * 6.6) + df2_6 = to_dataframe(tsfile_path, column_names=["Value6"]) + for i in range(max_row_num): + assert df2_6.iloc[i, 1] == f"string_value_{df2_6.iloc[i, 0]}" + df2_7 = to_dataframe(tsfile_path, column_names=["Value7"]) + for i in range(max_row_num): + assert df2_7.iloc[i, 1] == f"text_value_{df2_7.iloc[i, 0]}" + df2_8 = to_dataframe(tsfile_path, column_names=["Value8"]) + for i in range(max_row_num): + assert df2_8.iloc[i, 1] == f"blob_data_{df2_8.iloc[i, 0]}".encode('utf-8') + df2_9 = to_dataframe(tsfile_path, column_names=["Value9"]) + for i in range(max_row_num): + assert df2_9.iloc[i, 1] == np.int64(df2_9.iloc[i, 0] * 9) + df2_10 = to_dataframe(tsfile_path, column_names=["Value10"]) + for i in range(max_row_num): + assert df2_10.iloc[i, 1] in dateSet + df2_11 = to_dataframe(tsfile_path, column_names=["Device1", "Value1"]) + for i in range(max_row_num): + assert df2_11.iloc[i, 1] == "d1_" + str(df2_11.iloc[i, 0]) + assert df2_11.iloc[i, 2] == np.bool_(df2_11.iloc[i, 0] % 2 == 0) + df2_12 = to_dataframe( + tsfile_path, + column_names=[ + "Device1", + "Device2", + "Value1", + "Value2", + "Value3", + "Value4", + "Value5", + "Value6", + "Value7", + "Value8", + "Value9", + "Value10", + ], + ) + for i in range(max_row_num): + assert df2_12.iloc[i, 1] == "d1_" + str(df2_12.iloc[i, 0]) + assert df2_12.iloc[i, 2] == "d2_" + str(df2_12.iloc[i, 0]) + assert df2_12.iloc[i, 3] == np.bool_(df2_12.iloc[i, 0] % 2 == 0) + assert df2_12.iloc[i, 4] == np.int32(df2_12.iloc[i, 0] * 3) + assert df2_12.iloc[i, 5] == np.int64(df2_12.iloc[i, 0] * 4) + assert df2_12.iloc[i, 6] == np.float32(df2_12.iloc[i, 0] * 5.5) + assert df2_12.iloc[i, 7] == np.float64(df2_12.iloc[i, 0] * 6.6) + assert df2_12.iloc[i, 8] == f"string_value_{df2_12.iloc[i, 0]}" + assert df2_12.iloc[i, 9] == f"text_value_{df2_12.iloc[i, 0]}" + assert df2_12.iloc[i, 10] == f"blob_data_{df2_12.iloc[i, 0]}".encode( + "utf-8" + ) + assert df2_12.iloc[i, 11] == np.int64(df2_12.iloc[i, 0] * 9) + assert df2_12.iloc[i, 12] == date(2025, 1, df2_12.iloc[i, 0] % 20 + 1) + df2_13 = to_dataframe( + tsfile_path, column_names=["Device1", "Device2", "Value1"] + ) + for i in range(max_row_num): + assert df2_13.iloc[i, 1] == "d1_" + str(df2_13.iloc[i, 0]) + assert df2_13.iloc[i, 2] == "d2_" + str(df2_13.iloc[i, 0]) + assert df2_13.iloc[i, 3] == np.bool_(df2_13.iloc[i, 0] % 2 == 0) + + df3_1 = to_dataframe(tsfile_path, table_name="test_table") + assert df3_1.shape[0] == max_row_num + assert df3_1.iloc[0, 0] == 0 + df3_2 = to_dataframe(tsfile_path, table_name="TEST_TABLE") + assert df3_2.shape[0] == max_row_num + assert df3_2.iloc[0, 0] == 0 + + df4_1 = to_dataframe(tsfile_path, start_time=10) + assert df4_1.shape[0] == 90 + df4_2 = to_dataframe(tsfile_path, start_time=-10) + assert df4_2.shape[0] == max_row_num + df4_3 = to_dataframe(tsfile_path, end_time=5) + assert df4_3.shape[0] == 6 + df4_4 = to_dataframe(tsfile_path, end_time=-5) + assert df4_4.shape[0] == 0 + df4_5 = to_dataframe(tsfile_path, start_time=5, end_time=5) + assert df4_5.shape[0] == 1 + df4_6 = to_dataframe(tsfile_path, start_time=-5, end_time=-5) + assert df4_6.shape[0] == 0 + df4_7 = to_dataframe(tsfile_path, start_time=10, end_time=-10) + assert df4_7.shape[0] == 0 + df4_8 = to_dataframe(tsfile_path, start_time=-10, end_time=10) + assert df4_8.shape[0] == 11 + df4_8 = to_dataframe(tsfile_path, start_time=-50, end_time=50) + assert df4_8.shape[0] == 51 + + df5_1 = to_dataframe(tsfile_path, max_row_num=1) + assert df5_1.shape[0] == 1 + df5_2 = to_dataframe(tsfile_path, max_row_num=50) + assert df5_2.shape[0] == 50 + df5_3 = to_dataframe(tsfile_path, max_row_num=100) + assert df5_3.shape[0] == 100 + df5_4 = to_dataframe(tsfile_path, max_row_num=1000) + assert df5_4.shape[0] == 100 + df5_5 = to_dataframe(tsfile_path, max_row_num=0) + assert df5_5.shape[0] == 0 + df5_6 = to_dataframe(tsfile_path, max_row_num=-10) + assert df5_6.shape[0] == 0 + + for df6_1 in to_dataframe(tsfile_path, max_row_num=20, as_iterator=True): + assert df6_1.shape[0] == 20 + for df6_2 in to_dataframe(tsfile_path, max_row_num=1000, as_iterator=True): + assert df6_2.shape[0] == 100 + + for df7_1 in to_dataframe( + tsfile_path, + table_name="test_table", + column_names=["Device1", "Value1"], + start_time=21, + end_time=50, + max_row_num=10, + as_iterator=True, + ): + assert df7_1.shape[0] == 10 + for i in range(30): + assert df2_11.iloc[i, 1] == "d1_" + str(df2_11.iloc[i, 0]) + assert df2_11.iloc[i, 2] == np.bool_(df2_11.iloc[i, 0] % 2 == 0) + + try: + to_dataframe(tsfile_path, table_name="non_existent_table") + except TableNotExistError as e: + assert e.args[0] == "[non_existent_table] Requested table does not exist" + + try: + to_dataframe(tsfile_path, column_names=["non_existent_column"]) + except ColumnNotExistError as e: + assert e.args[0] == "[non_existent_column] Column does not exist" + + finally: + if os.path.exists(tsfile_path): + os.remove(tsfile_path) + + import os if __name__ == "__main__": diff --git a/python/tsfile/constants.py b/python/tsfile/constants.py index 97bc3613..2f1ad982 100644 --- a/python/tsfile/constants.py +++ b/python/tsfile/constants.py @@ -15,7 +15,6 @@ # specific language governing permissions and limitations # under the License. # - from enum import unique, IntEnum @@ -35,7 +34,7 @@ class TSDataType(IntEnum): def to_py_type(self): if self == TSDataType.BOOLEAN: return bool - elif self == TSDataType.INT32 or self == TSDataType.DATE: + elif self == TSDataType.INT32: return int elif self == TSDataType.INT64: return int @@ -45,6 +44,12 @@ class TSDataType(IntEnum): return float elif self == TSDataType.TEXT or self == TSDataType.STRING: return str + elif self == TSDataType.BLOB: + return bytes + elif self == TSDataType.DATE: + return object + elif self == TSDataType.TIMESTAMP: + return int def to_pandas_dtype(self): """ @@ -63,11 +68,11 @@ class TSDataType(IntEnum): elif self == TSDataType.TEXT or self == TSDataType.STRING: return "object" elif self == TSDataType.TIMESTAMP: - return "datetime64[ns]" + return "int64" elif self == TSDataType.DATE: - return "int32" - elif self == TSDataType.BLOB: return "object" + elif self == TSDataType.BLOB: + return "bytes" else: raise ValueError(f"Unknown data type: {self}") diff --git a/python/tsfile/field.py b/python/tsfile/field.py index 26d02c5f..4f3f0798 100644 --- a/python/tsfile/field.py +++ b/python/tsfile/field.py @@ -20,7 +20,8 @@ from datetime import datetime import numpy as np from tsfile.constants import TSDataType -from tsfile.date_utils import parse_int_to_date +from tsfile.date_utils import parse_date_to_int, parse_int_to_date + class NoneDataTypeException(Exception): pass @@ -74,14 +75,17 @@ class Field(object): raise NoneDataTypeException("None Data Type Exception!") if ( - self.data_type != TSDataType.INT32 - and self.data_type != TSDataType.DATE - and self.data_type != TSDataType.INT64 - and self.data_type != TSDataType.FLOAT - and self.data_type != TSDataType.DOUBLE + self.data_type != TSDataType.INT32 + and self.data_type != TSDataType.DATE + and self.data_type != TSDataType.INT64 + and self.data_type != TSDataType.FLOAT + and self.data_type != TSDataType.DOUBLE ): raise TypeError(f"Expected INT32/64 or DOUBLE/FLOAT data type, got {self.data_type}.") min_int32, max_int32 = np.iinfo(np.int32).min, np.iinfo(np.int32).max + if self.data_type == TSDataType.DATE: + return parse_date_to_int(self.value) + if not (min_int32 <= self.value <= max_int32): raise OverflowError( f"Value {self.value} exceeds int range [{min_int32}, {max_int32}]" @@ -95,10 +99,10 @@ class Field(object): raise NoneDataTypeException("None Data Type Exception!") if ( - self.data_type != TSDataType.INT32 - and self.data_type != TSDataType.INT64 - and self.data_type != TSDataType.FLOAT - and self.data_type != TSDataType.DOUBLE + self.data_type != TSDataType.INT32 + and self.data_type != TSDataType.INT64 + and self.data_type != TSDataType.FLOAT + and self.data_type != TSDataType.DOUBLE ): raise TypeError(f"Expected INT32/64 or DOUBLE/FLOAT data type, got {self.data_type}.") @@ -110,8 +114,8 @@ class Field(object): if self.data_type is None: raise NoneDataTypeException("None Data Type Exception!") if ( - self.data_type != TSDataType.TIMESTAMP - and self.data_type != TSDataType.INT64 + self.data_type != TSDataType.TIMESTAMP + and self.data_type != TSDataType.INT64 ): raise TypeError(f"Expected INT64/TIMESTAMP data type, got {self.data_type}.") return np.int64(self.value) @@ -122,10 +126,10 @@ class Field(object): if self.data_type is None: raise NoneDataTypeException("None Data Type Exception!") if ( - self.data_type != TSDataType.INT32 - and self.data_type != TSDataType.INT64 - and self.data_type != TSDataType.FLOAT - and self.data_type != TSDataType.DOUBLE + self.data_type != TSDataType.INT32 + and self.data_type != TSDataType.INT64 + and self.data_type != TSDataType.FLOAT + and self.data_type != TSDataType.DOUBLE ): raise TypeError(f"Expected INT32/64 or DOUBLE/FLOAT data type, got {self.data_type}.") min_float32, max_float32 = np.finfo(np.float32).min, np.finfo(np.float32).max @@ -142,10 +146,10 @@ class Field(object): if self.data_type is None: raise NoneDataTypeException("None Data Type Exception!") if ( - self.data_type != TSDataType.INT32 - and self.data_type != TSDataType.INT64 - and self.data_type != TSDataType.FLOAT - and self.data_type != TSDataType.DOUBLE + self.data_type != TSDataType.INT32 + and self.data_type != TSDataType.INT64 + and self.data_type != TSDataType.FLOAT + and self.data_type != TSDataType.DOUBLE ): raise TypeError(f"Expected INT32/64 or DOUBLE/FLOAT data type, got {self.data_type}.") return np.float64(self.value) @@ -167,7 +171,6 @@ class Field(object): else: raise TypeError(f"Value is not a int or datetime: {type(self.value)}") - def get_string_value(self): if self.value is None: return None @@ -222,4 +225,3 @@ class Field(object): return self.value else: raise RuntimeError("Unsupported data type:" + str(data_type)) - diff --git a/python/tsfile/tablet.py b/python/tsfile/tablet.py index 52e7389c..71ecb063 100644 --- a/python/tsfile/tablet.py +++ b/python/tsfile/tablet.py @@ -16,14 +16,12 @@ # under the License. # import math -import struct -from enum import unique, IntEnum +from datetime import date from typing import List, Union import numpy as np -from .date_utils import parse_date_to_int -from .constants import TSDataType, ColumnCategory +from .constants import TSDataType class Tablet(object): @@ -42,7 +40,7 @@ class Tablet(object): def __init__(self, column_name_list: list[str], type_list: list[TSDataType], max_row_num: int = 1024): self.timestamp_list = [None for _ in range(max_row_num)] - self.data_list: List[List[Union[int, float, bool, str, bytes, None]]] = [ + self.data_list: List[List[Union[int, float, bool, str, bytes, date, None]]] = [ [None for _ in range(max_row_num)] for _ in range(len(column_name_list)) ] self.target_name = None diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index a2f621d4..40bff4eb 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -42,6 +42,7 @@ cdef extern from "./tsfile_cwrapper.h": TS_DATATYPE_DOUBLE = 4 TS_DATATYPE_TEXT = 5 TS_DATATYPE_VECTOR = 6 + TS_DATATYPE_TIMESTAMP = 8 TS_DATATYPE_DATE = 9 TS_DATATYPE_BLOB = 10 TS_DATATYPE_STRING = 11 @@ -146,8 +147,10 @@ cdef extern from "./tsfile_cwrapper.h": ErrorCode tablet_add_value_by_index_double(Tablet tablet, uint32_t row_index, uint32_t column_index, double value); ErrorCode tablet_add_value_by_index_float(Tablet tablet, uint32_t row_index, uint32_t column_index, float value); ErrorCode tablet_add_value_by_index_bool(Tablet tablet, uint32_t row_index, uint32_t column_index, bint value); - ErrorCode tablet_add_value_by_index_string(Tablet tablet, uint32_t row_index, - uint32_t column_index, const char * value); + ErrorCode tablet_add_value_by_index_string_with_len(Tablet tablet, + uint32_t row_index, + uint32_t column_index, + const char* value, int value_len) void free_tablet(Tablet * tablet); diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 10441041..d9924d7a 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -16,7 +16,7 @@ # under the License. # #cython: language_level=3 - +from .date_utils import parse_date_to_int from .tsfile_cpp cimport * from libc.stdlib cimport free @@ -100,7 +100,8 @@ cdef dict TS_DATA_TYPE_MAP = { TSDataTypePy.DATE: TSDataType.TS_DATATYPE_DATE, TSDataTypePy.TEXT: TSDataType.TS_DATATYPE_TEXT, TSDataTypePy.STRING: TSDataType.TS_DATATYPE_STRING, - TSDataTypePy.BLOB: TSDataType.TS_DATATYPE_BLOB + TSDataTypePy.BLOB: TSDataType.TS_DATATYPE_BLOB, + TSDataTypePy.TIMESTAMP: TSDataType.TS_DATATYPE_TIMESTAMP } cdef dict TS_ENCODING_MAP = { @@ -218,7 +219,9 @@ cdef Tablet to_c_tablet(object tablet): cdef char** columns_names cdef TSDataType * column_types cdef bytes row_bytes - cdef const char *row_str + cdef char *raw_str + cdef const char* str_ptr + cdef Py_ssize_t raw_len if tablet.get_target_name() is not None: device_id_bytes = PyUnicode_AsUTF8String(tablet.get_target_name()) @@ -264,7 +267,7 @@ cdef Tablet to_c_tablet(object tablet): tablet_add_value_by_index_int32_t(ctablet, row, col, value[row]) # INT64 - elif data_type == TS_DATATYPE_INT64: + elif data_type == TS_DATATYPE_INT64 or data_type == TS_DATATYPE_TIMESTAMP: for row in range(max_row_num): if value[row] is not None: tablet_add_value_by_index_int64_t(ctablet, row, col, value[row]) @@ -280,14 +283,24 @@ cdef Tablet to_c_tablet(object tablet): if value[row] is not None: tablet_add_value_by_index_double(ctablet, row, col, value[row]) - # STRING or TEXT or BLOB - elif data_type == TS_DATATYPE_STRING or data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_BLOB: + elif data_type == TS_DATATYPE_DATE: + for row in range(max_row_num): + if value[row] is not None: + tablet_add_value_by_index_int32_t(ctablet, row, col, parse_date_to_int(value[row])) + + # STRING or TEXT + elif data_type == TS_DATATYPE_STRING or data_type == TS_DATATYPE_TEXT: for row in range(max_row_num): if value[row] is not None: py_value = value[row] - row_bytes = PyUnicode_AsUTF8String(py_value) - row_str = PyBytes_AsString(row_bytes) - tablet_add_value_by_index_string(ctablet, row, col, row_str) + str_ptr = PyUnicode_AsUTF8AndSize(py_value, &raw_len) + tablet_add_value_by_index_string_with_len(ctablet, row, col, str_ptr, raw_len) + + elif data_type == TS_DATATYPE_BLOB: + for row in range(max_row_num): + if value[row] is not None: + PyBytes_AsStringAndSize(value[row], &raw_str, &raw_len) + tablet_add_value_by_index_string_with_len(ctablet, row, col, raw_str, raw_len) return ctablet @@ -313,6 +326,9 @@ cdef TsRecord to_c_record(object row_record): elif data_type == TS_DATATYPE_INT64: _insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_long_value()) + elif data_type == TS_DATATYPE_TIMESTAMP: + _insert_data_into_ts_record_by_name_int64_t(record, PyUnicode_AsUTF8(field.get_field_name()), + field.get_timestamp_value()) elif data_type == TS_DATATYPE_DOUBLE: _insert_data_into_ts_record_by_name_double(record, PyUnicode_AsUTF8(field.get_field_name()), field.get_double_value()) @@ -321,7 +337,7 @@ cdef TsRecord to_c_record(object row_record): elif data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: str_ptr = PyUnicode_AsUTF8AndSize(field.get_string_value(), &str_len) _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), str_ptr, str_len) - elif data_type == TS_DATATYPE_BLOB or data_type == TS_DATATYPE_TEXT or data_type == TS_DATATYPE_STRING: + elif data_type == TS_DATATYPE_BLOB: if PyBytes_AsStringAndSize(field.get_string_value(), &blob_ptr, &str_len) < 0: raise ValueError("blob not legal") _insert_data_into_ts_record_by_name_string_with_len(record, PyUnicode_AsUTF8(field.get_field_name()), diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 3b961639..041fe76d 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -23,9 +23,9 @@ from typing import List import pandas as pd from libc.stdint cimport INT64_MIN, INT64_MAX -from libc.stdlib cimport free from tsfile.schema import TSDataType as TSDataTypePy +from .date_utils import parse_int_to_date from .tsfile_cpp cimport * from .tsfile_py_cpp cimport * @@ -139,15 +139,6 @@ cdef class ResultSetPy: df = pd.DataFrame(data_container) data_type_dict = {col: dtype for col, dtype in zip(column_names, data_type)} df = df.astype(data_type_dict) - for col in date_columns: - try: - df[col] = pd.to_datetime( - df[col].astype(str), - format='%Y%m%d', - errors='coerce' - ).dt.normalize() - except KeyError: - raise ValueError(f"DATE column '{col}' not found in DataFrame") return df def get_value_by_index(self, index : int): @@ -159,13 +150,14 @@ cdef class ResultSetPy: self.check_result_set_invalid() # Well when we check is null, id from 0, so there index -1. if tsfile_result_set_is_null_by_index(self.result, index): - print("get value by index and check is null") return None # data type in metadata is an array, id from 0. data_type = self.metadata.get_data_type(index) - if data_type == TSDataTypePy.INT32 or data_type == TSDataTypePy.DATE: + if data_type == TSDataTypePy.INT32: return tsfile_result_set_get_value_by_index_int32_t(self.result, index) - elif data_type == TSDataTypePy.INT64: + elif data_type == TSDataTypePy.DATE: + return parse_int_to_date(tsfile_result_set_get_value_by_index_int64_t(self.result, index)) + elif data_type == TSDataTypePy.INT64 or data_type == TSDataTypePy.TIMESTAMP: return tsfile_result_set_get_value_by_index_int64_t(self.result, index) elif data_type == TSDataTypePy.FLOAT: return tsfile_result_set_get_value_by_index_float(self.result, index) @@ -292,7 +284,7 @@ cdef class TsFileReaderPy: pyresult.init_c(result, table_name) self.activate_result_set_list.add(pyresult) return pyresult - + def query_table_on_tree(self, column_names : List[str], start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy: """ @@ -300,9 +292,8 @@ cdef class TsFileReaderPy: :return: query result handler. """ cdef ResultSet result; - result = tsfile_reader_query_table_on_tree_c(self.reader, - [column_name.lower() for column_name in column_names], start_time, - end_time) + ## No need to convert column names to lowercase, as measurement names in the tree model are case-sensitive. + result = tsfile_reader_query_table_on_tree_c(self.reader, column_names, start_time, end_time) pyresult = ResultSetPy(self, True) pyresult.init_c(result, "root") self.activate_result_set_list.add(pyresult) diff --git a/python/tsfile/utils.py b/python/tsfile/utils.py index 1d6f1afc..d7cb186f 100644 --- a/python/tsfile/utils.py +++ b/python/tsfile/utils.py @@ -51,7 +51,8 @@ def to_dataframe(file_path: str, if _table_name is None: _table_name, columns = next(iter(table_schema.items())) else: - if _table_name not in table_schema: + _table_name = _table_name.lower() + if _table_name.lower() not in table_schema: raise TableNotExistError(_table_name) columns = table_schema[_table_name] @@ -59,7 +60,7 @@ def to_dataframe(file_path: str, if _column_names is not None: for column in _column_names: - if column not in column_names_in_file: + if column.lower() not in column_names_in_file: raise ColumnNotExistError(column) else: _column_names = column_names_in_file
