This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6a5f8330 feature: finish multiple flush for c++_tsfile (#110)
6a5f8330 is described below

commit 6a5f83303adc313239546ba1848ed49e18afe691
Author: RkGrit <[email protected]>
AuthorDate: Tue Jul 2 17:01:44 2024 +0800

    feature: finish multiple flush for c++_tsfile (#110)
    
    * feature: finish multiple flush for c++_tsfile
    
    * rewrite some code and add some comments
    
    * add multi-flush for write_tablet()
---
 cpp/src/common/config/config.h        |   2 +
 cpp/src/common/global.cc              |   2 +
 cpp/src/common/tsfile_common.h        |   9 +++
 cpp/src/encoding/bitpack_encoder.h    |  43 ++++++++------
 cpp/src/encoding/dictionary_encoder.h |   6 ++
 cpp/src/encoding/encoder.h            |   8 +++
 cpp/src/encoding/gorilla_encoder.h    |   6 +-
 cpp/src/encoding/plain_encoder.h      |   8 +--
 cpp/src/encoding/ts2diff_encoder.h    |   5 ++
 cpp/src/file/tsfile_io_writer.cc      |   2 -
 cpp/src/file/tsfile_io_writer.h       |   4 +-
 cpp/src/writer/chunk_writer.cc        |   9 +++
 cpp/src/writer/chunk_writer.h         |   2 +
 cpp/src/writer/page_writer.h          |  12 ++++
 cpp/src/writer/tsfile_writer.cc       | 104 ++++++++++++++++++++++++++++------
 cpp/src/writer/tsfile_writer.h        |   8 ++-
 16 files changed, 183 insertions(+), 47 deletions(-)

diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h
index 679a69b0..d2589229 100644
--- a/cpp/src/common/config/config.h
+++ b/cpp/src/common/config/config.h
@@ -49,6 +49,8 @@ typedef struct ConfigValue {
     const char *tsfile_prefix_path_;
     TSEncoding time_encoding_type_;
     uint32_t memtable_flusher_poll_interval_seconds_;
+    int32_t chunk_group_size_threshold_;
+    int32_t record_count_for_next_mem_check_;
 } ConfigValue;
 
 extern void init_config_value();
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index b54d4499..dfbbfc81 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -52,6 +52,8 @@ void init_config_value() {
     g_config_value_.page_writer_max_memory_bytes_ = 128 * 1024;  // 128 k
     g_config_value_.max_degree_of_index_node_ = 256;
     g_config_value_.tsfile_index_bloom_filter_error_percent_ = 0.05;
+    g_config_value_.record_count_for_next_mem_check_ = 100;
+    g_config_value_.chunk_group_size_threshold_ = 128 * 1024 * 1024;
     // g_config_value_.tsfile_prefix_path_ = "./data";
     g_config_value_.tsfile_prefix_path_ = "";
     // g_config_value_.time_encoding_type_ = TS_2DIFF;
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index a1920e25..601d5886 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -86,6 +86,15 @@ struct PageHeader {
         }
         return ret;
     }
+
+    /** max page header size without statistics. */
+    static int estimat_max_page_header_size_without_statistics() {
+        // uncompressedSize, compressedSize
+        // because we use unsigned varInt to encode these two integer, each
+        // unsigned varInt will cost at most 5 bytes
+        return 2 * (4 + 1);
+    }
+
 #ifndef NDEBUG
     friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) {
         os << "{uncompressed_size_=" << h.uncompressed_size_
diff --git a/cpp/src/encoding/bitpack_encoder.h 
b/cpp/src/encoding/bitpack_encoder.h
index 85a385bf..708e00a7 100644
--- a/cpp/src/encoding/bitpack_encoder.h
+++ b/cpp/src/encoding/bitpack_encoder.h
@@ -58,13 +58,12 @@ class BitPackEncoder {
         packer_ = nullptr;
     }
 
-    void destroy() { /* do nothing for BitPackEncoder */
-        delete (packer_);
-    }
+    void destroy() { delete (packer_); }
 
     void reset() {
         num_buffered_values_ = 0;
         bitpacked_group_count_ = 0;
+        bit_width_ = 0;
         bytes_buffer_.clear();
         byte_cache_.reset();
         values_.clear();
@@ -74,11 +73,13 @@ class BitPackEncoder {
 
     FORCE_INLINE void encode(int64_t value, common::ByteStream &out) {
         values_.push_back(value);
+        int current_bit_width = 32 - number_of_leading_zeros(value);
+        if (current_bit_width > bit_width_) {
+            bit_width_ = current_bit_width;
+        }
     }
 
     void encode_flush(common::ByteStream &out) {
-        // we get bit width after receiving all data
-        bit_width_ = get_int_max_bit_width(values_);
         ASSERT(packer_ == nullptr);
         packer_ = new IntPacker(bit_width_);
         common::SerializationUtil::write_i8(bit_width_, byte_cache_);
@@ -121,19 +122,6 @@ class BitPackEncoder {
         common::mem_free(bytes);
     }
 
-    int get_int_max_bit_width(std::vector<int64_t> values) {
-        // TODO: Optimization - find the maximum value first, and then calcuate
-        // the bit width
-        int max = 1;
-        for (size_t i = 0; i < values.size(); i++) {
-            int bitWidth = 64 - number_of_leading_zeros(values[i]);
-            if (bitWidth > max) {
-                max = bitWidth;
-            }
-        }
-        return max;
-    }
-
     void flush(common::ByteStream &out) {
         int last_bitpacked_num = num_buffered_values_;
         if (num_buffered_values_ > 0) {
@@ -167,6 +155,25 @@ class BitPackEncoder {
         bytes_buffer_.clear();
         bitpacked_group_count_ = 0;
     }
+
+    int get_max_byte_size() {
+        if (values_.empty()) {
+            return 0;
+        }
+        int totalValues = values_.size();
+        int fullGroups = totalValues / 8;
+        int remainingValues = totalValues % 8;
+        int bytesPerGroup = (bit_width_ * 8 + 7) / 8;
+        int maxSize = 0;
+        maxSize += fullGroups * bytesPerGroup;
+        if (remainingValues > 0) {
+            maxSize += bytesPerGroup;
+        }
+
+        // Add additional bytes, because each bitpack group has a header of 1 
byte and a tail of 1 byte.
+        maxSize += fullGroups * (1 + 1) + (remainingValues > 0 ? (1 + 1) : 0);
+        return maxSize;
+    }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/encoding/dictionary_encoder.h 
b/cpp/src/encoding/dictionary_encoder.h
index 73c6a6f5..9ea3f6f0 100644
--- a/cpp/src/encoding/dictionary_encoder.h
+++ b/cpp/src/encoding/dictionary_encoder.h
@@ -95,6 +95,12 @@ class DictionaryEncoder {
     void write_encoded_data(common::ByteStream &out) {
         values_encoder_.encode_flush(out);
     }
+
+    int get_max_byte_size() 
+    {
+        // 4 bytes for storing dictionary size
+        return 4 + map_size_ + values_encoder_.get_max_byte_size();
+    }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/encoding/encoder.h b/cpp/src/encoding/encoder.h
index 99e5146a..9b52cfd4 100644
--- a/cpp/src/encoding/encoder.h
+++ b/cpp/src/encoding/encoder.h
@@ -38,6 +38,14 @@ class Encoder {
     virtual int encode(float value, common::ByteStream &out_stream) = 0;
     virtual int encode(double value, common::ByteStream &out_stream) = 0;
     virtual int flush(common::ByteStream &out_stream) = 0;
+
+    /**
+     * The maximal possible memory size occupied by current Encoder. This
+     * statistic value doesn't involve OutputStream.
+     *
+     * @return the maximal size of possible memory occupied by current encoder
+     */
+    virtual int get_max_byte_size() = 0;
 };
 
 }  // end namespace storage
diff --git a/cpp/src/encoding/gorilla_encoder.h 
b/cpp/src/encoding/gorilla_encoder.h
index 2ae580cd..47cedf18 100644
--- a/cpp/src/encoding/gorilla_encoder.h
+++ b/cpp/src/encoding/gorilla_encoder.h
@@ -119,7 +119,7 @@ class GorillaEncoder : public Encoder {
         }
     }
 
-    int get_one_item_max_size();
+    int get_max_byte_size();
     void write_first(T value, common::ByteStream &out);
     void write_existing_leading(T xor_value, common::ByteStream &out);
     void write_new_leading(T xor_value, int leading_zeros, int trailing_zeros,
@@ -145,11 +145,11 @@ class GorillaEncoder : public Encoder {
 };
 
 template <>
-FORCE_INLINE int GorillaEncoder<int32_t>::get_one_item_max_size() {
+FORCE_INLINE int GorillaEncoder<int32_t>::get_max_byte_size() {
     return INT32_ONE_ITEM_MAX_SIZE;
 }
 template <>
-FORCE_INLINE int GorillaEncoder<int64_t>::get_one_item_max_size() {
+FORCE_INLINE int GorillaEncoder<int64_t>::get_max_byte_size() {
     return INT64_ONE_ITEM_MAX_SIZE;
 }
 
diff --git a/cpp/src/encoding/plain_encoder.h b/cpp/src/encoding/plain_encoder.h
index 556aea97..ab1e06f6 100644
--- a/cpp/src/encoding/plain_encoder.h
+++ b/cpp/src/encoding/plain_encoder.h
@@ -28,10 +28,8 @@ class PlainEncoder : public Encoder {
    public:
     PlainEncoder() {}
     ~PlainEncoder() { destroy(); }
-    void destroy() { /* do nothing for PlainEncoder */
-    }
-    void reset() { /* do thing for PlainEncoder */
-    }
+    void destroy() { /* do nothing for PlainEncoder */ }
+    void reset() { /* do thing for PlainEncoder */ }
 
     FORCE_INLINE int encode(bool value, common::ByteStream &out_stream) {
         return common::SerializationUtil::write_i8(value ? 1 : 0, out_stream);
@@ -57,6 +55,8 @@ class PlainEncoder : public Encoder {
         // do nothing for PlainEncoder
         return common::E_OK;
     }
+
+    int get_max_byte_size() { return 0; }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/encoding/ts2diff_encoder.h 
b/cpp/src/encoding/ts2diff_encoder.h
index 081c7fa7..67d2c83d 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -112,6 +112,11 @@ class TS2DIFFEncoder : public Encoder {
 
     int flush(common::ByteStream &out_stream);
 
+    int get_max_byte_size() {
+        // The meaning of 24 is: 
index(4)+width(4)+minDeltaBase(8)+firstValue(8)
+        return 24 + write_index_ * 8;
+    }
+
    public:
     int block_size_;
     T *delta_arr_;
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index ac1e1aa1..3880116e 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -168,8 +168,6 @@ int TsFileIOWriter::flush_chunk(ByteStream &chunk_data) {
     } else if (RET_FAIL(flush_stream_to_file())) {
         // log_err("flush stream error, ret=%d", ret);
     }
-
-    chunk_data.destroy();
     return ret;
 }
 
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index db594a12..092c543f 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -97,8 +97,8 @@ class TsFileIOWriter {
     int end_flush_chunk_group();
     int end_file();
 
-    FORCE_INLINE std::vector<TimeseriesTimeIndexEntry>
-        &get_ts_time_index_vector() {
+    FORCE_INLINE std::vector<TimeseriesTimeIndexEntry> &
+    get_ts_time_index_vector() {
         return ts_time_index_vector_;
     }
     FORCE_INLINE std::string get_file_path() { return file_->get_file_path(); }
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index 51c78af5..a283fb96 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -63,6 +63,7 @@ void ChunkWriter::destroy() {
     }
     chunk_data_.destroy();
     chunk_header_.reset();
+    num_of_pages_ = 0;
 }
 
 int ChunkWriter::seal_cur_page(bool end_chunk) {
@@ -151,4 +152,12 @@ int ChunkWriter::end_encode_chunk() {
     return ret;
 }
 
+
+int64_t ChunkWriter::estimate_max_series_mem_size(){
+  return chunk_data_.total_size()
+        + page_writer_.estimate_max_mem_size()
+        + PageHeader::estimat_max_page_header_size_without_statistics()
+        + get_typed_statistic_sizeof(page_writer_.get_statistic()->get_type());
+}
+
 }  // end namespace storage
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index c181ce98..54d51f54 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -93,6 +93,8 @@ class ChunkWriter {
         return false;
     }
 
+    int64_t estimate_max_series_mem_size();
+
    private:
     FORCE_INLINE bool is_cur_page_full() const {
         // FIXME
diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h
index 98b25179..6800c7ea 100644
--- a/cpp/src/writer/page_writer.h
+++ b/cpp/src/writer/page_writer.h
@@ -134,6 +134,18 @@ class PageWriter {
     FORCE_INLINE uint32_t get_page_memory_size() const {
         return time_out_stream_.total_size() + value_out_stream_.total_size();
     }
+    /**
+     * calculate max possible memory size it occupies, including time
+     * outputStream and value outputStream, because size outputStream is never
+     * used until flushing.
+     *
+     * @return allocated size in time, value and outputStream
+     */
+    FORCE_INLINE uint32_t estimate_max_mem_size() const {
+        return time_out_stream_.total_size() + value_out_stream_.total_size() +
+               time_encoder_->get_max_byte_size() +
+               value_encoder_->get_max_byte_size();
+    }
     int write_to_chunk(common::ByteStream &pages_data, bool write_header,
                        bool write_statistic, bool write_data_to_chunk_data);
     FORCE_INLINE common::ByteStream &get_time_data() {
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 37fe6098..7d5db9f0 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -60,10 +60,10 @@ TsFileWriter::TsFileWriter()
     : write_file_(nullptr),
       io_writer_(nullptr),
       schemas_(),
-      start_file_done_(false),
-      write_file_created_(false) {
-    // do nothing.
-}
+      record_count_since_last_flush_(0),
+      record_count_for_next_mem_check_(
+          g_config_value_.record_count_for_next_mem_check_),
+      write_file_created_(false) {}
 
 TsFileWriter::~TsFileWriter() { destroy(); }
 
@@ -94,6 +94,7 @@ void TsFileWriter::destroy() {
         delete dev_iter->second;
     }
     schemas_.clear();
+    record_count_since_last_flush_ = 0;
 }
 
 int TsFileWriter::init(WriteFile *write_file) {
@@ -124,6 +125,12 @@ int TsFileWriter::open(const std::string &file_path, int 
flags, mode_t mode) {
     if (RET_FAIL(write_file_->create(file_path, flags, mode))) {
     } else {
         io_writer_->init(write_file_);
+        if (RET_FAIL(io_writer_->start_file())) {
+            return ret;
+        }
+#if DEBUG_SE
+        std::cout << "finish writing magic code" << std::endl;
+#endif
     }
     return ret;
 }
@@ -202,9 +209,7 @@ struct MeasurementNamesFromTablet {
 template <typename MeasurementNamesGetter>
 int TsFileWriter::do_check_schema(const std::string &device_name,
                                   MeasurementNamesGetter &measurement_names,
-                                  SimpleVector<ChunkWriter *> &chunk_writers)
-// std::vector<ChunkWriter*> &chunk_writers)
-{
+                                  SimpleVector<ChunkWriter *> &chunk_writers) {
     int ret = E_OK;
     DeviceSchemaIter dev_it = schemas_.find(device_name);
     MeasurementSchemaGroup *device_schema = NULL;
@@ -220,8 +225,8 @@ int TsFileWriter::do_check_schema(const std::string 
&device_name,
         if (UNLIKELY(ms_iter == msm.end())) {
             chunk_writers.push_back(NULL);
         } else {
-            // Here we may check data_type against ms_iter. But in Java
-            // libtsfile, no check here.
+            // In Java we will check data_type. But in C++, no check here.
+            // Because checks are performed at the chunk layer and page layer
             MeasurementSchema *ms = ms_iter->second;
             if (IS_NULL(ms->chunk_writer_)) {
                 ms->chunk_writer_ = new ChunkWriter;
@@ -242,6 +247,43 @@ int TsFileWriter::do_check_schema(const std::string 
&device_name,
     return ret;
 }
 
+int64_t TsFileWriter::calculate_mem_size_for_all_group() {
+    int64_t mem_total_size = 0;
+    DeviceSchemaIter device_iter;
+    for (device_iter = schemas_.begin(); device_iter != schemas_.end();
+         device_iter++) {
+        MeasurementSchemaGroup *chunk_group = device_iter->second;
+        MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
+        for (MeasurementSchemaMapIter ms_iter = map.begin();
+             ms_iter != map.end(); ms_iter++) {
+            MeasurementSchema *m_schema = ms_iter->second;
+            ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
+            if (chunk_writer != NULL) {
+                mem_total_size += chunk_writer->estimate_max_series_mem_size();
+            }
+        }
+    }
+    return mem_total_size;
+}
+
+/**
+ * check occupied memory size, if it exceeds the chunkGroupSize threshold, 
flush
+ * them to given OutputStream.
+ */
+int TsFileWriter::check_memory_size_and_may_flush_chunks() {
+    int ret = E_OK;
+    if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) {
+        int64_t mem_size = calculate_mem_size_for_all_group();
+        record_count_for_next_mem_check_ =
+            record_count_since_last_flush_ *
+            common::g_config_value_.chunk_group_size_threshold_ / mem_size;
+        if (mem_size > common::g_config_value_.chunk_group_size_threshold_) {
+            ret = flush();
+        }
+    }
+    return ret;
+}
+
 int TsFileWriter::write_record(const TsRecord &record) {
     int ret = E_OK;
     // std::vector<ChunkWriter*> chunk_writers;
@@ -261,6 +303,9 @@ int TsFileWriter::write_record(const TsRecord &record) {
         // ignore point writer failure
         write_point(chunk_writer, record.timestamp_, record.points_[c]);
     }
+
+    record_count_since_last_flush_++;
+    ret = check_memory_size_and_may_flush_chunks();
     return ret;
 }
 
@@ -303,6 +348,9 @@ int TsFileWriter::write_tablet(const Tablet &tablet) {
         // ignore writer failure
         write_column(chunk_writer, tablet, c);
     }
+
+    record_count_since_last_flush_ += tablet.max_rows_;
+    ret = check_memory_size_and_may_flush_chunks();
     return ret;
 }
 
@@ -381,27 +429,44 @@ int TsFileWriter::write_typed_column(ChunkWriter 
*chunk_writer,
 // TODO make sure ret is meaningful to SDK user
 int TsFileWriter::flush() {
     int ret = E_OK;
-    if (!start_file_done_) {
-        if (RET_FAIL(io_writer_->start_file())) {
-            return ret;
-        }
-        start_file_done_ = true;
-    }
-    std::cout << "finish writing magic code" << std::endl;
 
     /* since @schemas_ used std::map which is rbtree underlying,
        so map itself is ordered by device name. */
     std::map<std::string, MeasurementSchemaGroup *>::iterator device_iter;
     for (device_iter = schemas_.begin(); device_iter != schemas_.end();
-         device_iter++) {  // cppcheck-suppress postfixOperator
+         device_iter++) {
+        if (check_chunk_group_empty(device_iter->second)) {
+            continue;
+        }
+
         if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first))) 
{
+            return ret;
         } else if (RET_FAIL(flush_chunk_group(device_iter->second))) {
+            return ret;
         } else if (RET_FAIL(io_writer_->end_flush_chunk_group())) {
+            return ret;
         }
     }
+    record_count_since_last_flush_ = 0;
     return ret;
 }
 
+bool TsFileWriter::check_chunk_group_empty(
+    MeasurementSchemaGroup *chunk_group) {
+    MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
+    for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
+         ms_iter++) {
+        MeasurementSchema *m_schema = ms_iter->second;
+        if (m_schema->chunk_writer_ != NULL &&
+            m_schema->chunk_writer_->num_of_pages() > 0) {
+            // first condition is to avoid first flush empty chunk group
+            // second condition is to avoid repeated flush
+            return false;
+        }
+    }
+    return true;
+}
+
 int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group) {
     int ret = E_OK;
     MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
@@ -415,10 +480,15 @@ int 
TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group) {
                        m_schema->measurement_name_, m_schema->data_type_,
                        m_schema->encoding_, m_schema->compression_type_,
                        chunk_writer->num_of_pages()))) {
+            return ret;
         } else if (RET_FAIL(io_writer_->flush_chunk(
                        chunk_writer->get_chunk_data()))) {
+            return ret;
         } else if (RET_FAIL(io_writer_->end_flush_chunk(
                        chunk_writer->get_chunk_statistic()))) {
+            return ret;
+        } else {
+            chunk_writer->destroy();
         }
     }
     return ret;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index f7d304b2..099c6467 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -59,6 +59,8 @@ class TsFileWriter {
                             common::CompressionType compression_type);
     int write_record(const TsRecord &record);
     int write_tablet(const Tablet &tablet);
+    int64_t calculate_mem_size_for_all_group();
+    int check_memory_size_and_may_flush_chunks();
 
     /*
      * Flush buffer to disk file, but do not writer file index part.
@@ -75,6 +77,7 @@ class TsFileWriter {
    private:
     int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp,
                     const DataPoint &point);
+    bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group);
     int flush_chunk_group(MeasurementSchemaGroup *chunk_group);
 
     int write_typed_column(storage::ChunkWriter *chunk_writer,
@@ -112,7 +115,10 @@ class TsFileWriter {
     storage::TsFileIOWriter *io_writer_;
     // device_name -> MeasurementSchemaGroup
     std::map<std::string, MeasurementSchemaGroup *> schemas_;
-    bool start_file_done_;
+    // record count since last flush
+    int64_t record_count_since_last_flush_;
+    // record count for next memory check
+    int64_t record_count_for_next_mem_check_;
     bool write_file_created_;
 };
 

Reply via email to