This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new d2a906ec Fixes the issue where data with only timestamps could not be 
inserted (#525)
d2a906ec is described below

commit d2a906ec88f8fe557bf0f6cba742e03348992d1d
Author: Colin Lee <[email protected]>
AuthorDate: Fri Jun 27 17:01:09 2025 +0800

    Fixes the issue where data with only timestamps could not be inserted (#525)
    
    * Fixes the issue where data with only timestamps could not be inserted or 
queried.
    
    * fix issue.
    
    * fix fmt.
---
 cpp/src/common/tsblock/tsblock.h                   |  14 ++-
 cpp/src/common/tsblock/tuple_desc.h                |   5 +
 cpp/src/reader/aligned_chunk_reader.cc             |  21 ++--
 .../reader/block/single_device_tsblock_reader.cc   |  34 +++++--
 cpp/src/reader/table_result_set.cc                 |   3 +-
 cpp/src/writer/time_chunk_writer.cc                |   5 +
 cpp/src/writer/time_chunk_writer.h                 |   4 +
 cpp/src/writer/tsfile_writer.cc                    |   8 +-
 .../writer/table_view/tsfile_writer_table_test.cc  | 109 ++++++++++++++++++---
 9 files changed, 166 insertions(+), 37 deletions(-)

diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index 4316e8f6..a0e94391 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -211,8 +211,7 @@ class ColAppender {
         }
         return E_OK;
     }
-    FORCE_INLINE int fill(const char *value, uint32_t len,
-                           uint32_t end_index) {
+    FORCE_INLINE int fill(const char *value, uint32_t len, uint32_t end_index) 
{
         while (column_row_count_ < end_index) {
             if (!add_row()) {
                 return E_INVALID_ARG;
@@ -258,6 +257,13 @@ class RowIterator {
         }
     }
 
+    FORCE_INLINE void next(size_t ind) const {
+        ASSERT(row_id_ < tsblock_->row_count_);
+        tsblock_->vectors_[ind]->update_offset();
+    }
+
+    FORCE_INLINE void update_row_id() { row_id_++; }
+
     FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len,
                             bool *__restrict null) {
         ASSERT(column_index < column_count_);
@@ -287,8 +293,10 @@ class ColIterator {
     FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; }
 
     FORCE_INLINE void next() {
+        if (!vec_->is_null(row_id_)) {
+            vec_->update_offset();
+        }
         ++row_id_;
-        vec_->update_offset();
     }
 
     FORCE_INLINE bool has_null() { return vec_->has_null(); }
diff --git a/cpp/src/common/tsblock/tuple_desc.h 
b/cpp/src/common/tsblock/tuple_desc.h
index 98bd341d..85ba1309 100644
--- a/cpp/src/common/tsblock/tuple_desc.h
+++ b/cpp/src/common/tsblock/tuple_desc.h
@@ -71,6 +71,11 @@ class TupleDesc {
         return column_list_[index].data_type_;
     }
 
+    FORCE_INLINE common::ColumnCategory get_column_category(
+        const uint32_t index) const {
+        return column_list_[index].column_category_;
+    }
+
     FORCE_INLINE std::string get_column_name(uint32_t index) {
         return column_list_[index].column_name_;
     }
diff --git a/cpp/src/reader/aligned_chunk_reader.cc 
b/cpp/src/reader/aligned_chunk_reader.cc
index 230661f3..5e1bbe43 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -302,7 +302,8 @@ int AlignedChunkReader::read_from_file_and_rewrap(
     int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
     int read_size =
         (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
-    if (file_data_buf_size < read_size || (may_shrink && read_size < 
file_data_buf_size / 10)) {
+    if (file_data_buf_size < read_size ||
+        (may_shrink && read_size < file_data_buf_size / 10)) {
         file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
         if (IS_NULL(file_data_buf)) {
             return E_OOM;
@@ -366,7 +367,6 @@ int AlignedChunkReader::decode_cur_time_page_data() {
     uint32_t time_compressed_buf_size = 0;
     uint32_t time_uncompressed_buf_size = 0;
 
-
     // Step 2: do uncompress
     if (IS_SUCC(ret)) {
         time_compressed_buf =
@@ -519,9 +519,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
         uint32_t mask = 1 << 7;                                                
\
         int64_t time = 0;                                                      
\
         CppType value;                                                         
\
-        while ((time_decoder_->has_remaining() || time_in.has_remaining())     
\
-                && (value_decoder_->has_remaining() ||                         
\
-                value_in.has_remaining())){                                    
\
+        while (                                                                
\
+            (time_decoder_->has_remaining() || time_in.has_remaining()) &&     
\
+            (value_decoder_->has_remaining() || value_in.has_remaining())) {   
\
             cur_value_index++;                                                 
\
             if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &        
\
                   0xFF) &                                                      
\
@@ -530,16 +530,17 @@ int 
AlignedChunkReader::decode_time_value_buf_into_tsblock(
                 if (ret != E_OK) {                                             
\
                     break;                                                     
\
                 }                                                              
\
-                ret = value_decoder_->read_##ReadType(value,                   
\
-                value_in);                                                     
\
-                if (ret != E_OK) {                                             
\
+                if (UNLIKELY(!row_appender.add_row())) {                       
\
+                    ret = E_OVERFLOW;                                          
\
                     break;                                                     
\
                 }                                                              
\
+                row_appender.append(0, (char *)&time, sizeof(time));           
\
+                row_appender.append_null(1);                                   
\
                 continue;                                                      
\
             }                                                                  
\
             if (UNLIKELY(!row_appender.add_row())) {                           
\
                 ret = E_OVERFLOW;                                              
\
-                cur_value_index--;                                            \
+                cur_value_index--;                                             
\
                 break;                                                         
\
             } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {   
\
             } else if (RET_FAIL(value_decoder_->read_##ReadType(value,         
\
@@ -549,7 +550,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
                 continue;                                                      
\
             } else {                                                           
\
                 /*std::cout << "decoder: time=" << time << ", value=" << value 
\
-                 * << std::endl;*/                                            \
+                 * << std::endl;*/                                             
\
                 row_appender.append(0, (char *)&time, sizeof(time));           
\
                 row_appender.append(1, (char *)&value, sizeof(value));         
\
             }                                                                  
\
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc 
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index 507597c0..1df563cd 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -29,8 +29,7 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
       field_filter_(field_filter),
       block_size_(block_size),
       tuple_desc_(),
-      tsfile_io_reader_(tsfile_io_reader) {
-}
+      tsfile_io_reader_(tsfile_io_reader) {}
 
 int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task,
                                     uint32_t block_size, Filter* time_filter,
@@ -63,9 +62,9 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* 
device_query_task,
             ->get_measurement_columns()
             .size());
     if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(
-        device_query_task->get_device_id(),
-        device_query_task->get_column_mapping()->get_measurement_columns(),
-        time_series_indexs, pa_))) {
+            device_query_task->get_device_id(),
+            device_query_task->get_column_mapping()->get_measurement_columns(),
+            time_series_indexs, pa_))) {
         return ret;
     }
     for (const auto& time_series_index : time_series_indexs) {
@@ -171,6 +170,21 @@ int SingleDeviceTsBlockReader::fill_measurements(
                 break;
             }
         }
+
+        // Align all columns, filling with nulls where data is missing.
+        uint32_t row_count =
+            col_appenders_[time_column_index_]->get_col_row_count();
+        for (auto& col_appender : col_appenders_) {
+            if (tuple_desc_.get_column_category(
+                    col_appender->get_column_index()) !=
+                common::ColumnCategory::FIELD) {
+                continue;
+            }
+            while (col_appender->get_col_row_count() < row_count) {
+                col_appender->add_row();
+                col_appender->append_null();
+            }
+        }
     }
     return ret;
 }
@@ -366,8 +380,8 @@ int 
SingleMeasurementColumnContext::get_current_value(char*& value,
     if (value_iter_->end()) {
         return common::E_NO_MORE_DATA;
     }
-    value = value_iter_->read(&len);
-    assert(value != nullptr);
+    bool is_null = false;
+    value = value_iter_->read(&len, &is_null);
     return common::E_OK;
 }
 
@@ -392,7 +406,11 @@ void SingleMeasurementColumnContext::fill_into(
     }
     for (int32_t pos : pos_in_result_) {
         col_appenders[pos + 1]->add_row();
-        col_appenders[pos + 1]->append(val, len);
+        if (val == nullptr) {
+            col_appenders[pos + 1]->append_null();
+        } else {
+            col_appenders[pos + 1]->append(val, len);
+        }
     }
 }
 
diff --git a/cpp/src/reader/table_result_set.cc 
b/cpp/src/reader/table_result_set.cc
index 396913e9..01c3b239 100644
--- a/cpp/src/reader/table_result_set.cc
+++ b/cpp/src/reader/table_result_set.cc
@@ -74,9 +74,10 @@ int TableResultSet::next(bool& has_next) {
             if (!null) {
                 
row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i),
                     value, len, pa_);
+                row_iterator_->next(i);
             }
         }
-        row_iterator_->next();
+        row_iterator_->update_row_id();
     }
     return ret;
 }
diff --git a/cpp/src/writer/time_chunk_writer.cc 
b/cpp/src/writer/time_chunk_writer.cc
index 892c0d1c..81fafc5a 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -191,4 +191,9 @@ int64_t TimeChunkWriter::estimate_max_series_mem_size() {
                time_page_writer_.get_statistic()->get_type());
 }
 
+bool TimeChunkWriter::hasData() {
+    return num_of_pages_ > 0 || (time_page_writer_.get_statistic() != nullptr 
&&
+                                 time_page_writer_.get_statistic()->count_ > 
0);
+}
+
 }  // end namespace storage
diff --git a/cpp/src/writer/time_chunk_writer.h 
b/cpp/src/writer/time_chunk_writer.h
index e03b264c..aff8e2af 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -28,6 +28,8 @@
 
 namespace storage {
 
+// TODO: TimeChunkWriter, ValueChunkWriter, ChunkWriter can be further
+// abstracted.
 class TimeChunkWriter {
    public:
     static const int32_t PAGES_DATA_PAGE_SIZE = 1024;
@@ -68,6 +70,8 @@ class TimeChunkWriter {
 
     int64_t estimate_max_series_mem_size();
 
+    bool hasData();
+
    private:
     FORCE_INLINE bool is_cur_page_full() const {
         // FIXME
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index e9a1162a..73e4543e 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -313,7 +313,8 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet 
&tablet) {
             if (col_index == -1) {
                 return E_COLUMN_NOT_EXIST;
             }
-            if (table_schema->get_data_types()[col_index] != 
tablet.schema_vec_->at(i).data_type_) {
+            if (table_schema->get_data_types()[col_index] !=
+                tablet.schema_vec_->at(i).data_type_) {
                 return E_TYPE_NOT_MATCH;
             }
             const common::ColumnCategory column_category =
@@ -1055,6 +1056,11 @@ int TsFileWriter::flush() {
 
 bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
                                            bool is_aligned) {
+    if (chunk_group->is_aligned_ &&
+        chunk_group->time_chunk_writer_ != nullptr &&
+        chunk_group->time_chunk_writer_->hasData()) {
+        return false;
+    }
     MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
     for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
          ms_iter++) {
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc 
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index b1ef896a..d2f6c1c2 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -32,7 +32,7 @@ using namespace storage;
 using namespace common;
 
 class TsFileWriterTableTest : public ::testing::Test {
-protected:
+   protected:
     void SetUp() override {
         libtsfile_init();
         file_name_ = std::string("tsfile_writer_table_test_") +
@@ -49,7 +49,7 @@ protected:
     std::string file_name_;
     WriteFile write_file_;
 
-public:
+   public:
     static std::string generate_random_string(int length) {
         std::random_device rd;
         std::mt19937 gen(rd());
@@ -101,7 +101,8 @@ public:
         for (int i = 0; i < device_num; i++) {
             PageArena pa;
             pa.init(512, MOD_DEFAULT);
-            std::string device_str = std::string("device_id_") + 
std::to_string(i);
+            std::string device_str =
+                std::string("device_id_") + std::to_string(i);
             String literal_str(device_str, pa);
             for (int l = 0; l < num_timestamp_per_device; l++) {
                 int row_index = i * num_timestamp_per_device + l;
@@ -450,7 +451,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
         cur_line++;
         int64_t timestamp = table_result_set->get_value<int64_t>("time");
         ASSERT_EQ(table_result_set->get_value<common::String*>("device")
-                  ->to_std_string(),
+                      ->to_std_string(),
                   "device" + std::to_string(timestamp));
         ASSERT_EQ(table_result_set->get_value<double>("VaLue"),
                   timestamp * 1.1);
@@ -497,7 +498,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
 
     ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->write_table(tablet));
     ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->register_table(
-                  std::make_shared<TableSchema>(*table_schema)));
+                                 
std::make_shared<TableSchema>(*table_schema)));
     delete table_schema;
 }
 
@@ -637,8 +638,8 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
 TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
     common::config_set_max_degree_of_index_node(5);
     auto table_schema = gen_table_schema(0, 1, 100);
-    auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
-        &write_file_, table_schema);
+    auto tsfile_table_writer_ =
+        std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
     int num_row_per_device = 10;
     auto tablet = gen_tablet(table_schema, 0, 100, num_row_per_device);
     ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
@@ -651,23 +652,23 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
 
     ResultSet* tmp_result_set = nullptr;
     ret = reader.query(table_schema->get_table_name(),
-                       table_schema->get_measurement_names(), 0,
-                       INT32_MAX, tmp_result_set);
+                       table_schema->get_measurement_names(), 0, INT32_MAX,
+                       tmp_result_set);
     auto* table_result_set = (TableResultSet*)tmp_result_set;
     bool has_next = false;
     int64_t row_num = 0;
     auto result_set_meta = table_result_set->get_metadata();
     ASSERT_EQ(result_set_meta->get_column_count(),
-              table_schema->get_columns_num() + 1); // +1: time column
+              table_schema->get_columns_num() + 1);  // +1: time column
     while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
         auto column_schemas = table_schema->get_measurement_schemas();
-        std::string tag_col_val; // "device_id_[num]"
+        std::string tag_col_val;  // "device_id_[num]"
         std::string tag_col_val_prefix = "device_id_";
         for (const auto& column_schema : column_schemas) {
             switch (column_schema->data_type_) {
                 case TSDataType::INT64:
                     if (!table_result_set->is_null(
-                        column_schema->measurement_name_)) {
+                            column_schema->measurement_name_)) {
                         std::string num = tag_col_val.substr(
                             tag_col_val_prefix.length(),
                             tag_col_val.length() - 
tag_col_val_prefix.length());
@@ -677,8 +678,10 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
                     }
                     break;
                 case TSDataType::STRING:
-                    tag_col_val = table_result_set->get_value<common::String*>(
-                        column_schema->measurement_name_)->to_std_string();
+                    tag_col_val = table_result_set
+                                      ->get_value<common::String*>(
+                                          column_schema->measurement_name_)
+                                      ->to_std_string();
                 default:
                     break;
             }
@@ -690,3 +693,81 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
     ASSERT_EQ(reader.close(), common::E_OK);
     delete table_schema;
 }
+
+TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
+    std::vector<MeasurementSchema*> measurement_schemas;
+    std::vector<ColumnCategory> column_categories;
+    for (int i = 0; i < 3; i++) {
+        measurement_schemas.emplace_back(new MeasurementSchema(
+            "id" + std::to_string(i), TSDataType::STRING));
+        column_categories.emplace_back(ColumnCategory::TAG);
+    }
+    measurement_schemas.emplace_back(new MeasurementSchema("value", DOUBLE));
+    measurement_schemas.emplace_back(new MeasurementSchema("value1", INT32));
+    column_categories.emplace_back(ColumnCategory::FIELD);
+    column_categories.emplace_back(ColumnCategory::FIELD);
+    auto table_schema =
+        new TableSchema("testTable", measurement_schemas, column_categories);
+    auto tsfile_table_writer =
+        std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
+    int time = 0;
+    Tablet tablet = Tablet(table_schema->get_measurement_names(),
+                           table_schema->get_data_types(), 100);
+
+    for (int i = 0; i < 100; i++) {
+        tablet.add_timestamp(i, static_cast<int64_t>(time++));
+        tablet.add_value(i, 0, "tag1");
+        tablet.add_value(i, 1, "tag2");
+        if (i % 3 == 0) {
+            // all device has no data
+            tablet.add_value(i, 2, "tag_null");
+        } else {
+            tablet.add_value(i, 2, "tag3");
+            tablet.add_value(i, 3, 100.0f);
+            if (i % 5 == 0) {
+                tablet.add_value(i, 4, 100);
+            }
+        }
+    }
+    tsfile_table_writer->write_table(tablet);
+    tsfile_table_writer->flush();
+    tsfile_table_writer->close();
+
+    delete table_schema;
+
+    auto reader = TsFileReader();
+    reader.open(write_file_.get_file_path());
+    ResultSet* ret = nullptr;
+    int ret_value = reader.query(
+        "testTable", {"id0", "id1", "id2", "value", "value1"}, 0, 100, ret);
+    ASSERT_EQ(common::E_OK, ret_value);
+
+    auto table_result_set = (TableResultSet*)ret;
+    bool has_next = false;
+    int cur_line = 0;
+    auto schema = table_result_set->get_metadata();
+    while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+        int64_t timestamp = table_result_set->get_value<int64_t>(1);
+        ASSERT_EQ(common::String("tag1"),
+                  *table_result_set->get_value<common::String*>(2));
+        ASSERT_EQ(common::String("tag2"),
+                  *table_result_set->get_value<common::String*>(3));
+        if (timestamp % 3 == 0) {
+            ASSERT_EQ(common::String("tag_null"),
+                      *table_result_set->get_value<common::String*>(4));
+            ASSERT_TRUE(table_result_set->is_null(5));
+            ASSERT_TRUE(table_result_set->is_null(6));
+        } else {
+            ASSERT_EQ(common::String("tag3"),
+                      *table_result_set->get_value<common::String*>(4));
+            ASSERT_EQ(100.0f, table_result_set->get_value<double>(5));
+            if (timestamp % 5 == 0) {
+                ASSERT_EQ(100, table_result_set->get_value<int32_t>(6));
+            } else {
+                ASSERT_TRUE(table_result_set->is_null(6));
+            }
+        }
+    }
+    reader.destroy_query_data_set(table_result_set);
+    ASSERT_EQ(reader.close(), common::E_OK);
+}

Reply via email to