This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c29582bd57 [pipeline](split by segment)support segment split by
scanner (#17738)
c29582bd57 is described below
commit c29582bd57ba50d5a510df3d25e95bfb93cc2fbf
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 16 15:25:52 2023 +0800
[pipeline](split by segment)support segment split by scanner (#17738)
* support segment split by scanner
* change code by cr
---
be/src/common/status.h | 2 -
be/src/olap/reader.h | 5 +
be/src/olap/rowset/beta_rowset_reader.cpp | 21 ++-
be/src/olap/rowset/beta_rowset_reader.h | 6 +-
be/src/olap/rowset/rowset_reader.h | 6 +-
be/src/olap/tablet_manager.cpp | 27 ++-
be/src/olap/tablet_manager.h | 3 +
be/src/vec/exec/scan/new_olap_scan_node.cpp | 203 ++++++++++++++++-----
be/src/vec/exec/scan/new_olap_scanner.cpp | 61 ++++---
be/src/vec/exec/scan/new_olap_scanner.h | 4 +-
be/src/vec/exec/scan/pip_scanner_context.h | 2 -
be/src/vec/olap/block_reader.cpp | 11 +-
be/src/vec/olap/vertical_block_reader.cpp | 3 +-
.../java/org/apache/doris/plugin/AuditEvent.java | 2 +-
14 files changed, 256 insertions(+), 100 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 254c25e528..772eaa2230 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -404,8 +404,6 @@ public:
bool ok() const { return _code == ErrorCode::OK; }
- bool is_blocked_by_sc() const { return _code ==
ErrorCode::PIP_WAIT_FOR_SC; }
-
bool is_io_error() const {
return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH ==
_code ||
ErrorCode::CHECKSUM_ERROR == _code ||
ErrorCode::FILE_DATA_ERROR == _code ||
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 17f7e79286..813369fca9 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -101,6 +101,11 @@ public:
DeleteBitmap* delete_bitmap {nullptr};
std::vector<RowsetReaderSharedPtr> rs_readers;
+ // if rs_readers_segment_offsets is not empty, means we only scan
+ // [pair.first, pair.second) segment in rs_reader, only effective in
dup key
+ // and pipeline
+ std::vector<std::pair<int, int>> rs_readers_segment_offsets;
+
// return_columns is init from query schema
std::vector<uint32_t> return_columns;
// output_columns only contain columns in OrderByExprs and outputExprs
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0f4410a45f..0a4e7ccdb4 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -42,6 +42,10 @@ void BetaRowsetReader::reset_read_options() {
_read_options.key_ranges.clear();
}
+RowsetReaderSharedPtr BetaRowsetReader::clone() {
+ return RowsetReaderSharedPtr(new BetaRowsetReader(_rowset));
+}
+
bool BetaRowsetReader::update_profile(RuntimeProfile* profile) {
if (_iterator != nullptr) {
return _iterator->update_profile(profile);
@@ -51,6 +55,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile*
profile) {
Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext*
read_context,
std::vector<RowwiseIteratorUPtr>* out_iters,
+ const std::pair<int, int>&
segment_offset,
bool use_cache) {
RETURN_NOT_OK(_rowset->load());
_context = read_context;
@@ -174,7 +179,15 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
should_use_cache));
// create iterator for each segment
- for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
+ auto& segments = _segment_cache_handle.get_segments();
+ auto [seg_start, seg_end] = segment_offset;
+ if (seg_start == seg_end) {
+ seg_start = 0;
+ seg_end = segments.size();
+ }
+
+ for (int i = seg_start; i < seg_end; i++) {
+ auto& seg_ptr = segments[i];
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(*_input_schema, _read_options, &iter);
if (!s.ok()) {
@@ -191,13 +204,15 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}
out_iters->push_back(std::move(iter));
}
+
return Status::OK();
}
-Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
+Status BetaRowsetReader::init(RowsetReaderContext* read_context,
+ const std::pair<int, int>& segment_offset) {
_context = read_context;
std::vector<RowwiseIteratorUPtr> iterators;
- RETURN_NOT_OK(get_segment_iterators(_context, &iterators));
+ RETURN_NOT_OK(get_segment_iterators(_context, &iterators, segment_offset));
// merge or union segment iterator
if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index eba2c2885c..45321c6302 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -31,10 +31,12 @@ public:
~BetaRowsetReader() override { _rowset->release(); }
- Status init(RowsetReaderContext* read_context) override;
+ Status init(RowsetReaderContext* read_context,
+ const std::pair<int, int>& segment_offset) override;
Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIteratorUPtr>* out_iters,
+ const std::pair<int, int>& segment_offset,
bool use_cache = false) override;
void reset_read_options() override;
Status next_block(vectorized::Block* block) override;
@@ -66,6 +68,8 @@ public:
bool update_profile(RuntimeProfile* profile) override;
+ RowsetReaderSharedPtr clone() override;
+
private:
bool _should_push_down_value_predicates() const;
diff --git a/be/src/olap/rowset/rowset_reader.h
b/be/src/olap/rowset/rowset_reader.h
index 795f841b0a..3d6b2c6fad 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -41,10 +41,12 @@ public:
virtual ~RowsetReader() = default;
// reader init
- virtual Status init(RowsetReaderContext* read_context) = 0;
+ virtual Status init(RowsetReaderContext* read_context,
+ const std::pair<int, int>& segment_offset = {0, 0}) =
0;
virtual Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIteratorUPtr>*
out_iters,
+ const std::pair<int, int>&
segment_offset = {0, 0},
bool use_cache = false) = 0;
virtual void reset_read_options() = 0;
@@ -73,6 +75,8 @@ public:
}
virtual bool update_profile(RuntimeProfile* profile) = 0;
+
+ virtual RowsetReaderSharedPtr clone() = 0;
};
} // namespace doris
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f7e1ca25a5..5bc7019fda 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -18,36 +18,23 @@
#include "olap/tablet_manager.h"
#include <gen_cpp/Types_types.h>
-#include <rapidjson/document.h>
#include <re2/re2.h>
-#include <thrift/protocol/TDebugProtocol.h>
#include <algorithm>
-#include <boost/algorithm/string.hpp>
-#include <boost/algorithm/string/classification.hpp>
-#include <boost/algorithm/string/split.hpp>
#include <cstdint>
#include <cstdio>
-#include <cstdlib>
#include <filesystem>
-#include "common/compiler_util.h"
#include "env/env.h"
-#include "env/env_util.h"
#include "gutil/strings/strcat.h"
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/data_dir.h"
#include "olap/olap_common.h"
#include "olap/push_handler.h"
-#include "olap/reader.h"
-#include "olap/rowset/rowset_id_generator.h"
-#include "olap/schema_change.h"
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
-#include "olap/utils.h"
-#include "rapidjson/document.h"
#include "rapidjson/prettywriter.h"
#include "rapidjson/stringbuffer.h"
#include "runtime/thread_context.h"
@@ -56,7 +43,6 @@
#include "util/file_utils.h"
#include "util/histogram.h"
#include "util/path_util.h"
-#include "util/pretty_printer.h"
#include "util/scoped_cleanup.h"
#include "util/time.h"
#include "util/trace.h"
@@ -534,6 +520,19 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId
tablet_id, bool include_dele
return _get_tablet_unlocked(tablet_id, include_deleted, err);
}
+std::pair<TabletSharedPtr, Status>
TabletManager::get_tablet_and_status(TTabletId tablet_id,
+ bool
include_deleted) {
+ std::string err;
+ auto tablet = get_tablet(tablet_id, include_deleted, &err);
+ if (tablet == nullptr) {
+ auto err_str = fmt::format("failed to get tablet: {}, reason: {}",
tablet_id, err);
+ LOG(WARNING) << err_str;
+ return {tablet, Status::InternalError(err_str)};
+ }
+
+ return {tablet, Status::OK()};
+}
+
TabletSharedPtr TabletManager::_get_tablet_unlocked(TTabletId tablet_id, bool
include_deleted,
string* err) {
TabletSharedPtr tablet;
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 1b7e9cf89b..dca1b635ad 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -75,6 +75,9 @@ public:
TabletSharedPtr get_tablet(TTabletId tablet_id, bool include_deleted =
false,
std::string* err = nullptr);
+ std::pair<TabletSharedPtr, Status> get_tablet_and_status(TTabletId
tablet_id,
+ bool
include_deleted = false);
+
TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid,
bool include_deleted = false, std::string* err
= nullptr);
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 55babf3066..51e13beb3c 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -17,6 +17,8 @@
#include "vec/exec/scan/new_olap_scan_node.h"
+#include <charconv>
+
#include "common/status.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
@@ -404,57 +406,172 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
- std::unordered_set<std::string> disk_set;
- for (auto& scan_range : _scan_ranges) {
- auto tablet_id = scan_range->tablet_id;
- std::string err;
- TabletSharedPtr tablet =
-
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
- if (tablet == nullptr) {
- auto err_str = fmt::format("failed to get tablet: {}, reason: {}",
tablet_id, err);
- LOG(WARNING) << err_str;
- return Status::InternalError(err_str);
- }
+ bool is_duplicate_key = false;
+ int segment_count = 0;
+ std::vector<std::vector<RowsetReaderSharedPtr>>
rowset_readers_vector(_scan_ranges.size());
+ std::vector<std::vector<int>> tablet_rs_seg_count(_scan_ranges.size());
+
+ // Split tablet segment by scanner, only use in pipeline in duplicate key
+ // 1. if tablet count lower than scanner thread num, count segment num of
all tablet ready for scan
+ // TODO: some tablet may do not have segment, may need split segment all
case
+ if (_shared_scan_opt && _scan_ranges.size() <
config::doris_scanner_thread_pool_thread_num) {
+ for (int i = 0; i < _scan_ranges.size(); ++i) {
+ auto& scan_range = _scan_ranges[i];
+ auto tablet_id = scan_range->tablet_id;
+ auto [tablet, status] =
+
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
+
true);
+ RETURN_IF_ERROR(status);
+
+ is_duplicate_key = tablet->keys_type() == DUP_KEYS;
+ if (!is_duplicate_key) {
+ break;
+ }
- std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&cond_ranges;
- int size_based_scanners_per_tablet = 1;
+ int64_t version = 0;
+ std::from_chars(scan_range->version.c_str(),
+ scan_range->version.c_str() +
scan_range->version.size(), version);
+
+ std::shared_lock rdlock(tablet->get_header_lock());
+ // acquire tablet rowset readers at the beginning of the scan node
+ // to prevent this case: when there are lots of olap scanners to
run for example 10000
+ // the rowsets maybe compacted when the last olap scanner starts
+ Status acquire_reader_st =
+ tablet->capture_rs_readers({0, version},
&rowset_readers_vector[i]);
+ if (!acquire_reader_st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
+ std::stringstream ss;
+ ss << "failed to initialize storage reader. tablet=" <<
tablet->full_name()
+ << ", res=" << acquire_reader_st
+ << ", backend=" << BackendOptions::get_localhost();
+ return Status::InternalError(ss.str());
+ }
- if (config::doris_scan_range_max_mb > 0) {
- size_based_scanners_per_tablet = std::max(
- 1, (int)(tablet->tablet_footprint() /
(config::doris_scan_range_max_mb << 20)));
+ for (const auto& rowset_reader : rowset_readers_vector[i]) {
+ auto num_segments = rowset_reader->rowset()->num_segments();
+ tablet_rs_seg_count[i].emplace_back(num_segments);
+ segment_count += num_segments;
+ }
}
+ }
- int ranges_per_scanner =
- std::max(1, (int)ranges->size() /
- std::min(scanners_per_tablet,
size_based_scanners_per_tablet));
- int num_ranges = ranges->size();
- for (int i = 0; i < num_ranges;) {
- std::vector<doris::OlapScanRange*> scanner_ranges;
- scanner_ranges.push_back((*ranges)[i].get());
- ++i;
- for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
- (*ranges)[i]->end_include == (*ranges)[i -
1]->end_include;
- ++j, ++i) {
- scanner_ranges.push_back((*ranges)[i].get());
+ std::unordered_set<std::string> disk_set;
+ auto build_new_scanner = [&](const TPaloScanRange& scan_range,
+ const std::vector<OlapScanRange*>& key_ranges,
+ const std::vector<RowsetReaderSharedPtr>&
rs_readers,
+ const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets) {
+ NewOlapScanner* scanner = new NewOlapScanner(_state, this,
_limit_per_scanner,
+
_olap_scan_node.is_preaggregation,
+ _need_agg_finalize,
_scanner_profile.get());
+
+ scanner->set_compound_filters(_compound_filters);
+ // add scanner to pool before doing prepare.
+ // so that scanner can be automatically deconstructed if prepare
failed.
+ _scanner_pool.add(scanner);
+ RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges,
_vconjunct_ctx_ptr.get(),
+ _olap_filters, _filter_predicates,
_push_down_functions,
+ _common_vexpr_ctxs_pushdown.get(),
rs_readers,
+ rs_reader_seg_offsets));
+ scanners->push_back((VScanner*)scanner);
+ disk_set.insert(scanner->scan_disk());
+ return Status::OK();
+ };
+ if (is_duplicate_key) {
+ // 2. Split by segment count, each scanner need scan avg segment count
+ auto avg_segment_count =
+ std::max(segment_count /
config::doris_scanner_thread_pool_thread_num, 1);
+ for (int i = 0; i < _scan_ranges.size(); ++i) {
+ auto& scan_range = _scan_ranges[i];
+ std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&cond_ranges;
+ int num_ranges = ranges->size();
+ std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
+ for (int j = 0; j < num_ranges; ++j) {
+ scanner_ranges[j] = (*ranges)[j].get();
+ }
+
+ const auto& rs_seg_count = tablet_rs_seg_count[i];
+ int rs_seg_count_index = 0;
+ int rs_seg_start_scan = 0;
+ int scanner_seg_occupy = 0;
+ std::vector<RowsetReaderSharedPtr> rs_readers;
+ std::vector<std::pair<int, int>> rs_reader_seg_offsets;
+
+ while (rs_seg_count_index < rs_seg_count.size()) {
+ auto max_add_seg_nums = rs_seg_count[rs_seg_count_index] -
rs_seg_start_scan;
+
rs_readers.emplace_back(rowset_readers_vector[i][rs_seg_count_index]->clone());
+
+ if (scanner_seg_occupy + max_add_seg_nums > avg_segment_count)
{
+ auto need_add_seg_nums = avg_segment_count -
scanner_seg_occupy;
+ rs_reader_seg_offsets.emplace_back(
+ rs_seg_start_scan,
+ rs_seg_start_scan + need_add_seg_nums); // only
scan need_add_seg_nums
+ RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_readers,
+ rs_reader_seg_offsets));
+
+ rs_seg_start_scan += need_add_seg_nums;
+ scanner_seg_occupy = 0;
+ rs_readers.clear();
+ rs_reader_seg_offsets.clear();
+ } else if (scanner_seg_occupy + max_add_seg_nums ==
avg_segment_count) {
+ rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
+
rs_seg_count[rs_seg_count_index]);
+ RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_readers,
+ rs_reader_seg_offsets));
+
+ rs_seg_start_scan = 0;
+ scanner_seg_occupy = 0;
+ rs_readers.clear();
+ rs_reader_seg_offsets.clear();
+ rs_seg_count_index++;
+ } else {
+ rs_reader_seg_offsets.emplace_back(rs_seg_start_scan,
+
rs_seg_count[rs_seg_count_index]);
+
+ rs_seg_start_scan = 0;
+ scanner_seg_occupy += max_add_seg_nums;
+ rs_seg_count_index++;
+ }
}
- NewOlapScanner* scanner = new NewOlapScanner(
- _state, this, _limit_per_scanner,
_olap_scan_node.is_preaggregation,
- _need_agg_finalize, _scanner_profile.get());
-
- scanner->set_compound_filters(_compound_filters);
- // add scanner to pool before doing prepare.
- // so that scanner can be automatically deconstructed if prepare
failed.
- _scanner_pool.add(scanner);
- RETURN_IF_ERROR(scanner->prepare(
- *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(),
_olap_filters,
- _filter_predicates, _push_down_functions,
_common_vexpr_ctxs_pushdown.get()));
- scanners->push_back((VScanner*)scanner);
- disk_set.insert(scanner->scan_disk());
+ // dispose some segment tail
+ if (!rs_readers.empty()) {
+ build_new_scanner(*scan_range, scanner_ranges, rs_readers,
rs_reader_seg_offsets);
+ }
+ }
+ } else {
+ for (auto& scan_range : _scan_ranges) {
+ auto tablet_id = scan_range->tablet_id;
+ auto [tablet, status] =
+
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
+
true);
+ RETURN_IF_ERROR(status);
+
+ std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&cond_ranges;
+ int size_based_scanners_per_tablet = 1;
+
+ if (config::doris_scan_range_max_mb > 0) {
+ size_based_scanners_per_tablet =
+ std::max(1, (int)(tablet->tablet_footprint() /
+ (config::doris_scan_range_max_mb <<
20)));
+ }
+ int ranges_per_scanner =
+ std::max(1, (int)ranges->size() /
std::min(scanners_per_tablet,
+
size_based_scanners_per_tablet));
+ int num_ranges = ranges->size();
+ for (int i = 0; i < num_ranges;) {
+ std::vector<doris::OlapScanRange*> scanner_ranges;
+ scanner_ranges.push_back((*ranges)[i].get());
+ ++i;
+ for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
+ (*ranges)[i]->end_include == (*ranges)[i -
1]->end_include;
+ ++j, ++i) {
+ scanner_ranges.push_back((*ranges)[i].get());
+ }
+ RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges,
{}, {}));
+ }
}
+ COUNTER_SET(_num_disks_accessed_counter,
static_cast<int64_t>(disk_set.size()));
}
-
- COUNTER_SET(_num_disks_accessed_counter,
static_cast<int64_t>(disk_set.size()));
// telemetry::set_span_attribute(span, _num_disks_accessed_counter);
// telemetry::set_span_attribute(span, _num_scanners);
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 9a503bd16d..cb41de43c9 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -52,7 +52,9 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
const std::vector<TCondition>& filters,
const FilterPredicates& filter_predicates,
const std::vector<FunctionFilter>&
function_filters,
- VExprContext** common_vexpr_ctxs_pushdown) {
+ VExprContext** common_vexpr_ctxs_pushdown,
+ const std::vector<RowsetReaderSharedPtr>&
rs_readers,
+ const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets) {
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
if (common_vexpr_ctxs_pushdown != nullptr) {
// Copy common_vexpr_ctxs_pushdown from scan node to this scanner's
_common_vexpr_ctxs_pushdown, just necessary.
@@ -70,14 +72,10 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
TTabletId tablet_id = scan_range.tablet_id;
_version = strtoul(scan_range.version.c_str(), nullptr, 10);
{
- std::string err;
- _tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
- if (_tablet.get() == nullptr) {
- std::stringstream ss;
- ss << "failed to get tablet. tablet_id=" << tablet_id << ",
reason=" << err;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
+ auto [tablet, status] =
+
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
true);
+ RETURN_IF_ERROR(status);
+ _tablet = std::move(tablet);
_tablet_schema->copy_from(*_tablet->tablet_schema());
TOlapScanNode& olap_scan_node =
((NewOlapScanNode*)_parent)->_olap_scan_node;
@@ -116,27 +114,32 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
{
std::shared_lock rdlock(_tablet->get_header_lock());
- const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
- if (rowset == nullptr) {
- std::stringstream ss;
- ss << "fail to get latest version of tablet: " << tablet_id;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
+ if (rs_readers.empty()) {
+ const RowsetSharedPtr rowset =
_tablet->rowset_with_max_version();
+ if (rowset == nullptr) {
+ std::stringstream ss;
+ ss << "fail to get latest version of tablet: " <<
tablet_id;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
- // acquire tablet rowset readers at the beginning of the scan node
- // to prevent this case: when there are lots of olap scanners to
run for example 10000
- // the rowsets maybe compacted when the last olap scanner starts
- Version rd_version(0, _version);
- Status acquire_reader_st =
- _tablet->capture_rs_readers(rd_version,
&_tablet_reader_params.rs_readers);
- if (!acquire_reader_st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
- std::stringstream ss;
- ss << "failed to initialize storage reader. tablet=" <<
_tablet->full_name()
- << ", res=" << acquire_reader_st
- << ", backend=" << BackendOptions::get_localhost();
- return Status::InternalError(ss.str());
+ // acquire tablet rowset readers at the beginning of the scan
node
+ // to prevent this case: when there are lots of olap scanners
to run for example 10000
+ // the rowsets maybe compacted when the last olap scanner
starts
+ Version rd_version(0, _version);
+ Status acquire_reader_st =
+ _tablet->capture_rs_readers(rd_version,
&_tablet_reader_params.rs_readers);
+ if (!acquire_reader_st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
+ std::stringstream ss;
+ ss << "failed to initialize storage reader. tablet=" <<
_tablet->full_name()
+ << ", res=" << acquire_reader_st
+ << ", backend=" << BackendOptions::get_localhost();
+ return Status::InternalError(ss.str());
+ }
+ } else {
+ _tablet_reader_params.rs_readers = rs_readers;
+ _tablet_reader_params.rs_readers_segment_offsets =
rs_reader_seg_offsets;
}
// Initialize tablet_reader_params
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index 83968e2535..2a04e021db 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -45,7 +45,9 @@ public:
VExprContext** vconjunct_ctx_ptr, const
std::vector<TCondition>& filters,
const FilterPredicates& filter_predicates,
const std::vector<FunctionFilter>& function_filters,
- VExprContext** common_vexpr_ctxs_pushdown);
+ VExprContext** common_vexpr_ctxs_pushdown,
+ const std::vector<RowsetReaderSharedPtr>& rs_readers = {},
+ const std::vector<std::pair<int, int>>&
rs_reader_seg_offsets = {});
const std::string& scan_disk() const { return _tablet->data_dir()->path();
}
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 0e418256c2..6a38617a9d 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -84,7 +84,6 @@ public:
void set_max_queue_size(int max_queue_size) override {
for (int i = 0; i < max_queue_size; ++i) {
- _blocks_queue_empty.emplace_back(true);
_queue_mutexs.emplace_back(new std::mutex);
_blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
}
@@ -92,7 +91,6 @@ public:
private:
int _next_queue_to_feed = 0;
- std::vector<bool> _blocks_queue_empty;
std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
};
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index a30abe2650..a9dbcd6226 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -75,10 +75,17 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params) {
_reader_context.push_down_agg_type_opt =
read_params.push_down_agg_type_opt;
std::vector<RowsetReaderSharedPtr> valid_rs_readers;
- for (auto& rs_reader : read_params.rs_readers) {
+ DCHECK(read_params.rs_readers_segment_offsets.empty() ||
+ read_params.rs_readers_segment_offsets.size() ==
read_params.rs_readers.size());
+
+ bool is_empty = read_params.rs_readers_segment_offsets.empty();
+ for (int i = 0; i < read_params.rs_readers.size(); ++i) {
+ auto& rs_reader = read_params.rs_readers[i];
// _vcollect_iter.topn_next() will init rs_reader by itself
if (!_vcollect_iter.use_topn_next()) {
- RETURN_NOT_OK(rs_reader->init(&_reader_context));
+ RETURN_NOT_OK(rs_reader->init(
+ &_reader_context,
+ is_empty ? std::pair {0, 0} :
read_params.rs_readers_segment_offsets[i]));
}
Status res = _vcollect_iter.add_child(rs_reader);
if (!res.ok() && !res.is<END_OF_FILE>()) {
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 7195ed3381..097ddb7513 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -53,7 +53,8 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
// segment iterator will be inited here
// In vertical compaction, every group will load segment so we should
cache
// segment to avoid tot many s3 head request
- RETURN_NOT_OK(rs_reader->get_segment_iterators(&_reader_context,
segment_iters, true));
+ RETURN_NOT_OK(
+ rs_reader->get_segment_iterators(&_reader_context,
segment_iters, {0, 0}, true));
// if segments overlapping, all segment iterator should be inited in
// heap merge iterator. If segments are none overlapping, only first
segment of this
// rowset will be inited and push to heap, other segment will be
inited later when current
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 597d156ea0..71ffa474b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -64,7 +64,7 @@ public class AuditEvent {
public int errorCode = 0;
@AuditField(value = "ErrorMessage")
public String errorMessage = "";
- @AuditField(value = "Time")
+ @AuditField(value = "Time(ms)")
public long queryTime = -1;
@AuditField(value = "ScanBytes")
public long scanBytes = -1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]