This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_fix_type_mismatch in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 2c40c6a027ecd568f90a886fdd791f70becde8e8 Author: ColinLee <[email protected]> AuthorDate: Mon Apr 21 11:33:20 2025 +0800 fix datatype mismatch between table schema and tablet. --- cpp/src/writer/tsfile_writer.cc | 67 ++++++++++++-------- .../writer/table_view/tsfile_writer_table_test.cc | 71 +++++++++++++++++++--- cpp/test/writer/tsfile_writer_test.cc | 50 ++++++++++++++- 3 files changed, 150 insertions(+), 38 deletions(-) diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index f289f875..78ade923 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -30,7 +30,6 @@ using namespace common; namespace storage { - int libtsfile_init() { static bool g_s_is_inited = false; if (g_s_is_inited) { @@ -671,8 +670,10 @@ int TsFileWriter::write_tablet_aligned(const Tablet &tablet) { if (IS_NULL(value_chunk_writer)) { continue; } - value_write_column(value_chunk_writer, tablet, c, 0, - tablet.get_cur_row_size()); + if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0, + tablet.get_cur_row_size()))) { + return ret; + } } return ret; } @@ -693,7 +694,9 @@ int TsFileWriter::write_tablet(const Tablet &tablet) { continue; } // ignore writer failure - write_column(chunk_writer, tablet, c); + if (RET_FAIL(write_column(chunk_writer, tablet, c))) { + return ret; + } } record_count_since_last_flush_ += tablet.max_row_num_; @@ -739,8 +742,11 @@ int TsFileWriter::write_table(Tablet &tablet) { if (IS_NULL(value_chunk_writer)) { continue; } - value_write_column(value_chunk_writer, tablet, i, start_idx, - end_idx); + + if (RET_FAIL(value_write_column(value_chunk_writer, tablet, + i, start_idx, end_idx))) { + return ret; + } field_col_count++; } } @@ -758,8 +764,10 @@ int TsFileWriter::write_table(Tablet &tablet) { if (IS_NULL(chunk_writer)) { continue; } - write_column(chunk_writer, tablet, c, start_idx, - device_id_end_index_pair.second); + if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx, + device_id_end_index_pair.second))) { + return ret; + } } start_idx = device_id_end_index_pair.second; } @@ -877,32 +885,39 @@ int TsFileWriter::value_write_column(ValueChunkWriter *value_chunk_writer, return ret; } -#define DO_WRITE_TYPED_COLUMN() \ - do { \ - int ret = E_OK; \ - for (uint32_t r = start_idx; r < end_idx; r++) { \ - if (LIKELY(!col_notnull_bitmap.test(r))) { \ - ret = chunk_writer->write(timestamps[r], col_values[r]); \ - } \ - } \ - return ret; \ - } while (false) - -#define DO_VALUE_WRITE_TYPED_COLUMN() \ +#define DO_WRITE_TYPED_COLUMN() \ do { \ int ret = E_OK; \ for (uint32_t r = start_idx; r < end_idx; r++) { \ - if (LIKELY(col_notnull_bitmap.test(r))) { \ - ret = value_chunk_writer->write(timestamps[r], col_values[r], \ - true); \ - } else { \ - ret = value_chunk_writer->write(timestamps[r], col_values[r], \ - false); \ + if (LIKELY(!col_notnull_bitmap.test(r))) { \ + if (RET_FAIL( \ + chunk_writer->write(timestamps[r], col_values[r]))) { \ + return ret; \ + } \ } \ } \ return ret; \ } while (false) +#define DO_VALUE_WRITE_TYPED_COLUMN() \ + do { \ + int ret = E_OK; \ + for (uint32_t r = start_idx; r < end_idx; r++) { \ + if (LIKELY(col_notnull_bitmap.test(r))) { \ + if (RET_FAIL(value_chunk_writer->write( \ + timestamps[r], col_values[r], true))) { \ + return ret; \ + } \ + } else { \ + if (RET_FAIL(value_chunk_writer->write( \ + timestamps[r], col_values[r], false))) { \ + return ret; \ + } \ + } \ + } \ + return ret; \ + } while (false) + int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer, int64_t *timestamps, bool *col_values, BitMap &col_notnull_bitmap, diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 43574dc7..031a3039 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -45,9 +45,7 @@ class TsFileWriterTableTest : public ::testing::Test { mode_t mode = 0666; write_file_.create(file_name_, flags, mode); } - void TearDown() override { - remove(file_name_.c_str()); - } + void TearDown() override { remove(file_name_.c_str()); } std::string file_name_; WriteFile write_file_; @@ -93,9 +91,11 @@ class TsFileWriterTableTest : public ::testing::Test { } static storage::Tablet gen_tablet(TableSchema* table_schema, int offset, - int device_num, int num_timestamp_per_device = 10) { + int device_num, + int num_timestamp_per_device = 10) { storage::Tablet tablet(table_schema->get_measurement_names(), - table_schema->get_data_types(), device_num * num_timestamp_per_device); + table_schema->get_data_types(), + device_num * num_timestamp_per_device); char* literal = new char[std::strlen("device_id") + 1]; std::strcpy(literal, "device_id"); @@ -141,8 +141,8 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) { TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { auto table_schema = gen_table_schema(0); - auto tsfile_table_writer_ = - std::make_shared<TsFileTableWriter>(&write_file_, table_schema, 2 * 1024); + auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>( + &write_file_, table_schema, 2 * 1024); for (int i = 0; i < 100; i++) { auto tablet = gen_tablet(table_schema, i * 10000, 1, 10000); ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); @@ -194,13 +194,64 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { auto table_schema = gen_table_schema(0); - auto tsfile_table_writer_ = - std::make_shared<TsFileTableWriter>(&write_file_, table_schema, 256 * 1024 * 1024); - ASSERT_EQ(common::g_config_value_.chunk_group_size_threshold_, 256 * 1024 * 1024); + auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>( + &write_file_, table_schema, 256 * 1024 * 1024); + ASSERT_EQ(common::g_config_value_.chunk_group_size_threshold_, + 256 * 1024 * 1024); tsfile_table_writer_->close(); delete table_schema; } +TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { + auto table_schema = gen_table_schema(0); + auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>( + &write_file_, table_schema, 256 * 1024 * 1024); + int device_num = 3; + int num_timestamp_per_device = 10; + int offset = 0; + auto datatypes = table_schema->get_data_types(); + + datatypes[6] = TSDataType::INT32; + storage::Tablet tablet(table_schema->get_measurement_names(), datatypes, + device_num * num_timestamp_per_device); + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String literal_str(literal, std::strlen("device_id")); + for (int i = 0; i < device_num; i++) { + for (int l = 0; l < num_timestamp_per_device; l++) { + int row_index = i * num_timestamp_per_device + l; + tablet.add_timestamp(row_index, offset + l); + auto column_schemas = table_schema->get_measurement_schemas(); + for (int idx = 0; idx < column_schemas.size(); idx++) { + switch (datatypes[idx]) { + case TSDataType::INT64: + tablet.add_value(row_index, + column_schemas[idx]->measurement_name_, + static_cast<int64_t>(i)); + break; + case TSDataType::INT32: + tablet.add_value(row_index, + column_schemas[idx]->measurement_name_, + static_cast<int32_t>(i)); + break; + case TSDataType::STRING: + tablet.add_value(row_index, + column_schemas[idx]->measurement_name_, + literal_str); + break; + default: + break; + } + } + } + } + delete[] literal; + + ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_table_writer_->write_table(tablet)); + tsfile_table_writer_->close(); +} + TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index bd4a4793..6f58f0cb 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -110,7 +110,7 @@ class TsFileWriterTest : public ::testing::Test { } }; -class TsFileWriterTestSimple : public ::testing::Test{}; +class TsFileWriterTestSimple : public ::testing::Test {}; TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) { TsFileWriter writer; @@ -229,7 +229,6 @@ TEST_F(TsFileWriterTest, RegisterTimeSeries) { E_OK); ASSERT_EQ(tsfile_writer_->flush(), E_OK); ASSERT_EQ(tsfile_writer_->close(), E_OK); - } TEST_F(TsFileWriterTest, WriteMultipleRecords) { @@ -911,4 +910,51 @@ TEST_F(TsFileWriterTest, WriteAlignedPartialData) { cur_row++; } while (true); reader.destroy_query_data_set(qds); +} + +TEST_F(TsFileWriterTest, WriteTabletDataTypeMismatch) { + for (int i = 0; i < 2; i++) { + std::string device_name = "test_device" + std::to_string(i); + for (int j = 0; j < 3; j++) { + std::string measure_name = "measurement" + std::to_string(j); + tsfile_writer_->register_timeseries( + device_name, storage::MeasurementSchema( + measure_name, common::TSDataType::INT32, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); + } + } + + std::vector<TSDataType> measurement_types{ + TSDataType::INT32, TSDataType::INT64, TSDataType::INT32}; + std::vector<std::string> measurement_names{"measurement0", "measurement1", + "measurement2"}; + + Tablet tablet("test_device0", &measurement_names, &measurement_types); + for (int row = 0; row < 100; row++) { + tablet.add_timestamp(row, row); + for (int col = 0; col < 3; col++) { + switch (measurement_types[col]) { + case TSDataType::INT32: + tablet.add_value(row, col, static_cast<int32_t>(row)); + break; + case TSDataType::INT64: + tablet.add_value(row, col, static_cast<int64_t>(row)); + break; + default:; + } + } + } + ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet(tablet)); + std::vector<MeasurementSchema *> measurement_schemas; + for (int i = 0; i < 3; i++) { + measurement_schemas.push_back(new MeasurementSchema( + "measurement" + std::to_string(i), TSDataType::INT32)); + } + + tsfile_writer_->register_aligned_timeseries("device3", measurement_schemas); + tablet.set_table_name("device3"); + ASSERT_EQ(E_TYPE_NOT_MATCH, tsfile_writer_->write_tablet_aligned(tablet)); + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); } \ No newline at end of file
