This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3dee2e51b92 [fix](cluster key) fix cluster key too many segment after
compaction (#44927)
3dee2e51b92 is described below
commit 3dee2e51b9221a5529ce051c065084df46f6c91e
Author: meiyi <[email protected]>
AuthorDate: Wed Dec 4 11:36:24 2024 +0800
[fix](cluster key) fix cluster key too many segment after compaction
(#44927)
---
be/src/common/config.cpp | 3 +++
be/src/common/config.h | 3 +++
be/src/olap/rowset/segment_v2/segment_writer.cpp | 31 +++++++++++-----------
be/src/olap/rowset/segment_v2/segment_writer.h | 4 +++
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 6 ++---
...est_compaction_with_multi_append_columns.groovy | 16 +++++++----
6 files changed, 40 insertions(+), 23 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 63989a76261..b3e7d0bce5e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1166,6 +1166,9 @@ DEFINE_mBool(enable_missing_rows_correctness_check,
"false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
+// When the size of primary keys in memory exceeds this value, finish current
segment
+// and create a new segment, used in compaction. Default 50MB.
+DEFINE_mInt64(mow_primary_key_index_max_size_in_memory, "52428800");
// When the version is not continuous for MOW table in publish phase and the
gap between
// current txn's publishing version and the max version of the tablet exceeds
this value,
// don't print warning log
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 29e55e64063..59fc61e8cb3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1236,6 +1236,9 @@ DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
+// When the size of primary keys in memory exceeds this value, finish current
segment
+// and create a new segment, used in compaction.
+DECLARE_mInt64(mow_primary_key_index_max_size_in_memory);
// When the version is not continuous for MOW table in publish phase and the
gap between
// current txn's publishing version and the max version of the tablet exceeds
this value,
// don't print warning log
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index fc22c3570e5..c6c9664be4b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -1016,6 +1016,18 @@ Status SegmentWriter::finalize_columns_index(uint64_t*
index_size) {
*index_size = _file_writer->bytes_appended() - index_start;
if (_has_key) {
if (_is_mow_with_cluster_key()) {
+ // 1. sort primary keys
+ std::sort(_primary_keys.begin(), _primary_keys.end());
+ // 2. write primary keys index
+ std::string last_key;
+ for (const auto& key : _primary_keys) {
+ DCHECK(key.compare(last_key) > 0)
+ << "found duplicate key or key is not sorted! current
key: " << key
+ << ", last key: " << last_key;
+ RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
+ last_key = key;
+ }
+
RETURN_IF_ERROR(_write_short_key_index());
*index_size = _file_writer->bytes_appended() - index_start;
RETURN_IF_ERROR(_write_primary_key_index());
@@ -1236,27 +1248,16 @@ Status SegmentWriter::_generate_primary_key_index(
last_key = std::move(key);
}
} else { // mow table with cluster key
- // 1. generate primary keys in memory
- std::vector<std::string> primary_keys;
+ // generate primary keys in memory
for (uint32_t pos = 0; pos < num_rows; pos++) {
std::string key = _full_encode_keys(primary_key_coders,
primary_key_columns, pos);
_maybe_invalid_row_cache(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
- _encode_rowid(pos, &key);
- primary_keys.emplace_back(std::move(key));
- }
- // 2. sort primary keys
- std::sort(primary_keys.begin(), primary_keys.end());
- // 3. write primary keys index
- std::string last_key;
- for (const auto& key : primary_keys) {
- DCHECK(key.compare(last_key) > 0)
- << "found duplicate key or key is not sorted! current key:
" << key
- << ", last key: " << last_key;
- RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
- last_key = key;
+ _encode_rowid(pos + _num_rows_written, &key);
+ _primary_keys_size += key.size();
+ _primary_keys.emplace_back(std::move(key));
}
}
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 9a8af131087..a1b7491a669 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -155,6 +155,8 @@ public:
return Status::OK();
}
+ uint64_t primary_keys_size() const { return _primary_keys_size; }
+
private:
DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
Status _create_column_writer(uint32_t cid, const TabletColumn& column,
@@ -260,6 +262,8 @@ private:
std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
// contains auto generated columns, should be nullptr if no variants's
subcolumns
TabletSchemaSPtr _flush_schema = nullptr;
+ std::vector<std::string> _primary_keys;
+ uint64_t _primary_keys_size = 0;
};
} // namespace segment_v2
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index ee9bfd97745..f493f21ac97 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -72,10 +72,9 @@ Status VerticalBetaRowsetWriter<T>::add_columns(const
vectorized::Block* block,
_cur_writer_idx = 0;
RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block,
0, num_rows));
} else if (is_key) {
- // TODO for cluster key, always create new segment writer because the
primary keys are
- // sorted in SegmentWriter::_generate_primary_key_index, will cause
too many segments
if (_segment_writers[_cur_writer_idx]->num_rows_written() >
max_rows_per_segment ||
- has_cluster_key) {
+ (has_cluster_key &&
_segment_writers[_cur_writer_idx]->primary_keys_size() >
+
config::mow_primary_key_index_max_size_in_memory)) {
// segment is full, need flush columns and create new segment
writer
RETURN_IF_ERROR(_flush_columns(_segment_writers[_cur_writer_idx].get(), true));
@@ -181,6 +180,7 @@ Status VerticalBetaRowsetWriter<T>::_create_segment_writer(
writer_options.enable_unique_key_merge_on_write =
context.enable_unique_key_merge_on_write;
writer_options.rowset_ctx = &context;
writer_options.max_rows_per_segment = context.max_rows_per_segment;
+ // TODO if support VerticalSegmentWriter, also need to handle cluster key
primary key index
*writer = std::make_unique<segment_v2::SegmentWriter>(
segment_file_writer.get(), seg_id, context.tablet_schema,
context.tablet,
context.data_dir, writer_options,
inverted_index_file_writer.get());
diff --git
a/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
b/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
index 8403b17cce5..acac719b8c5 100644
---
a/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
+++
b/regression-test/suites/unique_with_mow_c_p0/test_compaction_with_multi_append_columns.groovy
@@ -128,11 +128,17 @@ suite("test_compaction_with_multi_append_columns", "p0") {
assertEquals("success", compactJson.status.toLowerCase())
}
- (code, out, err) =
be_show_tablet_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
- logger.info("Show tablet status: code=" + code + ", out=" + out + ",
err=" + err)
- assertEquals(code, 0)
- def json = parseJson(out.trim())
- logger.info("tablet rowset: " + json)
+ for (int i = 0; i < 10; i++) {
+ (code, out, err) =
be_show_tablet_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("loop " + i + ", Show tablet status: code=" + code +
", out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ def json = parseJson(out.trim())
+ logger.info("tablet rowsets: " + json)
+ if (json.rowsets.size() <= 5) {
+ break
+ }
+ sleep(2000)
+ }
}
checkNoDuplicatedKeys(tableName)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]