This is an automated email from the ASF dual-hosted git repository.

zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 990ed6f167c [exec](query-cache) Support multiple tablets in 
query-cache key building (#60989)
990ed6f167c is described below

commit 990ed6f167c188e0446ecfcfa0f172948f39d8c3
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 12 17:46:04 2026 +0800

    [exec](query-cache) Support multiple tablets in query-cache key building 
(#60989)
    
    Support multiple tablets in query-cache key building
    - What it does: Extend query cache key construction to support multiple
    tablet scan ranges per operator — keys now include a sorted list of
    tablet IDs and validate that all tablets share the same version and
    tablet_to_range mapping.
    - Key implementation changes: updated
    be/src/runtime/query_cache/query_cache.h — QueryCache::build_cache_key
    now accepts multiple TScanRangeParams, validates digest and
    tablet_to_range, collects & sorts tablet IDs, enforces a single version
    and identical tablet_to_range across those tablets, sets version to the
    common value, and composes the cache key as digest + (binary tablet ids
    in sorted order) + tablet_range.
    - Operator changes & profiling:
    be/src/exec/operator/cache_source_operator.cpp — removed the previous
    single-scan-range restriction, call into the new build_cache_key, and
    write a comma-separated sorted CacheTabletId string into the runtime
    profile for debugging.
    - Tests added/updated:
    be/test/exec/operator/query_cache_operator_test.cpp — tests updated to
    set digest and check Status return values;
    be/test/exec/pipeline/query_cache_test.cpp — new
    build_cache_key_multiple_tablets unit test covers success and failure
    cases (different versions, mismatched tablet_to_range, missing tablets,
    empty scan_ranges), and existing tests adjusted accordingly.
    - Compatibility & risk: cache key format changes (now includes multiple
    tablet ids in sorted binary form) — existing cache entries keyed with
    the old single-tablet format will not match and may be "missed" until
    re-populated; callers/planners that assumed exactly one scan range must
    be reviewed. Recommended: run the updated unit tests and pipeline
    regression tests; if rolling out, consider invalidating old cache
    entries or coordinating versions.
---
 be/src/exec/operator/cache_source_operator.cpp     |  19 ++-
 be/src/runtime/query_cache/query_cache.h           |  85 ++++++++---
 .../exec/operator/query_cache_operator_test.cpp    |  10 +-
 be/test/exec/pipeline/query_cache_test.cpp         | 170 ++++++++++++++++++++-
 4 files changed, 256 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/operator/cache_source_operator.cpp 
b/be/src/exec/operator/cache_source_operator.cpp
index d3ca3466773..91d48d77470 100644
--- a/be/src/exec/operator/cache_source_operator.cpp
+++ b/be/src/exec/operator/cache_source_operator.cpp
@@ -37,9 +37,6 @@ Status CacheSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
             
->data_queue.set_source_dependency(_shared_state->source_deps.front());
     const auto& scan_ranges = info.scan_ranges;
     bool hit_cache = false;
-    if (scan_ranges.size() > 1) {
-        return Status::InternalError("CacheSourceOperator only support one 
scan range, plan error");
-    }
 
     const auto& cache_param = 
_parent->cast<CacheSourceOperatorX>()._cache_param;
     // 1. init the slot orders
@@ -59,8 +56,20 @@ Status CacheSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
 
     // 2. build cache key by digest_tablet_id
     RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param, 
&_cache_key, &_version));
-    custom_profile()->add_info_string(
-            "CacheTabletId", 
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
+    std::vector<int64_t> cache_tablet_ids;
+    cache_tablet_ids.reserve(scan_ranges.size());
+    for (const auto& scan_range : scan_ranges) {
+        
cache_tablet_ids.push_back(scan_range.scan_range.palo_scan_range.tablet_id);
+    }
+    std::sort(cache_tablet_ids.begin(), cache_tablet_ids.end());
+    std::string tablet_ids_str;
+    for (size_t i = 0; i < cache_tablet_ids.size(); ++i) {
+        tablet_ids_str += std::to_string(cache_tablet_ids[i]);
+        if (i < cache_tablet_ids.size() - 1) {
+            tablet_ids_str += ",";
+        }
+    }
+    custom_profile()->add_info_string("CacheTabletId", tablet_ids_str);
 
     // 3. lookup the cache and find proper slot order
     hit_cache = _global_cache->lookup(_cache_key, _version, 
&_query_cache_handle);
diff --git a/be/src/runtime/query_cache/query_cache.h 
b/be/src/runtime/query_cache/query_cache.h
index 45c086c35cc..6fd8db2eaa2 100644
--- a/be/src/runtime/query_cache/query_cache.h
+++ b/be/src/runtime/query_cache/query_cache.h
@@ -109,27 +109,76 @@ public:
     static Status build_cache_key(const std::vector<TScanRangeParams>& 
scan_ranges,
                                   const TQueryCacheParam& cache_param, 
std::string* cache_key,
                                   int64_t* version) {
-        if (scan_ranges.size() > 1) {
-            return Status::InternalError(
-                    "CacheSourceOperator only support one scan range, plan 
error");
+        if (scan_ranges.empty()) {
+            return Status::InternalError("scan_ranges is empty, plan error");
         }
-        auto& scan_range = scan_ranges[0];
-        DCHECK(scan_range.scan_range.__isset.palo_scan_range);
-        auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
-        std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
-                        scan_range.scan_range.palo_scan_range.version.data() +
-                                
scan_range.scan_range.palo_scan_range.version.size(),
-                        *version);
-
-        auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
-        if (find_tablet == cache_param.tablet_to_range.end()) {
-            return Status::InternalError("Not find tablet in 
partition_to_tablets, plan error");
+
+        std::string digest;
+        try {
+            digest = cache_param.digest;
+        } catch (const std::exception&) {
+            return Status::InternalError("digest is invalid, plan error");
+        }
+        if (digest.empty()) {
+            return Status::InternalError("digest is empty, plan error");
+        }
+
+        if (cache_param.tablet_to_range.empty()) {
+            return Status::InternalError("tablet_to_range is empty, plan 
error");
+        }
+
+        std::vector<int64_t> tablet_ids;
+        tablet_ids.reserve(scan_ranges.size());
+        for (const auto& scan_range : scan_ranges) {
+            auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+            tablet_ids.push_back(tablet_id);
+        }
+        std::sort(tablet_ids.begin(), tablet_ids.end());
+
+        int64_t first_version = -1;
+        std::string first_tablet_range;
+        for (size_t i = 0; i < tablet_ids.size(); ++i) {
+            auto tablet_id = tablet_ids[i];
+
+            auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+            if (find_tablet == cache_param.tablet_to_range.end()) {
+                return Status::InternalError("Not find tablet in 
partition_to_tablets, plan error");
+            }
+
+            auto scan_range_iter =
+                    std::find_if(scan_ranges.begin(), scan_ranges.end(),
+                                 [&tablet_id](const TScanRangeParams& range) {
+                                     return 
range.scan_range.palo_scan_range.tablet_id == tablet_id;
+                                 });
+            int64_t current_version = -1;
+            
std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(),
+                            
scan_range_iter->scan_range.palo_scan_range.version.data() +
+                                    
scan_range_iter->scan_range.palo_scan_range.version.size(),
+                            current_version);
+
+            if (i == 0) {
+                first_version = current_version;
+                first_tablet_range = find_tablet->second;
+            } else {
+                if (current_version != first_version) {
+                    return Status::InternalError(
+                            "All tablets in one instance must have the same 
version, plan error");
+                }
+                if (find_tablet->second != first_tablet_range) {
+                    return Status::InternalError(
+                            "All tablets in one instance must have the same 
tablet_to_range, plan "
+                            "error");
+                }
+            }
         }
 
-        *cache_key = cache_param.digest +
-                     std::string(reinterpret_cast<char*>(&tablet_id), 
sizeof(tablet_id)) +
-                     find_tablet->second;
+        *version = first_version;
+
+        *cache_key = digest;
+        for (auto tablet_id : tablet_ids) {
+            *cache_key += std::string(reinterpret_cast<char*>(&tablet_id), 
sizeof(tablet_id));
+        }
+        *cache_key += first_tablet_range;
 
         return Status::OK();
     }
diff --git a/be/test/exec/operator/query_cache_operator_test.cpp 
b/be/test/exec/operator/query_cache_operator_test.cpp
index 0f3ae8acce8..7f941281fe6 100644
--- a/be/test/exec/operator/query_cache_operator_test.cpp
+++ b/be/test/exec/operator/query_cache_operator_test.cpp
@@ -95,7 +95,7 @@ struct QueryCacheOperatorTest : public ::testing::Test {
                                  .shared_state_map = {},
                                  .task_idx = 0};
 
-            EXPECT_TRUE(source_local_state_uptr->init(state.get(), info));
+            EXPECT_TRUE(source_local_state_uptr->init(state.get(), info).ok());
             state->resize_op_id_to_local_state(-100);
             state->emplace_local_state(source->operator_id(), 
std::move(source_local_state_uptr));
         }
@@ -138,6 +138,7 @@ TEST_F(QueryCacheOperatorTest, test_no_hit_cache1) {
             new MockRowDescriptor {{std::make_shared<DataTypeInt64>()}, 
&pool});
     TQueryCacheParam cache_param;
     cache_param.node_id = 0;
+    cache_param.digest = "test_digest";
     cache_param.output_slot_mapping[0] = 0;
     cache_param.tablet_to_range.insert({42, "test"});
     cache_param.force_refresh_query_cache = false;
@@ -178,6 +179,7 @@ TEST_F(QueryCacheOperatorTest, test_no_hit_cache2) {
             new MockRowDescriptor {{std::make_shared<DataTypeInt64>()}, 
&pool});
     TQueryCacheParam cache_param;
     cache_param.node_id = 0;
+    cache_param.digest = "test_digest";
     cache_param.output_slot_mapping[0] = 0;
     cache_param.tablet_to_range.insert({42, "test"});
     cache_param.force_refresh_query_cache = false;
@@ -218,6 +220,7 @@ TEST_F(QueryCacheOperatorTest, test_hit_cache) {
             new MockRowDescriptor {{std::make_shared<DataTypeInt64>()}, 
&pool});
     TQueryCacheParam cache_param;
     cache_param.node_id = 0;
+    cache_param.digest = "test_digest";
     cache_param.output_slot_mapping[0] = 0;
     cache_param.tablet_to_range.insert({42, "test"});
     cache_param.force_refresh_query_cache = false;
@@ -227,7 +230,8 @@ TEST_F(QueryCacheOperatorTest, test_hit_cache) {
     {
         int64_t version = 0;
         std::string cache_key;
-        EXPECT_TRUE(QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version));
+        EXPECT_TRUE(
+                QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version).ok());
         CacheResult result;
         result.push_back(std::make_unique<Block>());
         *result.back() = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 
4, 5});
@@ -270,4 +274,4 @@ TEST_F(QueryCacheOperatorTest, test_hit_cache) {
     }
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/pipeline/query_cache_test.cpp 
b/be/test/exec/pipeline/query_cache_test.cpp
index a478aa52689..ec0f8c6c697 100644
--- a/be/test/exec/pipeline/query_cache_test.cpp
+++ b/be/test/exec/pipeline/query_cache_test.cpp
@@ -39,9 +39,24 @@ TEST_F(QueryCacheTest, create_global_cache) {
 TEST_F(QueryCacheTest, build_cache_key) {
     {
         std::vector<TScanRangeParams> scan_ranges;
-        scan_ranges.push_back({});
-        scan_ranges.push_back({});
+        TScanRangeParams scan_range1;
+        TPaloScanRange palp_scan_range1;
+        palp_scan_range1.__set_tablet_id(1);
+        palp_scan_range1.__set_version("100");
+        scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+        scan_ranges.emplace_back(scan_range1);
+
+        TScanRangeParams scan_range2;
+        TPaloScanRange palp_scan_range2;
+        palp_scan_range2.__set_tablet_id(2);
+        palp_scan_range2.__set_version("100");
+        scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+        scan_ranges.emplace_back(scan_range2);
+
         TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
+        cache_param.tablet_to_range.insert({1, "range_abc"});
+        cache_param.tablet_to_range.insert({2, "range_xyz"});
         std::string cache_key;
         int64_t version = 0;
         auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
@@ -58,6 +73,7 @@ TEST_F(QueryCacheTest, build_cache_key) {
         scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
         scan_ranges.push_back(scan_range);
         TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
         std::string cache_key;
         int64_t version = 0;
         auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
@@ -86,6 +102,156 @@ TEST_F(QueryCacheTest, build_cache_key) {
     }
 }
 
+TEST_F(QueryCacheTest, build_cache_key_multiple_tablets) {
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TScanRangeParams scan_range1;
+        TPaloScanRange palp_scan_range1;
+        palp_scan_range1.__set_tablet_id(3);
+        palp_scan_range1.__set_version("100");
+        scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+        scan_ranges.push_back(scan_range1);
+
+        TScanRangeParams scan_range2;
+        TPaloScanRange palp_scan_range2;
+        palp_scan_range2.__set_tablet_id(1);
+        palp_scan_range2.__set_version("100");
+        scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+        scan_ranges.push_back(scan_range2);
+
+        TScanRangeParams scan_range3;
+        TPaloScanRange palp_scan_range3;
+        palp_scan_range3.__set_tablet_id(2);
+        palp_scan_range3.__set_version("100");
+        scan_range3.scan_range.__set_palo_scan_range(palp_scan_range3);
+        scan_ranges.push_back(scan_range3);
+
+        TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
+        cache_param.tablet_to_range.insert({1, "range_abc"});
+        cache_param.tablet_to_range.insert({2, "range_abc"});
+        cache_param.tablet_to_range.insert({3, "range_abc"});
+
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+
+        EXPECT_TRUE(st.ok());
+        EXPECT_EQ(version, 100);
+
+        int64_t expected_tablet1 = 1;
+        int64_t expected_tablet2 = 2;
+        int64_t expected_tablet3 = 3;
+        std::string expected_key =
+                "test_digest" +
+                std::string(reinterpret_cast<char*>(&expected_tablet1), 
sizeof(expected_tablet1)) +
+                std::string(reinterpret_cast<char*>(&expected_tablet2), 
sizeof(expected_tablet2)) +
+                std::string(reinterpret_cast<char*>(&expected_tablet3), 
sizeof(expected_tablet3)) +
+                "range_abc";
+
+        EXPECT_EQ(cache_key, expected_key);
+    }
+
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TScanRangeParams scan_range1;
+        TPaloScanRange palp_scan_range1;
+        palp_scan_range1.__set_tablet_id(1);
+        palp_scan_range1.__set_version("100");
+        scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+        scan_ranges.push_back(scan_range1);
+
+        TScanRangeParams scan_range2;
+        TPaloScanRange palp_scan_range2;
+        palp_scan_range2.__set_tablet_id(2);
+        palp_scan_range2.__set_version("200");
+        scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+        scan_ranges.push_back(scan_range2);
+
+        TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
+        cache_param.tablet_to_range.insert({1, "range_abc"});
+        cache_param.tablet_to_range.insert({2, "range_abc"});
+
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+
+        EXPECT_FALSE(st.ok());
+        EXPECT_TRUE(st.msg().find("same version") != std::string::npos);
+    }
+
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TScanRangeParams scan_range1;
+        TPaloScanRange palp_scan_range1;
+        palp_scan_range1.__set_tablet_id(1);
+        palp_scan_range1.__set_version("100");
+        scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+        scan_ranges.push_back(scan_range1);
+
+        TScanRangeParams scan_range2;
+        TPaloScanRange palp_scan_range2;
+        palp_scan_range2.__set_tablet_id(2);
+        palp_scan_range2.__set_version("100");
+        scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+        scan_ranges.push_back(scan_range2);
+
+        TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
+        cache_param.tablet_to_range.insert({1, "range_abc"});
+        cache_param.tablet_to_range.insert({2, "range_xyz"});
+
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+
+        EXPECT_FALSE(st.ok());
+        EXPECT_TRUE(st.msg().find("same tablet_to_range") != 
std::string::npos);
+    }
+
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TScanRangeParams scan_range1;
+        TPaloScanRange palp_scan_range1;
+        palp_scan_range1.__set_tablet_id(1);
+        palp_scan_range1.__set_version("100");
+        scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+        scan_ranges.push_back(scan_range1);
+
+        TScanRangeParams scan_range2;
+        TPaloScanRange palp_scan_range2;
+        palp_scan_range2.__set_tablet_id(2);
+        palp_scan_range2.__set_version("100");
+        scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+        scan_ranges.push_back(scan_range2);
+
+        TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
+        cache_param.tablet_to_range.insert({1, "range_abc"});
+        cache_param.tablet_to_range.insert({3, "range_abc"});
+
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+
+        EXPECT_FALSE(st.ok());
+        EXPECT_TRUE(st.msg().find("Not find tablet") != std::string::npos);
+    }
+
+    {
+        std::vector<TScanRangeParams> scan_ranges;
+        TQueryCacheParam cache_param;
+        cache_param.__set_digest("test_digest");
+        std::string cache_key;
+        int64_t version = 0;
+        auto st = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+
+        EXPECT_FALSE(st.ok());
+        EXPECT_TRUE(st.msg().find("empty") != std::string::npos);
+    }
+}
+
 TEST_F(QueryCacheTest, insert_and_lookup) {
     std::unique_ptr<QueryCache> query_cache 
{QueryCache::create_global_cache(1024 * 1024 * 1024)};
     std::string cache_key = "be ut";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to