This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3117ec02 fix flush bug and fix empty file bug (#310)
3117ec02 is described below

commit 3117ec02821c5b04a403dc2b63e76d46dddfec4c
Author: Yukim1 <[email protected]>
AuthorDate: Mon Nov 25 10:02:02 2024 +0800

    fix flush bug and fix empty file bug (#310)
    
    * fix flush bug and fix empty file bug
    
    * add ut
---
 cpp/src/common/tsfile_common.cc       |   3 -
 cpp/src/file/tsfile_io_writer.cc      |   4 +-
 cpp/src/reader/bloom_filter.cc        |   6 +-
 cpp/src/writer/tsfile_writer.cc       |  26 +++++--
 cpp/src/writer/tsfile_writer.h        |   6 +-
 cpp/src/writer/value_chunk_writer.cc  |   6 ++
 cpp/src/writer/value_chunk_writer.h   |   2 +
 cpp/test/common/tsfile_common_test.cc |   2 +-
 cpp/test/writer/tsfile_writer_test.cc | 124 +++++++++++++++++++++++++++++++++-
 9 files changed, 161 insertions(+), 18 deletions(-)

diff --git a/cpp/src/common/tsfile_common.cc b/cpp/src/common/tsfile_common.cc
index a1643439..ba2c2423 100644
--- a/cpp/src/common/tsfile_common.cc
+++ b/cpp/src/common/tsfile_common.cc
@@ -92,9 +92,6 @@ int TSMIterator::init() {
 
     // FIXME empty list
     chunk_group_meta_iter_ = chunk_group_meta_list_.begin();
-    if (chunk_group_meta_iter_ == chunk_group_meta_list_.end()) {
-        return E_NOT_EXIST;
-    }
     while (chunk_group_meta_iter_ != chunk_group_meta_list_.end()) {
         chunk_meta_iter_ =
             chunk_group_meta_iter_.get()->chunk_meta_list_.begin();
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index abd08b82..99c6ec64 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -392,8 +392,8 @@ int TsFileIOWriter::write_file_index() {
         ret = E_OK;
     }
     ASSERT(ret == E_OK);
-
-    if (IS_SUCC(ret)) {  // iter finish
+    if (IS_SUCC(ret) && cur_index_node != nullptr &&
+        cur_index_node_queue != nullptr) {  // iter finish
         ASSERT(cur_index_node != nullptr);
         ASSERT(cur_index_node_queue != nullptr);
         if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node,
diff --git a/cpp/src/reader/bloom_filter.cc b/cpp/src/reader/bloom_filter.cc
index d2afd664..1ff1109d 100644
--- a/cpp/src/reader/bloom_filter.cc
+++ b/cpp/src/reader/bloom_filter.cc
@@ -223,8 +223,6 @@ int BloomFilter::serialize_to(ByteStream &out) {
     uint8_t *filter_data_bytes = nullptr;
     int32_t filter_data_bytes_len = 0;
     bitset_.to_bytes(filter_data_bytes, filter_data_bytes_len);
-    ASSERT(filter_data_bytes_len > 0);
-
     if (RET_FAIL(
             SerializationUtil::write_var_uint(filter_data_bytes_len, out))) {
     } else if (RET_FAIL(
@@ -233,7 +231,9 @@ int BloomFilter::serialize_to(ByteStream &out) {
     } else if (RET_FAIL(
                    SerializationUtil::write_var_uint(hash_func_count_, out))) {
     }
-    bitset_.revert_bytes(filter_data_bytes);
+    if (filter_data_bytes_len > 0) {
+        bitset_.revert_bytes(filter_data_bytes);
+    }
     return ret;
 }
 
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index f48ddfd7..8cc0b6d4 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -747,7 +747,12 @@ int TsFileWriter::flush() {
                 return ret;
             }
         }
+        if (check_chunk_group_empty(device_iter->second,
+                                    device_iter->second->is_aligned_)) {
+            continue;
+        }
         bool is_aligned = device_iter->second->is_aligned_;
+
         if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first,
                                                          is_aligned))) {
         } else if (RET_FAIL(
@@ -759,17 +764,24 @@ int TsFileWriter::flush() {
     return ret;
 }
 
-bool TsFileWriter::check_chunk_group_empty(
-    MeasurementSchemaGroup *chunk_group) {
+bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
+                                           bool is_aligned) {
     MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
     for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
          ms_iter++) {
         MeasurementSchema *m_schema = ms_iter->second;
-        if (m_schema->chunk_writer_ != NULL &&
-            m_schema->chunk_writer_->hasData()) {
-            // first condition is to avoid first flush empty chunk group
-            // second condition is to avoid repeated flush
-            return false;
+        if (is_aligned) {
+            if (m_schema->value_chunk_writer_ != NULL &&
+                m_schema->value_chunk_writer_->hasData()) {
+                return false;
+            }
+        } else {
+            if (m_schema->chunk_writer_ != NULL &&
+                m_schema->chunk_writer_->hasData()) {
+                // first condition is to avoid first flush empty chunk group
+                // second condition is to avoid repeated flush
+                return false;
+            }
         }
     }
     return true;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 128715a4..7b43b850 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -69,6 +69,9 @@ class TsFileWriter {
     int write_tablet(const Tablet &tablet);
     int write_record_aligned(const TsRecord &record);
     int write_tablet_aligned(const Tablet &tablet);
+    std::map<std::string, MeasurementSchemaGroup *> *get_schema_group_map() {
+        return &schemas_;
+    }
     int64_t calculate_mem_size_for_all_group();
     int check_memory_size_and_may_flush_chunks();
     /*
@@ -86,7 +89,8 @@ class TsFileWriter {
    private:
     int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp,
                     const DataPoint &point);
-    bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group);
+    bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
+                                 bool is_aligned);
     int write_point_aligned(ValueChunkWriter *value_chunk_writer,
                             int64_t timestamp, const DataPoint &point);
     int flush_chunk_group(MeasurementSchemaGroup *chunk_group, bool 
is_aligned);
diff --git a/cpp/src/writer/value_chunk_writer.cc 
b/cpp/src/writer/value_chunk_writer.cc
index b32b2dab..4156eaff 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -168,4 +168,10 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() {
                value_page_writer_.get_statistic()->get_type());
 }
 
+bool ValueChunkWriter::hasData() {
+    return num_of_pages_ > 0 ||
+           (value_page_writer_.get_statistic() != nullptr &&
+            value_page_writer_.get_statistic()->count_ > 0);
+}
+
 }  // end namespace storage
diff --git a/cpp/src/writer/value_chunk_writer.h 
b/cpp/src/writer/value_chunk_writer.h
index 3093dfff..10f51c75 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -87,6 +87,8 @@ class ValueChunkWriter {
 
     int64_t estimate_max_series_mem_size();
 
+    bool hasData();
+
    private:
     FORCE_INLINE bool is_cur_page_full() const {
         // FIXME
diff --git a/cpp/test/common/tsfile_common_test.cc 
b/cpp/test/common/tsfile_common_test.cc
index 08673458..2c9d1403 100644
--- a/cpp/test/common/tsfile_common_test.cc
+++ b/cpp/test/common/tsfile_common_test.cc
@@ -203,7 +203,7 @@ TEST_F(TSMIteratorTest, InitEmptyList) {
     common::PageArena arena;
     common::SimpleList<ChunkGroupMeta*> empty_list(&arena);
     TSMIterator iter(empty_list);
-    ASSERT_EQ(iter.init(), common::E_NOT_EXIST);
+    ASSERT_EQ(iter.init(), common::E_OK);
 }
 
 TEST_F(TSMIteratorTest, HasNext) {
diff --git a/cpp/test/writer/tsfile_writer_test.cc 
b/cpp/test/writer/tsfile_writer_test.cc
index d69a9ff1..6b601126 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -30,7 +30,7 @@
 #include "file/write_file.h"
 #include "reader/qds_without_timegenerator.h"
 #include "reader/tsfile_reader.h"
-
+#include "writer/chunk_writer.h"
 using namespace storage;
 using namespace common;
 
@@ -381,6 +381,128 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) {
     ASSERT_EQ(tsfile_writer_->close(), E_OK);
 }
 
+
+TEST_F(TsFileWriterTest, FlushMultipleDevice) {
+    const int device_num = 50;
+    const int measurement_num = 50;
+    const int max_rows = 100;
+    std::vector<MeasurementSchema> schema_vec[50];
+
+    for (int i = 0; i < device_num; i++) {
+        std::string device_name = "test_device" + std::to_string(i);
+        for (int j = 0; j < measurement_num; j++) {
+            std::string measure_name = "measurement" + std::to_string(j);
+            schema_vec[i].push_back(
+                MeasurementSchema(measure_name, common::TSDataType::INT64,
+                                  common::TSEncoding::PLAIN,
+                                  common::CompressionType::UNCOMPRESSED));
+            tsfile_writer_->register_timeseries(
+                device_name, measure_name, common::TSDataType::INT64,
+                common::TSEncoding::PLAIN,
+                common::CompressionType::UNCOMPRESSED);
+        }
+    }
+
+    for (int i = 0; i < device_num; i++) {
+        std::string device_name = "test_device" + std::to_string(i);
+        Tablet tablet(device_name, &schema_vec[i], max_rows);
+        tablet.init();
+        for (int j = 0; j < measurement_num; j++) {
+            for (int row = 0; row < max_rows; row++) {
+                tablet.set_timestamp(row, 16225600 + row);
+            }
+            for (int row = 0; row < max_rows; row++) {
+                tablet.set_value(row, j, static_cast<int64_t>(row));
+            }
+        }
+        ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
+        // flush after write tablet to check whether write empty chunk
+        ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+    }
+    ASSERT_EQ(tsfile_writer_->close(), E_OK);
+    
+    std::vector<storage::Path> select_list;
+        for (int i = 0; i < device_num; i++) {
+        std::string device_name = "test_device" + std::to_string(i);
+        for (int j = 0; j < measurement_num; j++) {
+            std::string measurement_name = "measurement" + std::to_string(j);
+            storage::Path path(device_name, measurement_name);
+            select_list.push_back(path);
+        }
+    }
+    storage::QueryExpression *query_expr =
+        storage::QueryExpression::create(select_list, nullptr);
+
+    storage::TsFileReader reader;
+    int ret = reader.open(file_name_);
+    ASSERT_EQ(ret, common::E_OK);
+    storage::QueryDataSet *tmp_qds = nullptr;
+
+    ret = reader.query(query_expr, tmp_qds);
+    auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
+
+    storage::RowRecord *record;
+    int64_t cur_record_num = 0;
+    do {
+        record = qds->get_next();
+        // if empty chunk is writen, the timestamp should be NULL
+        if (!record) {
+            break;
+        }
+        EXPECT_EQ(record->get_timestamp(), 16225600 + cur_record_num);
+        cur_record_num++;
+    } while (true);
+    EXPECT_EQ(cur_record_num, max_rows);
+    storage::QueryExpression::destory(query_expr);
+    reader.destroy_query_data_set(qds);
+}
+
+TEST_F(TsFileWriterTest, AnalyzeTsfileForload) {
+    const int device_num = 50;
+    const int measurement_num = 50;
+    const int max_rows = 100;
+    std::vector<MeasurementSchema> schema_vec[50];
+
+    for (int i = 0; i < device_num; i++) {
+        std::string device_name = "test_device" + std::to_string(i);
+        for (int j = 0; j < measurement_num; j++) {
+            std::string measure_name = "measurement" + std::to_string(j);
+            schema_vec[i].push_back(
+                MeasurementSchema(measure_name, common::TSDataType::INT64,
+                                  common::TSEncoding::PLAIN,
+                                  common::CompressionType::UNCOMPRESSED));
+            tsfile_writer_->register_timeseries(
+                device_name, measure_name, common::TSDataType::INT64,
+                common::TSEncoding::PLAIN,
+                common::CompressionType::UNCOMPRESSED);
+        }
+    }
+
+    for (int i = 0; i < device_num; i++) {
+        std::string device_name = "test_device" + std::to_string(i);
+        Tablet tablet(device_name, &schema_vec[i], max_rows);
+        tablet.init();
+        for (int j = 0; j < measurement_num; j++) {
+            for (int row = 0; row < max_rows; row++) {
+                tablet.set_timestamp(row, 16225600 + row);
+            }
+            for (int row = 0; row < max_rows; row++) {
+                tablet.set_value(row, j, static_cast<int64_t>(row));
+            }
+        }
+        ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
+    }
+    auto schemas = tsfile_writer_->get_schema_group_map();
+    ASSERT_EQ(schemas->size(), 50);
+    for (const auto& device_iter : *schemas) {
+        for (const auto& chunk_iter : 
device_iter.second->measurement_schema_map_) {
+            ASSERT_NE(chunk_iter.second->chunk_writer_, nullptr);
+            ASSERT_TRUE(chunk_iter.second->chunk_writer_->hasData());
+        }
+    }
+    ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+    ASSERT_EQ(tsfile_writer_->close(), E_OK);    
+}
 TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) {
     std::string device_path = "device1";
     std::string measurement_name = "temperature";

Reply via email to