This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch compaction_opt
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/compaction_opt by this push:
new 326ac30567 [feature](compaction) support ordered data compaction
(#14054)
326ac30567 is described below
commit 326ac305671a4b5a6654f625f14b42fc1c1c04c3
Author: yixiutt <[email protected]>
AuthorDate: Wed Nov 9 09:10:52 2022 +0800
[feature](compaction) support ordered data compaction (#14054)
---
be/src/olap/compaction.cpp | 168 +++++++++++++++++++----
be/src/olap/compaction.h | 10 +-
be/src/olap/rowset/beta_rowset.cpp | 18 ++-
be/src/olap/rowset/beta_rowset.h | 5 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 54 +++++++-
be/src/olap/rowset/beta_rowset_writer.h | 8 +-
be/src/olap/rowset/rowset.h | 14 +-
be/src/olap/rowset/rowset_meta.h | 9 ++
be/src/olap/rowset/rowset_writer.h | 3 +
be/src/olap/rowset/segment_v2/segment_writer.cpp | 91 ++++++++----
be/src/olap/rowset/segment_v2/segment_writer.h | 11 ++
be/test/io/cache/remote_file_cache_test.cpp | 4 +-
be/test/olap/rowid_conversion_test.cpp | 2 +-
be/test/olap/rowset/segment_v2/segment_test.cpp | 37 ++---
be/test/testutil/mock_rowset.h | 3 +-
regression-test/conf/regression-conf.groovy | 2 +-
16 files changed, 350 insertions(+), 89 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 89d290c854..3bd41c3bed 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -19,6 +19,7 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/tablet.h"
@@ -34,6 +35,8 @@ Compaction::Compaction(TabletSharedPtr tablet, const
std::string& label)
: _tablet(tablet),
_input_rowsets_size(0),
_input_row_num(0),
+ _input_num_segments(0),
+ _input_index_size(0),
_state(CompactionState::INITED) {
#ifndef BE_TEST
_mem_tracker = std::make_shared<MemTrackerLimiter>(
@@ -105,20 +108,76 @@ int64_t Compaction::get_avg_segment_rows() {
// todo(yixiu): add a new conf of segment size in compaction
return config::write_buffer_size / (_input_rowsets_size / (_input_row_num
+ 1) + 1);
}
+bool Compaction::is_rowset_tidy(std::string& pre_max_key, const
RowsetSharedPtr& rhs) {
+ size_t min_tidy_size = 10 * 1024 * 1024;
+ if (rhs->num_segments() == 0) {
+ return true;
+ }
+ if (rhs->is_segments_overlapping()) {
+ return false;
+ }
+ // check segment size
+ auto beta_rowset = reinterpret_cast<BetaRowset*>(rhs.get());
+ std::vector<size_t> segments_size;
+ beta_rowset->get_segments_size(&segments_size);
+ for (auto segment_size : segments_size) {
+ // is segment is too small, need to do compaction
+ if (segment_size < min_tidy_size) {
+ return false;
+ }
+ }
-Status Compaction::do_compaction_impl(int64_t permits) {
- OlapStopWatch watch;
+ auto min_key = rhs->min_key();
+ if (min_key < pre_max_key) {
+ return false;
+ }
+ pre_max_key = rhs->max_key();
+ return true;
+}
+
+Status Compaction::do_compact_ordered_rowsets() {
+ LOG(INFO) << "start to do ordered data compaction, tablet=" <<
_tablet->full_name()
+ << ", output_version=" << _output_version;
+ build_basic_info();
+ RETURN_NOT_OK(construct_output_rowset_writer());
+ // link data to new rowset
+ auto seg_id = 0;
+ std::vector<KeyBoundsPB> segment_key_bounds;
+ for (auto rowset : _input_rowsets) {
+ RETURN_NOT_OK(rowset->link_files_to(_tablet->tablet_path(),
_output_rs_writer->rowset_id(),
+ seg_id));
+ seg_id += rowset->num_segments();
- // 1. prepare input and output parameters
- int64_t segments_num = 0;
+ std::vector<KeyBoundsPB> key_bounds;
+ rowset->get_segments_key_bounds(&key_bounds);
+ segment_key_bounds.insert(segment_key_bounds.end(),
key_bounds.begin(), key_bounds.end());
+ }
+ // build output rowset
+ RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
+ rowset_meta->set_num_rows(_input_row_num);
+ rowset_meta->set_total_disk_size(_input_rowsets_size);
+ rowset_meta->set_data_disk_size(_input_rowsets_size);
+ rowset_meta->set_index_disk_size(_input_index_size);
+ rowset_meta->set_empty(_input_row_num == 0);
+ rowset_meta->set_num_segments(_input_num_segments);
+ rowset_meta->set_segments_overlap(NONOVERLAPPING);
+ rowset_meta->set_rowset_state(VISIBLE);
+
+ rowset_meta->set_segments_key_bounds(segment_key_bounds);
+ _output_rowset = _output_rs_writer->manual_build(rowset_meta);
+ return Status::OK();
+}
+
+void Compaction::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_size += rowset->data_disk_size();
+ _input_index_size += rowset->index_disk_size();
_input_row_num += rowset->num_rows();
- segments_num += rowset->num_segments();
+ _input_num_segments += rowset->num_segments();
}
TRACE_COUNTER_INCREMENT("input_rowsets_data_size", _input_rowsets_size);
TRACE_COUNTER_INCREMENT("input_row_num", _input_row_num);
- TRACE_COUNTER_INCREMENT("input_segments_num", segments_num);
+ TRACE_COUNTER_INCREMENT("input_segments_num", _input_num_segments);
_output_version =
Version(_input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version());
@@ -126,22 +185,81 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_oldest_write_timestamp = _input_rowsets.front()->oldest_write_timestamp();
_newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp();
- auto use_vectorized_compaction = config::enable_vectorized_compaction;
- string merge_type = use_vectorized_compaction ? "v" : "";
- bool vertical_compaction = should_vertical_compaction();
-
- LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" <<
_tablet->full_name()
- << ", output_version=" << _output_version << ", permits: " <<
permits
- << ", is_vertical_compaction=" << vertical_compaction;
- // get cur schema if rowset schema exist, rowset schema must be newer than
tablet schema
std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
std::transform(_input_rowsets.begin(), _input_rowsets.end(),
rowset_metas.begin(),
[](const RowsetSharedPtr& rowset) { return
rowset->rowset_meta(); });
- TabletSchemaSPtr cur_tablet_schema =
+ _cur_tablet_schema =
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
+}
+
+bool Compaction::handle_ordered_data_compaction() {
+ // check delete version: if compaction type is base compaction and
+ // has a delete version, use original compaction
+ if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
+ for (auto rowset : _input_rowsets) {
+ if (_tablet->version_for_delete_predicate(rowset->version())) {
+ return false;
+ }
+ }
+ }
- RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema,
vertical_compaction));
+ // check if rowsets are tidy so we can just modify meta and do link
+ // files to handle compaction
+ auto input_size = _input_rowsets.size();
+ std::string pre_max_key;
+ for (auto i = 0; i < input_size; ++i) {
+ if (!is_rowset_tidy(pre_max_key, _input_rowsets[i])) {
+ if (i <= input_size / 2) {
+ return false;
+ } else {
+ _input_rowsets.resize(i);
+ break;
+ }
+ }
+ }
+ // most rowset of current compaction is nonoverlapping
+ // just handle nonoverlappint rowsets
+ auto st = do_compact_ordered_rowsets();
+ if (!st.ok()) {
+ return false;
+ }
+ return true;
+}
+
+Status Compaction::do_compaction_impl(int64_t permits) {
+ OlapStopWatch watch;
+
+ auto use_vectorized_compaction = config::enable_vectorized_compaction;
+ string merge_type = use_vectorized_compaction ? "v" : "";
+
+ if (handle_ordered_data_compaction()) {
+ RETURN_NOT_OK(modify_rowsets());
+ TRACE("modify rowsets finished");
+
+ int64_t now = UnixMillis();
+ if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
+ _tablet->set_last_cumu_compaction_success_time(now);
+ } else {
+ _tablet->set_last_base_compaction_success_time(now);
+ }
+ auto cumu_policy = _tablet->cumulative_compaction_policy();
+ LOG(INFO) << "succeed to do ordered data " << merge_type <<
compaction_name()
+ << ". tablet=" << _tablet->full_name() << ",
output_version=" << _output_version
+ << ", disk=" << _tablet->data_dir()->path()
+ << ", segments=" << _input_num_segments << ",
input_row_num=" << _input_row_num
+ << ", output_row_num=" << _output_rowset->num_rows()
+ << ". elapsed time=" << watch.get_elapse_second()
+ << "s. cumulative_compaction_policy="
+ << (cumu_policy == nullptr ? "quick" : cumu_policy->name());
+ return Status::OK();
+ }
+ build_basic_info();
+
+ LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" <<
_tablet->full_name()
+ << ", output_version=" << _output_version << ", permits: " <<
permits;
+ bool vertical_compaction = should_vertical_compaction();
RETURN_NOT_OK(construct_input_rowset_readers());
+ RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction));
TRACE("prepare finished");
// 2. write merged rows to output rowset
@@ -155,15 +273,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
if (use_vectorized_compaction) {
if (vertical_compaction) {
- res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
cur_tablet_schema,
+ res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
_input_rs_readers,
_output_rs_writer.get(),
get_avg_segment_rows(),
&stats);
} else {
- res = Merger::vmerge_rowsets(_tablet, compaction_type(),
cur_tablet_schema,
+ res = Merger::vmerge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
_input_rs_readers,
_output_rs_writer.get(), &stats);
}
} else {
- res = Merger::merge_rowsets(_tablet, compaction_type(),
cur_tablet_schema,
+ res = Merger::merge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
_input_rs_readers,
_output_rs_writer.get(), &stats);
}
@@ -221,7 +339,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
LOG(INFO) << "succeed to do " << merge_type << compaction_name()
<< ". tablet=" << _tablet->full_name() << ", output_version=" <<
_output_version
<< ", current_max_version=" << current_max_version
- << ", disk=" << _tablet->data_dir()->path() << ", segments=" <<
segments_num
+ << ", disk=" << _tablet->data_dir()->path() << ", segments=" <<
_input_num_segments
<< ", input_row_num=" << _input_row_num
<< ", output_row_num=" << _output_rowset->num_rows()
<< ". elapsed time=" << watch.get_elapse_second()
@@ -231,15 +349,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
return Status::OK();
}
-Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema,
bool is_vertical) {
+Status Compaction::construct_output_rowset_writer(bool is_vertical) {
if (is_vertical) {
return _tablet->create_vertical_rowset_writer(_output_version,
VISIBLE, NONOVERLAPPING,
- schema,
_oldest_write_timestamp,
+ _cur_tablet_schema,
_oldest_write_timestamp,
_newest_write_timestamp,
&_output_rs_writer);
}
- return _tablet->create_rowset_writer(_output_version, VISIBLE,
NONOVERLAPPING, schema,
- _oldest_write_timestamp,
_newest_write_timestamp,
- &_output_rs_writer);
+ return _tablet->create_rowset_writer(_output_version, VISIBLE,
NONOVERLAPPING,
+ _cur_tablet_schema,
_oldest_write_timestamp,
+ _newest_write_timestamp,
&_output_rs_writer);
}
Status Compaction::construct_input_rowset_readers() {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 7f23f94b26..f2072347b2 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -64,7 +64,7 @@ protected:
Status modify_rowsets();
void gc_output_rowset();
- Status construct_output_rowset_writer(TabletSchemaSPtr schema, bool
is_vertical);
+ Status construct_output_rowset_writer(bool is_vertical = false);
Status construct_input_rowset_readers();
Status check_version_continuity(const std::vector<RowsetSharedPtr>&
rowsets);
@@ -76,6 +76,11 @@ protected:
bool should_vertical_compaction();
int64_t get_avg_segment_rows();
+ bool handle_ordered_data_compaction();
+ Status do_compact_ordered_rowsets();
+ bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs);
+ void build_basic_info();
+
protected:
// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
@@ -86,6 +91,8 @@ protected:
std::vector<RowsetReaderSharedPtr> _input_rs_readers;
int64_t _input_rowsets_size;
int64_t _input_row_num;
+ int64_t _input_num_segments;
+ int64_t _input_index_size;
RowsetSharedPtr _output_rowset;
std::unique_ptr<RowsetWriter> _output_rs_writer;
@@ -98,6 +105,7 @@ protected:
int64_t _oldest_write_timestamp;
int64_t _newest_write_timestamp;
RowIdConversion _rowid_conversion;
+ TabletSchemaSPtr _cur_tablet_schema;
DISALLOW_COPY_AND_ASSIGN(Compaction);
};
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 0d99676ecc..9094db2740 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -96,6 +96,19 @@ Status BetaRowset::do_load(bool /*use_cache*/) {
return Status::OK();
}
+Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) {
+ auto fs = _rowset_meta->fs();
+ if (!fs || _schema == nullptr) {
+ return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+ }
+ for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+ auto seg_path = segment_file_path(seg_id);
+ size_t file_size;
+ RETURN_IF_ERROR(fs->file_size(seg_path, &file_size));
+ segments_size->push_back(file_size);
+ }
+ return Status::OK();
+}
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>*
segments) {
auto fs = _rowset_meta->fs();
if (!fs || _schema == nullptr) {
@@ -182,14 +195,15 @@ void BetaRowset::do_close() {
// do nothing.
}
-Status BetaRowset::link_files_to(const std::string& dir, RowsetId
new_rowset_id) {
+Status BetaRowset::link_files_to(const std::string& dir, RowsetId
new_rowset_id,
+ size_t new_rowset_start_seg_id) {
DCHECK(is_local());
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
}
for (int i = 0; i < num_segments(); ++i) {
- auto dst_path = segment_file_path(dir, new_rowset_id, i);
+ auto dst_path = segment_file_path(dir, new_rowset_id, i +
new_rowset_start_seg_id);
// TODO(lingbin): use Env API? or EnvUtil?
bool dst_path_exist = false;
if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 1d4153f701..8c1035a4df 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -63,7 +63,8 @@ public:
Status remove() override;
- Status link_files_to(const std::string& dir, RowsetId new_rowset_id)
override;
+ Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
+ size_t new_rowset_start_seg_id = 0) override;
Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) override;
@@ -82,6 +83,8 @@ public:
Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
+ Status get_segments_size(std::vector<size_t>* segments_size);
+
protected:
BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
RowsetMetaSharedPtr rowset_meta);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 77e4e3ab40..de46e05afb 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -231,6 +231,27 @@ Status BetaRowsetWriter::flush_single_memtable(const
vectorized::Block* block) {
return Status::OK();
}
+RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr&
spec_rowset_meta) {
+ if (_rowset_meta->oldest_write_timestamp() == -1) {
+ _rowset_meta->set_oldest_write_timestamp(UnixSeconds());
+ }
+
+ if (_rowset_meta->newest_write_timestamp() == -1) {
+ _rowset_meta->set_newest_write_timestamp(UnixSeconds());
+ }
+
+ _build_rowset_meta_with_spec_field(_rowset_meta, spec_rowset_meta);
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(_context.tablet_schema,
_context.rowset_dir,
+ _rowset_meta, &rowset);
+ if (!status.ok()) {
+ LOG(WARNING) << "rowset init failed when build new rowset, res=" <<
status;
+ return nullptr;
+ }
+ _already_built = true;
+ return rowset;
+}
+
RowsetSharedPtr BetaRowsetWriter::build() {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
@@ -265,6 +286,37 @@ RowsetSharedPtr BetaRowsetWriter::build() {
return rowset;
}
+bool BetaRowsetWriter::_is_segment_overlapping() {
+ std::string last;
+ for (auto segment_encode_key : _segments_encoded_key_bounds) {
+ auto cur_min = segment_encode_key.min_key();
+ auto cur_max = segment_encode_key.max_key();
+ if (cur_min < last) {
+ return true;
+ }
+ last = cur_max;
+ }
+ return false;
+}
+
+void BetaRowsetWriter::_build_rowset_meta_with_spec_field(
+ RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr&
spec_rowset_meta) {
+ rowset_meta->set_num_rows(spec_rowset_meta->num_rows());
+ rowset_meta->set_total_disk_size(spec_rowset_meta->total_disk_size());
+ rowset_meta->set_data_disk_size(spec_rowset_meta->total_disk_size());
+ rowset_meta->set_index_disk_size(spec_rowset_meta->index_disk_size());
+ // TODO write zonemap to meta
+ rowset_meta->set_empty(spec_rowset_meta->num_rows() == 0);
+ rowset_meta->set_creation_time(time(nullptr));
+ rowset_meta->set_num_segments(spec_rowset_meta->num_segments());
+ rowset_meta->set_segments_overlap(spec_rowset_meta->segments_overlap());
+ rowset_meta->set_rowset_state(spec_rowset_meta->rowset_state());
+
+ std::vector<KeyBoundsPB> segments_key_bounds;
+ spec_rowset_meta->get_segments_key_bounds(&segments_key_bounds);
+ rowset_meta->set_segments_key_bounds(segments_key_bounds);
+}
+
void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta>
rowset_meta) {
rowset_meta->set_num_rows(_num_rows_written);
rowset_meta->set_total_disk_size(_total_data_size);
@@ -274,7 +326,7 @@ void
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
rowset_meta->set_empty(_num_rows_written == 0);
rowset_meta->set_creation_time(time(nullptr));
rowset_meta->set_num_segments(_num_segment);
- if (_num_segment <= 1) {
+ if (!_is_segment_overlapping()) {
rowset_meta->set_segments_overlap(NONOVERLAPPING);
}
if (_is_pending) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index de6688cb00..e65a090406 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -60,6 +60,8 @@ public:
// for this segment
RowsetSharedPtr build_tmp() override;
+ RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta)
override;
+
Version version() override { return _context.version; }
int64_t num_rows() const override { return _num_rows_written; }
@@ -83,7 +85,11 @@ private:
Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>*
writer);
- void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
+
+ void _build_rowset_meta(RowsetMetaSharedPtr rowset_meta);
+ void _build_rowset_meta_with_spec_field(RowsetMetaSharedPtr rowset_meta,
+ const RowsetMetaSharedPtr&
spec_rowset_meta);
+ bool _is_segment_overlapping();
protected:
RowsetWriterContext _context;
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 32fda0e102..312fe62b1a 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -161,6 +161,7 @@ public:
RowsetMetaPB get_rowset_pb() const { return
rowset_meta()->get_rowset_pb(); }
int64_t oldest_write_timestamp() const { return
rowset_meta()->oldest_write_timestamp(); }
int64_t newest_write_timestamp() const { return
rowset_meta()->newest_write_timestamp(); }
+ bool is_segments_overlapping() const { return
rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
// remove all files in this rowset
@@ -198,7 +199,8 @@ public:
}
// hard link all files in this rowset to `dir` to form a new rowset with
id `new_rowset_id`.
- virtual Status link_files_to(const std::string& dir, RowsetId
new_rowset_id) = 0;
+ virtual Status link_files_to(const std::string& dir, RowsetId
new_rowset_id,
+ size_t new_rowset_start_seg_id = 0) = 0;
// copy all files to `dir`
virtual Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) = 0;
@@ -265,6 +267,16 @@ public:
_rowset_meta->get_segments_key_bounds(segments_key_bounds);
return Status::OK();
}
+ std::string min_key() {
+ KeyBoundsPB key_bounds;
+ _rowset_meta->get_first_segment_key_bound(&key_bounds);
+ return key_bounds.min_key();
+ }
+ std::string max_key() {
+ KeyBoundsPB key_bounds;
+ _rowset_meta->get_last_segment_key_bound(&key_bounds);
+ return key_bounds.max_key();
+ }
bool check_rowset_segment();
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 7154116e7f..a4a7059d39 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -320,6 +320,15 @@ public:
segments_key_bounds->push_back(key_range);
}
}
+ virtual void get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
+ DCHECK(_rowset_meta_pb.segments_key_bounds_size() > 0);
+ *key_bounds = _rowset_meta_pb.segments_key_bounds(0);
+ }
+ virtual void get_last_segment_key_bound(KeyBoundsPB* key_bounds) {
+ DCHECK(_rowset_meta_pb.segments_key_bounds_size() > 0);
+ *key_bounds =
+
_rowset_meta_pb.segments_key_bounds(_rowset_meta_pb.segments_key_bounds_size()
- 1);
+ }
void set_segments_key_bounds(const std::vector<KeyBoundsPB>&
segments_key_bounds) {
for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index f5d095a48c..531dd103c8 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -82,6 +82,9 @@ public:
// real build will be called in DeltaWriter close_wait.
virtual RowsetSharedPtr build_tmp() = 0;
+ // For ordered rowset compaction, manual build rowset
+ virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr&
rowset_meta) = 0;
+
virtual Version version() = 0;
virtual int64_t num_rows() const = 0;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 971706abbf..284738e97d 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -51,11 +51,9 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer,
uint32_t segment_id,
_mem_tracker(std::make_unique<MemTracker>("SegmentWriter:Segment-" +
std::to_string(segment_id))) {
CHECK_NOTNULL(file_writer);
- if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
- _num_key_columns = _tablet_schema->num_key_columns();
- } else {
- _num_key_columns = _tablet_schema->num_short_key_columns();
- }
+ _num_key_columns = _tablet_schema->num_key_columns();
+ _num_short_key_columns = _tablet_schema->num_short_key_columns();
+ DCHECK(_num_key_columns >= _num_short_key_columns);
for (size_t cid = 0; cid < _num_key_columns; ++cid) {
const auto& column = _tablet_schema->column(cid);
_key_coders.push_back(get_key_coder(column.type()));
@@ -206,10 +204,15 @@ Status SegmentWriter::append_block(const
vectorized::Block* block, size_t row_po
// create primary indexes
for (size_t pos = 0; pos < num_rows; pos++) {
RETURN_IF_ERROR(
-
_primary_key_index_builder->add_item(_encode_keys(key_columns, pos)));
+
_primary_key_index_builder->add_item(_full_encode_keys(key_columns, pos)));
}
} else {
- // create short key indexes
+ // create short key indexes'
+ // for min_max key
+ for (size_t pos = 0; pos < num_rows; pos++) {
+ set_min_max_key(_full_encode_keys(key_columns, pos));
+ }
+ key_columns.resize(_num_short_key_columns);
for (const auto pos : short_key_pos) {
RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns,
pos)));
}
@@ -233,17 +236,15 @@ int64_t SegmentWriter::max_row_to_add(size_t
row_avg_size_in_bytes) {
return std::min(size_rows, count_rows);
}
-std::string SegmentWriter::_encode_keys(
+std::string SegmentWriter::_full_encode_keys(
const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos,
bool null_first) {
- if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write &&
- _tablet_schema->has_sequence_col()) {
+ assert(_key_index_size.size() == _num_key_columns);
+ if (_tablet_schema->has_sequence_col() &&
_opts.enable_unique_key_merge_on_write) {
assert(key_columns.size() == _num_key_columns + 1 &&
- _key_coders.size() == _num_key_columns + 1 &&
- _key_index_size.size() == _num_key_columns);
+ _key_coders.size() == _num_key_columns + 1);
} else {
- assert(key_columns.size() == _num_key_columns && _key_coders.size() ==
_num_key_columns &&
- _key_index_size.size() == _num_key_columns);
+ assert(key_columns.size() == _num_key_columns && _key_coders.size() ==
_num_key_columns);
}
std::string encoded_keys;
@@ -259,11 +260,31 @@ std::string SegmentWriter::_encode_keys(
continue;
}
encoded_keys.push_back(KEY_NORMAL_MARKER);
- if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
- _key_coders[cid]->full_encode_ascending(field, &encoded_keys);
- } else {
- _key_coders[cid]->encode_ascending(field, _key_index_size[cid],
&encoded_keys);
+ _key_coders[cid]->full_encode_ascending(field, &encoded_keys);
+ ++cid;
+ }
+ return encoded_keys;
+}
+
+std::string SegmentWriter::_encode_keys(
+ const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos,
+ bool null_first) {
+ assert(key_columns.size() == _num_short_key_columns);
+
+ std::string encoded_keys;
+ size_t cid = 0;
+ for (const auto& column : key_columns) {
+ auto field = column->get_data_at(pos);
+ if (UNLIKELY(!field)) {
+ if (null_first) {
+ encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
+ } else {
+ encoded_keys.push_back(KEY_NULL_LAST_MARKER);
+ }
+ continue;
}
+ encoded_keys.push_back(KEY_NORMAL_MARKER);
+ _key_coders[cid]->encode_ascending(field, _key_index_size[cid],
&encoded_keys);
++cid;
}
return encoded_keys;
@@ -275,24 +296,25 @@ Status SegmentWriter::append_row(const RowType& row) {
auto cell = row.cell(cid);
RETURN_IF_ERROR(_column_writers[cid]->append(cell));
}
+ std::string full_encoded_key;
+ encode_key<RowType, true, true>(&full_encoded_key, row, _num_key_columns);
+ if (_tablet_schema->has_sequence_col()) {
+ full_encoded_key.push_back(KEY_NORMAL_MARKER);
+ auto cid = _tablet_schema->sequence_col_idx();
+ auto cell = row.cell(cid);
+ row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(),
&full_encoded_key);
+ }
if (_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write) {
- std::string encoded_key;
- encode_key<RowType, true, true>(&encoded_key, row, _num_key_columns);
- if (_tablet_schema->has_sequence_col()) {
- encoded_key.push_back(KEY_NORMAL_MARKER);
- auto cid = _tablet_schema->sequence_col_idx();
- auto cell = row.cell(cid);
- row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(),
&encoded_key);
- }
- RETURN_IF_ERROR(_primary_key_index_builder->add_item(encoded_key));
+
RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key));
} else {
// At the beginning of one block, so add a short key index entry
if ((_num_rows_written % _opts.num_rows_per_block) == 0) {
std::string encoded_key;
- encode_key(&encoded_key, row, _num_key_columns);
+ encode_key(&encoded_key, row, _num_short_key_columns);
RETURN_IF_ERROR(_short_key_index_builder->add_item(encoded_key));
}
+ set_min_max_key(full_encoded_key);
}
++_num_rows_written;
return Status::OK();
@@ -465,13 +487,22 @@ Status SegmentWriter::_write_raw_data(const
std::vector<Slice>& slices) {
}
Slice SegmentWriter::min_encoded_key() {
- return (_primary_key_index_builder == nullptr) ? Slice()
+ return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(),
_min_key.size())
:
_primary_key_index_builder->min_key();
}
Slice SegmentWriter::max_encoded_key() {
- return (_primary_key_index_builder == nullptr) ? Slice()
+ return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(),
_max_key.size())
:
_primary_key_index_builder->max_key();
}
+void SegmentWriter::set_min_max_key(const Slice& key) {
+ if (UNLIKELY(_is_first_row)) {
+ _min_key.append(key.get_data(), key.get_size());
+ _is_first_row = false;
+ }
+ _max_key.clear();
+ _max_key.append(key.get_data(), key.get_size());
+}
+
} // namespace segment_v2
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 94ff14585c..5a7342a266 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -26,6 +26,7 @@
#include "gen_cpp/segment_v2.pb.h"
#include "gutil/macros.h"
#include "olap/tablet_schema.h"
+#include "util/faststring.h"
#include "vec/core/block.h"
#include "vec/olap/olap_data_convertor.h"
@@ -109,6 +110,11 @@ private:
Status _write_raw_data(const std::vector<Slice>& slices);
std::string _encode_keys(const
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
size_t pos, bool null_first = true);
+ // for unique-key merge on write and segment min_max key
+ std::string _full_encode_keys(
+ const std::vector<vectorized::IOlapColumnDataAccessor*>&
key_columns, size_t pos,
+ bool null_first = true);
+ void set_min_max_key(const Slice& key);
void _reset_column_writers();
@@ -124,6 +130,7 @@ private:
SegmentFooterPB _footer;
size_t _num_key_columns;
+ size_t _num_short_key_columns;
std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder;
std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
@@ -143,6 +150,10 @@ private:
// In vertical compaction row count is recorded when key columns group
finish
// and _num_rows_written will be updated in value column group
uint32_t _row_count = 0;
+
+ bool _is_first_row = true;
+ faststring _min_key;
+ faststring _max_key;
};
} // namespace segment_v2
diff --git a/be/test/io/cache/remote_file_cache_test.cpp
b/be/test/io/cache/remote_file_cache_test.cpp
index a88cf2fbf8..e7737443a9 100644
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ b/be/test/io/cache/remote_file_cache_test.cpp
@@ -139,8 +139,8 @@ protected:
EXPECT_TRUE(st.ok());
EXPECT_TRUE(file_writer->close().ok());
- EXPECT_EQ("", writer.min_encoded_key().to_string());
- EXPECT_EQ("", writer.max_encoded_key().to_string());
+ EXPECT_NE("", writer.min_encoded_key().to_string());
+ EXPECT_NE("", writer.max_encoded_key().to_string());
st = segment_v2::Segment::open(fs, path, "", 0, query_schema, res);
EXPECT_TRUE(st.ok());
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index 27b43fec3c..01ac4a667c 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -75,7 +75,7 @@ protected:
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
tablet_schema_pb.set_keys_type(keys_type);
- tablet_schema_pb.set_num_short_key_columns(2);
+ tablet_schema_pb.set_num_short_key_columns(1);
tablet_schema_pb.set_num_rows_per_row_block(1024);
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
tablet_schema_pb.set_next_column_unique_id(4);
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp
b/be/test/olap/rowset/segment_v2/segment_test.cpp
index ac2d8febfd..5071db4611 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -151,29 +151,22 @@ protected:
EXPECT_TRUE(st.ok());
EXPECT_TRUE(file_writer->close().ok());
// Check min/max key generation
- if (build_schema->keys_type() == UNIQUE_KEYS &&
opts.enable_unique_key_merge_on_write) {
- // Create min row
- for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
- RowCursorCell cell = row.cell(cid);
- generator(0, cid, 0 / opts.num_rows_per_block, cell);
- }
- std::string min_encoded_key;
- encode_key<RowCursor, true, true>(&min_encoded_key, row,
- build_schema->num_key_columns());
- EXPECT_EQ(min_encoded_key, writer.min_encoded_key().to_string());
- // Create max row
- for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
- RowCursorCell cell = row.cell(cid);
- generator(nrows - 1, cid, (nrows - 1) /
opts.num_rows_per_block, cell);
- }
- std::string max_encoded_key;
- encode_key<RowCursor, true, true>(&max_encoded_key, row,
- build_schema->num_key_columns());
- EXPECT_EQ(max_encoded_key, writer.max_encoded_key().to_string());
- } else {
- EXPECT_EQ("", writer.min_encoded_key().to_string());
- EXPECT_EQ("", writer.max_encoded_key().to_string());
+ // Create min row
+ for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
+ RowCursorCell cell = row.cell(cid);
+ generator(0, cid, 0 / opts.num_rows_per_block, cell);
+ }
+ std::string min_encoded_key;
+ encode_key<RowCursor, true, true>(&min_encoded_key, row,
build_schema->num_key_columns());
+ EXPECT_EQ(min_encoded_key, writer.min_encoded_key().to_string());
+ // Create max row
+ for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
+ RowCursorCell cell = row.cell(cid);
+ generator(nrows - 1, cid, (nrows - 1) / opts.num_rows_per_block,
cell);
}
+ std::string max_encoded_key;
+ encode_key<RowCursor, true, true>(&max_encoded_key, row,
build_schema->num_key_columns());
+ EXPECT_EQ(max_encoded_key, writer.max_encoded_key().to_string());
st = Segment::open(fs, path, "", 0, query_schema, res);
EXPECT_TRUE(st.ok());
diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h
index 17865ce3d8..234a64eeab 100644
--- a/be/test/testutil/mock_rowset.h
+++ b/be/test/testutil/mock_rowset.h
@@ -37,7 +37,8 @@ class MockRowset : public Rowset {
return Status::NotSupported("MockRowset not support this method.");
}
- virtual Status link_files_to(const std::string& dir, RowsetId
new_rowset_id) override {
+ virtual Status link_files_to(const std::string& dir, RowsetId
new_rowset_id,
+ size_t start_seg_id) override {
return Status::NotSupported("MockRowset not support this method.");
}
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index 6791f7cacd..b9c6d059fb 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -78,4 +78,4 @@ pg_14_port=5442
// See `docker/thirdparties/start-thirdparties-docker.sh`
enableHiveTest=false
hms_port=9183
-hdfs_port=8120
+hdfs_port=8120
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]