This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5921163bfa2 [test](beut) add pipeline QueryCache Operator beut (#49904)
5921163bfa2 is described below
commit 5921163bfa25b6c5da45f6e0e07b0a1b44c17f6f
Author: Mryange <[email protected]>
AuthorDate: Wed Apr 16 10:59:05 2025 +0800
[test](beut) add pipeline QueryCache Operator beut (#49904)
### What problem does this PR solve?
add pipeline QueryCache Operator beut
---
be/src/pipeline/exec/cache_sink_operator.h | 3 +
be/src/pipeline/exec/cache_source_operator.cpp | 2 +-
be/src/pipeline/exec/cache_source_operator.h | 5 +
be/test/pipeline/exec/query_cache_test.cpp | 125 ++++++++++
.../operator/query_cache_operator_test.cpp | 273 +++++++++++++++++++++
5 files changed, 407 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/cache_sink_operator.h
b/be/src/pipeline/exec/cache_sink_operator.h
index d4f2113ed06..e77badbb9ac 100644
--- a/be/src/pipeline/exec/cache_sink_operator.h
+++ b/be/src/pipeline/exec/cache_sink_operator.h
@@ -50,6 +50,9 @@ public:
friend class CacheSinkLocalState;
CacheSinkOperatorX(int sink_id, int child_id, int dest_id);
+#ifdef BE_TEST
+ CacheSinkOperatorX() = default;
+#endif
~CacheSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp
b/be/src/pipeline/exec/cache_source_operator.cpp
index ec9f9ecc572..7efe0e7588f 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -64,7 +64,7 @@ Status CacheSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
"CacheTabletId",
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
// 3. lookup the cache and find proper slot order
- hit_cache = QueryCache::instance()->lookup(_cache_key, _version,
&_query_cache_handle);
+ hit_cache = _global_cache->lookup(_cache_key, _version,
&_query_cache_handle);
_runtime_profile->add_info_string("HitCache", std::to_string(hit_cache));
if (hit_cache && !cache_param.force_refresh_query_cache) {
_hit_cache_results = _query_cache_handle.get_cache_result();
diff --git a/be/src/pipeline/exec/cache_source_operator.h
b/be/src/pipeline/exec/cache_source_operator.h
index 651f9ff5596..947fd00852b 100644
--- a/be/src/pipeline/exec/cache_source_operator.h
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -76,6 +76,11 @@ public:
: Base(pool, plan_node_id, operator_id), _cache_param(cache_param)
{
_op_name = "CACHE_SOURCE_OPERATOR";
};
+
+#ifdef BE_TEST
+ CacheSourceOperatorX() = default;
+#endif
+
~CacheSourceOperatorX() override = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
diff --git a/be/test/pipeline/exec/query_cache_test.cpp
b/be/test/pipeline/exec/query_cache_test.cpp
new file mode 100644
index 00000000000..0f0d37f2e4d
--- /dev/null
+++ b/be/test/pipeline/exec/query_cache_test.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/query_cache/query_cache.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "testutil/column_helper.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+using namespace vectorized;
+class QueryCacheTest : public testing::Test {
+public:
+ void SetUp() override {}
+};
+
+TEST_F(QueryCacheTest, create_global_cache) {
+ auto* cache = QueryCache::create_global_cache(1024 * 1024 * 1024, 16);
+ delete cache;
+}
+
+TEST_F(QueryCacheTest, build_cache_key) {
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ scan_ranges.push_back({});
+ scan_ranges.push_back({});
+ TQueryCacheParam cache_param;
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+ std::cout << st.msg() << std::endl;
+ EXPECT_FALSE(st.ok());
+ }
+
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TScanRangeParams scan_range;
+ TPaloScanRange palp_scan_range;
+ palp_scan_range.__set_tablet_id(42);
+ palp_scan_range.__set_version("114514");
+ scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
+ scan_ranges.push_back(scan_range);
+ TQueryCacheParam cache_param;
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+ std::cout << st.msg() << std::endl;
+ std::cout << version << std::endl;
+ EXPECT_FALSE(st.ok());
+ }
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TScanRangeParams scan_range;
+ TPaloScanRange palp_scan_range;
+ palp_scan_range.__set_tablet_id(42);
+ palp_scan_range.__set_version("114514");
+ scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
+ scan_ranges.push_back(scan_range);
+ TQueryCacheParam cache_param;
+ cache_param.__set_digest("be ut");
+ cache_param.tablet_to_range.insert({42, "test"});
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+ std::cout << st.msg() << std::endl;
+ std::cout << version << std::endl;
+ std::cout << cache_key << std::endl;
+ EXPECT_TRUE(st.ok());
+ }
+}
+
+TEST_F(QueryCacheTest, insert_and_lookup) {
+ std::unique_ptr<QueryCache> query_cache
{QueryCache::create_global_cache(1024 * 1024 * 1024)};
+ std::string cache_key = "be ut";
+ {
+ //insert
+ CacheResult result;
+ result.push_back(std::make_unique<Block>());
+ *result.back() = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3,
4, 5});
+ query_cache->insert(cache_key, 42, result, {1, 2, 3}, 1);
+ }
+
+ {
+ //lookup
+ std::unique_ptr<QueryCacheHandle> handle =
std::make_unique<QueryCacheHandle>();
+ EXPECT_TRUE(query_cache->lookup(cache_key, 42, handle.get()));
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ *handle->get_cache_result()->back(),
+ ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5})));
+ EXPECT_EQ(handle->get_cache_slot_orders()->size(), 3);
+ EXPECT_EQ(handle->get_cache_version(), 42);
+
+ QueryCacheHandle handle1 {std::move(*handle)};
+ QueryCacheHandle handle2;
+ handle2 = std::move(handle1);
+
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ *handle2.get_cache_result()->back(),
+ ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5})));
+ EXPECT_EQ(handle2.get_cache_slot_orders()->size(), 3);
+ EXPECT_EQ(handle2.get_cache_version(), 42);
+ }
+}
+
+// ./run-be-ut.sh --run --filter=DataQueueTest.*
+
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/operator/query_cache_operator_test.cpp
b/be/test/pipeline/operator/query_cache_operator_test.cpp
new file mode 100644
index 00000000000..6a2bff1efc0
--- /dev/null
+++ b/be/test/pipeline/operator/query_cache_operator_test.cpp
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <memory>
+
+#include "pipeline/exec/cache_sink_operator.h"
+#include "pipeline/exec/cache_source_operator.h"
+#include "pipeline/exec/repeat_operator.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
+#include "vec/core/block.h"
+namespace doris::pipeline {
+
+using namespace vectorized;
+
+class QueryCacheMockChildOperator : public OperatorXBase {
+public:
+ Status get_block_after_projects(RuntimeState* state, vectorized::Block*
block,
+ bool* eos) override {
+ return Status::OK();
+ }
+
+ Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override {
+ return Status::OK();
+ }
+ Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
+ return Status::OK();
+ }
+
+ const RowDescriptor& row_desc() const override { return *_mock_row_desc; }
+
+private:
+ std::unique_ptr<MockRowDescriptor> _mock_row_desc;
+};
+
+struct QueryCacheOperatorTest : public ::testing::Test {
+ void SetUp() override {
+ state = std::make_shared<MockRuntimeState>();
+ state->batsh_size = 10;
+ child_op = std::make_unique<QueryCacheMockChildOperator>();
+ query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 *
1024));
+ query_cache = query_cache_uptr.get();
+ scan_ranges.clear();
+ TScanRangeParams scan_range;
+ TPaloScanRange palp_scan_range;
+ palp_scan_range.__set_tablet_id(42);
+ palp_scan_range.__set_version("114514");
+ scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
+ scan_ranges.push_back(scan_range);
+ }
+ void create_local_state() {
+ shared_state = sink->create_shared_state();
+ {
+ sink_local_state_uptr = CacheSinkLocalState
::create_unique(sink.get(), state.get());
+ sink_local_state = sink_local_state_uptr.get();
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile = &profile,
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = {},
+ .tsink = TDataSink {}};
+ EXPECT_TRUE(sink_local_state_uptr->init(state.get(), info).ok());
+ state->emplace_sink_local_state(0,
std::move(sink_local_state_uptr));
+ }
+
+ {
+ source_local_state_uptr =
+ CacheSourceLocalState::create_unique(state.get(),
source.get());
+ source_local_state = source_local_state_uptr.get();
+ source_local_state->_global_cache = query_cache;
+ LocalStateInfo info {.parent_profile = &profile,
+ .scan_ranges = scan_ranges,
+ .shared_state = shared_state.get(),
+ .shared_state_map = {},
+ .task_idx = 0};
+
+ EXPECT_TRUE(source_local_state_uptr->init(state.get(), info));
+ state->resize_op_id_to_local_state(-100);
+ state->emplace_local_state(source->operator_id(),
std::move(source_local_state_uptr));
+ }
+
+ { EXPECT_TRUE(sink_local_state->open(state.get()).ok()); }
+
+ { EXPECT_TRUE(source_local_state->open(state.get()).ok()); }
+ }
+
+ RuntimeProfile profile {"test"};
+ std::unique_ptr<CacheSinkOperatorX> sink;
+ std::unique_ptr<CacheSourceOperatorX> source;
+
+ std::unique_ptr<CacheSinkLocalState> sink_local_state_uptr;
+
+ CacheSinkLocalState* sink_local_state;
+
+ std::unique_ptr<CacheSourceLocalState> source_local_state_uptr;
+ CacheSourceLocalState* source_local_state;
+
+ std::shared_ptr<MockRuntimeState> state;
+
+ std::shared_ptr<QueryCacheMockChildOperator> child_op;
+
+ ObjectPool pool;
+
+ std::shared_ptr<BasicSharedState> shared_state;
+
+ std::unique_ptr<QueryCache> query_cache_uptr;
+ QueryCache* query_cache;
+
+ std::vector<TScanRangeParams> scan_ranges;
+};
+
+TEST_F(QueryCacheOperatorTest, test_no_hit_cache1) {
+ sink = std::make_unique<CacheSinkOperatorX>();
+ source = std::make_unique<CacheSourceOperatorX>();
+ EXPECT_TRUE(source->set_child(child_op));
+ child_op->_mock_row_desc.reset(
+ new MockRowDescriptor
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
+ TQueryCacheParam cache_param;
+ cache_param.node_id = 0;
+ cache_param.output_slot_mapping[0] = 0;
+ cache_param.tablet_to_range.insert({42, "test"});
+ cache_param.force_refresh_query_cache = false;
+ cache_param.entry_max_bytes = 1024 * 1024;
+ cache_param.entry_max_rows = 1000;
+
+ source->_cache_param = cache_param;
+ create_local_state();
+
+ std::cout << query_cache->get_element_count() << std::endl;
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5});
+ auto st = sink->sink(state.get(), &block, true);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ }
+ {
+ Block block;
+ bool eos = false;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(eos);
+ std::cout << block.dump_data() << std::endl;
+ std::cout << query_cache->get_element_count() << std::endl;
+ EXPECT_EQ(block.rows(), 5);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5})));
+ EXPECT_EQ(query_cache->get_element_count(), 1);
+ }
+ EXPECT_TRUE(source_local_state->_need_insert_cache);
+}
+
+TEST_F(QueryCacheOperatorTest, test_no_hit_cache2) {
+ sink = std::make_unique<CacheSinkOperatorX>();
+ source = std::make_unique<CacheSourceOperatorX>();
+ EXPECT_TRUE(source->set_child(child_op));
+ child_op->_mock_row_desc.reset(
+ new MockRowDescriptor
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
+ TQueryCacheParam cache_param;
+ cache_param.node_id = 0;
+ cache_param.output_slot_mapping[0] = 0;
+ cache_param.tablet_to_range.insert({42, "test"});
+ cache_param.force_refresh_query_cache = false;
+ cache_param.entry_max_bytes = 1024 * 1024;
+ cache_param.entry_max_rows = 3;
+
+ source->_cache_param = cache_param;
+ create_local_state();
+
+ std::cout << query_cache->get_element_count() << std::endl;
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5});
+ auto st = sink->sink(state.get(), &block, true);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ }
+ {
+ Block block;
+ bool eos = false;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(eos);
+ std::cout << block.dump_data() << std::endl;
+ std::cout << query_cache->get_element_count() << std::endl;
+ EXPECT_EQ(block.rows(), 5);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5})));
+ EXPECT_EQ(query_cache->get_element_count(), 0);
+ }
+ EXPECT_FALSE(source_local_state->_need_insert_cache);
+}
+
+TEST_F(QueryCacheOperatorTest, test_hit_cache) {
+ sink = std::make_unique<CacheSinkOperatorX>();
+ source = std::make_unique<CacheSourceOperatorX>();
+ EXPECT_TRUE(source->set_child(child_op));
+ child_op->_mock_row_desc.reset(
+ new MockRowDescriptor
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
+ TQueryCacheParam cache_param;
+ cache_param.node_id = 0;
+ cache_param.output_slot_mapping[0] = 0;
+ cache_param.tablet_to_range.insert({42, "test"});
+ cache_param.force_refresh_query_cache = false;
+ cache_param.entry_max_bytes = 1024 * 1024;
+ cache_param.entry_max_rows = 3;
+
+ {
+ int64_t version = 0;
+ std::string cache_key;
+ EXPECT_TRUE(QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version));
+ CacheResult result;
+ result.push_back(std::make_unique<Block>());
+ *result.back() = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3,
4, 5});
+ query_cache->insert(cache_key, version, result, {0, 2, 3}, 1);
+ }
+
+ source->_cache_param = cache_param;
+ create_local_state();
+
+ std::cout << query_cache->get_element_count() << std::endl;
+ EXPECT_EQ(source_local_state->_slot_orders.size(), 1);
+ EXPECT_EQ(source_local_state->_slot_orders[0], 0);
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5});
+ auto st = sink->sink(state.get(), &block, true);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ }
+
+ {
+ Block block;
+ bool eos = false;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_FALSE(eos);
+ std::cout << block.dump_data() << std::endl;
+ std::cout << query_cache->get_element_count() << std::endl;
+ EXPECT_EQ(block.rows(), 5);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5})));
+ }
+
+ {
+ Block block;
+ bool eos = false;
+ auto st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(eos);
+ EXPECT_TRUE(block.empty());
+ }
+
+ query_cache_uptr.release();
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]