This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_refine_write in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 1477ac7d18b7f152fbad23d73aed7a50e8f2be47 Author: colin <[email protected]> AuthorDate: Fri Apr 18 00:38:08 2025 +0800 add batch api for tablet. --- cpp/src/common/allocator/my_string.h | 13 ++++ cpp/src/common/container/bit_map.h | 4 ++ cpp/src/common/tablet.cc | 20 ++++++- cpp/src/common/tablet.h | 113 ++++++++++++++++++++++++++--------- cpp/test/common/tablet_test.cc | 33 ++++++++++ 5 files changed, 153 insertions(+), 30 deletions(-) diff --git a/cpp/src/common/allocator/my_string.h b/cpp/src/common/allocator/my_string.h index 9f5d8a5a..a236bb6c 100644 --- a/cpp/src/common/allocator/my_string.h +++ b/cpp/src/common/allocator/my_string.h @@ -60,6 +60,19 @@ struct String { return common::E_OK; } + FORCE_INLINE int dup_from(const char* str, common::PageArena &pa) { + len_ = strlen(str); + if (UNLIKELY(len_ == 0)) { + return common::E_OK; + } + buf_ = pa.alloc(len_); + if (IS_NULL(buf_)) { + return common::E_OOM; + } + memcpy(buf_, str, len_); + return common::E_OK; + } + FORCE_INLINE bool operator==(const String &other) const { return equal_to(other); } diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 9d036744..c2f8f4e5 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -55,6 +55,10 @@ class BitMap { *start_addr = (*start_addr) & (~bit_mask); } + FORCE_INLINE void set_zero() { + memset(bitmap_, 0x00, size_); + } + FORCE_INLINE bool test(uint32_t index) { uint32_t offset = index >> 3; ASSERT(offset < size_); diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index ac4a2708..86e57599 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -21,8 +21,6 @@ #include <cstdlib> -#include "utils/errno_define.h" - using namespace common; namespace storage { @@ -285,6 +283,20 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, return add_value(row_index, measurement_name, String(val)); } +template <> +int Tablet::set_batch_data(uint32_t col_index, char **data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_SCHEMA; + } + + for (int i = 0; i < max_row_num_; i++) { + value_matrix_[col_index].string_data->dup_from(data[i], + page_arena_); + } + bitmaps_[col_index].set_zero(); + return common::E_OK; +} + template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, bool val); template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, @@ -323,6 +335,10 @@ void Tablet::set_column_categories( } } +void Tablet::set_null_value(uint32_t col_index, uint32_t row_index) { + bitmaps_[col_index].set(row_index); +} + std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const { std::vector<std::string> id_array; id_array.push_back(insert_target_name_); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index e69d477e..01926789 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -29,6 +29,7 @@ #include "common/db_common.h" #include "device_id.h" #include "schema.h" +#include "utils/errno_define.h" namespace storage { @@ -38,10 +39,12 @@ class TabletRowIterator; class TabletColIterator; /** - * @brief Represents a collection of data rows with associated metadata for insertion into a table. + * @brief Represents a collection of data rows with associated metadata for + * insertion into a table. * - * This class is used to manage and organize data that will be inserted into a specific target table. - * It handles the storage of timestamps and values, along with their associated metadata such as column names and types. + * This class is used to manage and organize data that will be inserted into a + * specific target table. It handles the storage of timestamps and values, along + * with their associated metadata such as column names and types. */ class Tablet { struct ValueMatrixEntry { @@ -105,7 +108,8 @@ class Tablet { [](const std::string &name, common::TSDataType type) { return MeasurementSchema(name, type); }); - schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>(measurement_vec); + schema_vec_ = + std::make_shared<std::vector<MeasurementSchema>>(measurement_vec); init(); } @@ -123,7 +127,8 @@ class Tablet { schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>(); for (size_t i = 0; i < column_names.size(); i++) { schema_vec_->emplace_back( - MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]), + MeasurementSchema(column_names[i], data_types[i], + common::get_value_encoder(data_types[i]), common::get_default_compressor())); } set_column_categories(column_categories); @@ -133,24 +138,26 @@ class Tablet { /** * @brief Constructs a Tablet object with the given parameters. * - * @param column_names A vector containing the names of the columns in the tablet. - * Each name corresponds to a column in the target table. + * @param column_names A vector containing the names of the columns in the + * tablet. Each name corresponds to a column in the target table. * @param data_types A vector containing the data types of each column. * These must match the schema of the target table. - * @param max_rows The maximum number of rows that this tablet can hold. Defaults to DEFAULT_MAX_ROWS. + * @param max_rows The maximum number of rows that this tablet can hold. + * Defaults to DEFAULT_MAX_ROWS. */ Tablet(const std::vector<std::string> &column_names, - const std::vector<common::TSDataType> &data_types, - uint32_t max_rows = DEFAULT_MAX_ROWS) - : max_row_num_(max_rows), - cur_row_size_(0), - timestamps_(nullptr), - value_matrix_(nullptr), - bitmaps_(nullptr) { + const std::vector<common::TSDataType> &data_types, + uint32_t max_rows = DEFAULT_MAX_ROWS) + : max_row_num_(max_rows), + cur_row_size_(0), + timestamps_(nullptr), + value_matrix_(nullptr), + bitmaps_(nullptr) { schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>(); for (size_t i = 0; i < column_names.size(); i++) { schema_vec_->emplace_back( - MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]), + MeasurementSchema(column_names[i], data_types[i], + common::get_value_encoder(data_types[i]), common::get_default_compressor())); } init(); @@ -158,9 +165,7 @@ class Tablet { ~Tablet() { destroy(); } - const std::string& get_table_name() const{ - return insert_target_name_; - } + const std::string &get_table_name() const { return insert_target_name_; } void set_table_name(const std::string &table_name) { insert_target_name_ = table_name; } @@ -170,8 +175,8 @@ class Tablet { /** * @brief Adds a timestamp to the specified row. * - * @param row_index The index of the row to which the timestamp will be added. - * Must be less than the maximum number of rows. + * @param row_index The index of the row to which the timestamp will be + * added. Must be less than the maximum number of rows. * @param timestamp The timestamp value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -180,12 +185,14 @@ class Tablet { void *get_value(int row_index, uint32_t schema_index, common::TSDataType &data_type) const; /** - * @brief Template function to add a value of type T to the specified row and column. + * @brief Template function to add a value of type T to the specified row + * and column. * * @tparam T The type of the value to add. * @param row_index The index of the row to which the value will be added. * Must be less than the maximum number of rows. - * @param schema_index The index of the column schema corresponding to the value being added. + * @param schema_index The index of the column schema corresponding to the + * value being added. * @param val The value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -196,13 +203,14 @@ class Tablet { const std::vector<common::ColumnCategory> &column_categories); std::shared_ptr<IDeviceID> get_device_id(int i) const; /** - * @brief Template function to add a value of type T to the specified row and column by name. + * @brief Template function to add a value of type T to the specified row + * and column by name. * * @tparam T The type of the value to add. * @param row_index The index of the row to which the value will be added. * Must be less than the maximum number of rows. - * @param measurement_name The name of the column to which the value will be added. - * Must match one of the column names provided during construction. + * @param measurement_name The name of the column to which the value will be + * added. Must match one of the column names provided during construction. * @param val The value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -210,7 +218,8 @@ class Tablet { int add_value(uint32_t row_index, const std::string &measurement_name, T val); - FORCE_INLINE const std::string &get_column_name(uint32_t column_index) const { + FORCE_INLINE const std::string &get_column_name( + uint32_t column_index) const { return schema_vec_->at(column_index).measurement_name_; } @@ -218,7 +227,7 @@ class Tablet { schema_vec_->at(column_index).measurement_name_ = name; } - const std::map<std::string, int>& get_schema_map() const { + const std::map<std::string, int> &get_schema_map() const { return schema_map_; } @@ -226,6 +235,54 @@ class Tablet { schema_map_ = schema_map; } + template <typename T> + int set_batch_data(uint32_t col_index, T *data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_ARG; + } + auto schema = schema_vec_->at(col_index); + switch (schema.data_type_) { + case common::BOOLEAN: + memcpy(value_matrix_[col_index].bool_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::INT32: + memcpy(value_matrix_[col_index].int32_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::INT64: + memcpy(value_matrix_[col_index].int64_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::FLOAT: + memcpy(value_matrix_[col_index].float_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::DOUBLE: + memcpy(value_matrix_[col_index].double_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + default:; + } + bitmaps_[col_index].set_zero(); + return common::E_OK; + } + + int set_batch_data(uint32_t col_index, char **data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_SCHEMA; + } + + for (int i = 0; i < max_row_num_; i++) { + value_matrix_[col_index].string_data[i].dup_from(data[i], + page_arena_); + } + bitmaps_[col_index].set_zero(); + return common::E_OK; + } + + void set_null_value(uint32_t col_index, uint32_t row_index); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc index 71863f0c..fe8a5495 100644 --- a/cpp/test/common/tablet_test.cc +++ b/cpp/test/common/tablet_test.cc @@ -61,4 +61,37 @@ TEST(TabletTest, LargeQuantities) { EXPECT_EQ(tablet.get_column_count(), schema_vec.size()); } +TEST(TabletTest, TabletBatchReadWrite) { + std::vector<std::string> column_names = { + "id1", "id2", "id3", "id4","id5","id6" + }; + std::vector<common::TSDataType> datatypes = { + common::TSDataType::BOOLEAN, common::TSDataType::INT32, + common::TSDataType::INT64, common::TSDataType::FLOAT, + common::TSDataType::DOUBLE, common::TSDataType::STRING + }; + Tablet tablet(column_names, datatypes, 100); + bool bool_vec[100] = {false}; + bool_vec[10] = true; + + common::TSDataType datatype; + tablet.set_batch_data(0, bool_vec); + ASSERT_TRUE(*(bool*)(tablet.get_value(10, 0, datatype))); + ASSERT_EQ(common::TSDataType::BOOLEAN, datatype); + int32_t i32_vec[100] = {false}; + i32_vec[99] = 123; + tablet.set_batch_data(1, i32_vec); + ASSERT_EQ(0, *(int32_t *)(tablet.get_value(10, 1, datatype))); + ASSERT_EQ(123, *(int32_t *)(tablet.get_value(99, 1, datatype))); + char** str = (char**) malloc(100 * sizeof(char*)); + for (int i = 0; i < 100; i++) { + str[i] = strdup(std::string("val" + std::to_string(i)).c_str()); + } + tablet.set_batch_data(5, str); + ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10, 5, datatype)); + + tablet.set_null_value(5, 20); + ASSERT_EQ(nullptr, tablet.get_value(20, 5, datatype)); +} + } // namespace storage \ No newline at end of file
