This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9305725f5ed [branch-3.0](pick) pick #47501 #47535 (#47602)
9305725f5ed is described below
commit 9305725f5ed7e983182eca5f6e4078406efc8266
Author: Gabriel <[email protected]>
AuthorDate: Sat Feb 8 15:15:24 2025 +0800
[branch-3.0](pick) pick #47501 #47535 (#47602)
---
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/olap/parallel_scanner_builder.cpp | 11 +-
be/src/olap/parallel_scanner_builder.h | 5 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 119 ++++++++++++++-------
be/src/pipeline/exec/olap_scan_operator.h | 5 +
be/src/pipeline/exec/operator.h | 2 +
be/src/pipeline/local_exchange/local_exchanger.cpp | 4 +-
be/src/pipeline/pipeline_task.cpp | 3 +
9 files changed, 109 insertions(+), 46 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 73d801f0828..089766214cf 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1031,6 +1031,9 @@ DEFINE_mInt32(segcompaction_num_threads, "5");
// enable java udf and jdbc scannode
DEFINE_Bool(enable_java_support, "true");
+// enable prefetch tablets before opening
+DEFINE_mBool(enable_prefetch_tablet, "true");
+
// Set config randomly to check more issues in github workflow
DEFINE_Bool(enable_fuzzy_mode, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a750c48e921..e3900c0dd86 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1076,6 +1076,9 @@ DECLARE_mInt32(segcompaction_num_threads);
// enable java udf and jdbc scannode
DECLARE_Bool(enable_java_support);
+// enable prefetch tablets before opening
+DECLARE_mBool(enable_prefetch_tablet);
+
// Set config randomly to check more issues in github workflow
DECLARE_Bool(enable_fuzzy_mode);
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index 88c69ab5c9a..103e6341d7c 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -164,17 +164,15 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
*/
Status ParallelScannerBuilder::_load() {
_total_rows = 0;
+ size_t idx = 0;
for (auto&& [tablet, version] : _tablets) {
const auto tablet_id = tablet->tablet_id();
- auto& read_source = _all_read_sources[tablet_id];
- RETURN_IF_ERROR(tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false));
- if (!_state->skip_delete_predicate()) {
- read_source.fill_delete_predicates();
- }
+ _all_read_sources[tablet_id] = _read_sources[idx];
+ const auto& read_source = _all_read_sources[tablet_id];
+
bool enable_segment_cache =
_state->query_options().__isset.enable_segment_cache
?
_state->query_options().enable_segment_cache
: true;
-
for (auto& rs_split : read_source.rs_splits) {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
@@ -190,6 +188,7 @@ Status ParallelScannerBuilder::_load() {
}
_total_rows += rowset->num_rows();
}
+ idx++;
}
_rows_per_scanner = _total_rows / _max_scanners_count;
diff --git a/be/src/olap/parallel_scanner_builder.h
b/be/src/olap/parallel_scanner_builder.h
index 7c6b5648e89..1f371e3129a 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -44,6 +44,7 @@ class ParallelScannerBuilder {
public:
ParallelScannerBuilder(pipeline::OlapScanLocalState* parent,
const std::vector<TabletWithVersion>& tablets,
+ std::vector<TabletReader::ReadSource>& read_sources,
const std::shared_ptr<RuntimeProfile>& profile,
const std::vector<OlapScanRange*>& key_ranges,
RuntimeState* state,
int64_t limit, bool is_dup_mow_key, bool
is_preaggregation)
@@ -54,7 +55,8 @@ public:
_is_dup_mow_key(is_dup_mow_key),
_is_preaggregation(is_preaggregation),
_tablets(tablets.cbegin(), tablets.cend()),
- _key_ranges(key_ranges.cbegin(), key_ranges.cend()) {}
+ _key_ranges(key_ranges.cbegin(), key_ranges.cend()),
+ _read_sources(read_sources) {}
Status build_scanners(std::list<VScannerSPtr>& scanners);
@@ -93,6 +95,7 @@ private:
std::vector<TabletWithVersion> _tablets;
std::vector<OlapScanRange*> _key_ranges;
std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
+ std::vector<TabletReader::ReadSource>& _read_sources;
};
} // namespace doris
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index bc92acc134a..dca47227914 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -22,7 +22,9 @@
#include <memory>
#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
+#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/config.h"
#include "olap/parallel_scanner_builder.h"
#include "olap/storage_engine.h"
@@ -312,33 +314,7 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
state()->query_options().resource_limit.__isset.cpu_limit;
- std::vector<TabletWithVersion> tablets;
- tablets.reserve(_scan_ranges.size());
- for (auto&& scan_range : _scan_ranges) {
- // TODO(plat1ko): Get cloud tablet in parallel
- auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
- int64_t version = 0;
- std::from_chars(scan_range->version.data(),
- scan_range->version.data() +
scan_range->version.size(), version);
- tablets.emplace_back(std::move(tablet), version);
- }
-
- if (config::is_cloud_mode()) {
- int64_t duration_ns = 0;
- {
- SCOPED_RAW_TIMER(&duration_ns);
- std::vector<std::function<Status()>> tasks;
- tasks.reserve(_scan_ranges.size());
- for (auto&& [tablet, version] : tablets) {
- tasks.emplace_back([tablet, version]() {
- return
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
- });
- }
- RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
- }
- _sync_rowset_timer->update(duration_ns);
- }
-
+ RETURN_IF_ERROR(hold_tablets());
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
p._push_down_agg_type == TPushAggOp::NONE &&
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
@@ -351,8 +327,9 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
key_ranges.emplace_back(range.get());
}
- ParallelScannerBuilder scanner_builder(this, tablets,
_scanner_profile, key_ranges, state(),
- p._limit, true,
p._olap_scan_node.is_preaggregation);
+ ParallelScannerBuilder scanner_builder(this, _tablets, _read_sources,
_scanner_profile,
+ key_ranges, state(), p._limit,
true,
+
p._olap_scan_node.is_preaggregation);
int max_scanners_count = state()->parallel_scan_max_scanners_count();
@@ -377,18 +354,19 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
-
- for (auto& scan_range : _scan_ranges) {
- auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+ for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size();
scan_range_idx++) {
int64_t version = 0;
- std::from_chars(scan_range->version.data(),
- scan_range->version.data() +
scan_range->version.size(), version);
+ std::from_chars(_scan_ranges[scan_range_idx]->version.data(),
+ _scan_ranges[scan_range_idx]->version.data() +
+ _scan_ranges[scan_range_idx]->version.size(),
+ version);
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges =
&_cond_ranges;
int size_based_scanners_per_tablet = 1;
if (config::doris_scan_range_max_mb > 0) {
- size_based_scanners_per_tablet = std::max(
- 1, (int)(tablet->tablet_footprint() /
(config::doris_scan_range_max_mb << 20)));
+ size_based_scanners_per_tablet =
+ std::max(1,
(int)(_tablets[scan_range_idx].tablet->tablet_footprint() /
+ (config::doris_scan_range_max_mb <<
20)));
}
int ranges_per_scanner =
std::max(1, (int)ranges->size() /
@@ -410,9 +388,9 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
state(),
_scanner_profile.get(),
scanner_ranges,
- tablet,
+ _tablets[scan_range_idx].tablet,
version,
- {},
+ _read_sources[scan_range_idx],
p._limit,
p._olap_scan_node.is_preaggregation,
});
@@ -420,10 +398,70 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
scanners->push_back(std::move(scanner));
}
}
+ _tablets.clear();
+ _read_sources.clear();
return Status::OK();
}
+Status OlapScanLocalState::hold_tablets() {
+ if (!_tablets.empty()) {
+ return Status::OK();
+ }
+ MonotonicStopWatch timer;
+ timer.start();
+ _tablets.resize(_scan_ranges.size());
+ _read_sources.resize(_scan_ranges.size());
+ for (size_t i = 0; i < _scan_ranges.size(); i++) {
+ int64_t version = 0;
+ std::from_chars(_scan_ranges[i]->version.data(),
+ _scan_ranges[i]->version.data() +
_scan_ranges[i]->version.size(), version);
+ auto tablet =
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id));
+ _tablets[i] = {std::move(tablet), version};
+
+ if (config::is_cloud_mode()) {
+ // FIXME(plat1ko): Avoid pointer cast
+
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(
+ *_tablets[i].tablet);
+ }
+ }
+
+ if (config::is_cloud_mode()) {
+ int64_t duration_ns = 0;
+ {
+ SCOPED_RAW_TIMER(&duration_ns);
+ std::vector<std::function<Status()>> tasks;
+ tasks.reserve(_scan_ranges.size());
+ for (auto&& [cur_tablet, cur_version] : _tablets) {
+ tasks.emplace_back([cur_tablet, cur_version]() {
+ return std::dynamic_pointer_cast<CloudTablet>(cur_tablet)
+ ->sync_rowsets(cur_version);
+ });
+ }
+ RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+ }
+ _sync_rowset_timer->update(duration_ns);
+ }
+ for (size_t i = 0; i < _scan_ranges.size(); i++) {
+ RETURN_IF_ERROR(_tablets[i].tablet->capture_rs_readers(
+ {0, _tablets[i].version}, &_read_sources[i].rs_splits,
+ RuntimeFilterConsumer::_state->skip_missing_version()));
+ if (!PipelineXLocalState<>::_state->skip_delete_predicate()) {
+ _read_sources[i].fill_delete_predicates();
+ }
+ }
+ timer.stop();
+ double cost_secs = static_cast<double>(timer.elapsed_time()) /
NANOS_PER_SEC;
+ if (cost_secs > 5) {
+ LOG_WARNING(
+ "Try to hold tablets costs {} seconds, it costs too much.
(Query-ID={}, NodeId={}, "
+ "ScanRangeNum={})",
+ cost_secs,
print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
+ _scan_ranges.size());
+ }
+ return Status::OK();
+}
+
TOlapScanNode& OlapScanLocalState::olap_scan_node() const {
return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
}
@@ -633,4 +671,9 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool,
const TPlanNode& tnode, i
}
}
+Status OlapScanOperatorX::hold_tablets(RuntimeState* state) {
+ auto& local_state =
ScanOperatorX<OlapScanLocalState>::get_local_state(state);
+ return local_state.hold_tablets();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 8559bfec2a8..d15a61b7d0f 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -22,6 +22,7 @@
#include <string>
#include "common/status.h"
+#include "olap/tablet_reader.h"
#include "operator.h"
#include "pipeline/exec/scan_operator.h"
@@ -49,6 +50,7 @@ public:
std::to_string(_parent->node_id()),
std::to_string(_parent->nereids_id()),
olap_scan_node().table_name);
}
+ Status hold_tablets();
private:
friend class vectorized::NewOlapScanner;
@@ -211,6 +213,8 @@ private:
RuntimeProfile::Counter* _segment_load_index_timer = nullptr;
std::mutex _profile_mtx;
+ std::vector<TabletWithVersion> _tablets;
+ std::vector<TabletReader::ReadSource> _read_sources;
};
class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
@@ -218,6 +222,7 @@ public:
OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs, int parallel_tasks,
const TQueryCacheParam& cache_param);
+ Status hold_tablets(RuntimeState* state) override;
private:
friend class OlapScanLocalState;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index fcd28f96d6e..7abde975fd0 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -647,6 +647,8 @@ public:
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const
{ return true; }
+ // Tablets should be hold before open phase.
+ [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return
Status::OK(); }
Status open(RuntimeState* state) override;
[[nodiscard]] virtual Status get_block(RuntimeState* state,
vectorized::Block* block,
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 23f91cca631..3ec4f537e47 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -150,9 +150,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
const auto* offset_start =
partitioned_block.second.row_idxs->data() +
partitioned_block.second.offset_start;
auto block_wrapper = partitioned_block.first;
+ Defer defer {[&]() {
+ block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
+ }};
RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block,
offset_start,
offset_start +
partitioned_block.second.length));
- block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
} while (mutable_block.rows() < state->batch_size() && !*eos &&
_dequeue_data(local_state, partitioned_block, eos, block));
return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 9d83c475778..5b5698936d7 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -311,6 +311,9 @@ Status PipelineTask::execute(bool* eos) {
query_context()->update_cpu_time(delta_cpu_time);
}};
if (_wait_to_start()) {
+ if (config::enable_prefetch_tablet) {
+ RETURN_IF_ERROR(_source->hold_tablets(_state));
+ }
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]