This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 990ed6f167c [exec](query-cache) Support multiple tablets in
query-cache key building (#60989)
990ed6f167c is described below
commit 990ed6f167c188e0446ecfcfa0f172948f39d8c3
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 12 17:46:04 2026 +0800
[exec](query-cache) Support multiple tablets in query-cache key building
(#60989)
Support multiple tablets in query-cache key building
- What it does: Extend query cache key construction to support multiple
tablet scan ranges per operator — keys now include a sorted list of
tablet IDs and validate that all tablets share the same version and
tablet_to_range mapping.
- Key implementation changes: updated
be/src/runtime/query_cache/query_cache.h — QueryCache::build_cache_key
now accepts multiple TScanRangeParams, validates digest and
tablet_to_range, collects & sorts tablet IDs, enforces a single version
and identical tablet_to_range across those tablets, sets version to the
common value, and composes the cache key as digest + (binary tablet ids
in sorted order) + tablet_range.
- Operator changes & profiling:
be/src/exec/operator/cache_source_operator.cpp — removed the previous
single-scan-range restriction, call into the new build_cache_key, and
write a comma-separated sorted CacheTabletId string into the runtime
profile for debugging.
- Tests added/updated:
be/test/exec/operator/query_cache_operator_test.cpp — tests updated to
set digest and check Status return values;
be/test/exec/pipeline/query_cache_test.cpp — new
build_cache_key_multiple_tablets unit test covers success and failure
cases (different versions, mismatched tablet_to_range, missing tablets,
empty scan_ranges), and existing tests adjusted accordingly.
- Compatibility & risk: cache key format changes (now includes multiple
tablet ids in sorted binary form) — existing cache entries keyed with
the old single-tablet format will not match and may be "missed" until
re-populated; callers/planners that assumed exactly one scan range must
be reviewed. Recommended: run the updated unit tests and pipeline
regression tests; if rolling out, consider invalidating old cache
entries or coordinating versions.
---
be/src/exec/operator/cache_source_operator.cpp | 19 ++-
be/src/runtime/query_cache/query_cache.h | 85 ++++++++---
.../exec/operator/query_cache_operator_test.cpp | 10 +-
be/test/exec/pipeline/query_cache_test.cpp | 170 ++++++++++++++++++++-
4 files changed, 256 insertions(+), 28 deletions(-)
diff --git a/be/src/exec/operator/cache_source_operator.cpp
b/be/src/exec/operator/cache_source_operator.cpp
index d3ca3466773..91d48d77470 100644
--- a/be/src/exec/operator/cache_source_operator.cpp
+++ b/be/src/exec/operator/cache_source_operator.cpp
@@ -37,9 +37,6 @@ Status CacheSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
->data_queue.set_source_dependency(_shared_state->source_deps.front());
const auto& scan_ranges = info.scan_ranges;
bool hit_cache = false;
- if (scan_ranges.size() > 1) {
- return Status::InternalError("CacheSourceOperator only support one
scan range, plan error");
- }
const auto& cache_param =
_parent->cast<CacheSourceOperatorX>()._cache_param;
// 1. init the slot orders
@@ -59,8 +56,20 @@ Status CacheSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// 2. build cache key by digest_tablet_id
RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param,
&_cache_key, &_version));
- custom_profile()->add_info_string(
- "CacheTabletId",
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
+ std::vector<int64_t> cache_tablet_ids;
+ cache_tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+
cache_tablet_ids.push_back(scan_range.scan_range.palo_scan_range.tablet_id);
+ }
+ std::sort(cache_tablet_ids.begin(), cache_tablet_ids.end());
+ std::string tablet_ids_str;
+ for (size_t i = 0; i < cache_tablet_ids.size(); ++i) {
+ tablet_ids_str += std::to_string(cache_tablet_ids[i]);
+ if (i < cache_tablet_ids.size() - 1) {
+ tablet_ids_str += ",";
+ }
+ }
+ custom_profile()->add_info_string("CacheTabletId", tablet_ids_str);
// 3. lookup the cache and find proper slot order
hit_cache = _global_cache->lookup(_cache_key, _version,
&_query_cache_handle);
diff --git a/be/src/runtime/query_cache/query_cache.h
b/be/src/runtime/query_cache/query_cache.h
index 45c086c35cc..6fd8db2eaa2 100644
--- a/be/src/runtime/query_cache/query_cache.h
+++ b/be/src/runtime/query_cache/query_cache.h
@@ -109,27 +109,76 @@ public:
static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
const TQueryCacheParam& cache_param,
std::string* cache_key,
int64_t* version) {
- if (scan_ranges.size() > 1) {
- return Status::InternalError(
- "CacheSourceOperator only support one scan range, plan
error");
+ if (scan_ranges.empty()) {
+ return Status::InternalError("scan_ranges is empty, plan error");
}
- auto& scan_range = scan_ranges[0];
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
- std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
- scan_range.scan_range.palo_scan_range.version.data() +
-
scan_range.scan_range.palo_scan_range.version.size(),
- *version);
-
- auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
- if (find_tablet == cache_param.tablet_to_range.end()) {
- return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+
+ std::string digest;
+ try {
+ digest = cache_param.digest;
+ } catch (const std::exception&) {
+ return Status::InternalError("digest is invalid, plan error");
+ }
+ if (digest.empty()) {
+ return Status::InternalError("digest is empty, plan error");
+ }
+
+ if (cache_param.tablet_to_range.empty()) {
+ return Status::InternalError("tablet_to_range is empty, plan
error");
+ }
+
+ std::vector<int64_t> tablet_ids;
+ tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+ tablet_ids.push_back(tablet_id);
+ }
+ std::sort(tablet_ids.begin(), tablet_ids.end());
+
+ int64_t first_version = -1;
+ std::string first_tablet_range;
+ for (size_t i = 0; i < tablet_ids.size(); ++i) {
+ auto tablet_id = tablet_ids[i];
+
+ auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+ if (find_tablet == cache_param.tablet_to_range.end()) {
+ return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+ }
+
+ auto scan_range_iter =
+ std::find_if(scan_ranges.begin(), scan_ranges.end(),
+ [&tablet_id](const TScanRangeParams& range) {
+ return
range.scan_range.palo_scan_range.tablet_id == tablet_id;
+ });
+ int64_t current_version = -1;
+
std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(),
+
scan_range_iter->scan_range.palo_scan_range.version.data() +
+
scan_range_iter->scan_range.palo_scan_range.version.size(),
+ current_version);
+
+ if (i == 0) {
+ first_version = current_version;
+ first_tablet_range = find_tablet->second;
+ } else {
+ if (current_version != first_version) {
+ return Status::InternalError(
+ "All tablets in one instance must have the same
version, plan error");
+ }
+ if (find_tablet->second != first_tablet_range) {
+ return Status::InternalError(
+ "All tablets in one instance must have the same
tablet_to_range, plan "
+ "error");
+ }
+ }
}
- *cache_key = cache_param.digest +
- std::string(reinterpret_cast<char*>(&tablet_id),
sizeof(tablet_id)) +
- find_tablet->second;
+ *version = first_version;
+
+ *cache_key = digest;
+ for (auto tablet_id : tablet_ids) {
+ *cache_key += std::string(reinterpret_cast<char*>(&tablet_id),
sizeof(tablet_id));
+ }
+ *cache_key += first_tablet_range;
return Status::OK();
}
diff --git a/be/test/exec/operator/query_cache_operator_test.cpp
b/be/test/exec/operator/query_cache_operator_test.cpp
index 0f3ae8acce8..7f941281fe6 100644
--- a/be/test/exec/operator/query_cache_operator_test.cpp
+++ b/be/test/exec/operator/query_cache_operator_test.cpp
@@ -95,7 +95,7 @@ struct QueryCacheOperatorTest : public ::testing::Test {
.shared_state_map = {},
.task_idx = 0};
- EXPECT_TRUE(source_local_state_uptr->init(state.get(), info));
+ EXPECT_TRUE(source_local_state_uptr->init(state.get(), info).ok());
state->resize_op_id_to_local_state(-100);
state->emplace_local_state(source->operator_id(),
std::move(source_local_state_uptr));
}
@@ -138,6 +138,7 @@ TEST_F(QueryCacheOperatorTest, test_no_hit_cache1) {
new MockRowDescriptor {{std::make_shared<DataTypeInt64>()},
&pool});
TQueryCacheParam cache_param;
cache_param.node_id = 0;
+ cache_param.digest = "test_digest";
cache_param.output_slot_mapping[0] = 0;
cache_param.tablet_to_range.insert({42, "test"});
cache_param.force_refresh_query_cache = false;
@@ -178,6 +179,7 @@ TEST_F(QueryCacheOperatorTest, test_no_hit_cache2) {
new MockRowDescriptor {{std::make_shared<DataTypeInt64>()},
&pool});
TQueryCacheParam cache_param;
cache_param.node_id = 0;
+ cache_param.digest = "test_digest";
cache_param.output_slot_mapping[0] = 0;
cache_param.tablet_to_range.insert({42, "test"});
cache_param.force_refresh_query_cache = false;
@@ -218,6 +220,7 @@ TEST_F(QueryCacheOperatorTest, test_hit_cache) {
new MockRowDescriptor {{std::make_shared<DataTypeInt64>()},
&pool});
TQueryCacheParam cache_param;
cache_param.node_id = 0;
+ cache_param.digest = "test_digest";
cache_param.output_slot_mapping[0] = 0;
cache_param.tablet_to_range.insert({42, "test"});
cache_param.force_refresh_query_cache = false;
@@ -227,7 +230,8 @@ TEST_F(QueryCacheOperatorTest, test_hit_cache) {
{
int64_t version = 0;
std::string cache_key;
- EXPECT_TRUE(QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version));
+ EXPECT_TRUE(
+ QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version).ok());
CacheResult result;
result.push_back(std::make_unique<Block>());
*result.back() = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3,
4, 5});
@@ -270,4 +274,4 @@ TEST_F(QueryCacheOperatorTest, test_hit_cache) {
}
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/exec/pipeline/query_cache_test.cpp
b/be/test/exec/pipeline/query_cache_test.cpp
index a478aa52689..ec0f8c6c697 100644
--- a/be/test/exec/pipeline/query_cache_test.cpp
+++ b/be/test/exec/pipeline/query_cache_test.cpp
@@ -39,9 +39,24 @@ TEST_F(QueryCacheTest, create_global_cache) {
TEST_F(QueryCacheTest, build_cache_key) {
{
std::vector<TScanRangeParams> scan_ranges;
- scan_ranges.push_back({});
- scan_ranges.push_back({});
+ TScanRangeParams scan_range1;
+ TPaloScanRange palp_scan_range1;
+ palp_scan_range1.__set_tablet_id(1);
+ palp_scan_range1.__set_version("100");
+ scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+ scan_ranges.emplace_back(scan_range1);
+
+ TScanRangeParams scan_range2;
+ TPaloScanRange palp_scan_range2;
+ palp_scan_range2.__set_tablet_id(2);
+ palp_scan_range2.__set_version("100");
+ scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+ scan_ranges.emplace_back(scan_range2);
+
TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
+ cache_param.tablet_to_range.insert({1, "range_abc"});
+ cache_param.tablet_to_range.insert({2, "range_xyz"});
std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
@@ -58,6 +73,7 @@ TEST_F(QueryCacheTest, build_cache_key) {
scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
scan_ranges.push_back(scan_range);
TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
@@ -86,6 +102,156 @@ TEST_F(QueryCacheTest, build_cache_key) {
}
}
+TEST_F(QueryCacheTest, build_cache_key_multiple_tablets) {
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TScanRangeParams scan_range1;
+ TPaloScanRange palp_scan_range1;
+ palp_scan_range1.__set_tablet_id(3);
+ palp_scan_range1.__set_version("100");
+ scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+ scan_ranges.push_back(scan_range1);
+
+ TScanRangeParams scan_range2;
+ TPaloScanRange palp_scan_range2;
+ palp_scan_range2.__set_tablet_id(1);
+ palp_scan_range2.__set_version("100");
+ scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+ scan_ranges.push_back(scan_range2);
+
+ TScanRangeParams scan_range3;
+ TPaloScanRange palp_scan_range3;
+ palp_scan_range3.__set_tablet_id(2);
+ palp_scan_range3.__set_version("100");
+ scan_range3.scan_range.__set_palo_scan_range(palp_scan_range3);
+ scan_ranges.push_back(scan_range3);
+
+ TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
+ cache_param.tablet_to_range.insert({1, "range_abc"});
+ cache_param.tablet_to_range.insert({2, "range_abc"});
+ cache_param.tablet_to_range.insert({3, "range_abc"});
+
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(version, 100);
+
+ int64_t expected_tablet1 = 1;
+ int64_t expected_tablet2 = 2;
+ int64_t expected_tablet3 = 3;
+ std::string expected_key =
+ "test_digest" +
+ std::string(reinterpret_cast<char*>(&expected_tablet1),
sizeof(expected_tablet1)) +
+ std::string(reinterpret_cast<char*>(&expected_tablet2),
sizeof(expected_tablet2)) +
+ std::string(reinterpret_cast<char*>(&expected_tablet3),
sizeof(expected_tablet3)) +
+ "range_abc";
+
+ EXPECT_EQ(cache_key, expected_key);
+ }
+
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TScanRangeParams scan_range1;
+ TPaloScanRange palp_scan_range1;
+ palp_scan_range1.__set_tablet_id(1);
+ palp_scan_range1.__set_version("100");
+ scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+ scan_ranges.push_back(scan_range1);
+
+ TScanRangeParams scan_range2;
+ TPaloScanRange palp_scan_range2;
+ palp_scan_range2.__set_tablet_id(2);
+ palp_scan_range2.__set_version("200");
+ scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+ scan_ranges.push_back(scan_range2);
+
+ TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
+ cache_param.tablet_to_range.insert({1, "range_abc"});
+ cache_param.tablet_to_range.insert({2, "range_abc"});
+
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+
+ EXPECT_FALSE(st.ok());
+ EXPECT_TRUE(st.msg().find("same version") != std::string::npos);
+ }
+
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TScanRangeParams scan_range1;
+ TPaloScanRange palp_scan_range1;
+ palp_scan_range1.__set_tablet_id(1);
+ palp_scan_range1.__set_version("100");
+ scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+ scan_ranges.push_back(scan_range1);
+
+ TScanRangeParams scan_range2;
+ TPaloScanRange palp_scan_range2;
+ palp_scan_range2.__set_tablet_id(2);
+ palp_scan_range2.__set_version("100");
+ scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+ scan_ranges.push_back(scan_range2);
+
+ TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
+ cache_param.tablet_to_range.insert({1, "range_abc"});
+ cache_param.tablet_to_range.insert({2, "range_xyz"});
+
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+
+ EXPECT_FALSE(st.ok());
+ EXPECT_TRUE(st.msg().find("same tablet_to_range") !=
std::string::npos);
+ }
+
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TScanRangeParams scan_range1;
+ TPaloScanRange palp_scan_range1;
+ palp_scan_range1.__set_tablet_id(1);
+ palp_scan_range1.__set_version("100");
+ scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
+ scan_ranges.push_back(scan_range1);
+
+ TScanRangeParams scan_range2;
+ TPaloScanRange palp_scan_range2;
+ palp_scan_range2.__set_tablet_id(2);
+ palp_scan_range2.__set_version("100");
+ scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
+ scan_ranges.push_back(scan_range2);
+
+ TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
+ cache_param.tablet_to_range.insert({1, "range_abc"});
+ cache_param.tablet_to_range.insert({3, "range_abc"});
+
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+
+ EXPECT_FALSE(st.ok());
+ EXPECT_TRUE(st.msg().find("Not find tablet") != std::string::npos);
+ }
+
+ {
+ std::vector<TScanRangeParams> scan_ranges;
+ TQueryCacheParam cache_param;
+ cache_param.__set_digest("test_digest");
+ std::string cache_key;
+ int64_t version = 0;
+ auto st = QueryCache::build_cache_key(scan_ranges, cache_param,
&cache_key, &version);
+
+ EXPECT_FALSE(st.ok());
+ EXPECT_TRUE(st.msg().find("empty") != std::string::npos);
+ }
+}
+
TEST_F(QueryCacheTest, insert_and_lookup) {
std::unique_ptr<QueryCache> query_cache
{QueryCache::create_global_cache(1024 * 1024 * 1024)};
std::string cache_key = "be ut";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]