This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_demos_multi_device
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/fix_demos_multi_device by this 
push:
     new cab76820 reuse chunk/page writers after flush
cab76820 is described below

commit cab76820a8e8eee727f760bb6dded69056edee3c
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Mar 7 15:14:46 2025 +0800

    reuse chunk/page writers after flush
---
 cpp/src/common/schema.h                                | 13 +++++++++++++
 cpp/src/common/tsfile_common.h                         |  4 ----
 cpp/src/writer/chunk_writer.cc                         | 15 ++++++++++++++-
 cpp/src/writer/chunk_writer.h                          |  1 +
 cpp/src/writer/page_writer.cc                          | 12 +++++++++---
 cpp/src/writer/time_chunk_writer.cc                    | 16 +++++++++++++++-
 cpp/src/writer/time_chunk_writer.h                     |  1 +
 cpp/src/writer/time_page_writer.cc                     | 12 ++++++++++--
 cpp/src/writer/tsfile_writer.cc                        | 11 ++++-------
 cpp/src/writer/value_chunk_writer.cc                   | 16 +++++++++++++++-
 cpp/src/writer/value_chunk_writer.h                    |  1 +
 cpp/src/writer/value_page_writer.cc                    | 12 ++++++++++--
 cpp/test/common/tsfile_common_test.cc                  |  8 ++++----
 cpp/test/reader/table_view/tsfile_reader_table_test.cc | 12 ++++++------
 cpp/test/writer/table_view/tsfile_writer_table_test.cc |  1 +
 cpp/test/writer/tsfile_writer_test.cc                  |  1 +
 16 files changed, 105 insertions(+), 31 deletions(-)

diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index 45d76de9..e0e2b3b8 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -20,6 +20,8 @@
 #ifndef COMMON_SCHEMA_H
 #define COMMON_SCHEMA_H
 
+#include <writer/chunk_writer.h>
+
 #include <algorithm>
 #include <map>  // use unordered_map instead
 #include <memory>
@@ -75,6 +77,17 @@ struct MeasurementSchema {
           chunk_writer_(nullptr),
           value_chunk_writer_(nullptr) {}
 
+    ~MeasurementSchema() {
+        if (chunk_writer_ != nullptr) {
+            delete chunk_writer_;
+            chunk_writer_ = nullptr;
+        }
+        if (value_chunk_writer_ != nullptr) {
+            delete value_chunk_writer_;
+            value_chunk_writer_ = nullptr;
+        }
+    }
+
     int serialize_to(common::ByteStream &out) {
         int ret = common::E_OK;
         if (RET_FAIL(
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index df88966c..8a77f1fe 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -124,11 +124,7 @@ struct ChunkHeader {
           chunk_type_(0) {}
 
     void reset() {
-        measurement_name_.clear();
         data_size_ = 0;
-        data_type_ = common::INVALID_DATATYPE;
-        compression_type_ = common::INVALID_COMPRESSION;
-        encoding_type_ = common::INVALID_ENCODING;
         num_of_pages_ = 0;
         serialized_size_ = 0;
         chunk_type_ = 0;
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index 888692fb..73618db7 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -69,6 +69,19 @@ void ChunkWriter::destroy() {
     num_of_pages_ = 0;
 }
 
+void ChunkWriter::reset() {
+    if (chunk_statistic_ != nullptr) {
+        chunk_statistic_->reset();
+    }
+    if (first_page_statistic_ != nullptr) {
+        first_page_statistic_->reset();
+    }
+    page_writer_.reset();
+    chunk_header_.reset();
+    chunk_data_.reset();
+    num_of_pages_ = 0;
+}
+
 int ChunkWriter::seal_cur_page(bool end_chunk) {
     int ret = E_OK;
     if (RET_FAIL(chunk_statistic_->merge_with(page_writer_.get_statistic()))) {
@@ -80,7 +93,7 @@ int ChunkWriter::seal_cur_page(bool end_chunk) {
             ret = page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
                                               /*stat*/ false, /*data*/ true);
             page_writer_.destroy_page_data();
-            page_writer_.destroy();
+            page_writer_.reset();
         } else {
             /*
              * if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 6d80353f..7add7ebd 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -62,6 +62,7 @@ class ChunkWriter {
     int init(const std::string &measurement_name, common::TSDataType data_type,
              common::TSEncoding encoding,
              common::CompressionType compression_type);
+    void reset();
     void destroy();
 
     FORCE_INLINE int write(int64_t timestamp, bool value) {
diff --git a/cpp/src/writer/page_writer.cc b/cpp/src/writer/page_writer.cc
index ec8b2856..019004a0 100644
--- a/cpp/src/writer/page_writer.cc
+++ b/cpp/src/writer/page_writer.cc
@@ -115,9 +115,15 @@ int PageWriter::init(TSDataType data_type, TSEncoding 
encoding,
  * free out_stream memory, reset statistic_,
  */
 void PageWriter::reset() {
-    time_encoder_->reset();
-    value_encoder_->reset();
-    statistic_->reset();
+    if (time_encoder_ != nullptr) {
+        time_encoder_->reset();
+    }
+    if (value_encoder_ != nullptr) {
+        value_encoder_->reset();
+    }
+    if (statistic_ != nullptr) {
+        statistic_->reset();
+    }
     time_out_stream_.reset();
     value_out_stream_.reset();
 }
diff --git a/cpp/src/writer/time_chunk_writer.cc 
b/cpp/src/writer/time_chunk_writer.cc
index b65b856b..f5b7b240 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -51,6 +51,20 @@ int TimeChunkWriter::init(const std::string 
&measurement_name,
     return ret;
 }
 
+void TimeChunkWriter::reset() {
+    if (chunk_statistic_ != nullptr) {
+        chunk_statistic_->reset();
+    }
+    if (first_page_statistic_ != nullptr) {
+        first_page_statistic_->reset();
+    }
+    time_page_writer_.reset();
+    chunk_header_.reset();
+    chunk_data_.reset();
+    num_of_pages_ = 0;
+}
+
+
 void TimeChunkWriter::destroy() {
     if (num_of_pages_ == 1) {
         free_first_writer_data();
@@ -82,7 +96,7 @@ int TimeChunkWriter::seal_cur_page(bool end_chunk) {
                 time_page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
                                                  /*stat*/ false, /*data*/ 
true);
             time_page_writer_.destroy_page_data();
-            time_page_writer_.destroy();
+            time_page_writer_.reset();
         } else {
             /*
              * if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/time_chunk_writer.h 
b/cpp/src/writer/time_chunk_writer.h
index d97a8aa9..8fcd9bd6 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -45,6 +45,7 @@ class TimeChunkWriter {
     int init(const common::ColumnSchema &col_schema);
     int init(const std::string &measurement_name, common::TSEncoding encoding,
              common::CompressionType compression_type);
+    void reset();
     void destroy();
 
     storage::ChunkHeader get_chunk_header() const { return chunk_header_; }
diff --git a/cpp/src/writer/time_page_writer.cc 
b/cpp/src/writer/time_page_writer.cc
index 49fe0fca..2ac75315 100644
--- a/cpp/src/writer/time_page_writer.cc
+++ b/cpp/src/writer/time_page_writer.cc
@@ -96,8 +96,12 @@ int TimePageWriter::init(TSEncoding encoding, 
CompressionType compression) {
 }
 
 void TimePageWriter::reset() {
-    time_encoder_->reset();
-    statistic_->reset();
+    if (time_encoder_ != nullptr) {
+        time_encoder_->reset();
+    }
+    if (statistic_ != nullptr) {
+        statistic_->reset();
+    }
     time_out_stream_.reset();
 }
 
@@ -110,6 +114,10 @@ void TimePageWriter::destroy() {
         EncoderFactory::free(time_encoder_);
         StatisticFactory::free(statistic_);
         CompressorFactory::free(compressor_);
+
+        time_encoder_ = nullptr;
+        statistic_ = nullptr;
+        compressor_ = nullptr;
     }
 }
 
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 8d7f050c..f289f875 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -727,7 +727,7 @@ int TsFileWriter::write_table(Tablet &tablet) {
                                                value_chunk_writers))) {
                 return ret;
             }
-            for (uint32_t i = start_idx; i < end_idx; i++) {
+            for (int i = start_idx; i < end_idx; i++) {
                 time_chunk_writer->write(tablet.timestamps_[i]);
             }
             uint32_t field_col_count = 0;
@@ -850,7 +850,6 @@ int TsFileWriter::value_write_column(ValueChunkWriter 
*value_chunk_writer,
     int64_t *timestamps = tablet.timestamps_;
     Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
     BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
-    uint32_t row_count = tablet.max_row_num_;
 
     if (data_type == common::BOOLEAN) {
         ret = write_typed_column(value_chunk_writer, timestamps,
@@ -1047,9 +1046,7 @@ bool 
TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
     } else if (RET_FAIL(io_writer->end_flush_chunk(                            
\
                    writer->get_chunk_statistic()))) {                          
\
     } else {                                                                   
\
-        writer->destroy();                                                     
\
-        delete writer;                                                         
\
-        writer = nullptr;                                                      
\
+        writer->reset();                                                       
\
     }
 
 int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
@@ -1069,13 +1066,13 @@ int 
TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
     for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
          ms_iter++) {
         MeasurementSchema *m_schema = ms_iter->second;
-        if (!chunk_group->is_aligned_) {
+        if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) {
             ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
             FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_,
                         m_schema->data_type_, m_schema->encoding_,
                         m_schema->compression_type_,
                         chunk_writer->num_of_pages())
-        } else {
+        } else if (m_schema->value_chunk_writer_ != nullptr) {
             ValueChunkWriter *&value_chunk_writer =
                 m_schema->value_chunk_writer_;
             FLUSH_CHUNK(value_chunk_writer, io_writer_,
diff --git a/cpp/src/writer/value_chunk_writer.cc 
b/cpp/src/writer/value_chunk_writer.cc
index 6c23cdad..e29f2565 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -52,6 +52,20 @@ int ValueChunkWriter::init(const std::string 
&measurement_name,
     return ret;
 }
 
+void ValueChunkWriter::reset() {
+    if (chunk_statistic_ != nullptr) {
+        chunk_statistic_->reset();
+    }
+    if (first_page_statistic_ != nullptr) {
+        first_page_statistic_->reset();
+    }
+    value_page_writer_.reset();
+    chunk_header_.reset();
+    chunk_data_.reset();
+    num_of_pages_ = 0;
+}
+
+
 void ValueChunkWriter::destroy() {
     if (num_of_pages_ == 1) {
         free_first_writer_data();
@@ -83,7 +97,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) {
                 chunk_data_, /*header*/ true,
                 /*stat*/ false, /*data*/ true);
             value_page_writer_.destroy_page_data();
-            value_page_writer_.destroy();
+            value_page_writer_.reset();
         } else {
             /*
              * if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/value_chunk_writer.h 
b/cpp/src/writer/value_chunk_writer.h
index 52581a34..a3e34239 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -62,6 +62,7 @@ class ValueChunkWriter {
     int init(const std::string &measurement_name, common::TSDataType data_type,
              common::TSEncoding encoding,
              common::CompressionType compression_type);
+    void reset();
     void destroy();
 
     FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
diff --git a/cpp/src/writer/value_page_writer.cc 
b/cpp/src/writer/value_page_writer.cc
index 76d820f0..b95307f7 100644
--- a/cpp/src/writer/value_page_writer.cc
+++ b/cpp/src/writer/value_page_writer.cc
@@ -111,8 +111,12 @@ int ValuePageWriter::init(TSDataType data_type, TSEncoding 
encoding,
 }
 
 void ValuePageWriter::reset() {
-    value_encoder_->reset();
-    statistic_->reset();
+    if (value_encoder_ != nullptr) {
+        value_encoder_->reset();
+    }
+    if (statistic_ != nullptr) {
+        statistic_->reset();
+    }
     col_notnull_bitmap_out_stream_.reset();
     value_out_stream_.reset();
 }
@@ -126,6 +130,10 @@ void ValuePageWriter::destroy() {
         EncoderFactory::free(value_encoder_);
         StatisticFactory::free(statistic_);
         CompressorFactory::free(compressor_);
+
+        value_encoder_ = nullptr;
+        statistic_ = nullptr;
+        compressor_ = nullptr;
     }
 }
 
diff --git a/cpp/test/common/tsfile_common_test.cc 
b/cpp/test/common/tsfile_common_test.cc
index be8c2ce5..e5eebd49 100644
--- a/cpp/test/common/tsfile_common_test.cc
+++ b/cpp/test/common/tsfile_common_test.cc
@@ -64,11 +64,11 @@ TEST(ChunkHeaderTest, Reset) {
   header.chunk_type_ = 1;
 
   header.reset();
-  EXPECT_EQ(header.measurement_name_, "");
+  EXPECT_EQ(header.measurement_name_, "test");
   EXPECT_EQ(header.data_size_, 0);
-  EXPECT_EQ(header.data_type_, common::INVALID_DATATYPE);
-  EXPECT_EQ(header.compression_type_, common::INVALID_COMPRESSION);
-  EXPECT_EQ(header.encoding_type_, common::INVALID_ENCODING);
+  EXPECT_EQ(header.data_type_, common::INT32);
+  EXPECT_EQ(header.compression_type_, common::SNAPPY);
+  EXPECT_EQ(header.encoding_type_, common::PLAIN);
   EXPECT_EQ(header.num_of_pages_, 0);
   EXPECT_EQ(header.serialized_size_, 0);
   EXPECT_EQ(header.chunk_type_, 0);
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc 
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index a98a00f3..33e9efa8 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -157,7 +157,7 @@ class TsFileTableReaderTest : public ::testing::Test {
         std::strcpy(literal, "device_id");
         String literal_str(literal, std::strlen("device_id"));
         bool has_next = false;
-        int64_t timestamp = 0;
+        int64_t row_num = 0;
         while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
             auto column_schemas = table_schema->get_measurement_schemas();
             for (const auto& column_schema : column_schemas) {
@@ -165,7 +165,7 @@ class TsFileTableReaderTest : public ::testing::Test {
                     case TSDataType::INT64:
                         ASSERT_EQ(table_result_set->get_value<int64_t>(
                                       column_schema->measurement_name_),
-                                  0);
+                                  (row_num / points_per_device) % device_num);
                         break;
                     case TSDataType::STRING:
                         ASSERT_EQ(table_result_set
@@ -185,12 +185,12 @@ class TsFileTableReaderTest : public ::testing::Test {
                     0);
             }
             for (int i = 7; i <= 11; i++) {
-                ASSERT_EQ(table_result_set->get_value<int64_t>(i), 0);
+                ASSERT_EQ(table_result_set->get_value<int64_t>(i),  (row_num / 
points_per_device) % device_num);
             }
-            ASSERT_EQ(table_result_set->get_value<int64_t>(1), timestamp);
-            timestamp++;
+            ASSERT_EQ(table_result_set->get_value<int64_t>(1), row_num % 
points_per_device);
+            row_num++;
         }
-        ASSERT_EQ(timestamp, points_per_device);
+        ASSERT_EQ(row_num, points_per_device * device_num);
         reader.destroy_query_data_set(table_result_set);
         delete[] literal;
         ASSERT_EQ(reader.close(), common::E_OK);
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc 
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 02b28b0f..a616b7fa 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -212,6 +212,7 @@ TEST_F(TsFileWriterTableTest, DISABLED_WriteAndReadSimple) {
     ResultSet* ret = nullptr;
     int ret_value =
         reader.query("test_table", {"device", "value"}, 10, 50, ret);
+    ASSERT_EQ(common::E_OK, ret_value);
 
     auto* table_result_set = (TableResultSet*)ret;
     bool has_next = false;
diff --git a/cpp/test/writer/tsfile_writer_test.cc 
b/cpp/test/writer/tsfile_writer_test.cc
index 3f99971f..dcd96db3 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -439,6 +439,7 @@ TEST_F(TsFileWriterTest, 
WriteMultipleTabletsAlignedMultiFlush) {
     storage::ResultSet *tmp_qds = nullptr;
 
     ret = reader.query(query_expr, tmp_qds);
+    ASSERT_EQ(ret, common::E_OK);
     auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
 
     storage::RowRecord *record;

Reply via email to