This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 1b38b7d7a enable memory detect. (#746)
1b38b7d7a is described below
commit 1b38b7d7a908e3ad5d54b837bf39d51d15dca657
Author: Colin Lee <[email protected]>
AuthorDate: Tue Mar 24 14:21:46 2026 +0800
enable memory detect. (#746)
* enable memory detect.
* fix format.
* fix memory loss.
* fix memory loss.
* fix memory leak.
---
cpp/CMakeLists.txt | 7 ++
cpp/src/common/allocator/alloc_base.h | 87 ++++++++---------
cpp/src/common/allocator/byte_stream.h | 40 +++++---
cpp/src/common/allocator/mem_alloc.cc | 103 +++++++++++++++------
cpp/src/common/allocator/page_arena.h | 5 +-
cpp/src/common/tablet.cc | 102 ++++++++++++--------
cpp/src/common/tablet.h | 2 +-
cpp/src/common/tablet_iterator.h | 78 ----------------
cpp/src/common/tsfile_common.h | 3 +-
cpp/src/compress/gzip_compressor.h | 8 +-
cpp/src/encoding/int32_rle_decoder.h | 2 +-
cpp/src/encoding/int32_rle_encoder.h | 4 +-
cpp/src/encoding/int64_rle_decoder.h | 14 +--
cpp/src/encoding/int64_rle_encoder.h | 4 +-
cpp/src/file/tsfile_io_reader.h | 2 +-
cpp/src/file/tsfile_io_writer.h | 2 +-
cpp/src/reader/aligned_chunk_reader.h | 8 +-
cpp/src/reader/chunk_reader.h | 6 +-
cpp/src/reader/tsfile_series_scan_iterator.cc | 7 +-
cpp/src/writer/chunk_writer.h | 2 +-
cpp/src/writer/page_writer.h | 4 +-
cpp/src/writer/time_chunk_writer.h | 2 +-
cpp/src/writer/time_page_writer.h | 2 +-
cpp/src/writer/value_chunk_writer.h | 2 +-
cpp/src/writer/value_page_writer.h | 5 +-
.../writer/table_view/tsfile_writer_table_test.cc | 79 +++++++++++++++-
26 files changed, 334 insertions(+), 246 deletions(-)
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 244faef2f..0774812d8 100755
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -58,6 +58,13 @@ if (${COV_ENABLED})
message("add_definitions -DCOV_ENABLED=1")
endif ()
+option(ENABLE_MEM_STAT "Enable memory status" ON)
+
+if (ENABLE_MEM_STAT)
+ add_definitions(-DENABLE_MEM_STAT)
+ message("add_definitions -DENABLE_MEM_STAT")
+endif ()
+
if (NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "Release" CACHE STRING "Choose the type of build."
FORCE)
diff --git a/cpp/src/common/allocator/alloc_base.h
b/cpp/src/common/allocator/alloc_base.h
index effe267d0..facfd8081 100644
--- a/cpp/src/common/allocator/alloc_base.h
+++ b/cpp/src/common/allocator/alloc_base.h
@@ -30,46 +30,33 @@ namespace common {
enum AllocModID {
__FIRST_MOD_ID = 0,
- // if you are sure you will not consume too much memory, you can use
- // MOD_DEFAULT.
MOD_DEFAULT = 0,
- MOD_MEMTABLE = 1,
- MOD_SCHEMA = 2,
- MOD_SQL = 3,
- MOD_NET = 4,
- MOD_NET_DATA = 5,
- MOD_TVLIST_DATA = 6,
- MOD_TVLIST_OBJ = 7,
- MOD_TSBLOCK = 8,
- MOD_CONTAINER = 9,
- MOD_TSSTORE_OBJ = 10,
- MOD_FLUSH_TASK_OBJ = 11,
- MOD_PAGE_WRITER_OUTPUT_STREAM = 12,
- MOD_CW_PAGES_DATA = 13,
- MOD_CHUNK_WRITER_OBJ = 14,
- MOD_STATISTIC_OBJ = 15,
- MOD_ENCODER_OBJ = 16,
- MOD_DECODER_OBJ = 17,
- MOD_TSFILE_WRITER_META = 18,
- MOD_TSFILE_WRITE_STREAM = 19,
- MOD_TIMESERIES_INDEX_OBJ = 20,
- MOD_BLOOM_FILTER = 21,
- MOD_OPEN_FILE_OBJ = 22,
- MOD_TSFILE_READER = 23,
- MOD_CHUNK_READER = 24,
- MOD_COMPRESSOR_OBJ = 25,
- MOD_ARRAY = 26,
- MOD_HASH_TABLE = 27,
- MOD_WRITER_INDEX_NODE = 28,
- MOD_TS2DIFF_OBJ = 29,
- MOD_BITENCODE_OBJ = 30,
- MOD_DICENCODE_OBJ = 31,
- MOD_ZIGZAG_OBJ = 32,
- MOD_DEVICE_META_ITER = 33,
- MOD_DEVICE_TASK_ITER = 34,
- MOD_DEVICE_ORDER_TSBLOCK_READER = 35,
- __LAST_MOD_ID = 36, // prev + 1,
- __MAX_MOD_ID = 127, // leave 1 bit to detect header size
+ MOD_TVLIST_DATA = 1,
+ MOD_TSBLOCK = 2,
+ MOD_PAGE_WRITER_OUTPUT_STREAM = 3,
+ MOD_CW_PAGES_DATA = 4,
+ MOD_STATISTIC_OBJ = 5,
+ MOD_ENCODER_OBJ = 6,
+ MOD_DECODER_OBJ = 7,
+ MOD_TSFILE_WRITER_META = 8,
+ MOD_TSFILE_WRITE_STREAM = 9,
+ MOD_TIMESERIES_INDEX_OBJ = 10,
+ MOD_BLOOM_FILTER = 11,
+ MOD_TSFILE_READER = 12,
+ MOD_CHUNK_READER = 13,
+ MOD_COMPRESSOR_OBJ = 14,
+ MOD_ARRAY = 15,
+ MOD_HASH_TABLE = 16,
+ MOD_WRITER_INDEX_NODE = 17,
+ MOD_TS2DIFF_OBJ = 18,
+ MOD_BITENCODE_OBJ = 19,
+ MOD_DICENCODE_OBJ = 20,
+ MOD_ZIGZAG_OBJ = 21,
+ MOD_DEVICE_META_ITER = 22,
+ MOD_DEVICE_TASK_ITER = 23,
+ MOD_TABLET = 24,
+ __LAST_MOD_ID = 25,
+ __MAX_MOD_ID = 127,
};
extern const char* g_mod_names[__LAST_MOD_ID];
@@ -82,24 +69,30 @@ void* mem_realloc(void* ptr, uint32_t size);
class ModStat {
public:
ModStat() : stat_arr_(NULL) {}
+ ~ModStat() { destroy(); }
static ModStat& get_instance() {
- /*
- * This is the singleton of Mod Memory Statistic.
- * gms is short for Global Mod Statistic
- */
static ModStat gms;
+#ifdef ENABLE_MEM_STAT
+ if (UNLIKELY(gms.stat_arr_ == NULL)) {
+ gms.init();
+ }
+#endif
return gms;
}
void init();
void destroy();
INLINE void update_alloc(AllocModID mid, int32_t size) {
- // ASSERT(mid < __LAST_MOD_ID);
- // ATOMIC_FAA(get_item(mid), size);
+#ifdef ENABLE_MEM_STAT
+ ASSERT(mid < __LAST_MOD_ID);
+ ATOMIC_FAA(get_item(mid), size);
+#endif
}
void update_free(AllocModID mid, uint32_t size) {
- // ASSERT(mid < __LAST_MOD_ID);
- // ATOMIC_FAA(get_item(mid), 0 - size);
+#ifdef ENABLE_MEM_STAT
+ ASSERT(mid < __LAST_MOD_ID);
+ ATOMIC_FAA(get_item(mid), 0 - size);
+#endif
}
void print_stat();
diff --git a/cpp/src/common/allocator/byte_stream.h
b/cpp/src/common/allocator/byte_stream.h
index 4e1029ea4..6d08744c3 100644
--- a/cpp/src/common/allocator/byte_stream.h
+++ b/cpp/src/common/allocator/byte_stream.h
@@ -268,9 +268,8 @@ class ByteStream {
// assert(page_size >= 16); // commented out by gxh on 2023.03.09
}
- // TODO use a specific construct function to mark it as wrapped use.
// for wrap plain buffer to ByteStream
- ByteStream()
+ ByteStream(AllocModID mid = MOD_DEFAULT)
: allocator_(g_base_allocator),
head_(nullptr, false),
tail_(nullptr, false),
@@ -279,7 +278,7 @@ class ByteStream {
read_pos_(0),
marked_read_pos_(0),
page_size_(0),
- mid_(MOD_DEFAULT),
+ mid_(mid),
wrapped_page_(false, nullptr) {}
~ByteStream() { destroy(); }
@@ -669,6 +668,11 @@ class ByteStream {
uint32_t marked_read_pos_; // current reader position
uint32_t page_size_;
AllocModID mid_;
+
+ public:
+ AllocModID get_mid() const { return mid_; }
+
+ private:
Page wrapped_page_;
};
@@ -896,10 +900,10 @@ class SerializationUtil {
FORCE_INLINE static int chunk_read_all_data(ByteStream& in, ByteStream&
out,
size_t chunk_size = 4096) {
- char* buffer = new char[chunk_size];
+ char* buffer = static_cast<char*>(mem_alloc(chunk_size,
out.get_mid()));
+ if (buffer == nullptr) return E_OOM;
int ret = common::E_OK;
while (in.remaining_size() > 0) {
- // Adjust read size based on remaining input size
uint32_t bytes_to_read = static_cast<uint32_t>(
std::min(chunk_size,
static_cast<size_t>(in.remaining_size())));
@@ -913,7 +917,7 @@ class SerializationUtil {
break;
}
}
- delete[] buffer;
+ mem_free(buffer);
return ret;
}
@@ -1172,16 +1176,18 @@ class SerializationUtil {
str = nullptr;
return ret;
} else {
- char* tmp_buf = static_cast<char*>(malloc(len));
+ char* tmp_buf =
+ static_cast<char*>(mem_alloc(len, in.get_mid()));
+ if (tmp_buf == nullptr) return E_OOM;
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
- free(tmp_buf);
+ mem_free(tmp_buf);
return ret;
} else if (len != read_len) {
- free(tmp_buf);
+ mem_free(tmp_buf);
ret = E_BUF_NOT_ENOUGH;
} else {
str = new std::string(tmp_buf, len);
- free(tmp_buf);
+ mem_free(tmp_buf);
}
}
}
@@ -1194,7 +1200,9 @@ class SerializationUtil {
int32_t read_len = 0;
if (RET_FAIL(read_var_int(len, in))) {
} else {
- char* tmp_buf = (char*)malloc(len + 1);
+ char* tmp_buf =
+ static_cast<char*>(mem_alloc(len + 1, in.get_mid()));
+ if (tmp_buf == nullptr) return E_OOM;
tmp_buf[len] = '\0';
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
} else if (len != read_len) {
@@ -1202,7 +1210,7 @@ class SerializationUtil {
} else {
str = std::string(tmp_buf);
}
- free(tmp_buf);
+ mem_free(tmp_buf);
}
return ret;
}
@@ -1220,7 +1228,9 @@ class SerializationUtil {
if (RET_FAIL(read_i32(len, in))) {
} else {
int32_t read_len = 0;
- char* tmp_buf = static_cast<char*>(malloc(len + 1));
+ char* tmp_buf =
+ static_cast<char*>(mem_alloc(len + 1, in.get_mid()));
+ if (tmp_buf == nullptr) return E_OOM;
tmp_buf[len] = '\0';
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
} else if (len != read_len) {
@@ -1228,7 +1238,7 @@ class SerializationUtil {
} else {
str = std::string(tmp_buf);
}
- free(tmp_buf);
+ mem_free(tmp_buf);
}
return ret;
}
@@ -1308,7 +1318,7 @@ FORCE_INLINE char* get_bytes_from_bytestream(ByteStream&
bs) {
return nullptr;
}
uint32_t size = bs.total_size();
- char* ret_buf = (char*)malloc(size);
+ char* ret_buf = static_cast<char*>(mem_alloc(size, bs.get_mid()));
if (ret_buf == nullptr) {
return nullptr;
}
diff --git a/cpp/src/common/allocator/mem_alloc.cc
b/cpp/src/common/allocator/mem_alloc.cc
index c79e78858..524287e75 100644
--- a/cpp/src/common/allocator/mem_alloc.cc
+++ b/cpp/src/common/allocator/mem_alloc.cc
@@ -22,6 +22,7 @@
#endif
#include <string.h>
+#include <iomanip>
#include <iostream>
#include "alloc_base.h"
@@ -34,33 +35,30 @@ namespace common {
const char* g_mod_names[__LAST_MOD_ID] = {
/* 0 */ "DEFAULT",
- /* 1 */ "MEMTABLE",
- /* 2 */ "SCHEMA",
- /* 3 */ "SQL",
- /* 4 */ "NET",
- /* 5 */ "NET_DATA",
- /* 6 */ "TVLIST_DATA",
- /* 7 */ "TVLIST_OBJ",
- /* 8 */ "TSBLOCK",
- /* 9 */ "CONTAINER",
- /* 10 */ "TSSTORE_OBJ",
- /* 11 */ "FLUSH_TASK_OBJ",
- /* 12 */ "PAGE_WRITER_OUTPUT_STREAM",
- /* 13 */ "CW_PAGES_DATA",
- /* 14 */ "CHUNK_WRITER_OBJ",
- /* 15 */ "STATISTIC_OBJ",
- /* 16 */ "ENCODER_OBJ",
- /* 17 */ "DECODER_OBJ",
- /* 18 */ "TSFILE_WRITER_META",
- /* 19 */ "TSFILE_WRITE_STREAM",
- /* 20 */ "TIMESERIES_INDEX_OBJ",
- /* 21 */ "BLOOM_FILTER",
- /* 22 */ "OPEN_FILE_OBJ",
- /* 23 */ "TSFILE_READER",
- /* 24 */ "CHUNK_READER",
- /* 25 */ "COMPRESSOR_OBJ",
- /* 26 */ "ARRAY",
- /* 27 */ "HASH_TABLE",
+ /* 1 */ "TVLIST_DATA",
+ /* 2 */ "TSBLOCK",
+ /* 3 */ "PAGE_WRITER_OUTPUT_STREAM",
+ /* 4 */ "CW_PAGES_DATA",
+ /* 5 */ "STATISTIC_OBJ",
+ /* 6 */ "ENCODER_OBJ",
+ /* 7 */ "DECODER_OBJ",
+ /* 8 */ "TSFILE_WRITER_META",
+ /* 9 */ "TSFILE_WRITE_STREAM",
+ /* 10 */ "TIMESERIES_INDEX_OBJ",
+ /* 11 */ "BLOOM_FILTER",
+ /* 12 */ "TSFILE_READER",
+ /* 13 */ "CHUNK_READER",
+ /* 14 */ "COMPRESSOR_OBJ",
+ /* 15 */ "ARRAY",
+ /* 16 */ "HASH_TABLE",
+ /* 17 */ "WRITER_INDEX_NODE",
+ /* 18 */ "TS2DIFF_OBJ",
+ /* 19 */ "BITENCODE_OBJ",
+ /* 20 */ "DICENCODE_OBJ",
+ /* 21 */ "ZIGZAG_OBJ",
+ /* 22 */ "DEVICE_META_ITER",
+ /* 23 */ "DEVICE_TASK_ITER",
+ /* 24 */ "TABLET",
};
// Most modern CPUs (e.g., x86_64, Arm) support at least 8-byte alignment,
@@ -97,6 +95,7 @@ void* mem_alloc(uint32_t size, AllocModID mid) {
auto high4b = static_cast<uint32_t>(header >> 32);
*reinterpret_cast<uint32_t*>(raw) = high4b;
*reinterpret_cast<uint32_t*>(raw + 4) = low4b;
+ ModStat::get_instance().update_alloc(mid, static_cast<int32_t>(size));
return raw + header_size;
}
@@ -164,6 +163,9 @@ void* mem_realloc(void* ptr, uint32_t size) {
}
void ModStat::init() {
+ if (stat_arr_ != NULL) {
+ return;
+ }
stat_arr_ = (int32_t*)(::malloc(ITEM_SIZE * ITEM_COUNT));
for (int8_t i = 0; i < __LAST_MOD_ID; i++) {
int32_t* item = get_item(i);
@@ -171,7 +173,52 @@ void ModStat::init() {
}
}
-void ModStat::destroy() { ::free(stat_arr_); }
+void ModStat::destroy() {
+ ::free(stat_arr_);
+ stat_arr_ = NULL;
+}
+
+void ModStat::print_stat() {
+ if (stat_arr_ == NULL) return;
+
+ struct Entry {
+ const char* name;
+ int32_t val;
+ };
+ Entry entries[__LAST_MOD_ID];
+ int count = 0;
+ int64_t total = 0;
+
+ for (int i = 0; i < __LAST_MOD_ID; i++) {
+ int32_t val = ATOMIC_FAA(get_item(i), 0);
+ total += val;
+ if (val != 0) {
+ entries[count++] = {g_mod_names[i], val};
+ }
+ }
+
+ for (int i = 0; i < count - 1; i++) {
+ for (int j = i + 1; j < count; j++) {
+ if (entries[j].val > entries[i].val) {
+ Entry tmp = entries[i];
+ entries[i] = entries[j];
+ entries[j] = tmp;
+ }
+ }
+ }
+
+ std::cout << "=== Memory Statistics ===" << std::endl;
+ for (int i = 0; i < count; i++) {
+ std::cout << " " << entries[i].name << ": " << entries[i].val
+ << " bytes" << std::endl;
+ }
+ double kb = total / 1024.0;
+ double mb = kb / 1024.0;
+ std::cout << " TOTAL: " << total << " bytes / " << std::fixed
+ << std::setprecision(2) << kb << " KB / " << mb << " MB"
+ << std::endl;
+ std::cout << "=========================" << std::endl;
+}
BaseAllocator g_base_allocator;
diff --git a/cpp/src/common/allocator/page_arena.h
b/cpp/src/common/allocator/page_arena.h
index 71c11988d..9b8ce5ef6 100644
--- a/cpp/src/common/allocator/page_arena.h
+++ b/cpp/src/common/allocator/page_arena.h
@@ -31,9 +31,10 @@ namespace common {
*/
class PageArena {
public:
- explicit PageArena(BaseAllocator& base_allocator = g_base_allocator)
+ explicit PageArena(AllocModID mid = __FIRST_MOD_ID,
+ BaseAllocator& base_allocator = g_base_allocator)
: page_size_(0),
- mid_(__FIRST_MOD_ID),
+ mid_(mid),
base_allocator_(base_allocator),
dummy_head_() {}
~PageArena() { destroy(); }
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index 68bdb898d..f2d09e53b 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -21,6 +21,7 @@
#include <cstdlib>
+#include "allocator/alloc_base.h"
#include "datatype/date_converter.h"
#include "utils/errno_define.h"
@@ -30,7 +31,9 @@ namespace storage {
int Tablet::init() {
ASSERT(timestamps_ == nullptr);
- timestamps_ = (int64_t*)malloc(sizeof(int64_t) * max_row_num_);
+ timestamps_ = static_cast<int64_t*>(
+ common::mem_alloc(sizeof(int64_t) * max_row_num_, common::MOD_TABLET));
+ if (timestamps_ == nullptr) return E_OOM;
cur_row_size_ = 0;
size_t schema_count = schema_vec_->size();
@@ -39,54 +42,66 @@ int Tablet::init() {
ins_res = schema_map_.insert(
std::make_pair(to_lower(schema_vec_->at(c).measurement_name_), c));
if (!ins_res.second) {
- // maybe dup measurement_name
return E_INVALID_ARG;
}
}
ASSERT(schema_map_.size() == schema_count);
- value_matrix_ =
- (ValueMatrixEntry*)malloc(sizeof(ValueMatrixEntry) * schema_count);
+ value_matrix_ = static_cast<ValueMatrixEntry*>(common::mem_alloc(
+ sizeof(ValueMatrixEntry) * schema_count, common::MOD_TABLET));
+ if (value_matrix_ == nullptr) return E_OOM;
for (size_t c = 0; c < schema_count; ++c) {
const MeasurementSchema& schema = schema_vec_->at(c);
switch (schema.data_type_) {
- case BOOLEAN:
- value_matrix_[c].bool_data = (bool*)malloc(
- get_data_type_size(schema.data_type_) * max_row_num_);
- memset(value_matrix_[c].bool_data, 0,
- get_data_type_size(schema.data_type_) * max_row_num_);
+ case BOOLEAN: {
+ size_t sz = sizeof(bool) * max_row_num_;
+ value_matrix_[c].bool_data = static_cast<bool*>(
+ common::mem_alloc(sz, common::MOD_TABLET));
+ if (value_matrix_[c].bool_data == nullptr) return E_OOM;
+ memset(value_matrix_[c].bool_data, 0, sz);
break;
+ }
case DATE:
- case INT32:
- value_matrix_[c].int32_data = (int32_t*)malloc(
- get_data_type_size(schema.data_type_) * max_row_num_);
- memset(value_matrix_[c].int32_data, 0,
- get_data_type_size(schema.data_type_) * max_row_num_);
+ case INT32: {
+ size_t sz = sizeof(int32_t) * max_row_num_;
+ value_matrix_[c].int32_data = static_cast<int32_t*>(
+ common::mem_alloc(sz, common::MOD_TABLET));
+ if (value_matrix_[c].int32_data == nullptr) return E_OOM;
+ memset(value_matrix_[c].int32_data, 0, sz);
break;
+ }
case TIMESTAMP:
- case INT64:
- value_matrix_[c].int64_data = (int64_t*)malloc(
- get_data_type_size(schema.data_type_) * max_row_num_);
- memset(value_matrix_[c].int64_data, 0,
- get_data_type_size(schema.data_type_) * max_row_num_);
+ case INT64: {
+ size_t sz = sizeof(int64_t) * max_row_num_;
+ value_matrix_[c].int64_data = static_cast<int64_t*>(
+ common::mem_alloc(sz, common::MOD_TABLET));
+ if (value_matrix_[c].int64_data == nullptr) return E_OOM;
+ memset(value_matrix_[c].int64_data, 0, sz);
break;
- case FLOAT:
- value_matrix_[c].float_data = (float*)malloc(
- get_data_type_size(schema.data_type_) * max_row_num_);
- memset(value_matrix_[c].float_data, 0,
- get_data_type_size(schema.data_type_) * max_row_num_);
+ }
+ case FLOAT: {
+ size_t sz = sizeof(float) * max_row_num_;
+ value_matrix_[c].float_data = static_cast<float*>(
+ common::mem_alloc(sz, common::MOD_TABLET));
+ if (value_matrix_[c].float_data == nullptr) return E_OOM;
+ memset(value_matrix_[c].float_data, 0, sz);
break;
- case DOUBLE:
- value_matrix_[c].double_data = (double*)malloc(
- get_data_type_size(schema.data_type_) * max_row_num_);
- memset(value_matrix_[c].double_data, 0,
- get_data_type_size(schema.data_type_) * max_row_num_);
+ }
+ case DOUBLE: {
+ size_t sz = sizeof(double) * max_row_num_;
+ value_matrix_[c].double_data = static_cast<double*>(
+ common::mem_alloc(sz, common::MOD_TABLET));
+ if (value_matrix_[c].double_data == nullptr) return E_OOM;
+ memset(value_matrix_[c].double_data, 0, sz);
break;
+ }
case BLOB:
case TEXT:
case STRING: {
value_matrix_[c].string_data =
- (common::String*)malloc(sizeof(String) * max_row_num_);
+ static_cast<common::String*>(common::mem_alloc(
+ sizeof(String) * max_row_num_, common::MOD_TABLET));
+ if (value_matrix_[c].string_data == nullptr) return E_OOM;
break;
}
default:
@@ -95,8 +110,11 @@ int Tablet::init() {
}
}
- bitmaps_ = new BitMap[schema_count];
+ bitmaps_ = static_cast<BitMap*>(
+ common::mem_alloc(sizeof(BitMap) * schema_count, common::MOD_TABLET));
+ if (bitmaps_ == nullptr) return E_OOM;
for (size_t c = 0; c < schema_count; c++) {
+ new (&bitmaps_[c]) BitMap();
bitmaps_[c].init(max_row_num_, false);
}
return E_OK;
@@ -104,7 +122,7 @@ int Tablet::init() {
void Tablet::destroy() {
if (timestamps_ != nullptr) {
- free(timestamps_);
+ common::mem_free(timestamps_);
timestamps_ = nullptr;
}
@@ -114,36 +132,40 @@ void Tablet::destroy() {
switch (schema.data_type_) {
case DATE:
case INT32:
- free(value_matrix_[c].int32_data);
+ common::mem_free(value_matrix_[c].int32_data);
break;
case TIMESTAMP:
case INT64:
- free(value_matrix_[c].int64_data);
+ common::mem_free(value_matrix_[c].int64_data);
break;
case FLOAT:
- free(value_matrix_[c].float_data);
+ common::mem_free(value_matrix_[c].float_data);
break;
case DOUBLE:
- free(value_matrix_[c].double_data);
+ common::mem_free(value_matrix_[c].double_data);
break;
case BOOLEAN:
- free(value_matrix_[c].bool_data);
+ common::mem_free(value_matrix_[c].bool_data);
break;
case BLOB:
case TEXT:
case STRING:
- free(value_matrix_[c].string_data);
+ common::mem_free(value_matrix_[c].string_data);
break;
default:
break;
}
}
- free(value_matrix_);
+ common::mem_free(value_matrix_);
value_matrix_ = nullptr;
}
if (bitmaps_ != nullptr) {
- delete[] bitmaps_;
+ size_t schema_count = schema_vec_->size();
+ for (size_t c = 0; c < schema_count; c++) {
+ bitmaps_[c].~BitMap();
+ }
+ common::mem_free(bitmaps_);
bitmaps_ = nullptr;
}
}
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index 02691087d..e47aa5c42 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -265,7 +265,7 @@ class Tablet {
private:
template <typename T>
void process_val(uint32_t row_index, uint32_t schema_index, T val);
- common::PageArena page_arena_;
+ common::PageArena page_arena_{common::MOD_TABLET};
uint32_t max_row_num_;
uint32_t cur_row_size_;
std::string insert_target_name_;
diff --git a/cpp/src/common/tablet_iterator.h b/cpp/src/common/tablet_iterator.h
deleted file mode 100644
index 53163f7e6..000000000
--- a/cpp/src/common/tablet_iterator.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef COMMON_TABLET_H
-#define COMMON_TABLET_H
-
-namespace storage {
-
-#define GET_TYPED_NEXT(timestamp, CppType, TsType, value) \
- do { \
- if (row_iter_ >= max_rows_) { \
- return E_NO_MORE_DATA; \
- } \
- if (data_type_ != TsType) { \
- return E_DATA_TYPE_NOT_MATCH; \
- } \
- timestamp = tablet_.timestamps_[row_iter_]; \
- void* value_arr = value_matrix_[col_idx_]; \
- if (data_type_ == TsType) { \
- value = ((CppType*)value_arr) + row_iter_; \
- } \
- return E_OK; \
- } while (false)
-
-class TabletColIterator {
- public:
- TabletColIterator(const Tablet& tablet, int col_idx)
- : tablet_(tablet), col_idx_(col_idx) {
- ASSERT(col_idx <= tablet.schema_vec_->size());
- data_type = get_data_type_size(tablet.schema_vec_->at(i).data_type_);
- row_iter_ = 0;
- }
-
- const MeasurementSchema& get_measurement_schema() const {
- return schema_vec_->at(col_idx_);
- }
-
- int get_next(int64_t& timestamp, bool& value) {
- GET_TYPED_NEXT(timestamp, bool, BOOLEAN, value);
- }
- int get_next(int64_t& timestamp, int32_t& value) {
- GET_TYPED_NEXT(timestamp, int32_t, INT32, value);
- }
- int get_next(int64_t& timestamp, int64_t& value) {
- GET_TYPED_NEXT(timestamp, int64_t, INT64, value);
- }
- int get_next(int64_t& timestamp, float& value) {
- GET_TYPED_NEXT(timestamp, float, FLOAT, value);
- }
- int get_next(int64_t& timestamp, double& value) {
- GET_TYPED_NEXT(timestamp, double, DOUBLE, value);
- }
-
- private:
- const Tablet& tablet_;
- TSDataType data_type_;
- int col_idx_;
- int row_iter_;
-};
-
-} // end namespace storage
-#endif
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index ad2fa5911..c866e4995 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -565,7 +565,8 @@ class TimeseriesIndex : public ITimeseriesIndex {
*/
Statistic* statistic_;
bool statistic_from_pa_;
- common::ByteStream chunk_meta_list_serialized_buf_;
+ common::ByteStream chunk_meta_list_serialized_buf_{
+ common::MOD_TSFILE_WRITER_META};
// common::PageArena page_arena_;
common::SimpleList<ChunkMeta*>* chunk_meta_list_; // for deserialize_from
};
diff --git a/cpp/src/compress/gzip_compressor.h
b/cpp/src/compress/gzip_compressor.h
index 677b72663..f2aba1310 100644
--- a/cpp/src/compress/gzip_compressor.h
+++ b/cpp/src/compress/gzip_compressor.h
@@ -46,7 +46,9 @@ class GzipCompressor {
void destroy() { end_zstream(); }
int compress(char* uncompressed_buf, uint32_t uncompressed_buf_len,
char*& compressed_buf, uint32_t& compressed_buf_len);
- void after_compress(char* compressed_buf) { ::free(compressed_buf); }
+ void after_compress(char* compressed_buf) {
+ common::mem_free(compressed_buf);
+ }
int compress_into_bytestream(char* uncompressed_buf,
uint32_t uncompressed_buf_len,
common::ByteStream& out);
@@ -69,7 +71,9 @@ class GzipDeCompressor {
void destroy() { end_zstream(); }
int uncompress(char* compressed_buf, uint32_t compressed_buf_len,
char*& uncompressed_buf, uint32_t& uncompressed_buf_len);
- void after_uncompress(char* uncompressed_buf) { ::free(uncompressed_buf); }
+ void after_uncompress(char* uncompressed_buf) {
+ common::mem_free(uncompressed_buf);
+ }
int decompress_into_bytestream(char* compressed_buf,
uint32_t compressed_buf_len,
common::ByteStream& out);
diff --git a/cpp/src/encoding/int32_rle_decoder.h
b/cpp/src/encoding/int32_rle_decoder.h
index 757a92599..aee9048a1 100644
--- a/cpp/src/encoding/int32_rle_decoder.h
+++ b/cpp/src/encoding/int32_rle_decoder.h
@@ -37,7 +37,7 @@ class Int32RleDecoder : public Decoder {
int bitpacking_num_;
bool is_length_and_bitwidth_readed_;
int current_count_;
- common::ByteStream byte_cache_;
+ common::ByteStream byte_cache_{common::MOD_DECODER_OBJ};
int32_t* current_buffer_;
Int32Packer* packer_;
uint8_t* tmp_buf_;
diff --git a/cpp/src/encoding/int32_rle_encoder.h
b/cpp/src/encoding/int32_rle_encoder.h
index d9eecf5dd..9301135a4 100644
--- a/cpp/src/encoding/int32_rle_encoder.h
+++ b/cpp/src/encoding/int32_rle_encoder.h
@@ -36,7 +36,7 @@ class Int32RleEncoder : public Encoder {
int num_buffered_values_;
int bit_width_;
Int32Packer* packer_;
- common::ByteStream byte_cache_;
+ common::ByteStream byte_cache_{common::MOD_ENCODER_OBJ};
std::vector<int32_t> values_; // all data tobe encoded
int32_t buffered_values_[8]; // encode each 8 values
std::vector<unsigned char> bytes_buffer_;
@@ -146,7 +146,7 @@ class Int32RleEncoder : public Encoder {
void convert_buffer() {
// TODO: put the bytes on the stack instead on the heap
unsigned char* bytes = (unsigned char*)common::mem_alloc(
- bit_width_, common::MOD_BITENCODE_OBJ);
+ bit_width_, common::MOD_ENCODER_OBJ);
int32_t tmp_buffer[8];
for (int i = 0; i < 8; i++) {
tmp_buffer[i] = (int64_t)buffered_values_[i];
diff --git a/cpp/src/encoding/int64_rle_decoder.h
b/cpp/src/encoding/int64_rle_decoder.h
index b2f85ed1e..8010fe0f7 100644
--- a/cpp/src/encoding/int64_rle_decoder.h
+++ b/cpp/src/encoding/int64_rle_decoder.h
@@ -37,7 +37,7 @@ class Int64RleDecoder : public Decoder {
int bitpacking_num_;
bool is_length_and_bitwidth_readed_;
int current_count_;
- common::ByteStream byte_cache_;
+ common::ByteStream byte_cache_{common::MOD_DECODER_OBJ};
int64_t* current_buffer_;
Int64Packer* packer_;
uint8_t* tmp_buf_;
@@ -150,9 +150,11 @@ class Int64RleDecoder : public Decoder {
void read_bit_packing_buffer(int bit_packed_group_count,
int last_bit_packed_num) {
if (current_buffer_ != nullptr) {
- delete[] current_buffer_;
+ common::mem_free(current_buffer_);
}
- current_buffer_ = new int64_t[bit_packed_group_count * 8];
+ current_buffer_ = static_cast<int64_t*>(
+ common::mem_alloc(sizeof(int64_t) * bit_packed_group_count * 8,
+ common::MOD_DECODER_OBJ));
int bytes_to_read = bit_packed_group_count * bit_width_;
if (bytes_to_read > (int)byte_cache_.remaining_size()) {
bytes_to_read = byte_cache_.remaining_size();
@@ -199,13 +201,13 @@ class Int64RleDecoder : public Decoder {
void init_packer() { packer_ = new Int64Packer(bit_width_); }
- void destroy() { /* do nothing for BitpackEncoder */
+ void destroy() {
if (packer_) {
delete (packer_);
packer_ = nullptr;
}
if (current_buffer_) {
- delete[] current_buffer_;
+ common::mem_free(current_buffer_);
current_buffer_ = nullptr;
}
if (tmp_buf_) {
@@ -221,7 +223,7 @@ class Int64RleDecoder : public Decoder {
is_length_and_bitwidth_readed_ = false;
current_count_ = 0;
if (current_buffer_) {
- delete[] current_buffer_;
+ common::mem_free(current_buffer_);
current_buffer_ = nullptr;
}
if (packer_) {
diff --git a/cpp/src/encoding/int64_rle_encoder.h
b/cpp/src/encoding/int64_rle_encoder.h
index 82fd40f38..119b36981 100644
--- a/cpp/src/encoding/int64_rle_encoder.h
+++ b/cpp/src/encoding/int64_rle_encoder.h
@@ -36,7 +36,7 @@ class Int64RleEncoder : public Encoder {
int num_buffered_values_;
int bit_width_;
Int64Packer* packer_;
- common::ByteStream byte_cache_;
+ common::ByteStream byte_cache_{common::MOD_ENCODER_OBJ};
std::vector<int64_t> values_; // all data tobe encoded
int64_t buffered_values_[8]; // encode each 8 values
std::vector<unsigned char> bytes_buffer_;
@@ -146,7 +146,7 @@ class Int64RleEncoder : public Encoder {
void convert_buffer() {
// TODO: put the bytes on the stack instead on the heap
unsigned char* bytes = (unsigned char*)common::mem_alloc(
- bit_width_, common::MOD_BITENCODE_OBJ);
+ bit_width_, common::MOD_ENCODER_OBJ);
int64_t tmp_buffer[8];
for (int i = 0; i < 8; i++) {
tmp_buffer[i] = (int64_t)buffered_values_[i];
diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h
index 19bcfea0b..2f4135e0e 100644
--- a/cpp/src/file/tsfile_io_reader.h
+++ b/cpp/src/file/tsfile_io_reader.h
@@ -45,7 +45,7 @@ class TsFileIOReader {
tsfile_meta_(&tsfile_meta_page_arena_),
tsfile_meta_ready_(false),
read_file_created_(false) {
- tsfile_meta_page_arena_.init(512, common::MOD_DEFAULT);
+ tsfile_meta_page_arena_.init(512, common::MOD_TSFILE_READER);
}
int init(const std::string& file_path);
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index 8fcc8fa55..6b12f4015 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -205,7 +205,7 @@ class TsFileIOWriter {
private:
common::PageArena meta_allocator_;
- common::ByteStream write_stream_;
+ common::ByteStream write_stream_{common::MOD_TSFILE_WRITE_STREAM};
common::ByteStream::Consumer write_stream_consumer_;
ChunkMeta* cur_chunk_meta_;
ChunkGroupMeta* cur_chunk_group_meta_;
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index aefb7bc58..12e0b9289 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -156,8 +156,8 @@ class AlignedChunkReader : public IChunkReader {
* also refer to offset within the chunk (including chunk header).
* It advanced by step of a page header or a page tv data.
*/
- common::ByteStream time_in_stream_;
- common::ByteStream value_in_stream_;
+ common::ByteStream time_in_stream_{common::MOD_CHUNK_READER};
+ common::ByteStream value_in_stream_{common::MOD_CHUNK_READER};
int32_t file_data_time_buf_size_;
int32_t file_data_value_buf_size_;
uint32_t time_chunk_visit_offset_;
@@ -170,8 +170,8 @@ class AlignedChunkReader : public IChunkReader {
Decoder* time_decoder_;
Decoder* value_decoder_;
- common::ByteStream time_in_;
- common::ByteStream value_in_;
+ common::ByteStream time_in_{common::MOD_CHUNK_READER};
+ common::ByteStream value_in_{common::MOD_CHUNK_READER};
char* time_uncompressed_buf_;
char* value_uncompressed_buf_;
std::vector<uint8_t> value_page_col_notnull_bitmap_;
diff --git a/cpp/src/reader/chunk_reader.h b/cpp/src/reader/chunk_reader.h
index 52c7f7a59..106b8648b 100644
--- a/cpp/src/reader/chunk_reader.h
+++ b/cpp/src/reader/chunk_reader.h
@@ -125,7 +125,7 @@ class ChunkReader : public IChunkReader {
* also refer to offset within the chunk (including chunk header).
* It advanced by step of a page header or a page tv data.
*/
- common::ByteStream in_stream_;
+ common::ByteStream in_stream_{common::MOD_CHUNK_READER};
int32_t file_data_buf_size_;
uint32_t chunk_visit_offset_;
@@ -135,8 +135,8 @@ class ChunkReader : public IChunkReader {
Decoder* time_decoder_;
Decoder* value_decoder_;
- common::ByteStream time_in_;
- common::ByteStream value_in_;
+ common::ByteStream time_in_{common::MOD_CHUNK_READER};
+ common::ByteStream value_in_{common::MOD_CHUNK_READER};
char* uncompressed_buf_;
};
diff --git a/cpp/src/reader/tsfile_series_scan_iterator.cc
b/cpp/src/reader/tsfile_series_scan_iterator.cc
index 5e78574e7..8130bd8ba 100644
--- a/cpp/src/reader/tsfile_series_scan_iterator.cc
+++ b/cpp/src/reader/tsfile_series_scan_iterator.cc
@@ -96,7 +96,8 @@ int TsFileSeriesScanIterator::init_chunk_reader() {
int ret = E_OK;
is_aligned_ = itimeseries_index_->get_data_type() == common::VECTOR;
if (!is_aligned_) {
- void* buf = common::mem_alloc(sizeof(ChunkReader),
common::MOD_DEFAULT);
+ void* buf =
+ common::mem_alloc(sizeof(ChunkReader), common::MOD_CHUNK_READER);
chunk_reader_ = new (buf) ChunkReader;
chunk_meta_cursor_ =
itimeseries_index_->get_chunk_meta_list()->begin();
ChunkMeta* cm = chunk_meta_cursor_.get();
@@ -109,8 +110,8 @@ int TsFileSeriesScanIterator::init_chunk_reader() {
chunk_meta_cursor_++;
}
} else {
- void* buf =
- common::mem_alloc(sizeof(AlignedChunkReader), common::MOD_DEFAULT);
+ void* buf = common::mem_alloc(sizeof(AlignedChunkReader),
+ common::MOD_CHUNK_READER);
chunk_reader_ = new (buf) AlignedChunkReader;
time_chunk_meta_cursor_ =
itimeseries_index_->get_time_chunk_meta_list()->begin();
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 3032ff9a5..6eb3f5418 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -150,7 +150,7 @@ class ChunkWriter {
common::TSDataType data_type_;
PageWriter page_writer_;
Statistic* chunk_statistic_;
- common::ByteStream chunk_data_;
+ common::ByteStream chunk_data_{common::MOD_CW_PAGES_DATA};
// to save first page data
PageData first_page_data_;
diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h
index cff4b60ed..d3966d865 100644
--- a/cpp/src/writer/page_writer.h
+++ b/cpp/src/writer/page_writer.h
@@ -201,8 +201,8 @@ class PageWriter {
Encoder* time_encoder_;
Encoder* value_encoder_;
Statistic* statistic_;
- common::ByteStream time_out_stream_;
- common::ByteStream value_out_stream_;
+ common::ByteStream time_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM};
+ common::ByteStream
value_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM};
PageData cur_page_data_;
Compressor* compressor_;
bool is_inited_;
diff --git a/cpp/src/writer/time_chunk_writer.h
b/cpp/src/writer/time_chunk_writer.h
index ac3b374b0..0c6e1f18a 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -102,7 +102,7 @@ class TimeChunkWriter {
private:
TimePageWriter time_page_writer_;
Statistic* chunk_statistic_;
- common::ByteStream chunk_data_;
+ common::ByteStream chunk_data_{common::MOD_CW_PAGES_DATA};
// to save first page data
TimePageData first_page_data_;
diff --git a/cpp/src/writer/time_page_writer.h
b/cpp/src/writer/time_page_writer.h
index 4c01044a6..d9dcecff1 100644
--- a/cpp/src/writer/time_page_writer.h
+++ b/cpp/src/writer/time_page_writer.h
@@ -121,7 +121,7 @@ class TimePageWriter {
common::TSDataType data_type_;
Encoder* time_encoder_;
Statistic* statistic_;
- common::ByteStream time_out_stream_;
+ common::ByteStream time_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM};
TimePageData cur_page_data_;
Compressor* compressor_;
bool is_inited_;
diff --git a/cpp/src/writer/value_chunk_writer.h
b/cpp/src/writer/value_chunk_writer.h
index 859fb57b0..4391b7540 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -148,7 +148,7 @@ class ValueChunkWriter {
common::TSDataType data_type_;
ValuePageWriter value_page_writer_;
Statistic* chunk_statistic_;
- common::ByteStream chunk_data_;
+ common::ByteStream chunk_data_{common::MOD_CW_PAGES_DATA};
// to save first page data
ValuePageData first_page_data_;
diff --git a/cpp/src/writer/value_page_writer.h
b/cpp/src/writer/value_page_writer.h
index 60d75b0b8..ec115c9da 100644
--- a/cpp/src/writer/value_page_writer.h
+++ b/cpp/src/writer/value_page_writer.h
@@ -205,8 +205,9 @@ class ValuePageWriter {
common::TSDataType data_type_;
Encoder* value_encoder_;
Statistic* statistic_;
- common::ByteStream col_notnull_bitmap_out_stream_;
- common::ByteStream value_out_stream_;
+ common::ByteStream col_notnull_bitmap_out_stream_{
+ common::MOD_PAGE_WRITER_OUTPUT_STREAM};
+ common::ByteStream
value_out_stream_{common::MOD_PAGE_WRITER_OUTPUT_STREAM};
ValuePageData cur_page_data_;
Compressor* compressor_;
bool is_inited_;
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 1f8c80ff6..477f875e7 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -1095,4 +1095,81 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration)
{
delete[] literal;
delete[] text_literal;
delete table_schema;
-}
\ No newline at end of file
+}
+
+#ifdef ENABLE_MEM_STAT
+TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) {
+ TableSchema* table_schema = gen_table_schema(0, 2, 3);
+ auto tsfile_table_writer =
+ std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
+
+ const int num_devices = 10;
+ const int num_timestamps = 100;
+
+ for (int flush = 0; flush < 10; flush++) {
+ Tablet tablet =
+ gen_tablet(table_schema, flush * num_devices * num_timestamps,
+ num_devices, num_timestamps);
+ ASSERT_EQ(tsfile_table_writer->write_table(tablet), E_OK);
+ std::cout << "--- After write, before flush " << flush << " ---"
+ << std::endl;
+ ModStat::get_instance().print_stat();
+ ASSERT_EQ(tsfile_table_writer->flush(), E_OK);
+
+ std::cout << "--- After flush " << flush << " ---" << std::endl;
+ ModStat::get_instance().print_stat();
+ }
+
+ ASSERT_EQ(tsfile_table_writer->close(), E_OK);
+
+ std::cout << "--- After writer close ---" << std::endl;
+ ModStat::get_instance().print_stat();
+
+ TsFileReader reader;
+ ASSERT_EQ(reader.open(file_name_), E_OK);
+
+ std::cout << "--- After reader open ---" << std::endl;
+ ModStat::get_instance().print_stat();
+
+ ResultSet* result_set = nullptr;
+ ASSERT_EQ(reader.query(table_schema->get_table_name(),
+ table_schema->get_measurement_names(), 0, INT64_MAX,
+ result_set),
+ E_OK);
+
+ std::cout << "--- After query init ---" << std::endl;
+ ModStat::get_instance().print_stat();
+
+ int row_count = 0;
+ bool has_next = false;
+ const int total_rows = num_devices * num_timestamps * 10;
+ const int sample_interval = total_rows / 5;
+ int next_sample = sample_interval;
+ auto* table_result_set = static_cast<TableResultSet*>(result_set);
+ while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ row_count++;
+ if (row_count == next_sample) {
+ std::cout << "--- Reading row " << row_count << "/" << total_rows
+ << " ---" << std::endl;
+ ModStat::get_instance().print_stat();
+ next_sample += sample_interval;
+ }
+ }
+ EXPECT_EQ(row_count, total_rows);
+
+ std::cout << "--- After read complete ---" << std::endl;
+ ModStat::get_instance().print_stat();
+
+ reader.destroy_query_data_set(table_result_set);
+
+ std::cout << "--- After destroy result set ---" << std::endl;
+ ModStat::get_instance().print_stat();
+
+ ASSERT_EQ(reader.close(), E_OK);
+
+ std::cout << "--- After reader close ---" << std::endl;
+ ModStat::get_instance().print_stat();
+
+ delete table_schema;
+}
+#endif
\ No newline at end of file