This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5921163bfa2 [test](beut) add pipeline QueryCache Operator beut (#49904)
5921163bfa2 is described below

commit 5921163bfa25b6c5da45f6e0e07b0a1b44c17f6f
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Wed Apr 16 10:59:05 2025 +0800

    [test](beut) add pipeline QueryCache Operator beut (#49904)
    
    ### What problem does this PR solve?
    add pipeline QueryCache Operator beut
---
 be/src/pipeline/exec/cache_sink_operator.h         |   3 +
 be/src/pipeline/exec/cache_source_operator.cpp     |   2 +-
 be/src/pipeline/exec/cache_source_operator.h       |   5 +
 be/test/pipeline/exec/query_cache_test.cpp         | 125 ++++++++++
 .../operator/query_cache_operator_test.cpp         | 273 +++++++++++++++++++++
 5 files changed, 407 insertions(+), 1 deletion(-)

diff --git a/be/src/pipeline/exec/cache_sink_operator.h 
b/be/src/pipeline/exec/cache_sink_operator.h
index d4f2113ed06..e77badbb9ac 100644
--- a/be/src/pipeline/exec/cache_sink_operator.h
+++ b/be/src/pipeline/exec/cache_sink_operator.h
@@ -50,6 +50,9 @@ public:
 
     friend class CacheSinkLocalState;
     CacheSinkOperatorX(int sink_id, int child_id, int dest_id);
+#ifdef BE_TEST
+    CacheSinkOperatorX() = default;
+#endif
     ~CacheSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TDataSink",
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp 
b/be/src/pipeline/exec/cache_source_operator.cpp
index ec9f9ecc572..7efe0e7588f 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -64,7 +64,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
             "CacheTabletId", 
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
 
     // 3. lookup the cache and find proper slot order
-    hit_cache = QueryCache::instance()->lookup(_cache_key, _version, 
&_query_cache_handle);
+    hit_cache = _global_cache->lookup(_cache_key, _version, 
&_query_cache_handle);
     _runtime_profile->add_info_string("HitCache", std::to_string(hit_cache));
     if (hit_cache && !cache_param.force_refresh_query_cache) {
         _hit_cache_results = _query_cache_handle.get_cache_result();
diff --git a/be/src/pipeline/exec/cache_source_operator.h 
b/be/src/pipeline/exec/cache_source_operator.h
index 651f9ff5596..947fd00852b 100644
--- a/be/src/pipeline/exec/cache_source_operator.h
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -76,6 +76,11 @@ public:
             : Base(pool, plan_node_id, operator_id), _cache_param(cache_param) 
{
         _op_name = "CACHE_SOURCE_OPERATOR";
     };
+
+#ifdef BE_TEST
+    CacheSourceOperatorX() = default;
+#endif
+
     ~CacheSourceOperatorX() override = default;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
diff --git a/be/test/pipeline/exec/query_cache_test.cpp 
b/be/test/pipeline/exec/query_cache_test.cpp
new file mode 100644
index 00000000000..0f0d37f2e4d
--- /dev/null
+++ b/be/test/pipeline/exec/query_cache_test.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/query_cache/query_cache.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "testutil/column_helper.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+using namespace vectorized;
+class QueryCacheTest : public testing::Test {
+public:
+    void SetUp() override {}
+};
+
+TEST_F(QueryCacheTest, create_global_cache) {
+    auto* cache = QueryCache::create_global_cache(1024 * 1024 * 1024, 16);
+    delete cache;
+}
+
+TEST_F(QueryCacheTest, build_cache_key) {
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        scan_ranges.push_back({});
+        scan_ranges.push_back({});
+        TQueryCacheParam cache_param;
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+        std::cout << st.msg() << std::endl;
+        EXPECT_FALSE(st.ok());
+    }
+
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TScanRangeParams scan_range;
+        TPaloScanRange palp_scan_range;
+        palp_scan_range.__set_tablet_id(42);
+        palp_scan_range.__set_version("114514");
+        scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
+        scan_ranges.push_back(scan_range);
+        TQueryCacheParam cache_param;
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+        std::cout << st.msg() << std::endl;
+        std::cout << version << std::endl;
+        EXPECT_FALSE(st.ok());
+    }
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TScanRangeParams scan_range;
+        TPaloScanRange palp_scan_range;
+        palp_scan_range.__set_tablet_id(42);
+        palp_scan_range.__set_version("114514");
+        scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
+        scan_ranges.push_back(scan_range);
+        TQueryCacheParam cache_param;
+        cache_param.__set_digest("be ut");
+        cache_param.tablet_to_range.insert({42, "test"});
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+        std::cout << st.msg() << std::endl;
+        std::cout << version << std::endl;
+        std::cout << cache_key << std::endl;
+        EXPECT_TRUE(st.ok());
+    }
+}
+
+TEST_F(QueryCacheTest, insert_and_lookup) {
+    std::unique_ptr<QueryCache> query_cache 
{QueryCache::create_global_cache(1024 * 1024 * 1024)};
+    std::string cache_key = "be ut";
+    {
+        //insert
+        CacheResult result;
+        result.push_back(std::make_unique<Block>());
+        *result.back() = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 
4, 5});
+        query_cache->insert(cache_key, 42, result, {1, 2, 3}, 1);
+    }
+
+    {
+        //lookup
+        std::unique_ptr<QueryCacheHandle> handle = 
std::make_unique<QueryCacheHandle>();
+        EXPECT_TRUE(query_cache->lookup(cache_key, 42, handle.get()));
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                *handle->get_cache_result()->back(),
+                ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5})));
+        EXPECT_EQ(handle->get_cache_slot_orders()->size(), 3);
+        EXPECT_EQ(handle->get_cache_version(), 42);
+
+        QueryCacheHandle handle1 {std::move(*handle)};
+        QueryCacheHandle handle2;
+        handle2 = std::move(handle1);
+
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                *handle2.get_cache_result()->back(),
+                ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5})));
+        EXPECT_EQ(handle2.get_cache_slot_orders()->size(), 3);
+        EXPECT_EQ(handle2.get_cache_version(), 42);
+    }
+}
+
+// ./run-be-ut.sh --run --filter=DataQueueTest.*
+
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/operator/query_cache_operator_test.cpp 
b/be/test/pipeline/operator/query_cache_operator_test.cpp
new file mode 100644
index 00000000000..6a2bff1efc0
--- /dev/null
+++ b/be/test/pipeline/operator/query_cache_operator_test.cpp
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <memory>
+
+#include "pipeline/exec/cache_sink_operator.h"
+#include "pipeline/exec/cache_source_operator.h"
+#include "pipeline/exec/repeat_operator.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
+#include "vec/core/block.h"
+namespace doris::pipeline {
+
+using namespace vectorized;
+
+class QueryCacheMockChildOperator : public OperatorXBase {
+public:
+    Status get_block_after_projects(RuntimeState* state, vectorized::Block* 
block,
+                                    bool* eos) override {
+        return Status::OK();
+    }
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override {
+        return Status::OK();
+    }
+    Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override {
+        return Status::OK();
+    }
+
+    const RowDescriptor& row_desc() const override { return *_mock_row_desc; }
+
+private:
+    std::unique_ptr<MockRowDescriptor> _mock_row_desc;
+};
+
+struct QueryCacheOperatorTest : public ::testing::Test {
+    void SetUp() override {
+        state = std::make_shared<MockRuntimeState>();
+        state->batsh_size = 10;
+        child_op = std::make_unique<QueryCacheMockChildOperator>();
+        query_cache_uptr.reset(QueryCache::create_global_cache(1024 * 1024 * 
1024));
+        query_cache = query_cache_uptr.get();
+        scan_ranges.clear();
+        TScanRangeParams scan_range;
+        TPaloScanRange palp_scan_range;
+        palp_scan_range.__set_tablet_id(42);
+        palp_scan_range.__set_version("114514");
+        scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
+        scan_ranges.push_back(scan_range);
+    }
+    void create_local_state() {
+        shared_state = sink->create_shared_state();
+        {
+            sink_local_state_uptr = CacheSinkLocalState 
::create_unique(sink.get(), state.get());
+            sink_local_state = sink_local_state_uptr.get();
+            LocalSinkStateInfo info {.task_idx = 0,
+                                     .parent_profile = &profile,
+                                     .sender_id = 0,
+                                     .shared_state = shared_state.get(),
+                                     .shared_state_map = {},
+                                     .tsink = TDataSink {}};
+            EXPECT_TRUE(sink_local_state_uptr->init(state.get(), info).ok());
+            state->emplace_sink_local_state(0, 
std::move(sink_local_state_uptr));
+        }
+
+        {
+            source_local_state_uptr =
+                    CacheSourceLocalState::create_unique(state.get(), 
source.get());
+            source_local_state = source_local_state_uptr.get();
+            source_local_state->_global_cache = query_cache;
+            LocalStateInfo info {.parent_profile = &profile,
+                                 .scan_ranges = scan_ranges,
+                                 .shared_state = shared_state.get(),
+                                 .shared_state_map = {},
+                                 .task_idx = 0};
+
+            EXPECT_TRUE(source_local_state_uptr->init(state.get(), info));
+            state->resize_op_id_to_local_state(-100);
+            state->emplace_local_state(source->operator_id(), 
std::move(source_local_state_uptr));
+        }
+
+        { EXPECT_TRUE(sink_local_state->open(state.get()).ok()); }
+
+        { EXPECT_TRUE(source_local_state->open(state.get()).ok()); }
+    }
+
+    RuntimeProfile profile {"test"};
+    std::unique_ptr<CacheSinkOperatorX> sink;
+    std::unique_ptr<CacheSourceOperatorX> source;
+
+    std::unique_ptr<CacheSinkLocalState> sink_local_state_uptr;
+
+    CacheSinkLocalState* sink_local_state;
+
+    std::unique_ptr<CacheSourceLocalState> source_local_state_uptr;
+    CacheSourceLocalState* source_local_state;
+
+    std::shared_ptr<MockRuntimeState> state;
+
+    std::shared_ptr<QueryCacheMockChildOperator> child_op;
+
+    ObjectPool pool;
+
+    std::shared_ptr<BasicSharedState> shared_state;
+
+    std::unique_ptr<QueryCache> query_cache_uptr;
+    QueryCache* query_cache;
+
+    std::vector<TScanRangeParams> scan_ranges;
+};
+
+TEST_F(QueryCacheOperatorTest, test_no_hit_cache1) {
+    sink = std::make_unique<CacheSinkOperatorX>();
+    source = std::make_unique<CacheSourceOperatorX>();
+    EXPECT_TRUE(source->set_child(child_op));
+    child_op->_mock_row_desc.reset(
+            new MockRowDescriptor 
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
+    TQueryCacheParam cache_param;
+    cache_param.node_id = 0;
+    cache_param.output_slot_mapping[0] = 0;
+    cache_param.tablet_to_range.insert({42, "test"});
+    cache_param.force_refresh_query_cache = false;
+    cache_param.entry_max_bytes = 1024 * 1024;
+    cache_param.entry_max_rows = 1000;
+
+    source->_cache_param = cache_param;
+    create_local_state();
+
+    std::cout << query_cache->get_element_count() << std::endl;
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5});
+        auto st = sink->sink(state.get(), &block, true);
+        EXPECT_TRUE(st.ok()) << st.msg();
+    }
+    {
+        Block block;
+        bool eos = false;
+        auto st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_TRUE(eos);
+        std::cout << block.dump_data() << std::endl;
+        std::cout << query_cache->get_element_count() << std::endl;
+        EXPECT_EQ(block.rows(), 5);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5})));
+        EXPECT_EQ(query_cache->get_element_count(), 1);
+    }
+    EXPECT_TRUE(source_local_state->_need_insert_cache);
+}
+
+TEST_F(QueryCacheOperatorTest, test_no_hit_cache2) {
+    sink = std::make_unique<CacheSinkOperatorX>();
+    source = std::make_unique<CacheSourceOperatorX>();
+    EXPECT_TRUE(source->set_child(child_op));
+    child_op->_mock_row_desc.reset(
+            new MockRowDescriptor 
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
+    TQueryCacheParam cache_param;
+    cache_param.node_id = 0;
+    cache_param.output_slot_mapping[0] = 0;
+    cache_param.tablet_to_range.insert({42, "test"});
+    cache_param.force_refresh_query_cache = false;
+    cache_param.entry_max_bytes = 1024 * 1024;
+    cache_param.entry_max_rows = 3;
+
+    source->_cache_param = cache_param;
+    create_local_state();
+
+    std::cout << query_cache->get_element_count() << std::endl;
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5});
+        auto st = sink->sink(state.get(), &block, true);
+        EXPECT_TRUE(st.ok()) << st.msg();
+    }
+    {
+        Block block;
+        bool eos = false;
+        auto st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_TRUE(eos);
+        std::cout << block.dump_data() << std::endl;
+        std::cout << query_cache->get_element_count() << std::endl;
+        EXPECT_EQ(block.rows(), 5);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5})));
+        EXPECT_EQ(query_cache->get_element_count(), 0);
+    }
+    EXPECT_FALSE(source_local_state->_need_insert_cache);
+}
+
+TEST_F(QueryCacheOperatorTest, test_hit_cache) {
+    sink = std::make_unique<CacheSinkOperatorX>();
+    source = std::make_unique<CacheSourceOperatorX>();
+    EXPECT_TRUE(source->set_child(child_op));
+    child_op->_mock_row_desc.reset(
+            new MockRowDescriptor 
{{std::make_shared<vectorized::DataTypeInt64>()}, &pool});
+    TQueryCacheParam cache_param;
+    cache_param.node_id = 0;
+    cache_param.output_slot_mapping[0] = 0;
+    cache_param.tablet_to_range.insert({42, "test"});
+    cache_param.force_refresh_query_cache = false;
+    cache_param.entry_max_bytes = 1024 * 1024;
+    cache_param.entry_max_rows = 3;
+
+    {
+        int64_t version = 0;
+        std::string cache_key;
+        EXPECT_TRUE(QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version));
+        CacheResult result;
+        result.push_back(std::make_unique<Block>());
+        *result.back() = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 
4, 5});
+        query_cache->insert(cache_key, version, result, {0, 2, 3}, 1);
+    }
+
+    source->_cache_param = cache_param;
+    create_local_state();
+
+    std::cout << query_cache->get_element_count() << std::endl;
+    EXPECT_EQ(source_local_state->_slot_orders.size(), 1);
+    EXPECT_EQ(source_local_state->_slot_orders[0], 0);
+
+    {
+        auto block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5});
+        auto st = sink->sink(state.get(), &block, true);
+        EXPECT_TRUE(st.ok()) << st.msg();
+    }
+
+    {
+        Block block;
+        bool eos = false;
+        auto st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_FALSE(eos);
+        std::cout << block.dump_data() << std::endl;
+        std::cout << query_cache->get_element_count() << std::endl;
+        EXPECT_EQ(block.rows(), 5);
+        EXPECT_TRUE(ColumnHelper::block_equal(
+                block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4, 
5})));
+    }
+
+    {
+        Block block;
+        bool eos = false;
+        auto st = source->get_block(state.get(), &block, &eos);
+        EXPECT_TRUE(st.ok()) << st.msg();
+        EXPECT_TRUE(eos);
+        EXPECT_TRUE(block.empty());
+    }
+
+    query_cache_uptr.release();
+}
+
+} // namespace doris::pipeline
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to