This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5921163bfa2 [test](beut) add pipeline QueryCache Operator beut (#49904) 5921163bfa2 is described below commit 5921163bfa25b6c5da45f6e0e07b0a1b44c17f6f Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Wed Apr 16 10:59:05 2025 +0800 [test](beut) add pipeline QueryCache Operator beut (#49904) ### What problem does this PR solve? add pipeline QueryCache Operator beut --- be/src/pipeline/exec/cache_sink_operator.h | 3 + be/src/pipeline/exec/cache_source_operator.cpp | 2 +- be/src/pipeline/exec/cache_source_operator.h | 5 + be/test/pipeline/exec/query_cache_test.cpp | 125 ++++++++++ .../operator/query_cache_operator_test.cpp | 273 +++++++++++++++++++++ 5 files changed, 407 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/cache_sink_operator.h b/be/src/pipeline/exec/cache_sink_operator.h index d4f2113ed06..e77badbb9ac 100644 --- a/be/src/pipeline/exec/cache_sink_operator.h +++ b/be/src/pipeline/exec/cache_sink_operator.h @@ -50,6 +50,9 @@ public: friend class CacheSinkLocalState; CacheSinkOperatorX(int sink_id, int child_id, int dest_id); +#ifdef BE_TEST + CacheSinkOperatorX() = default; +#endif ~CacheSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TDataSink", diff --git a/be/src/pipeline/exec/cache_source_operator.cpp b/be/src/pipeline/exec/cache_source_operator.cpp index ec9f9ecc572..7efe0e7588f 100644 --- a/be/src/pipeline/exec/cache_source_operator.cpp +++ b/be/src/pipeline/exec/cache_source_operator.cpp @@ -64,7 +64,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { "CacheTabletId", std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id)); // 3. lookup the cache and find proper slot order - hit_cache = QueryCache::instance()->lookup(_cache_key, _version, &_query_cache_handle); + hit_cache = _global_cache->lookup(_cache_key, _version, &_query_cache_handle); _runtime_profile->add_info_string("HitCache", std::to_string(hit_cache)); if (hit_cache && !cache_param.force_refresh_query_cache) { _hit_cache_results = _query_cache_handle.get_cache_result(); diff --git a/be/src/pipeline/exec/cache_source_operator.h b/be/src/pipeline/exec/cache_source_operator.h index 651f9ff5596..947fd00852b 100644 --- a/be/src/pipeline/exec/cache_source_operator.h +++ b/be/src/pipeline/exec/cache_source_operator.h @@ -76,6 +76,11 @@ public: : Base(pool, plan_node_id, operator_id), _cache_param(cache_param) { _op_name = "CACHE_SOURCE_OPERATOR"; }; + +#ifdef BE_TEST + CacheSourceOperatorX() = default; +#endif + ~CacheSourceOperatorX() override = default; Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; diff --git a/be/test/pipeline/exec/query_cache_test.cpp b/be/test/pipeline/exec/query_cache_test.cpp new file mode 100644 index 00000000000..0f0d37f2e4d --- /dev/null +++ b/be/test/pipeline/exec/query_cache_test.cpp @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/query_cache/query_cache.h" + +#include <gtest/gtest.h> + +#include <memory> +#include <vector> + +#include "testutil/column_helper.h" +#include "vec/data_types/data_type_number.h" + +namespace doris::pipeline { +using namespace vectorized; +class QueryCacheTest : public testing::Test { +public: + void SetUp() override {} +}; + +TEST_F(QueryCacheTest, create_global_cache) { + auto* cache = QueryCache::create_global_cache(1024 * 1024 * 1024, 16); + delete cache; +} + +TEST_F(QueryCacheTest, build_cache_key) { + { + std::vector<TScanRangeParams> scan_ranges; + scan_ranges.push_back({}); + scan_ranges.push_back({}); + TQueryCacheParam cache_param; + std::string cache_key; + int64_t version = 0; + auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version); + std::cout << st.msg() << std::endl; + EXPECT_FALSE(st.ok()); + } + + { + std::vector<TScanRangeParams> scan_ranges; + TScanRangeParams scan_range; + TPaloScanRange palp_scan_range; + palp_scan_range.__set_tablet_id(42); + palp_scan_range.__set_version("114514"); + scan_range.scan_range.__set_palo_scan_range(palp_scan_range); + scan_ranges.push_back(scan_range); + TQueryCacheParam cache_param; + std::string cache_key; + int64_t version = 0; + auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version); + std::cout << st.msg() << std::endl; + std::cout << version << std::endl; + EXPECT_FALSE(st.ok()); + } + { + std::vector<TScanRangeParams> scan_ranges; + TScanRangeParams scan_range; + TPaloScanRange palp_scan_range; + palp_scan_range.__set_tablet_id(42); + palp_scan_range.__set_version("114514"); + scan_range.scan_range.__set_palo_scan_range(palp_scan_range); + scan_ranges.push_back(scan_range); + TQueryCacheParam cache_param; + cache_param.__set_digest("be ut"); + cache_param.tablet_to_range.insert({42, "test"}); + std::string cache_key; + int64_t version = 0; + auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version); + std::cout << st.msg() << std::endl; + std::cout << version << std::endl; + std::cout << cache_key << std::endl; + EXPECT_TRUE(st.ok()); + } +} + +TEST_F(QueryCacheTest, insert_and_lookup) { + std::unique_ptr<QueryCache> query_cache {QueryCache::create_global_cache(1024 * 1024 * 1024)}; + std::string cache_key = "be ut"; + { + //insert + CacheResult result; + result.push_back(std::make_unique<Block>()); + *result.back() = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + query_cache->insert(cache_key, 42, result, {1, 2, 3}, 1); + } + + { + //lookup + std::unique_ptr<QueryCacheHandle> handle = std::make_unique<QueryCacheHandle>(); + EXPECT_TRUE(query_cache->lookup(cache_key, 42, handle.get())); + EXPECT_TRUE(ColumnHelper::block_equal( + *handle->get_cache_result()->back(), + ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}))); + EXPECT_EQ(handle->get_cache_slot_orders()->size(), 3); + EXPECT_EQ(handle->get_cache_version(), 42); + + QueryCacheHandle handle1 {std::move(*handle)}; + QueryCacheHandle handle2; + handle2 = std::move(handle1); + + EXPECT_TRUE(ColumnHelper::block_equal( + *handle2.get_cache_result()->back(), + ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}))); + EXPECT_EQ(handle2.get_cache_slot_orders()->size(), 3); + EXPECT_EQ(handle2.get_cache_version(), 42); + } +} + +// ./run-be-ut.sh --run --filter=DataQueueTest.* + +} // namespace doris::pipeline diff --git a/be/test/pipeline/operator/query_cache_operator_test.cpp b/be/test/pipeline/operator/query_cache_operator_test.cpp new file mode 100644 index 00000000000..6a2bff1efc0 --- /dev/null +++ b/be/test/pipeline/operator/query_cache_operator_test.cpp @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include <algorithm> +#include <memory> + +#include "pipeline/exec/cache_sink_operator.h" +#include "pipeline/exec/cache_source_operator.h" +#include "pipeline/exec/repeat_operator.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_descriptors.h" +#include "testutil/mock/mock_runtime_state.h" +#include "testutil/mock/mock_slot_ref.h" +#include "vec/core/block.h" +namespace doris::pipeline { + +using namespace vectorized; + +class QueryCacheMockChildOperator : public OperatorXBase { +public: + Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, + bool* eos) override { + return Status::OK(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + return Status::OK(); + } + Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override { + return Status::OK(); + } + + const RowDescriptor& row_desc() const override { return *_mock_row_desc; } + +private: + std::unique_ptr<MockRowDescriptor> _mock_row_desc; +}; + +struct QueryCacheOperatorTest : public ::testing::Test { + void SetUp() override { + state = std::make_shared<MockRuntimeState>(); + state->batsh_size = 10; + child_op = std::make_unique<QueryCacheMockChildOperator>(); + query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 * 1024)); + query_cache = query_cache_uptr.get(); + scan_ranges.clear(); + TScanRangeParams scan_range; + TPaloScanRange palp_scan_range; + palp_scan_range.__set_tablet_id(42); + palp_scan_range.__set_version("114514"); + scan_range.scan_range.__set_palo_scan_range(palp_scan_range); + scan_ranges.push_back(scan_range); + } + void create_local_state() { + shared_state = sink->create_shared_state(); + { + sink_local_state_uptr = CacheSinkLocalState ::create_unique(sink.get(), state.get()); + sink_local_state = sink_local_state_uptr.get(); + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = &profile, + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink {}}; + EXPECT_TRUE(sink_local_state_uptr->init(state.get(), info).ok()); + state->emplace_sink_local_state(0, std::move(sink_local_state_uptr)); + } + + { + source_local_state_uptr = + CacheSourceLocalState::create_unique(state.get(), source.get()); + source_local_state = source_local_state_uptr.get(); + source_local_state->_global_cache = query_cache; + LocalStateInfo info {.parent_profile = &profile, + .scan_ranges = scan_ranges, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .task_idx = 0}; + + EXPECT_TRUE(source_local_state_uptr->init(state.get(), info)); + state->resize_op_id_to_local_state(-100); + state->emplace_local_state(source->operator_id(), std::move(source_local_state_uptr)); + } + + { EXPECT_TRUE(sink_local_state->open(state.get()).ok()); } + + { EXPECT_TRUE(source_local_state->open(state.get()).ok()); } + } + + RuntimeProfile profile {"test"}; + std::unique_ptr<CacheSinkOperatorX> sink; + std::unique_ptr<CacheSourceOperatorX> source; + + std::unique_ptr<CacheSinkLocalState> sink_local_state_uptr; + + CacheSinkLocalState* sink_local_state; + + std::unique_ptr<CacheSourceLocalState> source_local_state_uptr; + CacheSourceLocalState* source_local_state; + + std::shared_ptr<MockRuntimeState> state; + + std::shared_ptr<QueryCacheMockChildOperator> child_op; + + ObjectPool pool; + + std::shared_ptr<BasicSharedState> shared_state; + + std::unique_ptr<QueryCache> query_cache_uptr; + QueryCache* query_cache; + + std::vector<TScanRangeParams> scan_ranges; +}; + +TEST_F(QueryCacheOperatorTest, test_no_hit_cache1) { + sink = std::make_unique<CacheSinkOperatorX>(); + source = std::make_unique<CacheSourceOperatorX>(); + EXPECT_TRUE(source->set_child(child_op)); + child_op->_mock_row_desc.reset( + new MockRowDescriptor {{std::make_shared<vectorized::DataTypeInt64>()}, &pool}); + TQueryCacheParam cache_param; + cache_param.node_id = 0; + cache_param.output_slot_mapping[0] = 0; + cache_param.tablet_to_range.insert({42, "test"}); + cache_param.force_refresh_query_cache = false; + cache_param.entry_max_bytes = 1024 * 1024; + cache_param.entry_max_rows = 1000; + + source->_cache_param = cache_param; + create_local_state(); + + std::cout << query_cache->get_element_count() << std::endl; + + { + auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}); + auto st = sink->sink(state.get(), &block, true); + EXPECT_TRUE(st.ok()) << st.msg(); + } + { + Block block; + bool eos = false; + auto st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_TRUE(eos); + std::cout << block.dump_data() << std::endl; + std::cout << query_cache->get_element_count() << std::endl; + EXPECT_EQ(block.rows(), 5); + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}))); + EXPECT_EQ(query_cache->get_element_count(), 1); + } + EXPECT_TRUE(source_local_state->_need_insert_cache); +} + +TEST_F(QueryCacheOperatorTest, test_no_hit_cache2) { + sink = std::make_unique<CacheSinkOperatorX>(); + source = std::make_unique<CacheSourceOperatorX>(); + EXPECT_TRUE(source->set_child(child_op)); + child_op->_mock_row_desc.reset( + new MockRowDescriptor {{std::make_shared<vectorized::DataTypeInt64>()}, &pool}); + TQueryCacheParam cache_param; + cache_param.node_id = 0; + cache_param.output_slot_mapping[0] = 0; + cache_param.tablet_to_range.insert({42, "test"}); + cache_param.force_refresh_query_cache = false; + cache_param.entry_max_bytes = 1024 * 1024; + cache_param.entry_max_rows = 3; + + source->_cache_param = cache_param; + create_local_state(); + + std::cout << query_cache->get_element_count() << std::endl; + + { + auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}); + auto st = sink->sink(state.get(), &block, true); + EXPECT_TRUE(st.ok()) << st.msg(); + } + { + Block block; + bool eos = false; + auto st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_TRUE(eos); + std::cout << block.dump_data() << std::endl; + std::cout << query_cache->get_element_count() << std::endl; + EXPECT_EQ(block.rows(), 5); + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}))); + EXPECT_EQ(query_cache->get_element_count(), 0); + } + EXPECT_FALSE(source_local_state->_need_insert_cache); +} + +TEST_F(QueryCacheOperatorTest, test_hit_cache) { + sink = std::make_unique<CacheSinkOperatorX>(); + source = std::make_unique<CacheSourceOperatorX>(); + EXPECT_TRUE(source->set_child(child_op)); + child_op->_mock_row_desc.reset( + new MockRowDescriptor {{std::make_shared<vectorized::DataTypeInt64>()}, &pool}); + TQueryCacheParam cache_param; + cache_param.node_id = 0; + cache_param.output_slot_mapping[0] = 0; + cache_param.tablet_to_range.insert({42, "test"}); + cache_param.force_refresh_query_cache = false; + cache_param.entry_max_bytes = 1024 * 1024; + cache_param.entry_max_rows = 3; + + { + int64_t version = 0; + std::string cache_key; + EXPECT_TRUE(QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version)); + CacheResult result; + result.push_back(std::make_unique<Block>()); + *result.back() = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}); + query_cache->insert(cache_key, version, result, {0, 2, 3}, 1); + } + + source->_cache_param = cache_param; + create_local_state(); + + std::cout << query_cache->get_element_count() << std::endl; + EXPECT_EQ(source_local_state->_slot_orders.size(), 1); + EXPECT_EQ(source_local_state->_slot_orders[0], 0); + + { + auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}); + auto st = sink->sink(state.get(), &block, true); + EXPECT_TRUE(st.ok()) << st.msg(); + } + + { + Block block; + bool eos = false; + auto st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_FALSE(eos); + std::cout << block.dump_data() << std::endl; + std::cout << query_cache->get_element_count() << std::endl; + EXPECT_EQ(block.rows(), 5); + EXPECT_TRUE(ColumnHelper::block_equal( + block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 5}))); + } + + { + Block block; + bool eos = false; + auto st = source->get_block(state.get(), &block, &eos); + EXPECT_TRUE(st.ok()) << st.msg(); + EXPECT_TRUE(eos); + EXPECT_TRUE(block.empty()); + } + + query_cache_uptr.release(); +} + +} // namespace doris::pipeline \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org