This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 589d11547c [feature](load) add segment bytes limit in segcompaction
(#22526) (#22638)
589d11547c is described below
commit 589d11547c88d164956551ca5768929bc37f288c
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 11 17:47:52 2023 +0800
[feature](load) add segment bytes limit in segcompaction (#22526) (#22638)
---
be/src/common/config.cpp | 21 +++--
be/src/common/config.h | 21 +++--
be/src/olap/olap_server.cpp | 4 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 104 ++++++++++-----------
be/src/olap/rowset/beta_rowset_writer.h | 5 +-
be/test/olap/segcompaction_test.cpp | 26 +++---
docs/en/docs/admin-manual/config/be-config.md | 32 ++++++-
docs/en/docs/advanced/best-practice/compaction.md | 2 +-
docs/zh-CN/docs/admin-manual/config/be-config.md | 40 +++++++-
.../docs/advanced/best-practice/compaction.md | 2 +-
10 files changed, 165 insertions(+), 92 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 8e33a42e4e..6666ee9ef0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -911,14 +911,23 @@ DEFINE_Bool(hide_webserver_config_page, "false");
DEFINE_Bool(enable_segcompaction, "true");
-// Trigger segcompaction if the num of segments in a rowset exceeds this
threshold.
-DEFINE_Int32(segcompaction_threshold_segment_num, "10");
+// Max number of segments allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_batch_size, "10");
-// The segment whose row number above the threshold will be compacted during
segcompaction
-DEFINE_Int32(segcompaction_small_threshold, "1048576");
+// Max row count allowed in a single source segment, bigger segments will be
skipped.
+DEFINE_Int32(segcompaction_candidate_max_rows, "1048576");
-// This config can be set to limit thread number in segcompaction thread pool.
-DEFINE_mInt32(segcompaction_max_threads, "10");
+// Max file size allowed in a single source segment, bigger segments will be
skipped.
+DEFINE_Int64(segcompaction_candidate_max_bytes, "104857600");
+
+// Max total row count allowed in a single segcompaction task.
+DEFINE_Int32(segcompaction_task_max_rows, "1572864");
+
+// Max total file size allowed in a single segcompaction task.
+DEFINE_Int64(segcompaction_task_max_bytes, "157286400");
+
+// Global segcompaction thread pool size.
+DEFINE_mInt32(segcompaction_num_threads, "5");
// enable java udf and jdbc scannode
DEFINE_Bool(enable_java_support, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7316870cd6..4f4c703308 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -953,14 +953,23 @@ DECLARE_Bool(hide_webserver_config_page);
DECLARE_Bool(enable_segcompaction);
-// Trigger segcompaction if the num of segments in a rowset exceeds this
threshold.
-DECLARE_Int32(segcompaction_threshold_segment_num);
+// Max number of segments allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_batch_size);
-// The segment whose row number above the threshold will be compacted during
segcompaction
-DECLARE_Int32(segcompaction_small_threshold);
+// Max row count allowed in a single source segment, bigger segments will be
skipped.
+DECLARE_Int32(segcompaction_candidate_max_rows);
-// This config can be set to limit thread number in segcompaction thread pool.
-DECLARE_mInt32(segcompaction_max_threads);
+// Max file size allowed in a single source segment, bigger segments will be
skipped.
+DECLARE_Int64(segcompaction_candidate_max_bytes);
+
+// Max total row count allowed in a single segcompaction task.
+DECLARE_Int32(segcompaction_task_max_rows);
+
+// Max total file size allowed in a single segcompaction task.
+DECLARE_Int64(segcompaction_task_max_bytes);
+
+// Global segcompaction thread pool size.
+DECLARE_mInt32(segcompaction_num_threads);
// enable java udf and jdbc scannode
DECLARE_Bool(enable_java_support);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 166e86ce1b..65abb5f2ac 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -133,8 +133,8 @@ Status StorageEngine::start_bg_threads() {
if (config::enable_segcompaction) {
ThreadPoolBuilder("SegCompactionTaskThreadPool")
- .set_min_threads(config::segcompaction_max_threads)
- .set_max_threads(config::segcompaction_max_threads)
+ .set_min_threads(config::segcompaction_num_threads)
+ .set_max_threads(config::segcompaction_num_threads)
.build(&_seg_compaction_thread_pool);
}
ThreadPoolBuilder("ColdDataCompactionTaskThreadPool")
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 85791cd01f..5c66182381 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -34,6 +34,7 @@
#include "common/logging.h"
#include "gutil/integral_types.h"
#include "gutil/strings/substitute.h"
+#include "io/fs/file_reader.h"
#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
@@ -142,27 +143,22 @@ Status BetaRowsetWriter::add_block(const
vectorized::Block* block) {
return _add_block(block, &_segment_writer);
}
-Status BetaRowsetWriter::_load_noncompacted_segments(
- std::vector<segment_v2::SegmentSharedPtr>* segments, size_t num) {
+Status
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr&
segment,
+ int32_t segment_id) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>(
- "BetaRowsetWriter::_load_noncompacted_segments
_rowset_meta->fs get failed");
- }
- for (int seg_id = _segcompacted_point; seg_id < num; ++seg_id) {
- auto seg_path =
- BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, seg_id);
- std::shared_ptr<segment_v2::Segment> segment;
- auto type = config::enable_file_cache ? config::file_cache_type : "";
- io::FileReaderOptions reader_options(io::cache_type_from_string(type),
- io::SegmentCachePathPolicy());
- auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(),
- _context.tablet_schema,
reader_options, &segment);
- if (!s.ok()) {
- LOG(WARNING) << "failed to open segment. " << seg_path << ":" <<
s.to_string();
- return s;
- }
- segments->push_back(std::move(segment));
+ "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs
get failed");
+ }
+ auto path = BetaRowset::segment_file_path(_context.rowset_dir,
_context.rowset_id, segment_id);
+ auto type = config::enable_file_cache ? config::file_cache_type : "";
+ io::FileReaderOptions reader_options(io::cache_type_from_string(type),
+ io::SegmentCachePathPolicy());
+ auto s = segment_v2::Segment::open(fs, path, segment_id, rowset_id(),
_context.tablet_schema,
+ reader_options, &segment);
+ if (!s.ok()) {
+ LOG(WARNING) << "failed to open segment. " << path << ":" << s;
+ return s;
}
return Status::OK();
}
@@ -172,43 +168,46 @@ Status BetaRowsetWriter::_load_noncompacted_segments(
* 2. if the consecutive smalls end up with a big, compact the smalls, except
* single small
* 3. if the consecutive smalls end up with small, compact the smalls if the
- * length is beyond (config::segcompaction_threshold_segment_num / 2)
+ * length is beyond (config::segcompaction_batch_size / 2)
*/
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
- SegCompactionCandidatesSharedPtr segments) {
- std::vector<segment_v2::SegmentSharedPtr> all_segments;
- // subtract one to skip last (maybe active) segment
- RETURN_IF_ERROR(_load_noncompacted_segments(&all_segments, _num_segment -
1));
-
- if (VLOG_DEBUG_IS_ON) {
- vlog_buffer.clear();
- for (auto& segment : all_segments) {
- fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(),
segment->num_rows());
- }
- VLOG_DEBUG << "all noncompacted segments num:" << all_segments.size()
- << " list of segments:" << fmt::to_string(vlog_buffer);
- }
-
- bool is_terminated_by_big = false;
- bool let_big_terminate = false;
- size_t small_threshold = config::segcompaction_small_threshold;
- for (int64_t i = 0; i < all_segments.size(); ++i) {
- segment_v2::SegmentSharedPtr seg = all_segments[i];
- if (seg->num_rows() > small_threshold) {
- if (let_big_terminate) {
- is_terminated_by_big = true;
- break;
- } else {
+ SegCompactionCandidatesSharedPtr& segments) {
+ segments = std::make_shared<SegCompactionCandidates>();
+ // skip last (maybe active) segment
+ int32_t last_segment = _num_segment - 1;
+ size_t task_bytes = 0;
+ uint32_t task_rows = 0;
+ int32_t segid;
+ for (segid = _segcompacted_point;
+ segid < last_segment && segments->size() <
config::segcompaction_batch_size; segid++) {
+ segment_v2::SegmentSharedPtr segment;
+ RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
+ const auto segment_rows = segment->num_rows();
+ const auto segment_bytes = segment->file_reader()->size();
+ bool is_large_segment = segment_rows >
config::segcompaction_candidate_max_rows ||
+ segment_bytes >
config::segcompaction_candidate_max_bytes;
+ if (is_large_segment) {
+ if (segid == _segcompacted_point) {
+ // skip large segments at the front
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+ continue;
+ } else {
+ // stop because we need consecutive segments
+ break;
}
- } else {
- let_big_terminate = true; // break if find a big after small
- segments->push_back(seg);
}
+ bool is_task_full = task_rows + segment_rows >
config::segcompaction_task_max_rows ||
+ task_bytes + segment_bytes >
config::segcompaction_task_max_bytes;
+ if (is_task_full) {
+ break;
+ }
+ segments->push_back(segment);
+ task_rows += segment->num_rows();
+ task_bytes += segment->file_reader()->size();
}
size_t s = segments->size();
- if (!is_terminated_by_big && s <=
(config::segcompaction_threshold_segment_num / 2)) {
- // start with big segments and end with small, better to do it in next
+ if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
+ // we didn't collect enough segments, better to do it in next
// round to compact more at once
segments->clear();
return Status::OK();
@@ -343,9 +342,8 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
if (_segcompaction_status.load() != OK) {
status = Status::Error<SEGCOMPACTION_FAILED>(
"BetaRowsetWriter::_segcompaction_if_necessary meet invalid
state");
- } else if ((_num_segment - _segcompacted_point) >=
- config::segcompaction_threshold_segment_num) {
- SegCompactionCandidatesSharedPtr segments =
std::make_shared<SegCompactionCandidates>();
+ } else if ((_num_segment - _segcompacted_point) >=
config::segcompaction_batch_size) {
+ SegCompactionCandidatesSharedPtr segments;
status = _find_longest_consecutive_small_segment(segments);
if (LIKELY(status.ok()) && (segments->size() > 0)) {
LOG(INFO) << "submit segcompaction task, tablet_id:" <<
_context.tablet_id
@@ -381,9 +379,7 @@ Status
BetaRowsetWriter::_segcompaction_rename_last_segments() {
// currently we only rename remaining segments to reduce wait time
// so that transaction can be committed ASAP
VLOG_DEBUG << "segcompaction last few segments";
- SegCompactionCandidates segments;
- RETURN_IF_ERROR(_load_noncompacted_segments(&segments, _num_segment));
- for (int i = 0; i < segments.size(); ++i) {
+ for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
}
return Status::OK();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 29c67218c1..af4dc2ca50 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -144,9 +144,8 @@ private:
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
Status _segcompaction_if_necessary();
Status _segcompaction_rename_last_segments();
- Status
_load_noncompacted_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
- size_t num);
- Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr
segments);
+ Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id);
+ Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr&
segments);
bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
bool _check_and_set_is_doing_segcompaction();
diff --git a/be/test/olap/segcompaction_test.cpp
b/be/test/olap/segcompaction_test.cpp
index 6a4476f423..2a894b9197 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -231,9 +231,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
RowsetSharedPtr rowset;
const int num_segments = 15;
const uint32_t rows_per_segment = 4096;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
- config::segcompaction_threshold_segment_num = 10;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 10;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -340,8 +340,8 @@ TEST_F(SegCompactionTest,
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
create_tablet_schema(tablet_schema, DUP_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -484,8 +484,8 @@ TEST_F(SegCompactionTest,
SegCompactionInterleaveWithBig_OoOoO) {
create_tablet_schema(tablet_schema, DUP_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- config::segcompaction_threshold_segment_num = 5;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ config::segcompaction_batch_size = 5;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -607,9 +607,9 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadUniqueTableSmall) {
create_tablet_schema(tablet_schema, UNIQUE_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
- config::segcompaction_threshold_segment_num = 3;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 3;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
@@ -841,9 +841,9 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadAggTableSmall) {
create_tablet_schema(tablet_schema, AGG_KEYS);
RowsetSharedPtr rowset;
- config::segcompaction_small_threshold = 6000; // set threshold above
- // rows_per_segment
- config::segcompaction_threshold_segment_num = 3;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 3;
std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 9399d35371..601577f252 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -622,18 +622,42 @@ BaseCompaction:546859:
* Description: Enable to use segment compaction during loading to avoid -238
error
* Default value: true
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
* Type: int32
-* Description: Trigger segcompaction if the num of segments in a rowset
exceeds this threshold
+* Description: Max number of segments allowed in a single segcompaction task.
* Default value: 10
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
* Type: int32
-* Description: The segment whose row number above the threshold will be
compacted during segcompaction
+* Description: Max row count allowed in a single source segment, bigger
segments will be skipped.
* Default value: 1048576
+#### `segcompaction_candidate_max_bytes`
+
+* Type: int64
+* Description: Max file size allowed in a single source segment, bigger
segments will be skipped.
+* Default value: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* Type: int32
+* Description: Max total row count allowed in a single segcompaction task.
+* Default value: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* Type: int64
+* Description: Max total file size allowed in a single segcompaction task.
+* Default value: 157286400
+
+#### `segcompaction_num_threads`
+
+* Type: int32
+* Description: Global segcompaction thread pool size.
+* Default value: 5
+
#### `disable_compaction_trace_log`
* Type: bool
diff --git a/docs/en/docs/advanced/best-practice/compaction.md
b/docs/en/docs/advanced/best-practice/compaction.md
index 5963a1bdfb..5168b38dab 100644
--- a/docs/en/docs/advanced/best-practice/compaction.md
+++ b/docs/en/docs/advanced/best-practice/compaction.md
@@ -59,7 +59,7 @@ The following features are provided by segment compaction:
BE configuration:
- `enable_segcompaction=true` turn it on.
-- `segcompaction_threshold_segment_num` is used to configure the interval for
merging. The default value 10 means that every 10 segment files will trigger a
segment compaction. It is recommended to set between 10 - 30. The larger value
will increase the memory usage of segment compaction.
+- `segcompaction_batch_size` is used to configure the interval for merging.
The default value 10 means that every 10 segment files will trigger a segment
compaction. It is recommended to set between 10 - 30. The larger value will
increase the memory usage of segment compaction.
Situations where segment compaction is recommended:
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 8e41243e52..291c711470 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -636,18 +636,54 @@ BaseCompaction:546859:
* 描述:在导入时进行 segment compaction 来减少 segment 数量, 以避免出现写入时的 -238 错误
* 默认值:true
-#### `segcompaction_threshold_segment_num`
+#### `segcompaction_batch_size`
* 类型:int32
* 描述:当 segment 数量超过此阈值时触发 segment compaction
* 默认值:10
-#### `segcompaction_small_threshold`
+#### `segcompaction_candidate_max_rows`
* 类型:int32
* 描述:当 segment 的行数超过此大小时则会在 segment compaction 时被 compact,否则跳过
* 默认值:1048576
+#### `segcompaction_batch_size`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中的最大原始 segment 数量。
+* 默认值: 10
+
+#### `segcompaction_candidate_max_rows`
+
+* 类型: int32
+* 描述: segment compaction 任务中允许的单个原始 segment 行数,过大的 segment 将被跳过。
+* 默认值: 1048576
+
+#### `segcompaction_candidate_max_bytes`
+
+* 类型: int64
+* 描述: segment compaction 任务中允许的单个原始 segment 大小(字节),过大的 segment 将被跳过。
+* 默认值: 104857600
+
+#### `segcompaction_task_max_rows`
+
+* 类型: int32
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总行数。
+* 默认值: 1572864
+
+#### `segcompaction_task_max_bytes`
+
+* 类型: int64
+* 描述: 单个 segment compaction 任务中允许的原始 segment 总大小(字节)。
+* 默认值: 157286400
+
+#### `segcompaction_num_threads`
+
+* 类型: int32
+* 描述: segment compaction 线程池大小。
+* 默认值: 5
+
#### `disable_compaction_trace_log`
* 类型: bool
diff --git a/docs/zh-CN/docs/advanced/best-practice/compaction.md
b/docs/zh-CN/docs/advanced/best-practice/compaction.md
index 342397c740..5b8562db50 100644
--- a/docs/zh-CN/docs/advanced/best-practice/compaction.md
+++ b/docs/zh-CN/docs/advanced/best-practice/compaction.md
@@ -57,7 +57,7 @@ Segment compaction 有以下特点:
开启和配置方法(BE 配置):
- `enable_segcompaction = true` 可以使能该功能
-- `segcompaction_threshold_segment_num` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment
文件将会进行一次 segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
+- `segcompaction_batch_size` 用于配置合并的间隔。默认 10 表示每生成 10 个 segment 文件将会进行一次
segment compaction。一般设置为 10 - 30,过大的值会增加 segment compaction 的内存用量。
如有以下场景或问题,建议开启此功能:
- 导入大量数据触发 OLAP_ERR_TOO_MANY_SEGMENTS (errcode -238) 错误导致导入失败。此时建议打开 segment
compaction 的功能,在导入过程中对 segment 进行合并控制最终的数量。
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]