This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a1799e5506 [improve](point query) reuse rowset from lookup_row_key to
eliminate tablet lock (#16770)
a1799e5506 is described below
commit a1799e5506d7d5f07b12f860d61c490c57f24cf2
Author: lihangyu <[email protected]>
AuthorDate: Mon Feb 20 18:38:11 2023 +0800
[improve](point query) reuse rowset from lookup_row_key to eliminate tablet
lock (#16770)
Reuse rowset for 2 reasons:
1. eliminate tablet lock for performance issue, if other thread hold the
lock too long could affect point query latency
2. rowset should be acquired during lookup procedure
---
be/src/olap/tablet.cpp | 14 ++++++----
be/src/olap/tablet.h | 7 ++---
be/src/service/point_query_executor.cpp | 46 +++++++++++++++++++--------------
be/src/service/point_query_executor.h | 24 ++++++++++++++---
4 files changed, 60 insertions(+), 31 deletions(-)
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 01087a2a01..8cbafa7f6e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2223,11 +2223,10 @@ TabletSchemaSPtr
Tablet::get_max_version_schema(std::lock_guard<std::shared_mute
}
Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation&
row_location,
- const TupleDescriptor* desc, vectorized::Block*
block,
- bool write_to_cache) {
+ RowsetSharedPtr input_rowset, const
TupleDescriptor* desc,
+ vectorized::Block* block, bool write_to_cache) {
// read row data
- BetaRowsetSharedPtr rowset =
-
std::static_pointer_cast<BetaRowset>(get_rowset(row_location.rowset_id));
+ BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(input_rowset);
if (!rowset) {
return Status::NotFound(
fmt::format("rowset {} not found",
row_location.rowset_id.to_string()));
@@ -2286,7 +2285,8 @@ Status Tablet::lookup_row_data(const Slice& encoded_key,
const RowLocation& row_
}
Status Tablet::lookup_row_key(const Slice& encoded_key, const
RowsetIdUnorderedSet* rowset_ids,
- RowLocation* row_location, uint32_t version) {
+ RowLocation* row_location, uint32_t version,
+ RowsetSharedPtr* rowset) {
std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs;
size_t seq_col_length = 0;
if (_schema->has_sequence_col()) {
@@ -2335,6 +2335,10 @@ Status Tablet::lookup_row_key(const Slice& encoded_key,
const RowsetIdUnorderedS
break;
}
*row_location = loc;
+ if (rowset) {
+ // return it's rowset
+ *rowset = rs.first;
+ }
// find it and return
return s;
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 34fe9c32fe..82f42dccdd 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -341,12 +341,13 @@ public:
// NOTE: the method only works in unique key model with primary key index,
you will got a
// not supported error in other data model.
Status lookup_row_key(const Slice& encoded_key, const
RowsetIdUnorderedSet* rowset_ids,
- RowLocation* row_location, uint32_t version);
+ RowLocation* row_location, uint32_t version,
+ RowsetSharedPtr* rowset = nullptr);
// Lookup a row with TupleDescriptor and fill Block
Status lookup_row_data(const Slice& encoded_key, const RowLocation&
row_location,
- const TupleDescriptor* desc, vectorized::Block*
block,
- bool write_to_cache = false);
+ RowsetSharedPtr rowset, const TupleDescriptor* desc,
+ vectorized::Block* block, bool write_to_cache =
false);
// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index 9c70e08061..e33a7a186b 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -195,7 +195,7 @@ std::string PointQueryExecutor::print_profile() {
"",
total_us, init_us, init_key_us, lookup_key_us, lookup_data_us,
output_data_us,
_hit_lookup_cache, _binary_row_format,
_reusable->output_exprs().size(),
- _primary_keys.size(), _row_cache_hits);
+ _row_read_ctxs.size(), _row_cache_hits);
}
Status PointQueryExecutor::_init_keys(const PTabletKeyLookupRequest* request) {
@@ -209,43 +209,49 @@ Status PointQueryExecutor::_init_keys(const
PTabletKeyLookupRequest* request) {
olap_tuples[i].add_value(key_col);
}
}
- _primary_keys.resize(olap_tuples.size());
+ _row_read_ctxs.resize(olap_tuples.size());
// get row cursor and encode keys
for (size_t i = 0; i < olap_tuples.size(); ++i) {
RowCursor cursor;
RETURN_IF_ERROR(cursor.init_scan_key(_tablet->tablet_schema(),
olap_tuples[i].values()));
RETURN_IF_ERROR(cursor.from_tuple(olap_tuples[i]));
- encode_key_with_padding<RowCursor, true, true>(
- &_primary_keys[i], cursor,
_tablet->tablet_schema()->num_key_columns(), true);
+ encode_key_with_padding<RowCursor, true,
true>(&_row_read_ctxs[i]._primary_key, cursor,
+
_tablet->tablet_schema()->num_key_columns(),
+ true);
}
return Status::OK();
}
Status PointQueryExecutor::_lookup_row_key() {
SCOPED_TIMER(&_profile_metrics.lookup_key_ns);
- _row_locations.resize(_primary_keys.size());
- _cached_row_data.resize(_primary_keys.size());
// 2. lookup row location
Status st;
- for (size_t i = 0; i < _primary_keys.size(); ++i) {
+ for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
RowLocation location;
if (!config::disable_storage_row_cache) {
RowCache::CacheHandle cache_handle;
- auto hit_cache =
RowCache::instance()->lookup({_tablet->tablet_id(), _primary_keys[i]},
- &cache_handle);
+ auto hit_cache = RowCache::instance()->lookup(
+ {_tablet->tablet_id(), _row_read_ctxs[i]._primary_key},
&cache_handle);
if (hit_cache) {
- _cached_row_data[i] = std::move(cache_handle);
+ _row_read_ctxs[i]._cached_row_data = std::move(cache_handle);
++_row_cache_hits;
continue;
}
}
- st = (_tablet->lookup_row_key(_primary_keys[i], nullptr, &location,
- INT32_MAX /*rethink?*/));
+ // Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this
ptr
+ auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
+ st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, nullptr,
&location,
+ INT32_MAX /*rethink?*/,
rowset_ptr.get()));
if (st.is_not_found()) {
continue;
}
RETURN_IF_ERROR(st);
- _row_locations[i] = location;
+ _row_read_ctxs[i]._row_location = location;
+ // acquire and wrap this rowset
+ (*rowset_ptr)->acquire();
+ VLOG_DEBUG << "aquire rowset " << (*rowset_ptr)->unique_id();
+ _row_read_ctxs[i]._rowset_ptr = std::unique_ptr<RowsetSharedPtr,
decltype(&release_rowset)>(
+ rowset_ptr.release(), &release_rowset);
}
return Status::OK();
}
@@ -253,19 +259,19 @@ Status PointQueryExecutor::_lookup_row_key() {
Status PointQueryExecutor::_lookup_row_data() {
// 3. get values
SCOPED_TIMER(&_profile_metrics.lookup_data_ns);
- for (size_t i = 0; i < _row_locations.size(); ++i) {
- if (_cached_row_data[i].valid()) {
+ for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
+ if (_row_read_ctxs[i]._cached_row_data.valid()) {
vectorized::JsonbSerializeUtil::jsonb_to_block(
- *_reusable->tuple_desc(), _cached_row_data[i].data().data,
- _cached_row_data[i].data().size, *_result_block);
+ *_reusable->tuple_desc(),
_row_read_ctxs[i]._cached_row_data.data().data,
+ _row_read_ctxs[i]._cached_row_data.data().size,
*_result_block);
continue;
}
- if (!_row_locations[i].has_value()) {
+ if (!_row_read_ctxs[i]._row_location.has_value()) {
continue;
}
RETURN_IF_ERROR(_tablet->lookup_row_data(
- _primary_keys[i], _row_locations[i].value(),
_reusable->tuple_desc(),
- _result_block.get(),
+ _row_read_ctxs[i]._primary_key,
_row_read_ctxs[i]._row_location.value(),
+ *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(),
_result_block.get(),
!config::disable_storage_row_cache /*whether write row
cache*/));
}
return Status::OK();
diff --git a/be/src/service/point_query_executor.h
b/be/src/service/point_query_executor.h
index 1169817019..9dd5a36822 100644
--- a/be/src/service/point_query_executor.h
+++ b/be/src/service/point_query_executor.h
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
#include "gutil/int128.h"
+#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@@ -253,11 +254,28 @@ private:
Status _output_data();
+ static void release_rowset(RowsetSharedPtr* r) {
+ if (r && *r) {
+ VLOG_DEBUG << "release rowset " << (*r)->unique_id();
+ (*r)->release();
+ }
+ delete r;
+ }
+
+ // Read context for each row
+ struct RowReadContext {
+ RowReadContext() : _rowset_ptr(nullptr, &release_rowset) {}
+ std::string _primary_key;
+ RowCache::CacheHandle _cached_row_data;
+ std::optional<RowLocation> _row_location;
+ // rowset will be aquired during read
+ // and released after used
+ std::unique_ptr<RowsetSharedPtr, decltype(&release_rowset)>
_rowset_ptr;
+ };
+
PTabletKeyLookupResponse* _response;
TabletSharedPtr _tablet;
- std::vector<std::string> _primary_keys;
- std::vector<RowCache::CacheHandle> _cached_row_data;
- std::vector<std::optional<RowLocation>> _row_locations;
+ std::vector<RowReadContext> _row_read_ctxs;
std::shared_ptr<Reusable> _reusable;
std::unique_ptr<vectorized::Block> _result_block;
Metrics _profile_metrics;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]