This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch fix_demos_multi_device
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/fix_demos_multi_device by this
push:
new cab76820 reuse chunk/page writers after flush
cab76820 is described below
commit cab76820a8e8eee727f760bb6dded69056edee3c
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Mar 7 15:14:46 2025 +0800
reuse chunk/page writers after flush
---
cpp/src/common/schema.h | 13 +++++++++++++
cpp/src/common/tsfile_common.h | 4 ----
cpp/src/writer/chunk_writer.cc | 15 ++++++++++++++-
cpp/src/writer/chunk_writer.h | 1 +
cpp/src/writer/page_writer.cc | 12 +++++++++---
cpp/src/writer/time_chunk_writer.cc | 16 +++++++++++++++-
cpp/src/writer/time_chunk_writer.h | 1 +
cpp/src/writer/time_page_writer.cc | 12 ++++++++++--
cpp/src/writer/tsfile_writer.cc | 11 ++++-------
cpp/src/writer/value_chunk_writer.cc | 16 +++++++++++++++-
cpp/src/writer/value_chunk_writer.h | 1 +
cpp/src/writer/value_page_writer.cc | 12 ++++++++++--
cpp/test/common/tsfile_common_test.cc | 8 ++++----
cpp/test/reader/table_view/tsfile_reader_table_test.cc | 12 ++++++------
cpp/test/writer/table_view/tsfile_writer_table_test.cc | 1 +
cpp/test/writer/tsfile_writer_test.cc | 1 +
16 files changed, 105 insertions(+), 31 deletions(-)
diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index 45d76de9..e0e2b3b8 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -20,6 +20,8 @@
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
+#include <writer/chunk_writer.h>
+
#include <algorithm>
#include <map> // use unordered_map instead
#include <memory>
@@ -75,6 +77,17 @@ struct MeasurementSchema {
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}
+ ~MeasurementSchema() {
+ if (chunk_writer_ != nullptr) {
+ delete chunk_writer_;
+ chunk_writer_ = nullptr;
+ }
+ if (value_chunk_writer_ != nullptr) {
+ delete value_chunk_writer_;
+ value_chunk_writer_ = nullptr;
+ }
+ }
+
int serialize_to(common::ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index df88966c..8a77f1fe 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -124,11 +124,7 @@ struct ChunkHeader {
chunk_type_(0) {}
void reset() {
- measurement_name_.clear();
data_size_ = 0;
- data_type_ = common::INVALID_DATATYPE;
- compression_type_ = common::INVALID_COMPRESSION;
- encoding_type_ = common::INVALID_ENCODING;
num_of_pages_ = 0;
serialized_size_ = 0;
chunk_type_ = 0;
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index 888692fb..73618db7 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -69,6 +69,19 @@ void ChunkWriter::destroy() {
num_of_pages_ = 0;
}
+void ChunkWriter::reset() {
+ if (chunk_statistic_ != nullptr) {
+ chunk_statistic_->reset();
+ }
+ if (first_page_statistic_ != nullptr) {
+ first_page_statistic_->reset();
+ }
+ page_writer_.reset();
+ chunk_header_.reset();
+ chunk_data_.reset();
+ num_of_pages_ = 0;
+}
+
int ChunkWriter::seal_cur_page(bool end_chunk) {
int ret = E_OK;
if (RET_FAIL(chunk_statistic_->merge_with(page_writer_.get_statistic()))) {
@@ -80,7 +93,7 @@ int ChunkWriter::seal_cur_page(bool end_chunk) {
ret = page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
/*stat*/ false, /*data*/ true);
page_writer_.destroy_page_data();
- page_writer_.destroy();
+ page_writer_.reset();
} else {
/*
* if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 6d80353f..7add7ebd 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -62,6 +62,7 @@ class ChunkWriter {
int init(const std::string &measurement_name, common::TSDataType data_type,
common::TSEncoding encoding,
common::CompressionType compression_type);
+ void reset();
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value) {
diff --git a/cpp/src/writer/page_writer.cc b/cpp/src/writer/page_writer.cc
index ec8b2856..019004a0 100644
--- a/cpp/src/writer/page_writer.cc
+++ b/cpp/src/writer/page_writer.cc
@@ -115,9 +115,15 @@ int PageWriter::init(TSDataType data_type, TSEncoding
encoding,
* free out_stream memory, reset statistic_,
*/
void PageWriter::reset() {
- time_encoder_->reset();
- value_encoder_->reset();
- statistic_->reset();
+ if (time_encoder_ != nullptr) {
+ time_encoder_->reset();
+ }
+ if (value_encoder_ != nullptr) {
+ value_encoder_->reset();
+ }
+ if (statistic_ != nullptr) {
+ statistic_->reset();
+ }
time_out_stream_.reset();
value_out_stream_.reset();
}
diff --git a/cpp/src/writer/time_chunk_writer.cc
b/cpp/src/writer/time_chunk_writer.cc
index b65b856b..f5b7b240 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -51,6 +51,20 @@ int TimeChunkWriter::init(const std::string
&measurement_name,
return ret;
}
+void TimeChunkWriter::reset() {
+ if (chunk_statistic_ != nullptr) {
+ chunk_statistic_->reset();
+ }
+ if (first_page_statistic_ != nullptr) {
+ first_page_statistic_->reset();
+ }
+ time_page_writer_.reset();
+ chunk_header_.reset();
+ chunk_data_.reset();
+ num_of_pages_ = 0;
+}
+
+
void TimeChunkWriter::destroy() {
if (num_of_pages_ == 1) {
free_first_writer_data();
@@ -82,7 +96,7 @@ int TimeChunkWriter::seal_cur_page(bool end_chunk) {
time_page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
/*stat*/ false, /*data*/
true);
time_page_writer_.destroy_page_data();
- time_page_writer_.destroy();
+ time_page_writer_.reset();
} else {
/*
* if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/time_chunk_writer.h
b/cpp/src/writer/time_chunk_writer.h
index d97a8aa9..8fcd9bd6 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -45,6 +45,7 @@ class TimeChunkWriter {
int init(const common::ColumnSchema &col_schema);
int init(const std::string &measurement_name, common::TSEncoding encoding,
common::CompressionType compression_type);
+ void reset();
void destroy();
storage::ChunkHeader get_chunk_header() const { return chunk_header_; }
diff --git a/cpp/src/writer/time_page_writer.cc
b/cpp/src/writer/time_page_writer.cc
index 49fe0fca..2ac75315 100644
--- a/cpp/src/writer/time_page_writer.cc
+++ b/cpp/src/writer/time_page_writer.cc
@@ -96,8 +96,12 @@ int TimePageWriter::init(TSEncoding encoding,
CompressionType compression) {
}
void TimePageWriter::reset() {
- time_encoder_->reset();
- statistic_->reset();
+ if (time_encoder_ != nullptr) {
+ time_encoder_->reset();
+ }
+ if (statistic_ != nullptr) {
+ statistic_->reset();
+ }
time_out_stream_.reset();
}
@@ -110,6 +114,10 @@ void TimePageWriter::destroy() {
EncoderFactory::free(time_encoder_);
StatisticFactory::free(statistic_);
CompressorFactory::free(compressor_);
+
+ time_encoder_ = nullptr;
+ statistic_ = nullptr;
+ compressor_ = nullptr;
}
}
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 8d7f050c..f289f875 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -727,7 +727,7 @@ int TsFileWriter::write_table(Tablet &tablet) {
value_chunk_writers))) {
return ret;
}
- for (uint32_t i = start_idx; i < end_idx; i++) {
+ for (int i = start_idx; i < end_idx; i++) {
time_chunk_writer->write(tablet.timestamps_[i]);
}
uint32_t field_col_count = 0;
@@ -850,7 +850,6 @@ int TsFileWriter::value_write_column(ValueChunkWriter
*value_chunk_writer,
int64_t *timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
- uint32_t row_count = tablet.max_row_num_;
if (data_type == common::BOOLEAN) {
ret = write_typed_column(value_chunk_writer, timestamps,
@@ -1047,9 +1046,7 @@ bool
TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
} else if (RET_FAIL(io_writer->end_flush_chunk(
\
writer->get_chunk_statistic()))) {
\
} else {
\
- writer->destroy();
\
- delete writer;
\
- writer = nullptr;
\
+ writer->reset();
\
}
int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
@@ -1069,13 +1066,13 @@ int
TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
- if (!chunk_group->is_aligned_) {
+ if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) {
ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_,
m_schema->data_type_, m_schema->encoding_,
m_schema->compression_type_,
chunk_writer->num_of_pages())
- } else {
+ } else if (m_schema->value_chunk_writer_ != nullptr) {
ValueChunkWriter *&value_chunk_writer =
m_schema->value_chunk_writer_;
FLUSH_CHUNK(value_chunk_writer, io_writer_,
diff --git a/cpp/src/writer/value_chunk_writer.cc
b/cpp/src/writer/value_chunk_writer.cc
index 6c23cdad..e29f2565 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -52,6 +52,20 @@ int ValueChunkWriter::init(const std::string
&measurement_name,
return ret;
}
+void ValueChunkWriter::reset() {
+ if (chunk_statistic_ != nullptr) {
+ chunk_statistic_->reset();
+ }
+ if (first_page_statistic_ != nullptr) {
+ first_page_statistic_->reset();
+ }
+ value_page_writer_.reset();
+ chunk_header_.reset();
+ chunk_data_.reset();
+ num_of_pages_ = 0;
+}
+
+
void ValueChunkWriter::destroy() {
if (num_of_pages_ == 1) {
free_first_writer_data();
@@ -83,7 +97,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) {
chunk_data_, /*header*/ true,
/*stat*/ false, /*data*/ true);
value_page_writer_.destroy_page_data();
- value_page_writer_.destroy();
+ value_page_writer_.reset();
} else {
/*
* if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/value_chunk_writer.h
b/cpp/src/writer/value_chunk_writer.h
index 52581a34..a3e34239 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -62,6 +62,7 @@ class ValueChunkWriter {
int init(const std::string &measurement_name, common::TSDataType data_type,
common::TSEncoding encoding,
common::CompressionType compression_type);
+ void reset();
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
diff --git a/cpp/src/writer/value_page_writer.cc
b/cpp/src/writer/value_page_writer.cc
index 76d820f0..b95307f7 100644
--- a/cpp/src/writer/value_page_writer.cc
+++ b/cpp/src/writer/value_page_writer.cc
@@ -111,8 +111,12 @@ int ValuePageWriter::init(TSDataType data_type, TSEncoding
encoding,
}
void ValuePageWriter::reset() {
- value_encoder_->reset();
- statistic_->reset();
+ if (value_encoder_ != nullptr) {
+ value_encoder_->reset();
+ }
+ if (statistic_ != nullptr) {
+ statistic_->reset();
+ }
col_notnull_bitmap_out_stream_.reset();
value_out_stream_.reset();
}
@@ -126,6 +130,10 @@ void ValuePageWriter::destroy() {
EncoderFactory::free(value_encoder_);
StatisticFactory::free(statistic_);
CompressorFactory::free(compressor_);
+
+ value_encoder_ = nullptr;
+ statistic_ = nullptr;
+ compressor_ = nullptr;
}
}
diff --git a/cpp/test/common/tsfile_common_test.cc
b/cpp/test/common/tsfile_common_test.cc
index be8c2ce5..e5eebd49 100644
--- a/cpp/test/common/tsfile_common_test.cc
+++ b/cpp/test/common/tsfile_common_test.cc
@@ -64,11 +64,11 @@ TEST(ChunkHeaderTest, Reset) {
header.chunk_type_ = 1;
header.reset();
- EXPECT_EQ(header.measurement_name_, "");
+ EXPECT_EQ(header.measurement_name_, "test");
EXPECT_EQ(header.data_size_, 0);
- EXPECT_EQ(header.data_type_, common::INVALID_DATATYPE);
- EXPECT_EQ(header.compression_type_, common::INVALID_COMPRESSION);
- EXPECT_EQ(header.encoding_type_, common::INVALID_ENCODING);
+ EXPECT_EQ(header.data_type_, common::INT32);
+ EXPECT_EQ(header.compression_type_, common::SNAPPY);
+ EXPECT_EQ(header.encoding_type_, common::PLAIN);
EXPECT_EQ(header.num_of_pages_, 0);
EXPECT_EQ(header.serialized_size_, 0);
EXPECT_EQ(header.chunk_type_, 0);
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index a98a00f3..33e9efa8 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -157,7 +157,7 @@ class TsFileTableReaderTest : public ::testing::Test {
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
bool has_next = false;
- int64_t timestamp = 0;
+ int64_t row_num = 0;
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
auto column_schemas = table_schema->get_measurement_schemas();
for (const auto& column_schema : column_schemas) {
@@ -165,7 +165,7 @@ class TsFileTableReaderTest : public ::testing::Test {
case TSDataType::INT64:
ASSERT_EQ(table_result_set->get_value<int64_t>(
column_schema->measurement_name_),
- 0);
+ (row_num / points_per_device) % device_num);
break;
case TSDataType::STRING:
ASSERT_EQ(table_result_set
@@ -185,12 +185,12 @@ class TsFileTableReaderTest : public ::testing::Test {
0);
}
for (int i = 7; i <= 11; i++) {
- ASSERT_EQ(table_result_set->get_value<int64_t>(i), 0);
+ ASSERT_EQ(table_result_set->get_value<int64_t>(i), (row_num /
points_per_device) % device_num);
}
- ASSERT_EQ(table_result_set->get_value<int64_t>(1), timestamp);
- timestamp++;
+ ASSERT_EQ(table_result_set->get_value<int64_t>(1), row_num %
points_per_device);
+ row_num++;
}
- ASSERT_EQ(timestamp, points_per_device);
+ ASSERT_EQ(row_num, points_per_device * device_num);
reader.destroy_query_data_set(table_result_set);
delete[] literal;
ASSERT_EQ(reader.close(), common::E_OK);
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 02b28b0f..a616b7fa 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -212,6 +212,7 @@ TEST_F(TsFileWriterTableTest, DISABLED_WriteAndReadSimple) {
ResultSet* ret = nullptr;
int ret_value =
reader.query("test_table", {"device", "value"}, 10, 50, ret);
+ ASSERT_EQ(common::E_OK, ret_value);
auto* table_result_set = (TableResultSet*)ret;
bool has_next = false;
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index 3f99971f..dcd96db3 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -439,6 +439,7 @@ TEST_F(TsFileWriterTest,
WriteMultipleTabletsAlignedMultiFlush) {
storage::ResultSet *tmp_qds = nullptr;
ret = reader.query(query_expr, tmp_qds);
+ ASSERT_EQ(ret, common::E_OK);
auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
storage::RowRecord *record;