This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 103c473b96 [Bug](pipeline) fix pipeline shared scan + topn
optimization (#21940)
103c473b96 is described below
commit 103c473b967477b0748a9e7afa9f781f65d9a5eb
Author: Gabriel <[email protected]>
AuthorDate: Tue Jul 25 12:48:27 2023 +0800
[Bug](pipeline) fix pipeline shared scan + topn optimization (#21940)
---
be/src/olap/merger.cpp | 26 +++--
be/src/olap/reader.cpp | 4 +-
be/src/olap/reader.h | 17 ++--
be/src/olap/rowset/beta_rowset_reader.cpp | 10 +-
be/src/olap/rowset/beta_rowset_reader.h | 6 +-
be/src/olap/rowset/rowset_reader.h | 19 +++-
be/src/olap/schema_change.cpp | 17 ++--
be/src/olap/tablet.cpp | 10 +-
be/src/olap/tablet.h | 4 +-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 148 +++++++++++++++-------------
be/src/vec/exec/scan/new_olap_scanner.cpp | 53 +++++-----
be/src/vec/exec/scan/new_olap_scanner.h | 6 +-
be/src/vec/olap/block_reader.cpp | 57 +++++++----
be/src/vec/olap/block_reader.h | 2 +-
be/src/vec/olap/vcollect_iterator.cpp | 25 ++---
be/src/vec/olap/vcollect_iterator.h | 9 +-
be/src/vec/olap/vertical_block_reader.cpp | 18 ++--
be/test/olap/tablet_test.cpp | 8 +-
18 files changed, 240 insertions(+), 199 deletions(-)
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 44fd3d510e..ee1bffa9e2 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -60,7 +60,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
TabletReader::ReaderParams reader_params;
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- reader_params.rs_readers = src_rowset_readers;
+ reader_params.rs_splits.reserve(src_rowset_readers.size());
+ for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
+ reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+ }
reader_params.version = dst_rowset_writer->version();
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
@@ -92,10 +95,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
- for (auto& rs_reader : reader_params.rs_readers) {
-
RETURN_IF_ERROR(rs_reader->get_segment_num_rows(&segment_num_rows));
-
stats_output->rowid_conversion->init_segment_map(rs_reader->rowset()->rowset_id(),
- segment_num_rows);
+ for (auto& rs_split : reader_params.rs_splits) {
+
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
+ stats_output->rowid_conversion->init_segment_map(
+ rs_split.rs_reader->rowset()->rowset_id(),
segment_num_rows);
}
}
@@ -194,7 +197,10 @@ Status Merger::vertical_compact_one_group(
reader_params.is_key_column_group = is_key;
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- reader_params.rs_readers = src_rowset_readers;
+ reader_params.rs_splits.reserve(src_rowset_readers.size());
+ for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
+ reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+ }
reader_params.version = dst_rowset_writer->version();
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
@@ -225,10 +231,10 @@ Status Merger::vertical_compact_one_group(
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
- for (auto& rs_reader : reader_params.rs_readers) {
-
RETURN_IF_ERROR(rs_reader->get_segment_num_rows(&segment_num_rows));
-
stats_output->rowid_conversion->init_segment_map(rs_reader->rowset()->rowset_id(),
- segment_num_rows);
+ for (auto& rs_split : reader_params.rs_splits) {
+
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
+ stats_output->rowid_conversion->init_segment_map(
+ rs_split.rs_reader->rowset()->rowset_id(),
segment_num_rows);
}
}
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 468c6289d5..1a5516f273 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -153,7 +153,7 @@ bool TabletReader::_optimize_for_single_rowset(
}
Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
- if (read_params.rs_readers.empty()) {
+ if (read_params.rs_splits.empty()) {
return Status::InternalError("fail to acquire data sources. tablet={}",
_tablet->full_name());
}
@@ -636,7 +636,7 @@ Status TabletReader::init_reader_params_and_create_block(
for (auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
- reader_params->rs_readers.push_back(std::move(rs_reader));
+ reader_params->rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
}
std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 231ff23d6e..249c199781 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -93,6 +93,16 @@ public:
// Params for Reader,
// mainly include tablet, data version and fetch range.
struct ReaderParams {
+ bool has_single_version() const {
+ return (rs_splits.size() == 1 &&
+ rs_splits[0].rs_reader->rowset()->start_version() == 0 &&
+
!rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) ||
+ (rs_splits.size() == 2 &&
+
rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 &&
+ rs_splits[1].rs_reader->rowset()->start_version() == 2 &&
+
!rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
+ }
+
TabletSharedPtr tablet;
TabletSchemaSPtr tablet_schema;
ReaderType reader_type = ReaderType::READER_QUERY;
@@ -118,12 +128,7 @@ public:
// For unique key table with merge-on-write
DeleteBitmap* delete_bitmap {nullptr};
-
- std::vector<RowsetReaderSharedPtr> rs_readers;
- // if rs_readers_segment_offsets is not empty, means we only scan
- // [pair.first, pair.second) segment in rs_reader, only effective in
dup key
- // and pipeline
- std::vector<std::pair<int, int>> rs_readers_segment_offsets;
+ std::vector<RowSetSplits> rs_splits;
// return_columns is init from query schema
std::vector<uint32_t> return_columns;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 7ba0260801..bdb0e1fca6 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -76,8 +76,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile*
profile) {
Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext*
read_context,
std::vector<RowwiseIteratorUPtr>* out_iters,
- const std::pair<int, int>&
segment_offset,
- bool use_cache) {
+ const RowSetSplits& rs_splits,
bool use_cache) {
RETURN_IF_ERROR(_rowset->load());
_context = read_context;
// The segment iterator is created with its own statistics,
@@ -218,7 +217,7 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// create iterator for each segment
auto& segments = _segment_cache_handle.get_segments();
- auto [seg_start, seg_end] = segment_offset;
+ auto [seg_start, seg_end] = rs_splits.segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
seg_end = segments.size();
@@ -241,12 +240,11 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
return Status::OK();
}
-Status BetaRowsetReader::init(RowsetReaderContext* read_context,
- const std::pair<int, int>& segment_offset) {
+Status BetaRowsetReader::init(RowsetReaderContext* read_context, const
RowSetSplits& rs_splits) {
_context = read_context;
_context->rowset_id = _rowset->rowset_id();
std::vector<RowwiseIteratorUPtr> iterators;
- RETURN_IF_ERROR(get_segment_iterators(_context, &iterators,
segment_offset));
+ RETURN_IF_ERROR(get_segment_iterators(_context, &iterators, rs_splits));
// merge or union segment iterator
if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index 83bc94ea0b..74ef9c96a6 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -46,13 +46,11 @@ public:
~BetaRowsetReader() override { _rowset->release(); }
- Status init(RowsetReaderContext* read_context,
- const std::pair<int, int>& segment_offset) override;
+ Status init(RowsetReaderContext* read_context, const RowSetSplits&
rs_splits) override;
Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIteratorUPtr>* out_iters,
- const std::pair<int, int>& segment_offset,
- bool use_cache = false) override;
+ const RowSetSplits& rs_splits, bool use_cache
= false) override;
void reset_read_options() override;
Status next_block(vectorized::Block* block) override;
Status next_block_view(vectorized::BlockView* block_view) override;
diff --git a/be/src/olap/rowset/rowset_reader.h
b/be/src/olap/rowset/rowset_reader.h
index dadfd0e05c..cbe005b9bd 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -37,17 +37,28 @@ class Block;
class RowsetReader;
using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;
+struct RowSetSplits {
+ RowsetReaderSharedPtr rs_reader;
+
+ // if segment_offsets is not empty, means we only scan
+ // [pair.first, pair.second) segment in rs_reader, only effective in dup
key
+ // and pipeline
+ std::pair<int, int> segment_offsets;
+
+ RowSetSplits(RowsetReaderSharedPtr rs_reader_)
+ : rs_reader(rs_reader_), segment_offsets({0, 0}) {}
+ RowSetSplits() = default;
+};
+
class RowsetReader {
public:
virtual ~RowsetReader() = default;
- // reader init
- virtual Status init(RowsetReaderContext* read_context,
- const std::pair<int, int>& segment_offset = {0, 0}) =
0;
+ virtual Status init(RowsetReaderContext* read_context, const RowSetSplits&
rs_splits = {}) = 0;
virtual Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIteratorUPtr>*
out_iters,
- const std::pair<int, int>&
segment_offset = {0, 0},
+ const RowSetSplits& rs_splits = {},
bool use_cache = false) = 0;
virtual void reset_read_options() = 0;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index aff67f68bb..f2f69f241d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -722,7 +722,7 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
// reader_context is stack variables, it's lifetime should keep the same
// with rs_readers
RowsetReaderContext reader_context;
- std::vector<RowsetReaderSharedPtr> rs_readers;
+ std::vector<RowSetSplits> rs_splits;
// delete handlers for new tablet
DeleteHandler delete_handler;
std::vector<ColumnId> return_columns;
@@ -814,11 +814,11 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
// acquire data sources correspond to history versions
- base_tablet->capture_rs_readers(versions_to_be_changed,
&rs_readers);
- if (rs_readers.empty()) {
+ base_tablet->capture_rs_readers(versions_to_be_changed,
&rs_splits);
+ if (rs_splits.empty()) {
res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
"fail to acquire all data sources. version_num={},
data_source_num={}",
- versions_to_be_changed.size(), rs_readers.size());
+ versions_to_be_changed.size(), rs_splits.size());
break;
}
auto& all_del_preds = base_tablet->delete_predicates();
@@ -846,8 +846,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
reader_context.delete_bitmap =
&base_tablet->tablet_meta()->delete_bitmap();
reader_context.version = Version(0, end_version);
- for (auto& rs_reader : rs_readers) {
- res = rs_reader->init(&reader_context);
+ for (auto& rs_split : rs_splits) {
+ res = rs_split.rs_reader->init(&reader_context);
if (!res) {
LOG(WARNING) << "failed to init rowset reader: " <<
base_tablet->full_name();
break;
@@ -865,7 +865,10 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
DescriptorTbl::create(&sc_params.pool, request.desc_tbl,
&sc_params.desc_tbl);
sc_params.base_tablet = base_tablet;
sc_params.new_tablet = new_tablet;
- sc_params.ref_rowset_readers = rs_readers;
+ sc_params.ref_rowset_readers.reserve(rs_splits.size());
+ for (RowSetSplits& split : rs_splits) {
+ sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
+ }
sc_params.delete_handler = &delete_handler;
sc_params.base_tablet_schema = base_tablet_schema;
sc_params.be_exec_version = request.be_exec_version;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 3f2c1829a5..9348f78119 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -924,16 +924,16 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const
std::vector<Version>&
}
Status Tablet::capture_rs_readers(const Version& spec_version,
- std::vector<RowsetReaderSharedPtr>*
rs_readers) const {
+ std::vector<RowSetSplits>* rs_splits) const {
std::vector<Version> version_path;
RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path));
- RETURN_IF_ERROR(capture_rs_readers(version_path, rs_readers));
+ RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits));
return Status::OK();
}
Status Tablet::capture_rs_readers(const std::vector<Version>& version_path,
- std::vector<RowsetReaderSharedPtr>*
rs_readers) const {
- DCHECK(rs_readers != nullptr && rs_readers->empty());
+ std::vector<RowSetSplits>* rs_splits) const {
+ DCHECK(rs_splits != nullptr && rs_splits->empty());
for (auto version : version_path) {
auto it = _rs_version_map.find(version);
if (it == _rs_version_map.end()) {
@@ -954,7 +954,7 @@ Status Tablet::capture_rs_readers(const
std::vector<Version>& version_path,
return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
"failed to create reader for rowset:{}",
it->second->rowset_id().to_string());
}
- rs_readers->push_back(std::move(rs_reader));
+ rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
}
return Status::OK();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1688dcf2a6..3efd2c89ce 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -184,10 +184,10 @@ public:
Status capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>* rowsets)
const;
Status capture_rs_readers(const Version& spec_version,
- std::vector<RowsetReaderSharedPtr>* rs_readers)
const;
+ std::vector<RowSetSplits>* rs_splits) const;
Status capture_rs_readers(const std::vector<Version>& version_path,
- std::vector<RowsetReaderSharedPtr>* rs_readers)
const;
+ std::vector<RowSetSplits>* rs_splits) const;
const std::vector<RowsetMetaSharedPtr> delete_predicates() {
return _tablet_meta->delete_predicates();
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index cfd9e329f0..f12533431f 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -454,9 +454,9 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
bool is_duplicate_key = false;
- int segment_count = 0;
- std::vector<std::vector<RowsetReaderSharedPtr>>
rowset_readers_vector(_scan_ranges.size());
- std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size());
+ size_t segment_count = 0;
+ std::vector<std::vector<RowSetSplits>>
rowset_splits_vector(_scan_ranges.size());
+ std::vector<std::vector<size_t>> tablet_rs_seg_count(_scan_ranges.size());
// Split tablet segment by scanner, only use in pipeline in duplicate key
// 1. if tablet count lower than scanner thread num, count segment num of
all tablet ready for scan
@@ -484,7 +484,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
// to prevent this case: when there are lots of olap scanners to
run for example 10000
// the rowsets maybe compacted when the last olap scanner starts
Status acquire_reader_st =
- tablet->capture_rs_readers({0, version},
&rowset_readers_vector[i]);
+ tablet->capture_rs_readers({0, version},
&rowset_splits_vector[i]);
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
std::stringstream ss;
@@ -494,31 +494,30 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
return Status::InternalError(ss.str());
}
- for (const auto& rowset_reader : rowset_readers_vector[i]) {
- auto num_segments = rowset_reader->rowset()->num_segments();
+ for (const auto& rowset_splits : rowset_splits_vector[i]) {
+ auto num_segments =
rowset_splits.rs_reader->rowset()->num_segments();
tablet_rs_seg_count[i].emplace_back(num_segments);
segment_count += num_segments;
}
}
}
- auto build_new_scanner = [&](const TPaloScanRange& scan_range,
- const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<RowsetReaderSharedPtr>&
rs_readers,
- const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets) {
- std::shared_ptr<NewOlapScanner> scanner =
NewOlapScanner::create_shared(
- _state, this, _limit_per_scanner,
_olap_scan_node.is_preaggregation, scan_range,
- key_ranges, rs_readers, rs_reader_seg_offsets,
_scanner_profile.get());
-
- RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
- scanner->set_compound_filters(_compound_filters);
- scanners->push_back(scanner);
- return Status::OK();
- };
if (is_duplicate_key) {
- // 2. Split by segment count, each scanner need scan avg segment count
- auto avg_segment_count =
- std::max(segment_count /
config::doris_scanner_thread_pool_thread_num, 1);
+ auto build_new_scanner = [&](const TPaloScanRange& scan_range,
+ const std::vector<OlapScanRange*>&
key_ranges,
+ const std::vector<RowSetSplits>&
rs_splits) {
+ std::shared_ptr<NewOlapScanner> scanner =
NewOlapScanner::create_shared(
+ _state, this, _limit_per_scanner,
_olap_scan_node.is_preaggregation, scan_range,
+ key_ranges, rs_splits, _scanner_profile.get());
+
+ RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
+ scanner->set_compound_filters(_compound_filters);
+ scanners->push_back(scanner);
+ return Status::OK();
+ };
+ // 2. Split segment evenly to each scanner (e.g. each scanner need to
scan `avg_segment_count_per_scanner` segments)
+ const auto avg_segment_count_by_scanner =
+ std::max(segment_count /
config::doris_scanner_thread_pool_thread_num, (size_t)1);
for (int i = 0; i < _scan_ranges.size(); ++i) {
auto& scan_range = _scan_ranges[i];
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&_cond_ranges;
@@ -528,69 +527,84 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
scanner_ranges[j] = (*ranges)[j].get();
}
+ // Segment count in current rowset vector
const auto& rs_seg_count = tablet_rs_seg_count[i];
- int rs_seg_count_index = 0;
- int rs_seg_start_scan = 0;
- int scanner_seg_occupy = 0;
- std::vector<RowsetReaderSharedPtr> rs_readers;
- std::vector<std::pair<int, int>> rs_reader_seg_offsets;
- while (rs_seg_count_index < rs_seg_count.size()) {
+ size_t rowset_idx = 0;
+ size_t segment_idx_to_scan = 0;
+ size_t num_segments_assigned = 0;
+
+ std::vector<RowSetSplits> rs_splits;
+
+ while (rowset_idx < rs_seg_count.size()) {
// do not generator range of segment (0, 0)
- if (rs_seg_count[rs_seg_count_index] == 0) {
- rs_seg_start_scan = 0;
- rs_seg_count_index++;
+ if (rs_seg_count[rowset_idx] == 0) {
+ segment_idx_to_scan = 0;
+ rowset_idx++;
continue;
}
- auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] -
rs_seg_start_scan;
-
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());
-
- if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count)
{
- auto need_add_seg_nums = avg_segment_count -
scanner_seg_occupy;
- rs_reader_seg_offsets.emplace_back(
- rs_seg_start_scan,
- rs_seg_start_scan + need_add_seg_nums); // only
scan need_add_seg_nums
- RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_readers,
- rs_reader_seg_offsets));
-
- rs_seg_start_scan += need_add_seg_nums;
- scanner_seg_occupy = 0;
- rs_readers.clear();
- rs_reader_seg_offsets.clear();
- } else if (scanner_seg_occupy + max_add_seg_nums ==
avg_segment_count) {
- rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
-
rs_seg_count[rs_seg_count_index]);
- RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_readers,
- rs_reader_seg_offsets));
-
- rs_seg_start_scan = 0;
- scanner_seg_occupy = 0;
- rs_readers.clear();
- rs_reader_seg_offsets.clear();
- rs_seg_count_index++;
+ const auto max_add_seg_nums = rs_seg_count[rowset_idx] -
segment_idx_to_scan;
+ rs_splits.emplace_back(RowSetSplits());
+ rs_splits.back().rs_reader =
rowset_splits_vector[i][rowset_idx].rs_reader->clone();
+
+ // if segments assigned to current scanner are already more
than the average count,
+ // this scanner will just scan segments equal to the average
count
+ if (num_segments_assigned + max_add_seg_nums >
avg_segment_count_by_scanner) {
+ auto need_add_seg_nums = avg_segment_count_by_scanner -
num_segments_assigned;
+ rs_splits.back().segment_offsets = {
+ segment_idx_to_scan,
+ segment_idx_to_scan + need_add_seg_nums}; // only
scan need_add_seg_nums
+
+ RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_splits));
+
+ segment_idx_to_scan += need_add_seg_nums;
+ num_segments_assigned = 0;
+ rs_splits.clear();
+ } else if (num_segments_assigned + max_add_seg_nums ==
+ avg_segment_count_by_scanner) {
+ rs_splits.back().segment_offsets = {segment_idx_to_scan,
+
rs_seg_count[rowset_idx]};
+ RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_splits));
+
+ segment_idx_to_scan = 0;
+ num_segments_assigned = 0;
+ rs_splits.clear();
+ rowset_idx++;
} else {
- rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
-
rs_seg_count[rs_seg_count_index]);
+ rs_splits.back().segment_offsets = {segment_idx_to_scan,
+
rs_seg_count[rowset_idx]};
- rs_seg_start_scan = 0;
- scanner_seg_occupy += max_add_seg_nums;
- rs_seg_count_index++;
+ segment_idx_to_scan = 0;
+ num_segments_assigned += max_add_seg_nums;
+ rowset_idx++;
}
}
#ifndef NDEBUG
- for (const auto& offset : rs_reader_seg_offsets) {
- DCHECK_NE(offset.first, offset.second);
+ for (const auto& rs_reader_with_segments : rs_splits) {
+ DCHECK_NE(rs_reader_with_segments.segment_offsets.first,
+ rs_reader_with_segments.segment_offsets.second);
}
#endif
// dispose some segment tail
- if (!rs_readers.empty()) {
- build_new_scanner(*scan_range, scanner_ranges, rs_readers,
rs_reader_seg_offsets);
+ if (!rs_splits.empty()) {
+ build_new_scanner(*scan_range, scanner_ranges, rs_splits);
}
}
} else {
+ auto build_new_scanner = [&](const TPaloScanRange& scan_range,
+ const std::vector<OlapScanRange*>&
key_ranges) {
+ std::shared_ptr<NewOlapScanner> scanner =
NewOlapScanner::create_shared(
+ _state, this, _limit_per_scanner,
_olap_scan_node.is_preaggregation, scan_range,
+ key_ranges, _scanner_profile.get());
+
+ RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
+ scanner->set_compound_filters(_compound_filters);
+ scanners->push_back(scanner);
+ return Status::OK();
+ };
for (auto& scan_range : _scan_ranges) {
auto tablet_id = scan_range->tablet_id;
auto [tablet, status] =
@@ -619,7 +633,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
++j, ++i) {
scanner_ranges.push_back((*ranges)[i].get());
}
- RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges,
{}, {}));
+ RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges));
}
}
}
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index b945d42f6c..decf12ee91 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -63,17 +63,26 @@ namespace doris::vectorized {
NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent,
int64_t limit,
bool aggregation, const TPaloScanRange&
scan_range,
const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<RowsetReaderSharedPtr>&
rs_readers,
- const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets,
RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_aggregation(aggregation),
_version(-1),
_scan_range(scan_range),
_key_ranges(key_ranges) {
- DCHECK(rs_readers.size() == rs_reader_seg_offsets.size());
- _tablet_reader_params.rs_readers = rs_readers;
- _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
+ _tablet_schema = std::make_shared<TabletSchema>();
+ _is_init = false;
+}
+
+NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent,
int64_t limit,
+ bool aggregation, const TPaloScanRange&
scan_range,
+ const std::vector<OlapScanRange*>& key_ranges,
+ const std::vector<RowSetSplits>& rs_splits,
RuntimeProfile* profile)
+ : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
+ _aggregation(aggregation),
+ _version(-1),
+ _scan_range(scan_range),
+ _key_ranges(key_ranges) {
+ _tablet_reader_params.rs_splits = rs_splits;
_tablet_schema = std::make_shared<TabletSchema>();
_is_init = false;
}
@@ -167,7 +176,7 @@ Status NewOlapScanner::init() {
{
std::shared_lock rdlock(_tablet->get_header_lock());
- if (_tablet_reader_params.rs_readers.empty()) {
+ if (_tablet_reader_params.rs_splits.empty()) {
const RowsetSharedPtr rowset =
_tablet->rowset_with_max_version();
if (rowset == nullptr) {
std::stringstream ss;
@@ -181,7 +190,7 @@ Status NewOlapScanner::init() {
// the rowsets maybe compacted when the last olap scanner
starts
Version rd_version(0, _version);
Status acquire_reader_st =
- _tablet->capture_rs_readers(rd_version,
&_tablet_reader_params.rs_readers);
+ _tablet->capture_rs_readers(rd_version,
&_tablet_reader_params.rs_splits);
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
std::stringstream ss;
@@ -237,20 +246,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
const FilterPredicates& filter_predicates,
const std::vector<FunctionFilter>& function_filters) {
// if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
- bool single_version =
- (_tablet_reader_params.rs_readers.size() == 1 &&
- _tablet_reader_params.rs_readers[0]->rowset()->start_version() ==
0 &&
- !_tablet_reader_params.rs_readers[0]
- ->rowset()
- ->rowset_meta()
- ->is_segments_overlapping()) ||
- (_tablet_reader_params.rs_readers.size() == 2 &&
-
_tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 &&
- _tablet_reader_params.rs_readers[1]->rowset()->start_version() ==
2 &&
- !_tablet_reader_params.rs_readers[1]
- ->rowset()
- ->rowset_meta()
- ->is_segments_overlapping());
+ const bool single_version = _tablet_reader_params.has_single_version();
auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);
if (_state->skip_storage_engine_merge()) {
_tablet_reader_params.direct_mode = true;
@@ -408,12 +404,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
// by rowset->update_delayed_expired_timestamp().This could expand the
lifespan of Rowset
if (_tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
constexpr static int delayed_s = 60;
- for (auto rs_reader : _tablet_reader_params.rs_readers) {
+ for (auto rs_reader : _tablet_reader_params.rs_splits) {
uint64_t delayed_expired_timestamp =
UnixSeconds() +
_tablet_reader_params.runtime_state->execution_timeout() +
delayed_s;
-
rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp);
- StorageEngine::instance()->add_quering_rowset(rs_reader->rowset());
+ rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
+ delayed_expired_timestamp);
+
StorageEngine::instance()->add_quering_rowset(rs_reader.rs_reader->rowset());
}
}
@@ -452,10 +449,10 @@ Status NewOlapScanner::_init_return_columns() {
doris::TabletStorageType NewOlapScanner::get_storage_type() {
int local_reader = 0;
- for (const auto& reader : _tablet_reader_params.rs_readers) {
- local_reader += reader->rowset()->is_local();
+ for (const auto& reader : _tablet_reader_params.rs_splits) {
+ local_reader += reader.rs_reader->rowset()->is_local();
}
- int total_reader = _tablet_reader_params.rs_readers.size();
+ int total_reader = _tablet_reader_params.rs_splits.size();
if (local_reader == total_reader) {
return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
@@ -491,7 +488,7 @@ Status NewOlapScanner::close(RuntimeState* state) {
// readers will be release when runtime state deconstructed but
// deconstructor in reader references runtime state
// so that it will core
- _tablet_reader_params.rs_readers.clear();
+ _tablet_reader_params.rs_splits.clear();
_tablet_reader.reset();
RETURN_IF_ERROR(VScanner::close(state));
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index f44e55ee37..a411056097 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -55,10 +55,12 @@ class NewOlapScanner : public VScanner {
public:
NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t
limit, bool aggregation,
const TPaloScanRange& scan_range, const
std::vector<OlapScanRange*>& key_ranges,
- const std::vector<RowsetReaderSharedPtr>& rs_readers,
- const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets,
RuntimeProfile* profile);
+ NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t
limit, bool aggregation,
+ const TPaloScanRange& scan_range, const
std::vector<OlapScanRange*>& key_ranges,
+ const std::vector<RowSetSplits>& rs_splits, RuntimeProfile*
profile);
+
Status init() override;
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index bf513d78b9..b37459974c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -60,28 +60,51 @@ BlockReader::~BlockReader() {
}
}
-bool BlockReader::_rowsets_overlapping(const
std::vector<RowsetReaderSharedPtr>& rs_readers) {
+bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) {
std::string cur_max_key;
- for (const auto& rs_reader : rs_readers) {
+ const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
+ for (const auto& rs_split : rs_splits) {
// version 0-1 of every tablet is empty, just skip this rowset
- if (rs_reader->rowset()->version().second == 1) {
+ if (rs_split.rs_reader->rowset()->version().second == 1) {
continue;
}
- if (rs_reader->rowset()->num_rows() == 0) {
+ if (rs_split.rs_reader->rowset()->num_rows() == 0) {
continue;
}
- if (rs_reader->rowset()->is_segments_overlapping()) {
+ if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
return true;
}
std::string min_key;
- bool has_min_key = rs_reader->rowset()->min_key(&min_key);
+ bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key);
if (!has_min_key) {
return true;
}
if (min_key <= cur_max_key) {
return true;
}
- CHECK(rs_reader->rowset()->max_key(&cur_max_key));
+ CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key));
+ }
+
+ for (const auto& rs_reader : rs_splits) {
+ // version 0-1 of every tablet is empty, just skip this rowset
+ if (rs_reader.rs_reader->rowset()->version().second == 1) {
+ continue;
+ }
+ if (rs_reader.rs_reader->rowset()->num_rows() == 0) {
+ continue;
+ }
+ if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) {
+ return true;
+ }
+ std::string min_key;
+ bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key);
+ if (!has_min_key) {
+ return true;
+ }
+ if (min_key <= cur_max_key) {
+ return true;
+ }
+ CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key));
}
return false;
}
@@ -96,34 +119,28 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params) {
return res;
}
// check if rowsets are noneoverlapping
- _is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers);
+ _is_rowsets_overlapping = _rowsets_overlapping(read_params);
_vcollect_iter.init(this, _is_rowsets_overlapping,
read_params.read_orderby_key,
- read_params.read_orderby_key_reverse,
- read_params.rs_readers_segment_offsets);
+ read_params.read_orderby_key_reverse);
_reader_context.push_down_agg_type_opt =
read_params.push_down_agg_type_opt;
std::vector<RowsetReaderSharedPtr> valid_rs_readers;
- DCHECK(read_params.rs_readers_segment_offsets.empty() ||
- read_params.rs_readers_segment_offsets.size() ==
read_params.rs_readers.size());
- bool is_empty = read_params.rs_readers_segment_offsets.empty();
- for (int i = 0; i < read_params.rs_readers.size(); ++i) {
- auto& rs_reader = read_params.rs_readers[i];
+ for (int i = 0; i < read_params.rs_splits.size(); ++i) {
+ auto& rs_split = read_params.rs_splits[i];
// _vcollect_iter.topn_next() will init rs_reader by itself
if (!_vcollect_iter.use_topn_next()) {
- RETURN_IF_ERROR(rs_reader->init(
- &_reader_context,
- is_empty ? std::pair {0, 0} :
read_params.rs_readers_segment_offsets[i]));
+ RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context,
rs_split));
}
- Status res = _vcollect_iter.add_child(rs_reader);
+ Status res = _vcollect_iter.add_child(rs_split);
if (!res.ok() && !res.is<END_OF_FILE>()) {
LOG(WARNING) << "failed to add child to iterator, err=" << res;
return res;
}
if (res.ok()) {
- valid_rs_readers.push_back(rs_reader);
+ valid_rs_readers.push_back(rs_split.rs_reader);
}
}
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index bf9e20def4..0fe188419e 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -90,7 +90,7 @@ private:
bool _get_next_row_same();
- bool _rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>&
rs_readers);
+ bool _rowsets_overlapping(const ReaderParams& read_params);
VCollectIterator _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index bca58c34f8..78fad44fb7 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -65,8 +65,7 @@ VCollectIterator::~VCollectIterator() {
}
void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping,
bool force_merge,
- bool is_reverse,
- std::vector<std::pair<int, int>>
rs_readers_segment_offsets) {
+ bool is_reverse) {
_reader = reader;
// when aggregate is enabled or key_type is DUP_KEYS, we don't merge
@@ -92,21 +91,18 @@ void VCollectIterator::init(TabletReader* reader, bool
ori_data_overlapping, boo
(_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_reader->_tablet->enable_unique_key_merge_on_write()))) {
_topn_limit = _reader->_reader_context.read_orderby_key_limit;
- // When we use scanner pooling + query with topn_with_limit, we need
it because we initialize our rs_reader
- // in out method but not upstream user. At time we init readers, we
will need to use it.
- _rs_readers_segment_offsets = rs_readers_segment_offsets;
} else {
_topn_limit = 0;
}
}
-Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
+Status VCollectIterator::add_child(const RowSetSplits& rs_splits) {
if (use_topn_next()) {
- _rs_readers.push_back(rs_reader);
+ _rs_splits.push_back(rs_splits);
return Status::OK();
}
- std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader,
_reader));
+ std::unique_ptr<LevelIterator> child(new
Level0Iterator(rs_splits.rs_reader, _reader));
_children.push_back(child.release());
return Status::OK();
}
@@ -288,23 +284,20 @@ Status VCollectIterator::_topn_next(Block* block) {
row_pos_comparator);
if (_is_reverse) {
- std::reverse(_rs_readers.begin(), _rs_readers.end());
+ std::reverse(_rs_splits.begin(), _rs_splits.end());
}
- bool segment_empty = _rs_readers_segment_offsets.empty();
- for (size_t i = 0; i < _rs_readers.size(); i++) {
- const auto& rs_reader = _rs_readers[i];
+ for (size_t i = 0; i < _rs_splits.size(); i++) {
+ const auto& rs_split = _rs_splits[i];
// init will prune segment by _reader_context.conditions and
_reader_context.runtime_conditions
- RETURN_IF_ERROR(
- rs_reader->init(&_reader->_reader_context,
- segment_empty ? std::pair {0, 0} :
_rs_readers_segment_offsets[i]));
+ RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader->_reader_context,
rs_split));
// read _topn_limit rows from this rs
size_t read_rows = 0;
bool eof = false;
while (read_rows < _topn_limit && !eof) {
block->clear_column_data();
- auto status = rs_reader->next_block(block);
+ auto status = rs_split.rs_reader->next_block(block);
if (!status.ok()) {
if (status.is<END_OF_FILE>()) {
eof = true;
diff --git a/be/src/vec/olap/vcollect_iterator.h
b/be/src/vec/olap/vcollect_iterator.h
index 449e1e05d2..7faa47ac2b 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -53,10 +53,9 @@ public:
// Hold reader point to get reader params
~VCollectIterator();
- void init(TabletReader* reader, bool ori_data_overlapping, bool
force_merge, bool is_reverse,
- std::vector<std::pair<int, int>> rs_readers_segment_offsets);
+ void init(TabletReader* reader, bool ori_data_overlapping, bool
force_merge, bool is_reverse);
- Status add_child(RowsetReaderSharedPtr rs_reader);
+ Status add_child(const RowSetSplits& rs_splits);
Status build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
// Get top row of the heap, nullptr if reach end.
@@ -330,9 +329,7 @@ private:
// for topn next
size_t _topn_limit = 0;
bool _topn_eof = false;
- // when we use scanner pooling + query with topn_with_limit, we use it.
- std::vector<RowsetReaderSharedPtr> _rs_readers;
- std::vector<std::pair<int, int>> _rs_readers_segment_offsets;
+ std::vector<RowSetSplits> _rs_splits;
// Hold reader point to access read params, such as fetch conditions.
TabletReader* _reader = nullptr;
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 09757226c3..3023f28408 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -63,23 +63,23 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
return res;
}
_reader_context.is_vertical_compaction = true;
- for (auto& rs_reader : read_params.rs_readers) {
+ for (auto& rs_split : read_params.rs_splits) {
// segment iterator will be inited here
// In vertical compaction, every group will load segment so we should
cache
// segment to avoid tot many s3 head request
- RETURN_IF_ERROR(
- rs_reader->get_segment_iterators(&_reader_context,
segment_iters, {0, 0}, true));
+
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_iterators(&_reader_context,
segment_iters,
+ {}, true));
// if segments overlapping, all segment iterator should be inited in
// heap merge iterator. If segments are none overlapping, only first
segment of this
// rowset will be inited and push to heap, other segment will be
inited later when current
// segment reached it's end.
// Use this iterator_init_flag so we can load few segments in
HeapMergeIterator to save memory
- if (rs_reader->rowset()->is_segments_overlapping()) {
- for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+ if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
+ for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments();
++i) {
iterator_init_flag->push_back(true);
}
} else {
- for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
+ for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments();
++i) {
if (i == 0) {
iterator_init_flag->push_back(true);
continue;
@@ -87,10 +87,10 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
iterator_init_flag->push_back(false);
}
}
- for (int i = 0; i < rs_reader->rowset()->num_segments(); ++i) {
- rowset_ids->push_back(rs_reader->rowset()->rowset_id());
+ for (int i = 0; i < rs_split.rs_reader->rowset()->num_segments(); ++i)
{
+ rowset_ids->push_back(rs_split.rs_reader->rowset()->rowset_id());
}
- rs_reader->reset_read_options();
+ rs_split.rs_reader->reset_read_options();
}
return Status::OK();
}
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 3ae55f9743..838db9b0d6 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -278,13 +278,13 @@ TEST_F(TestTablet, pad_rowset) {
_tablet->init();
Version version(5, 5);
- std::vector<RowsetReaderSharedPtr> readers;
- ASSERT_FALSE(_tablet->capture_rs_readers(version, &readers).ok());
- readers.clear();
+ std::vector<RowSetSplits> splits;
+ ASSERT_FALSE(_tablet->capture_rs_readers(version, &splits).ok());
+ splits.clear();
PadRowsetAction action(nullptr, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN);
action._pad_rowset(_tablet, version);
- ASSERT_TRUE(_tablet->capture_rs_readers(version, &readers).ok());
+ ASSERT_TRUE(_tablet->capture_rs_readers(version, &splits).ok());
}
TEST_F(TestTablet, cooldown_policy) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]