This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e993b6a5e1f [feature-WIP](query cache) cache tablets aggregate result,
BE part (#40171) (#42375)
e993b6a5e1f is described below
commit e993b6a5e1f8935fbbc427d240751023dcbeb0ca
Author: HappenLee <[email protected]>
AuthorDate: Thu Oct 24 21:15:27 2024 +0800
[feature-WIP](query cache) cache tablets aggregate result, BE part (#40171)
(#42375)
support cache tablets aggregate result
for example
SQL 1:
```sql
select key, sum(value)
from tbl
where dt between '2024-08-01' and '2024-08-10'
group by key
```
SQL 2:
```sql
select key, sum(value)
from tbl
where dt between '2024-08-5' and '2024-08-15'
group by key
```
SQL 1 will update the tablets aggregate result which partition between
'2024-08-01' and '2024-08-10'.
Then SQL 2 will reuse the tablets aggregate which partition between
'2024-08-05' and '2024-08-10', and compute aggregate which partition
between '2024-08-11' and '2024-08-15'
We only support simple aggregate which not contains join with runtime
filter, at present.
```sql
set enable_query_cache=true;
```
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 3 +
be/src/pipeline/dependency.h | 7 +
be/src/pipeline/exec/cache_sink_operator.cpp | 73 +++++++++
be/src/pipeline/exec/cache_sink_operator.h | 73 +++++++++
be/src/pipeline/exec/cache_source_operator.cpp | 199 +++++++++++++++++++++++++
be/src/pipeline/exec/cache_source_operator.h | 104 +++++++++++++
be/src/pipeline/exec/olap_scan_operator.cpp | 34 +++--
be/src/pipeline/exec/olap_scan_operator.h | 4 +-
be/src/pipeline/exec/operator.cpp | 7 +-
be/src/pipeline/pipeline_fragment_context.cpp | 80 ++++++++--
be/src/pipeline/query_cache/query_cache.cpp | 69 +++++++++
be/src/pipeline/query_cache/query_cache.h | 151 +++++++++++++++++++
be/src/runtime/exec_env.h | 7 +
be/src/runtime/exec_env_init.cpp | 9 +-
be/src/runtime/memory/cache_policy.h | 3 +
be/src/vec/core/block.cpp | 2 -
17 files changed, 802 insertions(+), 25 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b077deac04f..5a8c607bad7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1374,6 +1374,8 @@ DEFINE_mInt32(lz4_compression_block_size, "262144");
DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");
+DEFINE_Int32(query_cache_size, "512");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 734d73f46d8..7e70e067f3a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1454,6 +1454,9 @@ DECLARE_mInt32(lz4_compression_block_size);
DECLARE_mBool(enable_pipeline_task_leakage_detect);
+// MB
+DECLARE_Int32(query_cache_size);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index d0c16a3ff5a..9364170898d 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -35,6 +35,7 @@
#include "pipeline/exec/join/process_hash_table_probe.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
#include "vec/core/types.h"
#include "vec/spill/spill_stream.h"
@@ -541,6 +542,12 @@ public:
const int _child_count;
};
+struct CacheSharedState : public BasicSharedState {
+ ENABLE_FACTORY_CREATOR(CacheSharedState)
+public:
+ DataQueue data_queue;
+};
+
class MultiCastDataStreamer;
struct MultiCastSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/exec/cache_sink_operator.cpp
b/be/src/pipeline/exec/cache_sink_operator.cpp
new file mode 100644
index 00000000000..b8b5b534659
--- /dev/null
+++ b/be/src/pipeline/exec/cache_sink_operator.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cache_sink_operator.h"
+
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/status.h"
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+
+namespace doris::pipeline {
+
+Status CacheSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo&
info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_init_timer);
+ _shared_state->data_queue.set_sink_dependency(_dependency, 0);
+ return Status::OK();
+}
+
+Status CacheSinkLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ RETURN_IF_ERROR(Base::open(state));
+ // auto& p = _parent->cast<Parent>();
+
+
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
+ return Status::OK();
+}
+
+CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int child_id)
+ : Base(sink_id, child_id, child_id) {
+ _name = "CACHE_SINK_OPERATOR";
+}
+
+Status CacheSinkOperatorX::open(RuntimeState* state) {
+ return Status::OK();
+}
+
+Status CacheSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+
+ if (in_block->rows() > 0) {
+ local_state._shared_state->data_queue.push_block(
+ vectorized::Block::create_unique(std::move(*in_block)), 0);
+ }
+ if (UNLIKELY(eos)) {
+ local_state._shared_state->data_queue.set_finish(0);
+ }
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/cache_sink_operator.h
b/be/src/pipeline/exec/cache_sink_operator.h
new file mode 100644
index 00000000000..9c4beb48df2
--- /dev/null
+++ b/be/src/pipeline/exec/cache_sink_operator.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class CacheSinkOperatorX;
+class CacheSinkLocalState final : public
PipelineXSinkLocalState<CacheSharedState> {
+public:
+ ENABLE_FACTORY_CREATOR(CacheSinkLocalState);
+ CacheSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) :
Base(parent, state) {}
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override;
+ friend class CacheSinkOperatorX;
+ using Base = PipelineXSinkLocalState<CacheSharedState>;
+ using Parent = CacheSinkOperatorX;
+};
+
+class CacheSinkOperatorX final : public DataSinkOperatorX<CacheSinkLocalState>
{
+public:
+ using Base = DataSinkOperatorX<CacheSinkLocalState>;
+
+ friend class CacheSinkLocalState;
+ CacheSinkOperatorX(int sink_id, int child_id);
+ ~CacheSinkOperatorX() override = default;
+ Status init(const TDataSink& tsink) override {
+ return Status::InternalError("{} should not init with TDataSink",
+
DataSinkOperatorX<CacheSinkLocalState>::_name);
+ }
+
+ Status open(RuntimeState* state) override;
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
+
+ std::shared_ptr<BasicSharedState> create_shared_state() const override {
+ std::shared_ptr<BasicSharedState> ss =
std::make_shared<CacheSharedState>();
+ ss->id = operator_id();
+ for (auto& dest : dests_id()) {
+ ss->related_op_ids.insert(dest);
+ }
+ return ss;
+ }
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp
b/be/src/pipeline/exec/cache_source_operator.cpp
new file mode 100644
index 00000000000..5f8c5befc6a
--- /dev/null
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/cache_source_operator.h"
+
+#include <functional>
+#include <utility>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+
+Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_init_timer);
+ ((CacheSharedState*)_dependency->shared_state())
+
->data_queue.set_source_dependency(_shared_state->source_deps.front());
+ const auto& scan_ranges = info.scan_ranges;
+ bool hit_cache = false;
+ if (scan_ranges.size() > 1) {
+ return Status::InternalError("CacheSourceOperator only support one
scan range, plan error");
+ }
+
+ const auto& cache_param =
_parent->cast<CacheSourceOperatorX>()._cache_param;
+ // 1. init the slot orders
+ const auto& tuple_descs =
_parent->cast<CacheSourceOperatorX>().row_desc().tuple_descriptors();
+ for (auto tuple_desc : tuple_descs) {
+ for (auto slot_desc : tuple_desc->slots()) {
+ if (cache_param.output_slot_mapping.find(slot_desc->id()) !=
+ cache_param.output_slot_mapping.end()) {
+
_slot_orders.emplace_back(cache_param.output_slot_mapping.at(slot_desc->id()));
+ } else {
+ return Status::InternalError(
+ fmt::format("Cache can find the mapping slot id {},
node id {}",
+ slot_desc->id(), cache_param.node_id));
+ }
+ }
+ }
+
+ // 2. build cache key by digest_tablet_id
+ RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param,
&_cache_key, &_version));
+ _runtime_profile->add_info_string(
+ "CacheTabletId",
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
+
+ // 3. lookup the cache and find proper slot order
+ hit_cache = QueryCache::instance()->lookup(_cache_key, _version,
&_query_cache_handle);
+ _runtime_profile->add_info_string("HitCache", hit_cache ? "1" : "0");
+ if (hit_cache && !cache_param.force_refresh_query_cache) {
+ _hit_cache_results = _query_cache_handle.get_cache_result();
+ auto hit_cache_slot_orders =
_query_cache_handle.get_cache_slot_orders();
+
+ bool need_reorder = _slot_orders.size() !=
hit_cache_slot_orders->size();
+ if (!need_reorder) {
+ for (int i = 0; i < _slot_orders.size(); ++i) {
+ need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i];
+ }
+ }
+
+ if (need_reorder) {
+ for (auto slot_id : _slot_orders) {
+ auto find_res = std::find(hit_cache_slot_orders->begin(),
+ hit_cache_slot_orders->end(),
slot_id);
+ if (find_res != hit_cache_slot_orders->end()) {
+ _hit_cache_column_orders.emplace_back(find_res -
+
hit_cache_slot_orders->begin());
+ } else {
+ return Status::InternalError(fmt::format(
+ "Cache can find the mapping slot id {}, node id
{}, "
+ "hit_cache_column_orders [{}]",
+ slot_id, cache_param.node_id,
fmt::join(*hit_cache_slot_orders, ",")));
+ }
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status CacheSourceLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ RETURN_IF_ERROR(Base::open(state));
+
+ return Status::OK();
+}
+
+std::string CacheSourceLocalState::debug_string(int indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
+ if (_shared_state) {
+ fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish =
{}, has_data = {})",
+ _shared_state->data_queue.is_all_finish(),
+ _shared_state->data_queue.remaining_has_data());
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
+Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+
+ block->clear_column_data(_row_descriptor.num_materialized_slots());
+ bool need_clone_empty = block->columns() == 0;
+
+ if (local_state._hit_cache_results == nullptr) {
+ Defer insert_cache([&] {
+ if (*eos && local_state._need_insert_cache) {
+ local_state._runtime_profile->add_info_string("InsertCache",
"1");
+ local_state._global_cache->insert(local_state._cache_key,
local_state._version,
+
local_state._local_cache_blocks,
+ local_state._slot_orders,
+
local_state._current_query_cache_bytes);
+ local_state._local_cache_blocks.clear();
+ }
+ });
+
+ std::unique_ptr<vectorized::Block> output_block;
+ int child_idx = 0;
+
RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block,
+
&child_idx));
+ // Here, check the value of `_has_data(state)` again after
`data_queue.is_all_finish()` is TRUE
+ // as there may be one or more blocks when
`data_queue.is_all_finish()` is TRUE.
+ *eos = !_has_data(state) &&
local_state._shared_state->data_queue.is_all_finish();
+
+ if (!output_block) {
+ return Status::OK();
+ }
+
+ if (local_state._need_insert_cache) {
+ if (need_clone_empty) {
+ *block = output_block->clone_empty();
+ }
+ RETURN_IF_ERROR(
+
vectorized::MutableBlock::build_mutable_block(block).merge(*output_block));
+ local_state._current_query_cache_rows += output_block->rows();
+ auto mem_consume = output_block->allocated_bytes();
+ local_state._current_query_cache_bytes += mem_consume;
+ local_state._mem_tracker->consume(mem_consume);
+
+ if (_cache_param.entry_max_bytes <
local_state._current_query_cache_bytes ||
+ _cache_param.entry_max_rows <
local_state._current_query_cache_rows) {
+ // over the max bytes, pass through the data, no need to do
cache
+ local_state._local_cache_blocks.clear();
+ local_state._need_insert_cache = false;
+ local_state._runtime_profile->add_info_string("InsertCache",
"0");
+ } else {
+
local_state._local_cache_blocks.emplace_back(std::move(output_block));
+ }
+ } else {
+ *block = std::move(*output_block);
+ }
+ } else {
+ if (local_state._hit_cache_pos <
local_state._hit_cache_results->size()) {
+ const auto& hit_cache_block =
+
local_state._hit_cache_results->at(local_state._hit_cache_pos++);
+ if (need_clone_empty) {
+ *block = hit_cache_block->clone_empty();
+ }
+ RETURN_IF_ERROR(
+
vectorized::MutableBlock::build_mutable_block(block).merge(*hit_cache_block));
+ if (!local_state._hit_cache_column_orders.empty()) {
+ auto datas = block->get_columns_with_type_and_name();
+ block->clear();
+ for (auto loc : local_state._hit_cache_column_orders) {
+ block->insert(datas[loc]);
+ }
+ }
+ } else {
+ *eos = true;
+ }
+ }
+
+ local_state.reached_limit(block, eos);
+ return Status::OK();
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/cache_source_operator.h
b/be/src/pipeline/exec/cache_source_operator.h
new file mode 100644
index 00000000000..e764323846b
--- /dev/null
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/query_cache/query_cache.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+namespace pipeline {
+class DataQueue;
+
+class CacheSourceOperatorX;
+class CacheSourceLocalState final : public
PipelineXLocalState<CacheSharedState> {
+public:
+ ENABLE_FACTORY_CREATOR(CacheSourceLocalState);
+ using Base = PipelineXLocalState<CacheSharedState>;
+ using Parent = CacheSourceOperatorX;
+ CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) :
Base(state, parent) {};
+
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status open(RuntimeState* state) override;
+
+ [[nodiscard]] std::string debug_string(int indentation_level = 0) const
override;
+
+private:
+ friend class CacheSourceOperatorX;
+ friend class OperatorX<CacheSourceLocalState>;
+
+ QueryCache* _global_cache = QueryCache::instance();
+
+ std::string _cache_key {};
+ int64_t _version = 0;
+ std::vector<vectorized::BlockUPtr> _local_cache_blocks;
+ std::vector<int> _slot_orders;
+ size_t _current_query_cache_bytes = 0;
+ size_t _current_query_cache_rows = 0;
+ bool _need_insert_cache = true;
+
+ QueryCacheHandle _query_cache_handle;
+ std::vector<vectorized::BlockUPtr>* _hit_cache_results = nullptr;
+ std::vector<int> _hit_cache_column_orders;
+ int _hit_cache_pos = 0;
+};
+
+class CacheSourceOperatorX final : public OperatorX<CacheSourceLocalState> {
+public:
+ using Base = OperatorX<CacheSourceLocalState>;
+ CacheSourceOperatorX(ObjectPool* pool, int plan_node_id, int operator_id,
+ const TQueryCacheParam& cache_param)
+ : Base(pool, plan_node_id, operator_id), _cache_param(cache_param)
{
+ _op_name = "CACHE_SOURCE_OPERATOR";
+ };
+ ~CacheSourceOperatorX() override = default;
+ Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
+
+ bool is_source() const override { return true; }
+
+ Status open(RuntimeState* state) override {
+ static_cast<void>(Base::open(state));
+ return Status::OK();
+ }
+
+ const RowDescriptor& intermediate_row_desc() const override {
+ return _child->intermediate_row_desc();
+ }
+ RowDescriptor& row_descriptor() override { return
_child->row_descriptor(); }
+ const RowDescriptor& row_desc() const override { return
_child->row_desc(); }
+
+private:
+ TQueryCacheParam _cache_param;
+ bool _has_data(RuntimeState* state) const {
+ auto& local_state = get_local_state(state);
+ return local_state._shared_state->data_queue.remaining_has_data();
+ }
+ friend class CacheSourceLocalState;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index aa6f0ed49f0..09e999d4737 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -29,15 +29,13 @@
#include "olap/tablet_manager.h"
#include "pipeline/common/runtime_filter_consumer.h"
#include "pipeline/exec/scan_operator.h"
+#include "pipeline/query_cache/query_cache.h"
#include "service/backend_options.h"
#include "util/to_string.h"
#include "vec/exec/scan/new_olap_scanner.h"
-#include "vec/exec/scan/vscan_node.h"
-#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vectorized_fn_call.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vin_predicate.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/in.h"
@@ -147,7 +145,6 @@ Status OlapScanLocalState::_init_profile() {
ADD_COUNTER(_segment_profile, "InvertedIndexDowngradeCount",
TUnit::UNIT);
_output_index_result_column_timer = ADD_TIMER(_segment_profile,
"OutputIndexResultColumnTimer");
-
_filtered_segment_counter = ADD_COUNTER(_segment_profile,
"NumSegmentFiltered", TUnit::UNIT);
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal",
TUnit::UNIT);
_tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
@@ -395,10 +392,25 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() const
{
void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) {
- for (auto& scan_range : scan_ranges) {
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- _scan_ranges.emplace_back(new
TPaloScanRange(scan_range.scan_range.palo_scan_range));
- COUNTER_UPDATE(_tablet_counter, 1);
+ const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param;
+ bool hit_cache = false;
+ if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache)
{
+ std::string cache_key;
+ int64_t version = 0;
+ auto status = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+ if (!status.ok()) {
+ throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
status.msg());
+ }
+ doris::QueryCacheHandle handle;
+ hit_cache = QueryCache::instance()->lookup(cache_key, version,
&handle);
+ }
+
+ if (!hit_cache) {
+ for (auto& scan_range : scan_ranges) {
+ DCHECK(scan_range.scan_range.__isset.palo_scan_range);
+ _scan_ranges.emplace_back(new
TPaloScanRange(scan_range.scan_range.palo_scan_range));
+ COUNTER_UPDATE(_tablet_counter, 1);
+ }
}
}
@@ -572,9 +584,11 @@ void OlapScanLocalState::add_filter_info(int id, const
PredicateFilterInfo& upda
}
OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id,
- const DescriptorTbl& descs, int
parallel_tasks)
+ const DescriptorTbl& descs, int
parallel_tasks,
+ const TQueryCacheParam& param)
: ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs,
parallel_tasks),
- _olap_scan_node(tnode.olap_scan_node) {
+ _olap_scan_node(tnode.olap_scan_node),
+ _cache_param(param) {
_output_tuple_id = tnode.olap_scan_node.tuple_id;
if (_olap_scan_node.__isset.sort_info &&
_olap_scan_node.__isset.sort_limit) {
_limit_per_scanner = _olap_scan_node.sort_limit;
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 6a03a46e65e..4465ce5690e 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -198,11 +198,13 @@ private:
class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
public:
OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
- const DescriptorTbl& descs, int parallel_tasks);
+ const DescriptorTbl& descs, int parallel_tasks,
+ const TQueryCacheParam& cache_param);
private:
friend class OlapScanLocalState;
TOlapScanNode _olap_scan_node;
+ TQueryCacheParam _cache_param;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index d65769254b9..4a93bac67fe 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -17,7 +17,6 @@
#include "operator.h"
-#include "common/logging.h"
#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/aggregation_sink_operator.h"
@@ -25,6 +24,8 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/cache_sink_operator.h"
+#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
#include "pipeline/exec/empty_set_operator.h"
@@ -694,6 +695,7 @@ DECLARE_OPERATOR(SetSinkLocalState<true>)
DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
+DECLARE_OPERATOR(CacheSinkLocalState)
#undef DECLARE_OPERATOR
@@ -725,6 +727,7 @@ DECLARE_OPERATOR(SchemaScanLocalState)
DECLARE_OPERATOR(MetaScanLocalState)
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
+DECLARE_OPERATOR(CacheSourceLocalState)
#undef DECLARE_OPERATOR
@@ -754,6 +757,7 @@ template class
PipelineXSinkLocalState<MultiCastSharedState>;
template class PipelineXSinkLocalState<SetSharedState>;
template class PipelineXSinkLocalState<LocalExchangeSharedState>;
template class PipelineXSinkLocalState<BasicSharedState>;
+template class PipelineXSinkLocalState<CacheSharedState>;
template class PipelineXLocalState<HashJoinSharedState>;
template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -765,6 +769,7 @@ template class PipelineXLocalState<AggSharedState>;
template class PipelineXLocalState<PartitionedAggSharedState>;
template class PipelineXLocalState<FakeSharedState>;
template class PipelineXLocalState<UnionSharedState>;
+template class PipelineXLocalState<CacheSharedState>;
template class PipelineXLocalState<MultiCastSharedState>;
template class PipelineXLocalState<PartitionSortNodeSharedState>;
template class PipelineXLocalState<SetSharedState>;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b1ee5933d27..7f3fa348237 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -43,6 +43,8 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/cache_sink_operator.h"
+#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
#include "pipeline/exec/empty_set_operator.h"
@@ -1210,10 +1212,13 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
+ bool enable_query_cache = request.fragment.__isset.query_cache_param;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
- op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
+ op.reset(new OlapScanOperatorX(
+ pool, tnode, next_operator_id(), descs, _num_instances,
+ enable_query_cache ? request.fragment.query_cache_param :
TQueryCacheParam {}));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
if (request.__isset.parallel_instances) {
cur_pipe->set_num_tasks(request.parallel_instances);
@@ -1286,6 +1291,26 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
": group by and output is empty");
}
+ auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
+ auto cache_node_id =
request.local_params[0].per_node_scan_ranges.begin()->first;
+ auto cache_source_id = next_operator_id();
+ op.reset(new CacheSourceOperatorX(pool, cache_node_id,
cache_source_id,
+
request.fragment.query_cache_param));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+ const auto downstream_pipeline_id = cur_pipe->id();
+ if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+ _dag.insert({downstream_pipeline_id, {}});
+ }
+ new_pipe = add_pipeline(cur_pipe);
+ _dag[downstream_pipeline_id].push_back(new_pipe->id());
+
+ DataSinkOperatorPtr cache_sink(
+ new CacheSinkOperatorX(next_sink_operator_id(),
cache_source_id));
+ cache_sink->set_dests_id({op->operator_id()});
+ RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
+ return Status::OK();
+ };
const bool group_by_limit_opt =
tnode.agg_node.__isset.agg_sort_info_by_group_key &&
tnode.limit > 0;
@@ -1298,24 +1323,59 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
- op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs,
-
_require_bucket_distribution));
-
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- _require_bucket_distribution =
- _require_bucket_distribution ||
op->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ if (enable_query_cache) {
+ PipelinePtr new_pipe;
+ RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+
+ op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs,
+
_require_bucket_distribution));
+ op->set_followed_by_shuffled_operator(false);
+ _require_bucket_distribution = true;
+ RETURN_IF_ERROR(new_pipe->add_operator(op));
+ RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
+ cur_pipe = new_pipe;
+ } else {
+ op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs,
+
_require_bucket_distribution));
+
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
op->require_data_distribution();
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ }
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
!tnode.agg_node.grouping_exprs.empty()) {
- op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ if (enable_query_cache) {
+ PipelinePtr new_pipe;
+ RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+
+ op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(op));
+ cur_pipe = new_pipe;
+ } else {
+ op.reset(new StreamingAggOperatorX(pool, next_operator_id(),
tnode, descs));
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ }
} else {
+ // create new pipeline to add query cache operator
+ PipelinePtr new_pipe;
+ if (enable_query_cache) {
+ RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+ }
+
if (enable_spill) {
op.reset(new PartitionedAggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
} else {
op.reset(new AggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
}
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ if (enable_query_cache) {
+ RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
+ RETURN_IF_ERROR(new_pipe->add_operator(op));
+ cur_pipe = new_pipe;
+ } else {
+ RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ }
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
diff --git a/be/src/pipeline/query_cache/query_cache.cpp
b/be/src/pipeline/query_cache/query_cache.cpp
new file mode 100644
index 00000000000..20e342e140f
--- /dev/null
+++ b/be/src/pipeline/query_cache/query_cache.cpp
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "query_cache.h"
+
+namespace doris {
+
+std::vector<int>* QueryCacheHandle::get_cache_slot_orders() {
+ DCHECK(_handle);
+ auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value;
+ return &((QueryCache::CacheValue*)(result_ptr))->slot_orders;
+}
+
+CacheResult* QueryCacheHandle::get_cache_result() {
+ DCHECK(_handle);
+ auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value;
+ return &((QueryCache::CacheValue*)(result_ptr))->result;
+}
+
+int64_t QueryCacheHandle::get_cache_version() {
+ DCHECK(_handle);
+ auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value;
+ return ((QueryCache::CacheValue*)(result_ptr))->version;
+}
+
+void QueryCache::insert(const CacheKey& key, int64_t version, CacheResult& res,
+ const std::vector<int>& slot_orders, int64_t
cache_size) {
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->query_cache_mem_tracker());
+ CacheResult cache_result;
+ for (auto& block_data : res) {
+ cache_result.emplace_back(vectorized::Block::create_unique())
+ ->swap(block_data->clone_empty());
+
(void)vectorized::MutableBlock(cache_result.back().get()).merge(*block_data);
+ }
+ auto cache_value_ptr =
+ std::make_unique<QueryCache::CacheValue>(version,
std::move(cache_result), slot_orders);
+
+ QueryCacheHandle(this, LRUCachePolicy::insert(key,
(void*)cache_value_ptr.release(), cache_size,
+ cache_size,
CachePriority::NORMAL));
+}
+
+bool QueryCache::lookup(const CacheKey& key, int64_t version,
doris::QueryCacheHandle* handle) {
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->query_cache_mem_tracker());
+ auto* lru_handle = LRUCachePolicy::lookup(key);
+ if (lru_handle) {
+ QueryCacheHandle tmp_handle(this, lru_handle);
+ if (tmp_handle.get_cache_version() == version) {
+ *handle = std::move(tmp_handle);
+ return true;
+ }
+ }
+ return false;
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/query_cache/query_cache.h
b/be/src/pipeline/query_cache/query_cache.h
new file mode 100644
index 00000000000..a905831b530
--- /dev/null
+++ b/be/src/pipeline/query_cache/query_cache.h
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <roaring/roaring.hh>
+#include <string>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "olap/lru_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/lru_cache_policy.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+#include "util/time.h"
+
+namespace doris {
+
+using CacheResult = std::vector<vectorized::BlockUPtr>;
+// A handle for mid-result from query lru cache.
+// The handle will automatically release the cache entry when it is destroyed.
+// So the caller need to make sure the handle is valid in lifecycle.
+class QueryCacheHandle {
+public:
+ QueryCacheHandle() = default;
+ QueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
+ : _cache(cache), _handle(handle) {}
+
+ ~QueryCacheHandle() {
+ if (_handle != nullptr) {
+ CHECK(_cache != nullptr);
+ {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->query_cache_mem_tracker());
+ _cache->release(_handle);
+ }
+ }
+ }
+
+ QueryCacheHandle(QueryCacheHandle&& other) noexcept {
+ std::swap(_cache, other._cache);
+ std::swap(_handle, other._handle);
+ }
+
+ QueryCacheHandle& operator=(QueryCacheHandle&& other) noexcept {
+ std::swap(_cache, other._cache);
+ std::swap(_handle, other._handle);
+ return *this;
+ }
+
+ std::vector<int>* get_cache_slot_orders();
+
+ CacheResult* get_cache_result();
+
+ int64_t get_cache_version();
+
+private:
+ LRUCachePolicy* _cache = nullptr;
+ Cache::Handle* _handle = nullptr;
+
+ // Don't allow copy and assign
+ DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle);
+};
+
+class QueryCache : public LRUCachePolicy {
+public:
+ using LRUCachePolicy::insert;
+
+ struct CacheValue : public LRUCacheValueBase {
+ int64_t version;
+ CacheResult result;
+ std::vector<int> slot_orders;
+
+ CacheValue(int64_t v, CacheResult&& r, const std::vector<int>& so)
+ : LRUCacheValueBase(), version(v), result(std::move(r)),
slot_orders(so) {}
+ };
+
+ // Create global instance of this class
+ static QueryCache* create_global_cache(size_t capacity, uint32_t
num_shards = 16) {
+ auto* res = new QueryCache(capacity, num_shards);
+ return res;
+ }
+
+ static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
+ const TQueryCacheParam& cache_param,
std::string* cache_key,
+ int64_t* version) {
+ if (scan_ranges.size() > 1) {
+ return Status::InternalError(
+ "CacheSourceOperator only support one scan range, plan
error");
+ }
+ auto& scan_range = scan_ranges[0];
+ DCHECK(scan_range.scan_range.__isset.palo_scan_range);
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+
+ std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
+ scan_range.scan_range.palo_scan_range.version.data() +
+
scan_range.scan_range.palo_scan_range.version.size(),
+ *version);
+
+ auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+ if (find_tablet == cache_param.tablet_to_range.end()) {
+ return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+ }
+
+ *cache_key = cache_param.digest +
+ std::string(reinterpret_cast<char*>(&tablet_id),
sizeof(tablet_id)) +
+ find_tablet->second;
+
+ return Status::OK();
+ }
+
+ // Return global instance.
+ // Client should call create_global_cache before.
+ static QueryCache* instance() { return
ExecEnv::GetInstance()->get_query_cache(); }
+
+ QueryCache() = delete;
+
+ QueryCache(size_t capacity, uint32_t num_shards)
+ : LRUCachePolicy(CachePolicy::CacheType::QUERY_CACHE, capacity,
LRUCacheType::SIZE,
+ 3600 * 24, num_shards) {}
+
+ bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle*
handle);
+
+ void insert(const CacheKey& key, int64_t version, CacheResult& result,
+ const std::vector<int>& solt_orders, int64_t cache_size);
+};
+} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index e77a1c7ae41..61cebad10b9 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -65,6 +65,7 @@ class InvertedIndexQueryCache;
class TmpFileDirs;
} // namespace segment_v2
+class QueryCache;
class WorkloadSchedPolicyMgr;
class BfdParser;
class BrokerMgr;
@@ -187,6 +188,9 @@ public:
std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
return _point_query_executor_mem_tracker;
}
+ std::shared_ptr<MemTrackerLimiter> query_cache_mem_tracker() {
+ return _query_cache_mem_tracker;
+ }
std::shared_ptr<MemTrackerLimiter> block_compression_mem_tracker() {
return _block_compression_mem_tracker;
}
@@ -305,6 +309,7 @@ public:
segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() {
return _inverted_index_query_cache;
}
+ QueryCache* get_query_cache() { return _query_cache; }
std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return
_dummy_lru_cache; }
pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
@@ -366,6 +371,7 @@ private:
// Tracking memory may be shared between multiple queries.
std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
std::shared_ptr<MemTrackerLimiter> _block_compression_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _query_cache_mem_tracker;
// TODO, looking forward to more accurate tracking.
std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker;
@@ -437,6 +443,7 @@ private:
CacheManager* _cache_manager = nullptr;
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache =
nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
+ QueryCache* _query_cache = nullptr;
std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index eb9fa12ea4b..c91e623b990 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -56,6 +56,7 @@
#include "olap/tablet_schema_cache.h"
#include "olap/wal/wal_manager.h"
#include "pipeline/pipeline_tracing.h"
+#include "pipeline/query_cache/query_cache.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/broker_mgr.h"
@@ -584,6 +585,9 @@ Status ExecEnv::_init_mem_env() {
_orc_memory_pool = new doris::vectorized::ORCMemoryPool();
_arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();
+ _query_cache = QueryCache::create_global_cache(config::query_cache_size *
1024L * 1024L);
+ LOG(INFO) << "query cache memory limit: " << config::query_cache_size <<
"MB";
+
return Status::OK();
}
@@ -600,7 +604,9 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"SegCompaction");
_point_query_executor_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"PointQueryExecutor");
- _block_compression_mem_tracker =
+ _query_cache_mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"QueryCache");
+ _block_compression_mem_tracker = _block_compression_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"BlockCompression");
_rowid_storage_reader_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"RowIdStorageReader");
@@ -690,6 +696,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_schema_cache);
SAFE_DELETE(_segment_loader);
SAFE_DELETE(_row_cache);
+ SAFE_DELETE(_query_cache);
// Free resource after threads are stopped.
// Some threads are still running, like threads created by
_new_load_stream_mgr ...
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index c43ca0b2fb7..5241efb9c29 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -48,6 +48,7 @@ public:
CLOUD_TXN_DELETE_BITMAP_CACHE = 17,
NONE = 18, // not be used
FOR_UT_CACHE_NUMBER = 19,
+ QUERY_CACHE = 20
};
static std::string type_string(CacheType type) {
@@ -90,6 +91,8 @@ public:
return "CloudTxnDeleteBitmapCache";
case CacheType::FOR_UT_CACHE_NUMBER:
return "ForUTCacheNumber";
+ case CacheType::QUERY_CACHE:
+ return "QUERY_CACHE";
default:
LOG(FATAL) << "not match type of cache policy :" <<
static_cast<int>(type);
}
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 69231804336..95cedf4c362 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -45,12 +45,10 @@
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
#include "util/slice.h"
-#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
-#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]