This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 3117ec02 fix flush bug and fix empty file bug (#310)
3117ec02 is described below
commit 3117ec02821c5b04a403dc2b63e76d46dddfec4c
Author: Yukim1 <[email protected]>
AuthorDate: Mon Nov 25 10:02:02 2024 +0800
fix flush bug and fix empty file bug (#310)
* fix flush bug and fix empty file bug
* add ut
---
cpp/src/common/tsfile_common.cc | 3 -
cpp/src/file/tsfile_io_writer.cc | 4 +-
cpp/src/reader/bloom_filter.cc | 6 +-
cpp/src/writer/tsfile_writer.cc | 26 +++++--
cpp/src/writer/tsfile_writer.h | 6 +-
cpp/src/writer/value_chunk_writer.cc | 6 ++
cpp/src/writer/value_chunk_writer.h | 2 +
cpp/test/common/tsfile_common_test.cc | 2 +-
cpp/test/writer/tsfile_writer_test.cc | 124 +++++++++++++++++++++++++++++++++-
9 files changed, 161 insertions(+), 18 deletions(-)
diff --git a/cpp/src/common/tsfile_common.cc b/cpp/src/common/tsfile_common.cc
index a1643439..ba2c2423 100644
--- a/cpp/src/common/tsfile_common.cc
+++ b/cpp/src/common/tsfile_common.cc
@@ -92,9 +92,6 @@ int TSMIterator::init() {
// FIXME empty list
chunk_group_meta_iter_ = chunk_group_meta_list_.begin();
- if (chunk_group_meta_iter_ == chunk_group_meta_list_.end()) {
- return E_NOT_EXIST;
- }
while (chunk_group_meta_iter_ != chunk_group_meta_list_.end()) {
chunk_meta_iter_ =
chunk_group_meta_iter_.get()->chunk_meta_list_.begin();
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index abd08b82..99c6ec64 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -392,8 +392,8 @@ int TsFileIOWriter::write_file_index() {
ret = E_OK;
}
ASSERT(ret == E_OK);
-
- if (IS_SUCC(ret)) { // iter finish
+ if (IS_SUCC(ret) && cur_index_node != nullptr &&
+ cur_index_node_queue != nullptr) { // iter finish
ASSERT(cur_index_node != nullptr);
ASSERT(cur_index_node_queue != nullptr);
if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node,
diff --git a/cpp/src/reader/bloom_filter.cc b/cpp/src/reader/bloom_filter.cc
index d2afd664..1ff1109d 100644
--- a/cpp/src/reader/bloom_filter.cc
+++ b/cpp/src/reader/bloom_filter.cc
@@ -223,8 +223,6 @@ int BloomFilter::serialize_to(ByteStream &out) {
uint8_t *filter_data_bytes = nullptr;
int32_t filter_data_bytes_len = 0;
bitset_.to_bytes(filter_data_bytes, filter_data_bytes_len);
- ASSERT(filter_data_bytes_len > 0);
-
if (RET_FAIL(
SerializationUtil::write_var_uint(filter_data_bytes_len, out))) {
} else if (RET_FAIL(
@@ -233,7 +231,9 @@ int BloomFilter::serialize_to(ByteStream &out) {
} else if (RET_FAIL(
SerializationUtil::write_var_uint(hash_func_count_, out))) {
}
- bitset_.revert_bytes(filter_data_bytes);
+ if (filter_data_bytes_len > 0) {
+ bitset_.revert_bytes(filter_data_bytes);
+ }
return ret;
}
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index f48ddfd7..8cc0b6d4 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -747,7 +747,12 @@ int TsFileWriter::flush() {
return ret;
}
}
+ if (check_chunk_group_empty(device_iter->second,
+ device_iter->second->is_aligned_)) {
+ continue;
+ }
bool is_aligned = device_iter->second->is_aligned_;
+
if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first,
is_aligned))) {
} else if (RET_FAIL(
@@ -759,17 +764,24 @@ int TsFileWriter::flush() {
return ret;
}
-bool TsFileWriter::check_chunk_group_empty(
- MeasurementSchemaGroup *chunk_group) {
+bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
+ bool is_aligned) {
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
- if (m_schema->chunk_writer_ != NULL &&
- m_schema->chunk_writer_->hasData()) {
- // first condition is to avoid first flush empty chunk group
- // second condition is to avoid repeated flush
- return false;
+ if (is_aligned) {
+ if (m_schema->value_chunk_writer_ != NULL &&
+ m_schema->value_chunk_writer_->hasData()) {
+ return false;
+ }
+ } else {
+ if (m_schema->chunk_writer_ != NULL &&
+ m_schema->chunk_writer_->hasData()) {
+ // first condition is to avoid first flush empty chunk group
+ // second condition is to avoid repeated flush
+ return false;
+ }
}
}
return true;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 128715a4..7b43b850 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -69,6 +69,9 @@ class TsFileWriter {
int write_tablet(const Tablet &tablet);
int write_record_aligned(const TsRecord &record);
int write_tablet_aligned(const Tablet &tablet);
+ std::map<std::string, MeasurementSchemaGroup *> *get_schema_group_map() {
+ return &schemas_;
+ }
int64_t calculate_mem_size_for_all_group();
int check_memory_size_and_may_flush_chunks();
/*
@@ -86,7 +89,8 @@ class TsFileWriter {
private:
int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp,
const DataPoint &point);
- bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group);
+ bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
+ bool is_aligned);
int write_point_aligned(ValueChunkWriter *value_chunk_writer,
int64_t timestamp, const DataPoint &point);
int flush_chunk_group(MeasurementSchemaGroup *chunk_group, bool
is_aligned);
diff --git a/cpp/src/writer/value_chunk_writer.cc
b/cpp/src/writer/value_chunk_writer.cc
index b32b2dab..4156eaff 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -168,4 +168,10 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() {
value_page_writer_.get_statistic()->get_type());
}
+bool ValueChunkWriter::hasData() {
+ return num_of_pages_ > 0 ||
+ (value_page_writer_.get_statistic() != nullptr &&
+ value_page_writer_.get_statistic()->count_ > 0);
+}
+
} // end namespace storage
diff --git a/cpp/src/writer/value_chunk_writer.h
b/cpp/src/writer/value_chunk_writer.h
index 3093dfff..10f51c75 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -87,6 +87,8 @@ class ValueChunkWriter {
int64_t estimate_max_series_mem_size();
+ bool hasData();
+
private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
diff --git a/cpp/test/common/tsfile_common_test.cc
b/cpp/test/common/tsfile_common_test.cc
index 08673458..2c9d1403 100644
--- a/cpp/test/common/tsfile_common_test.cc
+++ b/cpp/test/common/tsfile_common_test.cc
@@ -203,7 +203,7 @@ TEST_F(TSMIteratorTest, InitEmptyList) {
common::PageArena arena;
common::SimpleList<ChunkGroupMeta*> empty_list(&arena);
TSMIterator iter(empty_list);
- ASSERT_EQ(iter.init(), common::E_NOT_EXIST);
+ ASSERT_EQ(iter.init(), common::E_OK);
}
TEST_F(TSMIteratorTest, HasNext) {
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index d69a9ff1..6b601126 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -30,7 +30,7 @@
#include "file/write_file.h"
#include "reader/qds_without_timegenerator.h"
#include "reader/tsfile_reader.h"
-
+#include "writer/chunk_writer.h"
using namespace storage;
using namespace common;
@@ -381,6 +381,128 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) {
ASSERT_EQ(tsfile_writer_->close(), E_OK);
}
+
+TEST_F(TsFileWriterTest, FlushMultipleDevice) {
+ const int device_num = 50;
+ const int measurement_num = 50;
+ const int max_rows = 100;
+ std::vector<MeasurementSchema> schema_vec[50];
+
+ for (int i = 0; i < device_num; i++) {
+ std::string device_name = "test_device" + std::to_string(i);
+ for (int j = 0; j < measurement_num; j++) {
+ std::string measure_name = "measurement" + std::to_string(j);
+ schema_vec[i].push_back(
+ MeasurementSchema(measure_name, common::TSDataType::INT64,
+ common::TSEncoding::PLAIN,
+ common::CompressionType::UNCOMPRESSED));
+ tsfile_writer_->register_timeseries(
+ device_name, measure_name, common::TSDataType::INT64,
+ common::TSEncoding::PLAIN,
+ common::CompressionType::UNCOMPRESSED);
+ }
+ }
+
+ for (int i = 0; i < device_num; i++) {
+ std::string device_name = "test_device" + std::to_string(i);
+ Tablet tablet(device_name, &schema_vec[i], max_rows);
+ tablet.init();
+ for (int j = 0; j < measurement_num; j++) {
+ for (int row = 0; row < max_rows; row++) {
+ tablet.set_timestamp(row, 16225600 + row);
+ }
+ for (int row = 0; row < max_rows; row++) {
+ tablet.set_value(row, j, static_cast<int64_t>(row));
+ }
+ }
+ ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
+ // flush after write tablet to check whether write empty chunk
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ }
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
+ std::vector<storage::Path> select_list;
+ for (int i = 0; i < device_num; i++) {
+ std::string device_name = "test_device" + std::to_string(i);
+ for (int j = 0; j < measurement_num; j++) {
+ std::string measurement_name = "measurement" + std::to_string(j);
+ storage::Path path(device_name, measurement_name);
+ select_list.push_back(path);
+ }
+ }
+ storage::QueryExpression *query_expr =
+ storage::QueryExpression::create(select_list, nullptr);
+
+ storage::TsFileReader reader;
+ int ret = reader.open(file_name_);
+ ASSERT_EQ(ret, common::E_OK);
+ storage::QueryDataSet *tmp_qds = nullptr;
+
+ ret = reader.query(query_expr, tmp_qds);
+ auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
+
+ storage::RowRecord *record;
+ int64_t cur_record_num = 0;
+ do {
+ record = qds->get_next();
+ // if empty chunk is writen, the timestamp should be NULL
+ if (!record) {
+ break;
+ }
+ EXPECT_EQ(record->get_timestamp(), 16225600 + cur_record_num);
+ cur_record_num++;
+ } while (true);
+ EXPECT_EQ(cur_record_num, max_rows);
+ storage::QueryExpression::destory(query_expr);
+ reader.destroy_query_data_set(qds);
+}
+
+TEST_F(TsFileWriterTest, AnalyzeTsfileForload) {
+ const int device_num = 50;
+ const int measurement_num = 50;
+ const int max_rows = 100;
+ std::vector<MeasurementSchema> schema_vec[50];
+
+ for (int i = 0; i < device_num; i++) {
+ std::string device_name = "test_device" + std::to_string(i);
+ for (int j = 0; j < measurement_num; j++) {
+ std::string measure_name = "measurement" + std::to_string(j);
+ schema_vec[i].push_back(
+ MeasurementSchema(measure_name, common::TSDataType::INT64,
+ common::TSEncoding::PLAIN,
+ common::CompressionType::UNCOMPRESSED));
+ tsfile_writer_->register_timeseries(
+ device_name, measure_name, common::TSDataType::INT64,
+ common::TSEncoding::PLAIN,
+ common::CompressionType::UNCOMPRESSED);
+ }
+ }
+
+ for (int i = 0; i < device_num; i++) {
+ std::string device_name = "test_device" + std::to_string(i);
+ Tablet tablet(device_name, &schema_vec[i], max_rows);
+ tablet.init();
+ for (int j = 0; j < measurement_num; j++) {
+ for (int row = 0; row < max_rows; row++) {
+ tablet.set_timestamp(row, 16225600 + row);
+ }
+ for (int row = 0; row < max_rows; row++) {
+ tablet.set_value(row, j, static_cast<int64_t>(row));
+ }
+ }
+ ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK);
+ }
+ auto schemas = tsfile_writer_->get_schema_group_map();
+ ASSERT_EQ(schemas->size(), 50);
+ for (const auto& device_iter : *schemas) {
+ for (const auto& chunk_iter :
device_iter.second->measurement_schema_map_) {
+ ASSERT_NE(chunk_iter.second->chunk_writer_, nullptr);
+ ASSERT_TRUE(chunk_iter.second->chunk_writer_->hasData());
+ }
+ }
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+}
TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) {
std::string device_path = "device1";
std::string measurement_name = "temperature";