This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch fix_only_timestamp
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit ea6a1a4ccab86c856552b9cb1ce8ffb69c1d2c6d
Author: ColinLee <[email protected]>
AuthorDate: Fri Jun 27 11:37:00 2025 +0800

    Fixes the issue where data with only timestamps could not be inserted or 
queried.
---
 cpp/src/common/tsblock/tsblock.h                   | 11 +++-
 cpp/src/common/tsblock/tuple_desc.h                |  4 ++
 cpp/src/reader/aligned_chunk_reader.cc             | 61 +++++++++++++----
 .../reader/block/single_device_tsblock_reader.cc   | 35 +++++++---
 cpp/src/reader/table_result_set.cc                 |  3 +-
 cpp/src/writer/time_chunk_writer.cc                |  6 ++
 cpp/src/writer/time_chunk_writer.h                 |  2 +
 cpp/src/writer/tsfile_writer.cc                    |  8 ++-
 .../writer/table_view/tsfile_writer_table_test.cc  | 76 ++++++++++++++++++++++
 9 files changed, 184 insertions(+), 22 deletions(-)

diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index 4316e8f6..3e1cbe61 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -258,6 +258,13 @@ class RowIterator {
         }
     }
 
+    FORCE_INLINE void next(size_t ind) {
+        ASSERT(row_id_ < tsblock_->row_count_);
+        tsblock_->vectors_[ind]->update_offset();
+    }
+
+    FORCE_INLINE void update_row_id() { row_id_++; }
+
     FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len,
                             bool *__restrict null) {
         ASSERT(column_index < column_count_);
@@ -287,8 +294,10 @@ class ColIterator {
     FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; }
 
     FORCE_INLINE void next() {
+        if (!vec_->is_null(row_id_)) {
+            vec_->update_offset();
+        }
         ++row_id_;
-        vec_->update_offset();
     }
 
     FORCE_INLINE bool has_null() { return vec_->has_null(); }
diff --git a/cpp/src/common/tsblock/tuple_desc.h 
b/cpp/src/common/tsblock/tuple_desc.h
index 98bd341d..fea0252e 100644
--- a/cpp/src/common/tsblock/tuple_desc.h
+++ b/cpp/src/common/tsblock/tuple_desc.h
@@ -71,6 +71,10 @@ class TupleDesc {
         return column_list_[index].data_type_;
     }
 
+    FORCE_INLINE common::ColumnCategory get_column_category(uint32_t index) {
+        return column_list_[index].column_category_;
+    }
+
     FORCE_INLINE std::string get_column_name(uint32_t index) {
         return column_list_[index].column_name_;
     }
diff --git a/cpp/src/reader/aligned_chunk_reader.cc 
b/cpp/src/reader/aligned_chunk_reader.cc
index 230661f3..d629adac 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -302,7 +302,8 @@ int AlignedChunkReader::read_from_file_and_rewrap(
     int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
     int read_size =
         (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
-    if (file_data_buf_size < read_size || (may_shrink && read_size < 
file_data_buf_size / 10)) {
+    if (file_data_buf_size < read_size ||
+        (may_shrink && read_size < file_data_buf_size / 10)) {
         file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
         if (IS_NULL(file_data_buf)) {
             return E_OOM;
@@ -366,7 +367,6 @@ int AlignedChunkReader::decode_cur_time_page_data() {
     uint32_t time_compressed_buf_size = 0;
     uint32_t time_uncompressed_buf_size = 0;
 
-
     // Step 2: do uncompress
     if (IS_SUCC(ret)) {
         time_compressed_buf =
@@ -519,9 +519,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
         uint32_t mask = 1 << 7;                                                
\
         int64_t time = 0;                                                      
\
         CppType value;                                                         
\
-        while ((time_decoder_->has_remaining() || time_in.has_remaining())     
\
-                && (value_decoder_->has_remaining() ||                         
\
-                value_in.has_remaining())){                                    
\
+        while (                                                                
\
+            (time_decoder_->has_remaining() || time_in.has_remaining()) &&     
\
+            (value_decoder_->has_remaining() || value_in.has_remaining())) {   
\
             cur_value_index++;                                                 
\
             if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &        
\
                   0xFF) &                                                      
\
@@ -530,8 +530,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
                 if (ret != E_OK) {                                             
\
                     break;                                                     
\
                 }                                                              
\
-                ret = value_decoder_->read_##ReadType(value,                   
\
-                value_in);                                                     
\
+                ret = value_decoder_->read_##ReadType(value, value_in);        
\
                 if (ret != E_OK) {                                             
\
                     break;                                                     
\
                 }                                                              
\
@@ -539,7 +538,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
             }                                                                  
\
             if (UNLIKELY(!row_appender.add_row())) {                           
\
                 ret = E_OVERFLOW;                                              
\
-                cur_value_index--;                                            \
+                cur_value_index--;                                             
\
                 break;                                                         
\
             } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {   
\
             } else if (RET_FAIL(value_decoder_->read_##ReadType(value,         
\
@@ -549,7 +548,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
                 continue;                                                      
\
             } else {                                                           
\
                 /*std::cout << "decoder: time=" << time << ", value=" << value 
\
-                 * << std::endl;*/                                            \
+                 * << std::endl;*/                                             
\
                 row_appender.append(0, (char *)&time, sizeof(time));           
\
                 row_appender.append(1, (char *)&value, sizeof(value));         
\
             }                                                                  
\
@@ -596,6 +595,8 @@ int 
AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
     ByteStream &time_in, ByteStream &value_in, TsBlock *ret_tsblock,
     Filter *filter) {
     int ret = E_OK;
+    uint32_t mask = 1 << 7;
+    int64_t time = 0;
     RowAppender row_appender(ret_tsblock);
     switch (value_chunk_header_.data_type_) {
         case common::BOOLEAN:
@@ -615,8 +616,46 @@ int 
AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype(
                                          row_appender);
             break;
         case common::DOUBLE:
-            DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, value_in_,
-                                         row_appender);
+            // DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, 
value_in_,
+            //                              row_appender);
+            double value;
+            while (
+                (time_decoder_->has_remaining() || time_in.has_remaining()) &&
+                (value_decoder_->has_remaining() || value_in.has_remaining())) 
{
+                cur_value_index++;
+                if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &
+                      0xFF) &
+                     (mask >> (cur_value_index % 8))) == 0) {
+                    ret = time_decoder_->read_int64(time, time_in);
+                    if (ret != E_OK) {
+                        break;
+                    }
+                    if (UNLIKELY(!row_appender.add_row())) {
+                        ret = E_OVERFLOW;
+                        break;
+                    }
+                    row_appender.append(0, (char *)&time, sizeof(time));
+                    row_appender.append_null(1);
+                    continue;
+                }
+                if (UNLIKELY(!row_appender.add_row())) {
+                    ret = E_OVERFLOW;
+                    cur_value_index--;
+                    break;
+                } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) 
{
+                } else if (RET_FAIL(
+                               value_decoder_->read_double(value, value_in))) {
+                } else if (filter != nullptr && !filter->satisfy(time, value)) 
{
+                    row_appender.backoff_add_row();
+                    continue;
+                } else {
+                    /*std::cout << "decoder: time=" << time << ", value=" <<
+                     * value
+                     * << std::endl;*/
+                    row_appender.append(0, (char *)&time, sizeof(time));
+                    row_appender.append(1, (char *)&value, sizeof(value));
+                }
+            }
             break;
         default:
             ret = E_NOT_SUPPORT;
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc 
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index 507597c0..9b5f9bac 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -19,6 +19,8 @@
 
 #include "single_device_tsblock_reader.h"
 
+#include <cwrapper/tsfile_cwrapper.h>
+
 namespace storage {
 
 SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
@@ -29,8 +31,7 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
       field_filter_(field_filter),
       block_size_(block_size),
       tuple_desc_(),
-      tsfile_io_reader_(tsfile_io_reader) {
-}
+      tsfile_io_reader_(tsfile_io_reader) {}
 
 int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task,
                                     uint32_t block_size, Filter* time_filter,
@@ -63,9 +64,9 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* 
device_query_task,
             ->get_measurement_columns()
             .size());
     if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(
-        device_query_task->get_device_id(),
-        device_query_task->get_column_mapping()->get_measurement_columns(),
-        time_series_indexs, pa_))) {
+            device_query_task->get_device_id(),
+            device_query_task->get_column_mapping()->get_measurement_columns(),
+            time_series_indexs, pa_))) {
         return ret;
     }
     for (const auto& time_series_index : time_series_indexs) {
@@ -171,6 +172,20 @@ int SingleDeviceTsBlockReader::fill_measurements(
                 break;
             }
         }
+
+        uint32_t row_count =
+            col_appenders_[time_column_index_]->get_col_row_count();
+        for (auto& col_appender : col_appenders_) {
+            if (tuple_desc_.get_column_category(
+                    col_appender->get_column_index()) !=
+                common::ColumnCategory::FIELD) {
+                continue;
+            }
+            while (col_appender->get_col_row_count() < row_count) {
+                col_appender->add_row();
+                col_appender->append_null();
+            }
+        }
     }
     return ret;
 }
@@ -366,8 +381,8 @@ int 
SingleMeasurementColumnContext::get_current_value(char*& value,
     if (value_iter_->end()) {
         return common::E_NO_MORE_DATA;
     }
-    value = value_iter_->read(&len);
-    assert(value != nullptr);
+    bool is_null = false;
+    value = value_iter_->read(&len, &is_null);
     return common::E_OK;
 }
 
@@ -392,7 +407,11 @@ void SingleMeasurementColumnContext::fill_into(
     }
     for (int32_t pos : pos_in_result_) {
         col_appenders[pos + 1]->add_row();
-        col_appenders[pos + 1]->append(val, len);
+        if (val == nullptr) {
+            col_appenders[pos + 1]->append_null();
+        } else {
+            col_appenders[pos + 1]->append(val, len);
+        }
     }
 }
 
diff --git a/cpp/src/reader/table_result_set.cc 
b/cpp/src/reader/table_result_set.cc
index 396913e9..01c3b239 100644
--- a/cpp/src/reader/table_result_set.cc
+++ b/cpp/src/reader/table_result_set.cc
@@ -74,9 +74,10 @@ int TableResultSet::next(bool& has_next) {
             if (!null) {
                 
row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i),
                     value, len, pa_);
+                row_iterator_->next(i);
             }
         }
-        row_iterator_->next();
+        row_iterator_->update_row_id();
     }
     return ret;
 }
diff --git a/cpp/src/writer/time_chunk_writer.cc 
b/cpp/src/writer/time_chunk_writer.cc
index 892c0d1c..47a684d6 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -191,4 +191,10 @@ int64_t TimeChunkWriter::estimate_max_series_mem_size() {
                time_page_writer_.get_statistic()->get_type());
 }
 
+bool TimeChunkWriter::hasData() {
+    return num_of_pages_ > 0 ||
+           (time_page_writer_.get_statistic() != nullptr &&
+            time_page_writer_.get_statistic()->count_ > 0);
+}
+
 }  // end namespace storage
diff --git a/cpp/src/writer/time_chunk_writer.h 
b/cpp/src/writer/time_chunk_writer.h
index e03b264c..02c15a5b 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -68,6 +68,8 @@ class TimeChunkWriter {
 
     int64_t estimate_max_series_mem_size();
 
+    bool hasData();
+
    private:
     FORCE_INLINE bool is_cur_page_full() const {
         // FIXME
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index e9a1162a..73e4543e 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -313,7 +313,8 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet 
&tablet) {
             if (col_index == -1) {
                 return E_COLUMN_NOT_EXIST;
             }
-            if (table_schema->get_data_types()[col_index] != 
tablet.schema_vec_->at(i).data_type_) {
+            if (table_schema->get_data_types()[col_index] !=
+                tablet.schema_vec_->at(i).data_type_) {
                 return E_TYPE_NOT_MATCH;
             }
             const common::ColumnCategory column_category =
@@ -1055,6 +1056,11 @@ int TsFileWriter::flush() {
 
 bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
                                            bool is_aligned) {
+    if (chunk_group->is_aligned_ &&
+        chunk_group->time_chunk_writer_ != nullptr &&
+        chunk_group->time_chunk_writer_->hasData()) {
+        return false;
+    }
     MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
     for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
          ms_iter++) {
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc 
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index b1ef896a..67f02741 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -690,3 +690,79 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
     ASSERT_EQ(reader.close(), common::E_OK);
     delete table_schema;
 }
+
+
+TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
+    std::vector<MeasurementSchema*> measurement_schemas;
+    std::vector<ColumnCategory> column_categories;
+    for (int i = 0; i < 3; i++) {
+        measurement_schemas.emplace_back(new MeasurementSchema(
+            "id" + std::to_string(i), TSDataType::STRING));
+        column_categories.emplace_back(ColumnCategory::TAG);
+    }
+    measurement_schemas.emplace_back(new MeasurementSchema("value", DOUBLE));
+    measurement_schemas.emplace_back(new MeasurementSchema("value1", DOUBLE));
+    column_categories.emplace_back(ColumnCategory::FIELD);
+    column_categories.emplace_back(ColumnCategory::FIELD);
+    auto table_schema =
+        new TableSchema("testTable", measurement_schemas, column_categories);
+    auto tsfile_table_writer =
+        std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
+    int time = 0;
+    Tablet tablet = Tablet(table_schema->get_measurement_names(),
+                           table_schema->get_data_types(), 100);
+
+    for (int i = 0; i < 100; i++) {
+        tablet.add_timestamp(i, static_cast<int64_t>(time++));
+        tablet.add_value(i, 0, "tag1");
+        tablet.add_value(i, 1, "tag2");
+        if (i % 3 == 0) {
+            // all device has no data
+            tablet.add_value(i, 2, "tag_null");
+        } else {
+            tablet.add_value(i, 2, "tag3");
+            tablet.add_value(i, 3, 100.0f);
+            if (i % 5 == 0) {
+                tablet.add_value(i, 4, 100.0f);
+            }
+        }
+    }
+    tsfile_table_writer->write_table(tablet);
+    tsfile_table_writer->flush();
+    tsfile_table_writer->close();
+
+    delete table_schema;
+
+    auto reader = TsFileReader();
+    reader.open(write_file_.get_file_path());
+    ResultSet* ret = nullptr;
+    int ret_value =
+        reader.query("testTable", {"id0", "id1", "id2", "value", "value1"}, 0, 
100, ret);
+    ASSERT_EQ(common::E_OK, ret_value);
+
+    auto table_result_set = (TableResultSet*)ret;
+    bool has_next = false;
+    int cur_line = 0;
+    auto schema = table_result_set->get_metadata();
+    while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+        int64_t timestamp = table_result_set->get_value<int64_t>(1);
+        ASSERT_EQ(common::String("tag1") , 
*table_result_set->get_value<common::String*>(2));
+        ASSERT_EQ(common::String("tag2") , 
*table_result_set->get_value<common::String*>(3));
+        if (timestamp % 3 == 0) {
+            ASSERT_EQ(common::String("tag_null") , 
*table_result_set->get_value<common::String*>(4));
+            ASSERT_TRUE(table_result_set->is_null(5));
+            ASSERT_TRUE(table_result_set->is_null(6));
+        } else {
+            ASSERT_EQ(common::String("tag3") , 
*table_result_set->get_value<common::String*>(4));
+            ASSERT_EQ(100.0f, table_result_set->get_value<double>(5));
+            if (timestamp % 5 == 0) {
+                ASSERT_EQ(100.0f , table_result_set->get_value<double>(6));
+            } else {
+                ASSERT_TRUE(table_result_set->is_null(6));
+            }
+
+        }
+    }
+    reader.destroy_query_data_set(table_result_set);
+    ASSERT_EQ(reader.close(), common::E_OK);
+}

Reply via email to