This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new c852f8d5 Fix demos & multi device & multi flush (#431)
c852f8d5 is described below
commit c852f8d51e1f1299059a7cd9d19029bf2d56715a
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Mar 7 16:20:23 2025 +0800
Fix demos & multi device & multi flush (#431)
* fix demos & fix insertion with multiple devices
* reuse chunk/page writers after flush
---
cpp/examples/c_examples/demo_read.c | 24 ++++-----
cpp/examples/c_examples/demo_write.c | 1 +
cpp/examples/cpp_examples/demo_read.cpp | 4 +-
cpp/src/common/schema.h | 13 +++++
cpp/src/common/tsfile_common.h | 4 --
cpp/src/writer/chunk_writer.cc | 15 +++++-
cpp/src/writer/chunk_writer.h | 1 +
cpp/src/writer/page_writer.cc | 12 +++--
cpp/src/writer/time_chunk_writer.cc | 16 +++++-
cpp/src/writer/time_chunk_writer.h | 1 +
cpp/src/writer/time_page_writer.cc | 12 ++++-
cpp/src/writer/tsfile_writer.cc | 62 ++++++++--------------
cpp/src/writer/tsfile_writer.h | 16 +++---
cpp/src/writer/value_chunk_writer.cc | 16 +++++-
cpp/src/writer/value_chunk_writer.h | 1 +
cpp/src/writer/value_page_writer.cc | 12 ++++-
cpp/test/common/tsfile_common_test.cc | 8 +--
.../reader/table_view/tsfile_reader_table_test.cc | 23 +++++---
.../writer/table_view/tsfile_writer_table_test.cc | 1 +
cpp/test/writer/tsfile_writer_test.cc | 1 +
20 files changed, 155 insertions(+), 88 deletions(-)
diff --git a/cpp/examples/c_examples/demo_read.c
b/cpp/examples/c_examples/demo_read.c
index 8e4e7acf..d3c17a12 100644
--- a/cpp/examples/c_examples/demo_read.c
+++ b/cpp/examples/c_examples/demo_read.c
@@ -43,11 +43,11 @@ ERRNO read_tsfile() {
// Get query result metadata: column name and datatype
ResultSetMetaData metadata = tsfile_result_set_get_metadata(ret);
- int column_num = metadata.column_num;
+ int column_num = tsfile_result_set_metadata_get_column_num(metadata);
- for (int i = 0; i < column_num; i++) {
- printf("column:%s, datatype:%d\n", metadata.column_names[i],
- metadata.data_types[i]);
+ for (int i = 1; i <= column_num; i++) {
+ printf("column:%s, datatype:%d\n",
tsfile_result_set_metadata_get_column_name(metadata, i),
+ tsfile_result_set_metadata_get_data_type(metadata, i));
}
// Get data by column name or index.
@@ -55,37 +55,37 @@ ERRNO read_tsfile() {
// Timestamp at column 1 and column index begin from 1.
Timestamp timestamp =
tsfile_result_set_get_value_by_index_int64_t(ret, 1);
- printf("%ld ", timestamp);
+ printf("%ld\n", timestamp);
for (int i = 1; i <= column_num; i++) {
if (tsfile_result_set_is_null_by_index(ret, i)) {
printf(" null ");
} else {
- switch (metadata.data_types[i]) {
+ switch (tsfile_result_set_metadata_get_data_type(metadata, i))
{
case TS_DATATYPE_BOOLEAN:
- printf("%d", tsfile_result_set_get_value_by_index_bool(
+ printf("%d\n",
tsfile_result_set_get_value_by_index_bool(
ret, i));
break;
case TS_DATATYPE_INT32:
- printf("%d",
+ printf("%d\n",
tsfile_result_set_get_value_by_index_int32_t(ret,
i));
break;
case TS_DATATYPE_INT64:
- printf("%ld",
+ printf("%ld\n",
tsfile_result_set_get_value_by_index_int64_t(ret,
i));
break;
case TS_DATATYPE_FLOAT:
- printf("%f",
tsfile_result_set_get_value_by_index_float(
+ printf("%f\n",
tsfile_result_set_get_value_by_index_float(
ret, i));
break;
case TS_DATATYPE_DOUBLE:
- printf("%lf",
+ printf("%lf\n",
tsfile_result_set_get_value_by_index_double(ret,
i));
break;
case TS_DATATYPE_STRING:
- printf("%s",
+ printf("%s\n",
tsfile_result_set_get_value_by_index_string(ret,
i));
break;
diff --git a/cpp/examples/c_examples/demo_write.c
b/cpp/examples/c_examples/demo_write.c
index fffe3f18..ebfc99df 100644
--- a/cpp/examples/c_examples/demo_write.c
+++ b/cpp/examples/c_examples/demo_write.c
@@ -54,6 +54,7 @@ ERRNO write_tsfile() {
.encoding = TS_ENCODING_PLAIN,
.column_category = FIELD};
+ remove("test_c.tsfile");
// Create a file with specify path to write tsfile.
WriteFile file = write_file_new("test_c.tsfile", &code);
HANDLE_ERROR(code);
diff --git a/cpp/examples/cpp_examples/demo_read.cpp
b/cpp/examples/cpp_examples/demo_read.cpp
index 4951e8ca..667fbfa1 100644
--- a/cpp/examples/cpp_examples/demo_read.cpp
+++ b/cpp/examples/cpp_examples/demo_read.cpp
@@ -54,7 +54,7 @@ int demo_read() {
for (int i = 1; i <= column_num; i++) {
std::cout << "column name: " << metadata->get_column_name(i)
<< std::endl;
- std::cout << "column type: " << metadata->get_column_type(i)
+ std::cout << "column type: " <<
std::to_string(metadata->get_column_type(i))
<< std::endl;
}
@@ -84,7 +84,7 @@ int demo_read() {
std::cout << ret->get_value<double>(i) << std::endl;
break;
case common::STRING:
- std::cout << ret->get_value<common::String*>(i)
+ std::cout << *(ret->get_value<common::String*>(i))
<< std::endl;
break;
default:;
diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index 45d76de9..e0e2b3b8 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -20,6 +20,8 @@
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
+#include <writer/chunk_writer.h>
+
#include <algorithm>
#include <map> // use unordered_map instead
#include <memory>
@@ -75,6 +77,17 @@ struct MeasurementSchema {
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}
+ ~MeasurementSchema() {
+ if (chunk_writer_ != nullptr) {
+ delete chunk_writer_;
+ chunk_writer_ = nullptr;
+ }
+ if (value_chunk_writer_ != nullptr) {
+ delete value_chunk_writer_;
+ value_chunk_writer_ = nullptr;
+ }
+ }
+
int serialize_to(common::ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index df88966c..8a77f1fe 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -124,11 +124,7 @@ struct ChunkHeader {
chunk_type_(0) {}
void reset() {
- measurement_name_.clear();
data_size_ = 0;
- data_type_ = common::INVALID_DATATYPE;
- compression_type_ = common::INVALID_COMPRESSION;
- encoding_type_ = common::INVALID_ENCODING;
num_of_pages_ = 0;
serialized_size_ = 0;
chunk_type_ = 0;
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index 888692fb..73618db7 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -69,6 +69,19 @@ void ChunkWriter::destroy() {
num_of_pages_ = 0;
}
+void ChunkWriter::reset() {
+ if (chunk_statistic_ != nullptr) {
+ chunk_statistic_->reset();
+ }
+ if (first_page_statistic_ != nullptr) {
+ first_page_statistic_->reset();
+ }
+ page_writer_.reset();
+ chunk_header_.reset();
+ chunk_data_.reset();
+ num_of_pages_ = 0;
+}
+
int ChunkWriter::seal_cur_page(bool end_chunk) {
int ret = E_OK;
if (RET_FAIL(chunk_statistic_->merge_with(page_writer_.get_statistic()))) {
@@ -80,7 +93,7 @@ int ChunkWriter::seal_cur_page(bool end_chunk) {
ret = page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
/*stat*/ false, /*data*/ true);
page_writer_.destroy_page_data();
- page_writer_.destroy();
+ page_writer_.reset();
} else {
/*
* if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 6d80353f..7add7ebd 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -62,6 +62,7 @@ class ChunkWriter {
int init(const std::string &measurement_name, common::TSDataType data_type,
common::TSEncoding encoding,
common::CompressionType compression_type);
+ void reset();
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value) {
diff --git a/cpp/src/writer/page_writer.cc b/cpp/src/writer/page_writer.cc
index ec8b2856..019004a0 100644
--- a/cpp/src/writer/page_writer.cc
+++ b/cpp/src/writer/page_writer.cc
@@ -115,9 +115,15 @@ int PageWriter::init(TSDataType data_type, TSEncoding
encoding,
* free out_stream memory, reset statistic_,
*/
void PageWriter::reset() {
- time_encoder_->reset();
- value_encoder_->reset();
- statistic_->reset();
+ if (time_encoder_ != nullptr) {
+ time_encoder_->reset();
+ }
+ if (value_encoder_ != nullptr) {
+ value_encoder_->reset();
+ }
+ if (statistic_ != nullptr) {
+ statistic_->reset();
+ }
time_out_stream_.reset();
value_out_stream_.reset();
}
diff --git a/cpp/src/writer/time_chunk_writer.cc
b/cpp/src/writer/time_chunk_writer.cc
index b65b856b..f5b7b240 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -51,6 +51,20 @@ int TimeChunkWriter::init(const std::string
&measurement_name,
return ret;
}
+void TimeChunkWriter::reset() {
+ if (chunk_statistic_ != nullptr) {
+ chunk_statistic_->reset();
+ }
+ if (first_page_statistic_ != nullptr) {
+ first_page_statistic_->reset();
+ }
+ time_page_writer_.reset();
+ chunk_header_.reset();
+ chunk_data_.reset();
+ num_of_pages_ = 0;
+}
+
+
void TimeChunkWriter::destroy() {
if (num_of_pages_ == 1) {
free_first_writer_data();
@@ -82,7 +96,7 @@ int TimeChunkWriter::seal_cur_page(bool end_chunk) {
time_page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
/*stat*/ false, /*data*/
true);
time_page_writer_.destroy_page_data();
- time_page_writer_.destroy();
+ time_page_writer_.reset();
} else {
/*
* if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/time_chunk_writer.h
b/cpp/src/writer/time_chunk_writer.h
index d97a8aa9..8fcd9bd6 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -45,6 +45,7 @@ class TimeChunkWriter {
int init(const common::ColumnSchema &col_schema);
int init(const std::string &measurement_name, common::TSEncoding encoding,
common::CompressionType compression_type);
+ void reset();
void destroy();
storage::ChunkHeader get_chunk_header() const { return chunk_header_; }
diff --git a/cpp/src/writer/time_page_writer.cc
b/cpp/src/writer/time_page_writer.cc
index 49fe0fca..2ac75315 100644
--- a/cpp/src/writer/time_page_writer.cc
+++ b/cpp/src/writer/time_page_writer.cc
@@ -96,8 +96,12 @@ int TimePageWriter::init(TSEncoding encoding,
CompressionType compression) {
}
void TimePageWriter::reset() {
- time_encoder_->reset();
- statistic_->reset();
+ if (time_encoder_ != nullptr) {
+ time_encoder_->reset();
+ }
+ if (statistic_ != nullptr) {
+ statistic_->reset();
+ }
time_out_stream_.reset();
}
@@ -110,6 +114,10 @@ void TimePageWriter::destroy() {
EncoderFactory::free(time_encoder_);
StatisticFactory::free(statistic_);
CompressorFactory::free(compressor_);
+
+ time_encoder_ = nullptr;
+ statistic_ = nullptr;
+ compressor_ = nullptr;
}
}
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 88e2927d..f289f875 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -671,7 +671,8 @@ int TsFileWriter::write_tablet_aligned(const Tablet
&tablet) {
if (IS_NULL(value_chunk_writer)) {
continue;
}
- value_write_column(value_chunk_writer, tablet, c);
+ value_write_column(value_chunk_writer, tablet, c, 0,
+ tablet.get_cur_row_size());
}
return ret;
}
@@ -716,7 +717,8 @@ int TsFileWriter::write_table(Tablet &tablet) {
int start_idx = 0;
for (auto &device_id_end_index_pair : device_id_end_index_pairs) {
auto device_id = device_id_end_index_pair.first;
- if (device_id_end_index_pair.second == 0) continue;
+ int end_idx = device_id_end_index_pair.second;
+ if (end_idx == 0) continue;
if (table_aligned_) {
SimpleVector<ValueChunkWriter *> value_chunk_writers;
TimeChunkWriter *time_chunk_writer = nullptr;
@@ -725,7 +727,7 @@ int TsFileWriter::write_table(Tablet &tablet) {
value_chunk_writers))) {
return ret;
}
- for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) {
+ for (int i = start_idx; i < end_idx; i++) {
time_chunk_writer->write(tablet.timestamps_[i]);
}
uint32_t field_col_count = 0;
@@ -737,10 +739,12 @@ int TsFileWriter::write_table(Tablet &tablet) {
if (IS_NULL(value_chunk_writer)) {
continue;
}
- value_write_column(value_chunk_writer, tablet, i);
+ value_write_column(value_chunk_writer, tablet, i,
start_idx,
+ end_idx);
field_col_count++;
}
}
+ start_idx = end_idx;
} else {
MeasurementNamesFromTablet mnames_getter(tablet);
SimpleVector<ChunkWriter *> chunk_writers;
@@ -846,28 +850,27 @@ int TsFileWriter::value_write_column(ValueChunkWriter
*value_chunk_writer,
int64_t *timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
- uint32_t row_count = tablet.max_row_num_;
if (data_type == common::BOOLEAN) {
ret = write_typed_column(value_chunk_writer, timestamps,
(bool *)col_values.bool_data,
- col_notnull_bitmap, row_count);
+ col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::INT32) {
ret = write_typed_column(value_chunk_writer, timestamps,
(int32_t *)col_values.int32_data,
- col_notnull_bitmap, row_count);
+ col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::INT64) {
ret = write_typed_column(value_chunk_writer, timestamps,
(int64_t *)col_values.int64_data,
- col_notnull_bitmap, row_count);
+ col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::FLOAT) {
ret = write_typed_column(value_chunk_writer, timestamps,
(float *)col_values.float_data,
- col_notnull_bitmap, row_count);
+ col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::DOUBLE) {
ret = write_typed_column(value_chunk_writer, timestamps,
(double *)col_values.double_data,
- col_notnull_bitmap, row_count);
+ col_notnull_bitmap, start_idx, end_idx);
} else {
return E_NOT_SUPPORT;
}
@@ -888,7 +891,7 @@ int TsFileWriter::value_write_column(ValueChunkWriter
*value_chunk_writer,
#define DO_VALUE_WRITE_TYPED_COLUMN() \
do { \
int ret = E_OK; \
- for (uint32_t r = 0; r < row_count; r++) { \
+ for (uint32_t r = start_idx; r < end_idx; r++) { \
if (LIKELY(col_notnull_bitmap.test(r))) { \
ret = value_chunk_writer->write(timestamps[r], col_values[r], \
true); \
@@ -946,35 +949,35 @@ int TsFileWriter::write_typed_column(ChunkWriter
*chunk_writer,
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, bool *col_values,
BitMap &col_notnull_bitmap,
- uint32_t row_count) {
+ uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, int32_t *col_values,
BitMap &col_notnull_bitmap,
- uint32_t row_count) {
+ uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, int64_t *col_values,
BitMap &col_notnull_bitmap,
- uint32_t row_count) {
+ uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, float *col_values,
BitMap &col_notnull_bitmap,
- uint32_t row_count) {
+ uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, double *col_values,
BitMap &col_notnull_bitmap,
- uint32_t row_count) {
+ uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
@@ -993,25 +996,6 @@ int TsFileWriter::flush() {
DeviceSchemasMapIter device_iter;
for (device_iter = schemas_.begin(); device_iter != schemas_.end();
device_iter++) { // cppcheck-suppress postfixOperator
- if (device_iter->second->is_aligned_) {
- SimpleVector<ValueChunkWriter *> value_chunk_writers;
- TimeChunkWriter *time_chunk_writer;
- MeasurementSchemaMapNamesGetter mnames_getter(
- device_iter->second->measurement_schema_map_);
- if (RET_FAIL(do_check_schema_aligned(
- device_iter->first, mnames_getter, time_chunk_writer,
- value_chunk_writers))) {
- return ret;
- }
- } else {
- SimpleVector<ChunkWriter *> chunk_writers;
- MeasurementSchemaMapNamesGetter mnames_getter(
- device_iter->second->measurement_schema_map_);
- if (RET_FAIL(do_check_schema(device_iter->first, mnames_getter,
- chunk_writers))) {
- return ret;
- }
- }
if (check_chunk_group_empty(device_iter->second,
device_iter->second->is_aligned_)) {
continue;
@@ -1062,9 +1046,7 @@ bool
TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
} else if (RET_FAIL(io_writer->end_flush_chunk(
\
writer->get_chunk_statistic()))) {
\
} else {
\
- writer->destroy();
\
- delete writer;
\
- writer = nullptr;
\
+ writer->reset();
\
}
int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
@@ -1084,13 +1066,13 @@ int
TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
- if (!chunk_group->is_aligned_) {
+ if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) {
ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_,
m_schema->data_type_, m_schema->encoding_,
m_schema->compression_type_,
chunk_writer->num_of_pages())
- } else {
+ } else if (m_schema->value_chunk_writer_ != nullptr) {
ValueChunkWriter *&value_chunk_writer =
m_schema->value_chunk_writer_;
FLUSH_CHUNK(value_chunk_writer, io_writer_,
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index bfaf70f0..1d2368fe 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -182,36 +182,36 @@ class TsFileWriter {
int write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, bool *col_values,
common::BitMap &col_notnull_bitmap,
- uint32_t row_count);
+ uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, double *col_values,
common::BitMap &col_notnull_bitmap,
- uint32_t row_count);
+ uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, common::String *col_values,
common::BitMap &col_notnull_bitmap,
- int32_t row_count);
+ uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, float *col_values,
common::BitMap &col_notnull_bitmap,
- uint32_t row_count);
+ uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, int32_t *col_values,
common::BitMap &col_notnull_bitmap,
- uint32_t row_count);
+ uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, int64_t *col_values,
common::BitMap &col_notnull_bitmap,
- uint32_t row_count);
+ uint32_t start_idx, uint32_t end_idx);
int value_write_column(ValueChunkWriter *value_chunk_writer,
const Tablet &tablet, int col_idx,
- uint32_t start_idx = 0,
- uint32_t end_idx = UINT32_MAX);
+ uint32_t start_idx,
+ uint32_t end_idx);
};
} // end namespace storage
diff --git a/cpp/src/writer/value_chunk_writer.cc
b/cpp/src/writer/value_chunk_writer.cc
index 6c23cdad..e29f2565 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -52,6 +52,20 @@ int ValueChunkWriter::init(const std::string
&measurement_name,
return ret;
}
+void ValueChunkWriter::reset() {
+ if (chunk_statistic_ != nullptr) {
+ chunk_statistic_->reset();
+ }
+ if (first_page_statistic_ != nullptr) {
+ first_page_statistic_->reset();
+ }
+ value_page_writer_.reset();
+ chunk_header_.reset();
+ chunk_data_.reset();
+ num_of_pages_ = 0;
+}
+
+
void ValueChunkWriter::destroy() {
if (num_of_pages_ == 1) {
free_first_writer_data();
@@ -83,7 +97,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) {
chunk_data_, /*header*/ true,
/*stat*/ false, /*data*/ true);
value_page_writer_.destroy_page_data();
- value_page_writer_.destroy();
+ value_page_writer_.reset();
} else {
/*
* if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/value_chunk_writer.h
b/cpp/src/writer/value_chunk_writer.h
index 52581a34..a3e34239 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -62,6 +62,7 @@ class ValueChunkWriter {
int init(const std::string &measurement_name, common::TSDataType data_type,
common::TSEncoding encoding,
common::CompressionType compression_type);
+ void reset();
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
diff --git a/cpp/src/writer/value_page_writer.cc
b/cpp/src/writer/value_page_writer.cc
index 76d820f0..b95307f7 100644
--- a/cpp/src/writer/value_page_writer.cc
+++ b/cpp/src/writer/value_page_writer.cc
@@ -111,8 +111,12 @@ int ValuePageWriter::init(TSDataType data_type, TSEncoding
encoding,
}
void ValuePageWriter::reset() {
- value_encoder_->reset();
- statistic_->reset();
+ if (value_encoder_ != nullptr) {
+ value_encoder_->reset();
+ }
+ if (statistic_ != nullptr) {
+ statistic_->reset();
+ }
col_notnull_bitmap_out_stream_.reset();
value_out_stream_.reset();
}
@@ -126,6 +130,10 @@ void ValuePageWriter::destroy() {
EncoderFactory::free(value_encoder_);
StatisticFactory::free(statistic_);
CompressorFactory::free(compressor_);
+
+ value_encoder_ = nullptr;
+ statistic_ = nullptr;
+ compressor_ = nullptr;
}
}
diff --git a/cpp/test/common/tsfile_common_test.cc
b/cpp/test/common/tsfile_common_test.cc
index be8c2ce5..e5eebd49 100644
--- a/cpp/test/common/tsfile_common_test.cc
+++ b/cpp/test/common/tsfile_common_test.cc
@@ -64,11 +64,11 @@ TEST(ChunkHeaderTest, Reset) {
header.chunk_type_ = 1;
header.reset();
- EXPECT_EQ(header.measurement_name_, "");
+ EXPECT_EQ(header.measurement_name_, "test");
EXPECT_EQ(header.data_size_, 0);
- EXPECT_EQ(header.data_type_, common::INVALID_DATATYPE);
- EXPECT_EQ(header.compression_type_, common::INVALID_COMPRESSION);
- EXPECT_EQ(header.encoding_type_, common::INVALID_ENCODING);
+ EXPECT_EQ(header.data_type_, common::INT32);
+ EXPECT_EQ(header.compression_type_, common::SNAPPY);
+ EXPECT_EQ(header.encoding_type_, common::PLAIN);
EXPECT_EQ(header.num_of_pages_, 0);
EXPECT_EQ(header.serialized_size_, 0);
EXPECT_EQ(header.chunk_type_, 0);
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index b1d7c257..33e9efa8 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -135,12 +135,12 @@ class TsFileTableReaderTest : public ::testing::Test {
return tablet;
}
- void test_table_model_query(uint32_t points_per_device = 10) {
+ void test_table_model_query(uint32_t points_per_device = 10, uint32_t
device_num = 1) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
- auto tablet = gen_tablet(table_schema, 0, 1, points_per_device);
+ auto tablet = gen_tablet(table_schema, 0, device_num,
points_per_device);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
@@ -157,7 +157,7 @@ class TsFileTableReaderTest : public ::testing::Test {
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
bool has_next = false;
- int64_t timestamp = 0;
+ int64_t row_num = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
auto column_schemas = table_schema->get_measurement_schemas();
for (const auto& column_schema : column_schemas) {
@@ -165,7 +165,7 @@ class TsFileTableReaderTest : public ::testing::Test {
case TSDataType::INT64:
ASSERT_EQ(table_result_set->get_value<int64_t>(
column_schema->measurement_name_),
- 0);
+ (row_num / points_per_device) % device_num);
break;
case TSDataType::STRING:
ASSERT_EQ(table_result_set
@@ -185,12 +185,12 @@ class TsFileTableReaderTest : public ::testing::Test {
0);
}
for (int i = 7; i <= 11; i++) {
- ASSERT_EQ(table_result_set->get_value<int64_t>(i), 0);
+ ASSERT_EQ(table_result_set->get_value<int64_t>(i), (row_num /
points_per_device) % device_num);
}
- ASSERT_EQ(table_result_set->get_value<int64_t>(1), timestamp);
- timestamp++;
+ ASSERT_EQ(table_result_set->get_value<int64_t>(1), row_num %
points_per_device);
+ row_num++;
}
- ASSERT_EQ(timestamp, points_per_device);
+ ASSERT_EQ(row_num, points_per_device * device_num);
reader.destroy_query_data_set(table_result_set);
delete[] literal;
ASSERT_EQ(reader.close(), common::E_OK);
@@ -221,6 +221,13 @@ TEST_F(TsFileTableReaderTest,
TableModelQueryMultiLargePage) {
g_config_value_.page_writer_max_point_num_ = prev_config;
}
+TEST_F(TsFileTableReaderTest, TableModelQueryMultiDevices) {
+ int prev_config = g_config_value_.page_writer_max_point_num_;
+ g_config_value_.page_writer_max_point_num_ = 10000;
+ test_table_model_query(g_config_value_.page_writer_max_point_num_, 10);
+ g_config_value_.page_writer_max_point_num_ = prev_config;
+}
+
TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 02b28b0f..a616b7fa 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -212,6 +212,7 @@ TEST_F(TsFileWriterTableTest, DISABLED_WriteAndReadSimple) {
ResultSet* ret = nullptr;
int ret_value =
reader.query("test_table", {"device", "value"}, 10, 50, ret);
+ ASSERT_EQ(common::E_OK, ret_value);
auto* table_result_set = (TableResultSet*)ret;
bool has_next = false;
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index 3f99971f..dcd96db3 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -439,6 +439,7 @@ TEST_F(TsFileWriterTest,
WriteMultipleTabletsAlignedMultiFlush) {
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
+ ASSERT_EQ(ret, common::E_OK);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;