This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 3b2d91a5ae8 [Exec](Cache) Support condition cache in Apache doris
(#61385)
3b2d91a5ae8 is described below
commit 3b2d91a5ae803cf800d9c39e1fee71d1734ceefc
Author: HappenLee <[email protected]>
AuthorDate: Wed Mar 18 14:35:47 2026 +0800
[Exec](Cache) Support condition cache in Apache doris (#61385)
### What problem does this PR solve?
cherry pick condition cache code
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/exprs/hybrid_set.h | 38 +++
be/src/olap/iterators.h | 24 +-
be/src/olap/olap_common.h | 4 +
be/src/olap/parallel_scanner_builder.cpp | 80 ++++-
be/src/olap/parallel_scanner_builder.h | 4 +-
be/src/olap/rowset/beta_rowset_reader.cpp | 16 +
be/src/olap/rowset/rowset_reader_context.h | 2 +
be/src/olap/rowset/segment_v2/condition_cache.cpp | 52 +++
be/src/olap/rowset/segment_v2/condition_cache.h | 137 ++++++++
be/src/olap/rowset/segment_v2/row_ranges.h | 14 +
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 116 ++++++-
be/src/olap/rowset/segment_v2/segment_iterator.h | 9 +-
be/src/olap/tablet_reader.cpp | 1 +
be/src/olap/tablet_reader.h | 2 +
be/src/pipeline/exec/olap_scan_operator.cpp | 6 +-
be/src/pipeline/exec/olap_scan_operator.h | 4 +
be/src/pipeline/exec/scan_operator.cpp | 20 +-
be/src/pipeline/exec/scan_operator.h | 16 +-
be/src/runtime/exec_env.h | 3 +
be/src/runtime/exec_env_init.cpp | 10 +-
be/src/runtime/memory/cache_policy.h | 5 +
be/src/util/doris_metrics.cpp | 9 +
be/src/util/doris_metrics.h | 4 +
be/src/vec/columns/column_const.h | 7 +-
be/src/vec/exec/scan/olap_scanner.cpp | 7 +-
be/src/vec/exprs/vbitmap_predicate.h | 3 +
be/src/vec/exprs/vbloom_predicate.cpp | 11 +
be/src/vec/exprs/vbloom_predicate.h | 2 +
be/src/vec/exprs/vcast_expr.h | 9 +
be/src/vec/exprs/vdirect_in_predicate.h | 8 +
be/src/vec/exprs/vexpr.cpp | 19 ++
be/src/vec/exprs/vexpr.h | 2 +
be/src/vec/exprs/vexpr_context.cpp | 5 +-
be/src/vec/exprs/vexpr_context.h | 2 +
be/src/vec/exprs/vin_predicate.h | 2 +
be/src/vec/exprs/vlambda_function_call_expr.h | 2 +
be/src/vec/exprs/vlambda_function_expr.h | 2 +
be/src/vec/exprs/vliteral.cpp | 5 +
be/src/vec/exprs/vliteral.h | 2 +
be/src/vec/exprs/vruntimefilter_wrapper.h | 9 +
be/src/vec/exprs/vslot_ref.cpp | 9 +
be/src/vec/exprs/vslot_ref.h | 3 +
.../java/org/apache/doris/qe/SessionVariable.java | 22 ++
gensrc/thrift/PaloInternalService.thrift | 4 +-
.../data/query_p0/cache/condition_cache.out | 73 +++++
.../test_ngram_bloomfilter_index_change.groovy | 3 +-
.../suites/query_p0/cache/condition_cache.groovy | 354 +++++++++++++++++++++
49 files changed, 1091 insertions(+), 56 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fc299779c8d..372a33dc886 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1207,6 +1207,9 @@ DEFINE_Int32(inverted_index_query_cache_shards, "256");
// inverted index match bitmap cache size
DEFINE_String(inverted_index_query_cache_limit, "10%");
+// condition cache limit
+DEFINE_Int16(condition_cache_limit, "512");
+
// inverted index
DEFINE_mDouble(inverted_index_ram_buffer_size, "512");
// -1 indicates not working.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 73922709283..f8416557a2e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1258,6 +1258,9 @@ DECLARE_Int32(inverted_index_query_cache_shards);
// inverted index match bitmap cache size
DECLARE_String(inverted_index_query_cache_limit);
+// condition cache limit
+DECLARE_Int16(condition_cache_limit);
+
// inverted index
DECLARE_mDouble(inverted_index_ram_buffer_size);
DECLARE_mInt32(inverted_index_max_buffered_docs);
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 27b32a4325e..6bfee554ce0 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -18,6 +18,7 @@
#pragma once
#include <gen_cpp/internal_service.pb.h>
+#include <pdqsort.h>
#include "common/object_pool.h"
#include "exprs/filter_base.h"
@@ -259,6 +260,7 @@ public:
const uint8_t* __restrict filter
= nullptr) = 0;
virtual void to_pb(PInFilter* filter) = 0;
+ virtual uint64_t get_digest(uint64_t seed) = 0;
class IteratorBase {
public:
@@ -431,6 +433,21 @@ public:
void to_pb(PInFilter* filter) override { set_pb(filter,
get_convertor<ElementType>()); }
+ uint64_t get_digest(uint64_t seed) override {
+ std::vector<ElementType> elems(_set.begin(), _set.end());
+ pdqsort(elems.begin(), elems.end());
+ if constexpr (std::is_same<ElementType, bool>::value) {
+ for (const auto& v : elems) {
+ seed = HashUtil::crc_hash64(&v, sizeof(v), seed);
+ }
+ } else {
+ seed = HashUtil::crc_hash64(elems.data(),
+ (uint32_t)(elems.size() *
sizeof(ElementType)), seed);
+ }
+
+ return HashUtil::crc_hash64(&_contain_null, sizeof(_contain_null),
seed);
+ }
+
private:
ContainerType _set;
ObjectPool _pool;
@@ -626,6 +643,16 @@ public:
void to_pb(PInFilter* filter) override { set_pb(filter,
get_convertor<std::string>()); }
+ uint64_t get_digest(uint64_t seed) override {
+ std::vector<StringRef> elems(_set.begin(), _set.end());
+ pdqsort(elems.begin(), elems.end());
+
+ for (const auto& v : elems) {
+ seed = HashUtil::crc_hash64(v.data, (uint32_t)v.size, seed);
+ }
+ return HashUtil::crc_hash64(&_contain_null, sizeof(_contain_null),
seed);
+ }
+
private:
ContainerType _set;
ObjectPool _pool;
@@ -824,6 +851,17 @@ public:
throw Exception(ErrorCode::INTERNAL_ERROR, "StringValueSet do not
support to_pb");
}
+ uint64_t get_digest(uint64_t seed) override {
+ std::vector<StringRef> elems(_set.begin(), _set.end());
+ pdqsort(elems.begin(), elems.end());
+
+ for (const auto& v : elems) {
+ seed = HashUtil::crc_hash64(v.data, (uint32_t)v.size, seed);
+ }
+
+ return HashUtil::crc_hash64(&_contain_null, sizeof(_contain_null),
seed);
+ }
+
private:
ContainerType _set;
ObjectPool _pool;
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index ea264e9f746..efbb0a2e00b 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -25,6 +25,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
+#include "olap/row_cursor.h"
#include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/tablet_schema.h"
@@ -35,7 +36,6 @@
namespace doris {
-class RowCursor;
class Schema;
class ColumnPredicate;
@@ -70,6 +70,22 @@ public:
const RowCursor* upper_key = nullptr;
// whether `upper_key` is included in the range
bool include_upper;
+
+ uint64_t get_digest(uint64_t seed) const {
+ if (lower_key != nullptr) {
+ auto key_str = lower_key->to_string();
+ seed = HashUtil::hash64(key_str.c_str(), key_str.size(), seed);
+ seed = HashUtil::hash64(&include_lower, sizeof(include_lower),
seed);
+ }
+
+ if (upper_key != nullptr) {
+ auto key_str = upper_key->to_string();
+ seed = HashUtil::hash64(key_str.c_str(), key_str.size(), seed);
+ seed = HashUtil::hash64(&include_upper, sizeof(include_upper),
seed);
+ }
+
+ return seed;
+ }
};
// reader's key ranges, empty if not existed.
@@ -132,6 +148,12 @@ public:
std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
+
+ // Cache for sparse column data to avoid redundant reads
+ // col_unique_id -> cached column_ptr
+ std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
+
+ uint64_t condition_cache_digest = 0;
};
struct CompactionSampleInfo {
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 9768bb2f7e0..9b236464743 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -402,6 +402,10 @@ struct OlapReaderStatistics {
int64_t output_index_result_column_timer = 0;
// number of segment filtered by column stat when creating seg iterator
int64_t filtered_segment_number = 0;
+ // number of segment with condition cache hit
+ int64_t condition_cache_hit_seg_nums = 0;
+ // number of rows filtered by condition cache hit
+ int64_t condition_cache_filtered_rows = 0;
// total number of segment
int64_t total_segment_number = 0;
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index 6c76cd3cc77..a978cb56336 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -35,7 +35,7 @@ using namespace vectorized;
Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>&
scanners) {
RETURN_IF_ERROR(_load());
- if (_optimize_index_scan_parallelism) {
+ if (_scan_parallelism_by_segment) {
return _build_scanners_by_segment(scanners);
} else if (_is_dup_mow_key) {
// Default strategy for DUP/MOW tables: split by rowids within segments
@@ -88,7 +88,7 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
auto rows_need = _rows_per_scanner - rows_collected;
// 0.9: try to avoid splitting the segments into
excessively small parts.
- if (rows_need >= remaining_rows * 0.9) {
+ if (rows_need >= remaining_rows * 9 / 10) {
rows_need = remaining_rows;
}
DCHECK_LE(rows_need, remaining_rows);
@@ -171,6 +171,8 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
// for the involved tablets. It preserves delete predicates and key ranges,
and clones
// RowsetReader per scanner to avoid sharing between scanners.
Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>&
scanners) {
+ DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
+
for (auto&& [tablet, version] : _tablets) {
DCHECK(_all_read_sources.contains(tablet->tablet_id()));
auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
@@ -180,8 +182,10 @@ Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
}
- // For each RowSet split in the read source, split by segment id and
build
- // one scanner per segment. Keep delete predicates shared.
+ // Collect segments into scanners based on rows count instead of one
scanner per segment
+ TabletReadSource partitial_read_source;
+ int64_t rows_collected = 0;
+
for (auto& rs_split : entire_read_source.rs_splits) {
auto reader = rs_split.rs_reader;
auto rowset = reader->rowset();
@@ -192,23 +196,65 @@ Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
continue;
}
- // Build scanners for [i, i+1) segment range, without row-range
slicing.
- for (int64_t i = 0; i < rowset->num_segments(); ++i) {
- RowSetSplits split(reader->clone());
- split.segment_offsets.first = i;
- split.segment_offsets.second = i + 1;
- // No row-ranges slicing; scan whole segment i.
- DCHECK_GE(split.segment_offsets.second,
split.segment_offsets.first + 1);
+ int64_t segment_start = 0;
+ auto split = RowSetSplits(reader->clone());
+
+ for (size_t i = 0; i < segments_rows.size(); ++i) {
+ const size_t rows_of_segment = segments_rows[i];
+
+ // Check if adding this segment would exceed rows_per_scanner
+ // 0.9: try to avoid splitting the segments into excessively
small parts.
+ if (rows_collected > 0 && (rows_collected + rows_of_segment >
_rows_per_scanner &&
+ rows_collected < _rows_per_scanner
* 9 / 10)) {
+ // Create a new scanner with collected segments
+ split.segment_offsets.first = segment_start;
+ split.segment_offsets.second =
+ i; // Range is [segment_start, i), including all
segments from segment_start to i-1
+
+ DCHECK_GT(split.segment_offsets.second,
split.segment_offsets.first);
+
+
partitial_read_source.rs_splits.emplace_back(std::move(split));
+
+ scanners.emplace_back(_build_scanner(
+ tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
+
+ // Reset for next scanner
+ partitial_read_source = TabletReadSource();
+ split = RowSetSplits(reader->clone());
+ segment_start = i;
+ rows_collected = 0;
+ }
+
+ // Add current segment to the current scanner
+ rows_collected += rows_of_segment;
+ }
- TabletReadSource partitial_read_source;
+ // Add remaining segments in this rowset to a scanner
+ if (rows_collected > 0) {
+ split.segment_offsets.first = segment_start;
+ split.segment_offsets.second = segments_rows.size();
+ DCHECK_GT(split.segment_offsets.second,
split.segment_offsets.first);
partitial_read_source.rs_splits.emplace_back(std::move(split));
+ }
+ }
- scanners.emplace_back(
- _build_scanner(tablet, version, _key_ranges,
- {.rs_splits =
std::move(partitial_read_source.rs_splits),
- .delete_predicates =
entire_read_source.delete_predicates,
- .delete_bitmap =
entire_read_source.delete_bitmap}));
+ // Add remaining segments across all rowsets to a scanner
+ if (rows_collected > 0) {
+ DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
+#ifndef NDEBUG
+ for (auto& split : partitial_read_source.rs_splits) {
+ DCHECK(split.rs_reader != nullptr);
+ DCHECK_LT(split.segment_offsets.first,
split.segment_offsets.second);
}
+#endif
+ scanners.emplace_back(
+ _build_scanner(tablet, version, _key_ranges,
+ {.rs_splits =
std::move(partitial_read_source.rs_splits),
+ .delete_predicates =
entire_read_source.delete_predicates,
+ .delete_bitmap =
entire_read_source.delete_bitmap}));
}
}
diff --git a/be/src/olap/parallel_scanner_builder.h
b/be/src/olap/parallel_scanner_builder.h
index de1ea33b149..7c57711bc70 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -65,7 +65,7 @@ public:
void set_min_rows_per_scanner(int64_t size) { _min_rows_per_scanner =
size; }
- void set_optimize_index_scan_parallelism(bool v) {
_optimize_index_scan_parallelism = v; }
+ void set_scan_parallelism_by_segment(bool v) {
_scan_parallelism_by_segment = v; }
const OlapReaderStatistics* builder_stats() const { return
&_builder_stats; }
@@ -96,7 +96,7 @@ private:
std::map<RowsetId, std::vector<size_t>> _all_segments_rows;
// Force building one scanner per segment when true.
- bool _optimize_index_scan_parallelism {false};
+ bool _scan_parallelism_by_segment {false};
std::shared_ptr<RuntimeProfile> _scanner_profile;
OlapReaderStatistics _builder_stats;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0b44597ed95..43f7f89adea 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -146,6 +146,10 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
read_columns.push_back(cid);
}
}
+ // disable condition cache if you have delete condition
+ _read_context->condition_cache_digest =
+ delete_columns_set.empty() ? _read_context->condition_cache_digest
: 0;
+ // create segment iterators
VLOG_NOTICE << "read columns size: " << read_columns.size();
_input_schema =
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
// output_schema only contains return_columns (excludes extra columns like
delete-predicate columns).
@@ -224,6 +228,14 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_context->runtime_state->query_options().disable_file_cache;
}
+ if (_read_context->condition_cache_digest) {
+ for (const auto& key_range : _read_options.key_ranges) {
+ _read_context->condition_cache_digest =
+
key_range.get_digest(_read_context->condition_cache_digest);
+ }
+ _read_options.condition_cache_digest =
_read_context->condition_cache_digest;
+ }
+
_read_options.io_ctx.expiration_time =
read_context->ttl_seconds > 0 &&
_rowset->rowset_meta()->newest_write_timestamp() > 0
? _rowset->rowset_meta()->newest_write_timestamp() +
read_context->ttl_seconds
@@ -275,6 +287,10 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
+ if (local_options.condition_cache_digest) {
+ local_options.condition_cache_digest =
+
local_options.row_ranges.get_digest(local_options.condition_cache_digest);
+ }
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i,
should_use_cache,
_input_schema,
local_options);
}
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index acf18cf86a4..846c721ca34 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -98,6 +98,8 @@ struct RowsetReaderContext {
std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
+
+ uint64_t condition_cache_digest = 0;
};
} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/condition_cache.cpp
b/be/src/olap/rowset/segment_v2/condition_cache.cpp
new file mode 100644
index 00000000000..f4ebd2584fc
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/condition_cache.cpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/condition_cache.h"
+
+#include <memory>
+
+#include "util/defer_op.h"
+
+namespace doris::segment_v2 {
+
+bool ConditionCache::lookup(const CacheKey& key, ConditionCacheHandle* handle)
{
+ if (key.encode().empty()) {
+ return false;
+ }
+ auto* lru_handle = LRUCachePolicy::lookup(key.encode());
+ if (lru_handle == nullptr) {
+ return false;
+ }
+ *handle = ConditionCacheHandle(this, lru_handle);
+ return true;
+}
+
+void ConditionCache::insert(const CacheKey& key,
std::shared_ptr<std::vector<bool>> result) {
+ if (key.encode().empty()) {
+ return;
+ }
+ std::unique_ptr<ConditionCache::CacheValue> cache_value_ptr =
+ std::make_unique<ConditionCache::CacheValue>();
+ cache_value_ptr->filter_result = result;
+
+ ConditionCacheHandle(
+ this,
+ LRUCachePolicy::insert(key.encode(),
(void*)cache_value_ptr.release(),
+ result->capacity(), result->capacity(),
CachePriority::NORMAL));
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/condition_cache.h
b/be/src/olap/rowset/segment_v2/condition_cache.h
new file mode 100644
index 00000000000..968f67df825
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/condition_cache.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <roaring/roaring.hh>
+#include <string>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "olap/lru_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/lru_cache_policy.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+#include "util/time.h"
+
+namespace doris::segment_v2 {
+
+class ConditionCacheHandle;
+
+class ConditionCache : public LRUCachePolicy {
+public:
+ using LRUCachePolicy::insert;
+
+ // The cache key or segment lru cache
+ struct CacheKey {
+ CacheKey(RowsetId rowset_id_, int64_t segment_id_, uint64_t digest_)
+ : rowset_id(rowset_id_), segment_id(segment_id_),
digest(digest_) {}
+ RowsetId rowset_id;
+ int64_t segment_id;
+ uint64_t digest;
+
+ // Encode to a flat binary which can be used as LRUCache's key
+ [[nodiscard]] std::string encode() const {
+ char buf[16];
+ memcpy(buf, &segment_id, 8);
+ memcpy(buf + 8, &digest, 8);
+
+ return rowset_id.to_string() + std::string(buf, 16);
+ }
+ };
+
+ class CacheValue : public LRUCacheValueBase {
+ public:
+ std::shared_ptr<std::vector<bool>> filter_result;
+ };
+
+ // Create global instance of this class
+ static ConditionCache* create_global_cache(size_t capacity, uint32_t
num_shards = 16) {
+ auto* res = new ConditionCache(capacity, num_shards);
+ return res;
+ }
+
+ // Return global instance.
+ // Client should call create_global_cache before.
+ static ConditionCache* instance() { return
ExecEnv::GetInstance()->get_condition_cache(); }
+
+ ConditionCache() = delete;
+
+ ConditionCache(size_t capacity, uint32_t num_shards)
+ : LRUCachePolicy(CachePolicy::CacheType::CONDITION_CACHE,
capacity, LRUCacheType::SIZE,
+
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
+ /*element_count_capacity*/ 0, /*enable_prune*/
true,
+ /*is_lru_k*/ false) {}
+
+ bool lookup(const CacheKey& key, ConditionCacheHandle* handle);
+
+ void insert(const CacheKey& key, std::shared_ptr<std::vector<bool>>
filter_result);
+};
+
+class ConditionCacheHandle {
+public:
+ ConditionCacheHandle() = default;
+
+ ConditionCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
+ : _cache(cache), _handle(handle) {}
+
+ ~ConditionCacheHandle() {
+ if (_handle != nullptr) {
+ _cache->release(_handle);
+ }
+ }
+
+ ConditionCacheHandle(ConditionCacheHandle&& other) noexcept {
+ // we can use std::exchange if we switch c++14 on
+ std::swap(_cache, other._cache);
+ std::swap(_handle, other._handle);
+ }
+
+ ConditionCacheHandle& operator=(ConditionCacheHandle&& other) noexcept {
+ std::swap(_cache, other._cache);
+ std::swap(_handle, other._handle);
+ return *this;
+ }
+
+ LRUCachePolicy* cache() const { return _cache; }
+
+ std::shared_ptr<std::vector<bool>> get_filter_result() const {
+ if (!_cache) {
+ return nullptr;
+ }
+ return
((ConditionCache::CacheValue*)_cache->value(_handle))->filter_result;
+ }
+
+private:
+ LRUCachePolicy* _cache = nullptr;
+ Cache::Handle* _handle = nullptr;
+
+ // Don't allow copy and assign
+ DISALLOW_COPY_AND_ASSIGN(ConditionCacheHandle);
+};
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/row_ranges.h
b/be/src/olap/rowset/segment_v2/row_ranges.h
index c98320d069a..2f990faa762 100644
--- a/be/src/olap/rowset/segment_v2/row_ranges.h
+++ b/be/src/olap/rowset/segment_v2/row_ranges.h
@@ -93,6 +93,13 @@ public:
std::string to_string() const { return absl::Substitute("[$0-$1)", _from,
_to); }
+ uint64_t get_digest(uint64_t seed) const {
+ uint64_t hash = seed;
+ hash = hash * 31 + _from;
+ hash = hash * 31 + _to;
+ return hash;
+ }
+
private:
int64_t _from;
int64_t _to;
@@ -268,6 +275,13 @@ public:
_count += range_to_add.count();
}
+ uint64_t get_digest(uint64_t seed) const {
+ for (auto range : _ranges) {
+ seed = range.get_digest(seed);
+ }
+ return seed;
+ }
+
private:
std::vector<RowRange> _ranges;
size_t _count;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 654af05b7e5..fb6e5fa171a 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -40,7 +40,6 @@
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
-#include "io/fs/file_reader.h"
#include "io/io_common.h"
#include "olap/bloom_filter_predicate.h"
#include "olap/collection_similarity.h"
@@ -56,6 +55,7 @@
#include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/rowset/segment_v2/column_reader_cache.h"
+#include "olap/rowset/segment_v2/condition_cache.h"
#include "olap/rowset/segment_v2/index_file_reader.h"
#include "olap/rowset/segment_v2/index_iterator.h"
#include "olap/rowset/segment_v2/index_query_context.h"
@@ -102,7 +102,6 @@
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vslot_ref.h"
#include "vec/functions/array/function_array_index.h"
-#include "vec/json/path_in_data.h"
namespace doris {
using namespace ErrorCode;
@@ -112,6 +111,60 @@ namespace segment_v2 {
SegmentIterator::~SegmentIterator() = default;
+void SegmentIterator::_init_row_bitmap_by_condition_cache() {
+ // Only dispose need column predicate and expr cal in condition cache
+ if (!_col_predicates.empty() ||
+ (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty())) {
+ if (_opts.condition_cache_digest) {
+ auto* condition_cache = ConditionCache::instance();
+ ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
+ _opts.condition_cache_digest);
+
+ // Increment search count when digest != 0
+
DorisMetrics::instance()->condition_cache_search_count->increment(1);
+
+ ConditionCacheHandle handle;
+ _find_condition_cache = condition_cache->lookup(cache_key,
&handle);
+
+ // Increment hit count if cache lookup is successful
+ if (_find_condition_cache) {
+
DorisMetrics::instance()->condition_cache_hit_count->increment(1);
+ if (_opts.runtime_state) {
+ VLOG_DEBUG << "Condition cache hit, query id: "
+ << print_id(_opts.runtime_state->query_id())
+ << ", segment id: " << _segment->id()
+ << ", cache digest: " <<
_opts.condition_cache_digest
+ << ", rowset id: " <<
_opts.rowset_id.to_string();
+ }
+ }
+
+ auto num_rows = _segment->num_rows();
+ if (_find_condition_cache) {
+ const auto& filter_result = *(handle.get_filter_result());
+ int64_t filtered_blocks = 0;
+ for (int i = 0; i < filter_result.size(); i++) {
+ if (!filter_result[i]) {
+ _row_bitmap.removeRange(
+ i * CONDITION_CACHE_OFFSET,
+ i * CONDITION_CACHE_OFFSET +
CONDITION_CACHE_OFFSET);
+ filtered_blocks++;
+ }
+ }
+ // Record condition_cache hit segment number
+ _opts.stats->condition_cache_hit_seg_nums++;
+ // Record rows filtered by condition cache hit
+ _opts.stats->condition_cache_filtered_rows +=
+ filtered_blocks *
SegmentIterator::CONDITION_CACHE_OFFSET;
+ } else {
+ _condition_cache = std::make_shared<std::vector<bool>>(
+ num_rows / CONDITION_CACHE_OFFSET + 1, false);
+ }
+ }
+ } else {
+ _opts.condition_cache_digest = 0;
+ }
+}
+
// A fast range iterator for roaring bitmap. Output ranges use closed-open
form, like [from, to).
// Example:
// input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
@@ -399,6 +452,8 @@ Status SegmentIterator::_lazy_init(vectorized::Block*
block) {
SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
DorisMetrics::instance()->segment_read_total->increment(1);
_row_bitmap.addRange(0, _segment->num_rows());
+ _init_row_bitmap_by_condition_cache();
+
// z-order can not use prefix index
if (_segment->_tablet_schema->sort_type() != SortType::ZORDER &&
_segment->_tablet_schema->cluster_key_uids().empty()) {
@@ -526,9 +581,8 @@ Status SegmentIterator::_get_row_ranges_by_keys() {
auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
}
- // pre-condition: _row_ranges == [0, num_rows)
size_t pre_size = _row_bitmap.cardinality();
- _row_bitmap = RowRanges::ranges_to_roaring(result_ranges);
+ _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges);
_opts.stats->rows_key_range_filtered += (pre_size -
_row_bitmap.cardinality());
return Status::OK();
@@ -630,6 +684,8 @@ Status
SegmentIterator::_get_row_ranges_by_column_conditions() {
++it;
}
}
+ _opts.condition_cache_digest =
+ _common_expr_ctxs_push_down.empty() ? 0 :
_opts.condition_cache_digest;
_opts.stats->rows_inverted_index_filtered += (input_rows -
_row_bitmap.cardinality());
for (auto cid : _schema->column_ids()) {
bool result_true =
_check_all_conditions_passed_inverted_index_for_column(cid);
@@ -2248,11 +2304,22 @@ uint16_t
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
std::vector<rowid_t>&
rowid_vector,
uint16_t* sel_rowid_idx,
size_t select_size,
- vectorized::MutableColumns*
mutable_columns) {
+ vectorized::MutableColumns*
mutable_columns,
+ bool init_condition_cache) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
std::vector<rowid_t> rowids(select_size);
- for (size_t i = 0; i < select_size; ++i) {
- rowids[i] = rowid_vector[sel_rowid_idx[i]];
+
+ if (init_condition_cache) {
+ DCHECK(_condition_cache);
+ auto& condition_cache = *_condition_cache;
+ for (size_t i = 0; i < select_size; ++i) {
+ rowids[i] = rowid_vector[sel_rowid_idx[i]];
+ condition_cache[rowids[i] /
SegmentIterator::CONDITION_CACHE_OFFSET] = true;
+ }
+ } else {
+ for (size_t i = 0; i < select_size; ++i) {
+ rowids[i] = rowid_vector[sel_rowid_idx[i]];
+ }
}
for (auto cid : read_column_ids) {
@@ -2296,7 +2363,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
RETURN_IF_CATCH_EXCEPTION({
auto res = _next_batch_internal(block);
- if (res.is<END_OF_FILE>() && block->rows() == 0) {
+ if (res.is<END_OF_FILE>()) {
// Since we have a type check at the caller.
// So a replacement of nothing column with real column is
needed.
const auto& idx_to_datatype = _opts.vir_col_idx_to_type;
@@ -2305,6 +2372,18 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
auto type = idx_to_datatype.find(idx)->second;
block->replace_by_position(idx, type->create_column());
}
+
+ if (_opts.condition_cache_digest && !_find_condition_cache) {
+ auto* condition_cache = ConditionCache::instance();
+ ConditionCache::CacheKey cache_key(_opts.rowset_id,
_segment->id(),
+
_opts.condition_cache_digest);
+ VLOG_DEBUG << "Condition cache insert, query id: "
+ << print_id(_opts.runtime_state->query_id())
+ << ", rowset id: " <<
_opts.rowset_id.to_string()
+ << ", segment id: " << _segment->id()
+ << ", cache digest: " <<
_opts.condition_cache_digest;
+ condition_cache->insert(cache_key,
std::move(_condition_cache));
+ }
return res;
}
@@ -2473,11 +2552,22 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
// step4: read non_predicate column
- if (_selected_size > 0 && !_non_predicate_columns.empty()) {
- RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns,
_block_rowids,
- _sel_rowid_idx.data(),
_selected_size,
- &_current_return_columns));
- _replace_version_col_if_needed(_non_predicate_columns,
_selected_size);
+ if (_selected_size > 0) {
+ if (!_non_predicate_columns.empty()) {
+ RETURN_IF_ERROR(_read_columns_by_rowids(
+ _non_predicate_columns, _block_rowids,
_sel_rowid_idx.data(),
+ _selected_size, &_current_return_columns,
+ _opts.condition_cache_digest &&
!_find_condition_cache));
+ _replace_version_col_if_needed(_non_predicate_columns,
_selected_size);
+ } else {
+ if (_opts.condition_cache_digest && !_find_condition_cache) {
+ auto& condition_cache = *_condition_cache;
+ for (size_t i = 0; i < _selected_size; ++i) {
+ auto rowid = _block_rowids[_sel_rowid_idx[i]];
+ condition_cache[rowid /
SegmentIterator::CONDITION_CACHE_OFFSET] = true;
+ }
+ }
+ }
}
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index b6fc5467acb..4d2ab090b19 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -230,7 +230,8 @@ private:
[[nodiscard]] Status _read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
std::vector<rowid_t>&
rowid_vector,
uint16_t* sel_rowid_idx,
size_t select_size,
- vectorized::MutableColumns*
mutable_columns);
+ vectorized::MutableColumns*
mutable_columns,
+ bool init_condition_cache =
false);
Status copy_column_data_by_selector(vectorized::IColumn* input_col_ptr,
vectorized::MutableColumnPtr&
output_col,
@@ -383,6 +384,8 @@ private:
Status _materialization_of_virtual_column(vectorized::Block* block);
void _prepare_score_column_materialization();
+ void _init_row_bitmap_by_condition_cache();
+
class BitmapRangeIterator;
class BackwardBitmapRangeIterator;
@@ -514,6 +517,10 @@ private:
// key is column uid, value is the sparse column cache
std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr>
_variant_sparse_column_cache;
+
+ bool _find_condition_cache = false;
+ std::shared_ptr<std::vector<bool>> _condition_cache;
+ static constexpr int CONDITION_CACHE_OFFSET = 2048;
};
} // namespace segment_v2
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index c4cb3b2dd10..925dd54b722 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -227,6 +227,7 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
_reader_context.all_access_paths = read_params.all_access_paths;
_reader_context.predicate_access_paths =
read_params.predicate_access_paths;
+ _reader_context.condition_cache_digest =
read_params.condition_cache_digest;
return Status::OK();
}
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index bb2b66d3384..71efa484fa1 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -206,6 +206,8 @@ public:
std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
CollectionStatisticsPtr collection_statistics;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
+
+ uint64_t condition_cache_digest = 0;
};
TabletReader() = default;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 5260e610083..2082dd7ea87 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -372,6 +372,10 @@ Status OlapScanLocalState::_init_profile() {
_variant_doc_value_column_iter_count =
ADD_COUNTER(_segment_profile, "VariantDocValueColumnIterCount",
TUnit::UNIT);
+ _condition_cache_hit_segment_counter =
+ ADD_COUNTER(_segment_profile, "ConditionCacheSegmentHit",
TUnit::UNIT);
+ _condition_cache_filtered_rows_counter =
+ ADD_COUNTER(_segment_profile, "ConditionCacheFilteredRows",
TUnit::UNIT);
return Status::OK();
}
@@ -516,7 +520,7 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
// TODO: Use optimize_index_scan_parallelism for ann range search
in the future.
// Currently, ann topn is enough
if (_ann_topn_runtime != nullptr) {
- scanner_builder.set_optimize_index_scan_parallelism(true);
+ scanner_builder.set_scan_parallelism_by_segment(true);
}
}
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index ea14a1e6b06..292081b29a6 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -259,6 +259,10 @@ private:
// total number of segment related to this scan node
RuntimeProfile::Counter* _total_segment_counter = nullptr;
+ // condition cache filter stats
+ RuntimeProfile::Counter* _condition_cache_hit_segment_counter = nullptr;
+ RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr;
+
// timer about tablet reader
RuntimeProfile::Counter* _tablet_reader_init_timer = nullptr;
RuntimeProfile::Counter* _tablet_reader_capture_rs_readers_timer = nullptr;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 768040b75fa..a1a7e1fb461 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -169,6 +169,20 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
p._common_expr_ctxs_push_down[i]->clone(state,
_common_expr_ctxs_push_down[i]));
}
RETURN_IF_ERROR(_helper.acquire_runtime_filter(state, _conjuncts,
p.row_descriptor()));
+
+ // Disable condition cache in topn filter valid. TODO:: Try to support the
topn filter in condition cache
+ if (state->query_options().condition_cache_digest &&
p._topn_filter_source_node_ids.empty()) {
+ _condition_cache_digest =
state->query_options().condition_cache_digest;
+ for (auto& conjunct : _conjuncts) {
+ _condition_cache_digest =
conjunct->get_digest(_condition_cache_digest);
+ if (!_condition_cache_digest) {
+ break;
+ }
+ }
+ } else {
+ _condition_cache_digest = 0;
+ }
+
RETURN_IF_ERROR(_process_conjuncts(state));
auto status = _eos ? Status::OK() : _prepare_scanners();
@@ -1074,7 +1088,7 @@ template <typename Derived>
Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
auto& p = _parent->cast<typename Derived::Parent>();
std::stringstream result;
- std::copy(p.topn_filter_source_node_ids.begin(),
p.topn_filter_source_node_ids.end(),
+ std::copy(p._topn_filter_source_node_ids.begin(),
p._topn_filter_source_node_ids.end(),
std::ostream_iterator<int>(result, ","));
custom_profile()->add_info_string("TopNFilterSourceNodeIds", result.str());
@@ -1188,7 +1202,7 @@ Status ScanOperatorX<LocalStateType>::init(const
TPlanNode& tnode, RuntimeState*
}
if (tnode.__isset.topn_filter_source_node_ids) {
- topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
+ _topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
}
// Which means the request could be fullfilled in a single segment
iterator request.
@@ -1220,7 +1234,7 @@ Status
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
_colname_to_slot_id[slot->col_name()] = slot->id();
_slot_id_to_slot_desc[slot->id()] = slot;
}
- for (auto id : topn_filter_source_node_ids) {
+ for (auto id : _topn_filter_source_node_ids) {
if (!state->get_query_ctx()->has_runtime_predicate(id)) {
// compatible with older versions fe
continue;
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index aef51db57db..3d72ee6f5bd 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -90,6 +90,8 @@ public:
Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
scanner_conjuncts);
+ uint64_t get_condition_cache_digest() const { return
_condition_cache_digest; }
+
protected:
friend class vectorized::ScannerContext;
friend class vectorized::Scanner;
@@ -125,6 +127,8 @@ protected:
std::mutex _conjuncts_lock;
RuntimeFilterConsumerHelper _helper;
+ // magic number as seed to generate hash value for condition cache
+ uint64_t _condition_cache_digest = 0;
};
template <typename LocalStateType>
@@ -171,7 +175,7 @@ class ScanLocalState : public ScanLocalStateBase {
std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool
push_down) {
std::vector<int> result;
- for (int id : _parent->cast<typename
Derived::Parent>().topn_filter_source_node_ids) {
+ for (int id : _parent->cast<typename
Derived::Parent>()._topn_filter_source_node_ids) {
if (!state->get_query_ctx()->has_runtime_predicate(id)) {
// compatible with older versions fe
continue;
@@ -195,10 +199,7 @@ protected:
friend class vectorized::Scanner;
Status _init_profile() override;
- virtual Status _process_conjuncts(RuntimeState* state) {
- RETURN_IF_ERROR(_normalize_conjuncts(state));
- return Status::OK();
- }
+ virtual Status _process_conjuncts(RuntimeState* state) { return
_normalize_conjuncts(state); }
virtual bool _should_push_down_common_expr() { return false; }
virtual bool _storage_no_merge() { return false; }
@@ -430,7 +431,10 @@ protected:
// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;
- std::vector<int> topn_filter_source_node_ids;
+
+ int _query_parallel_instance_num = 0;
+
+ std::vector<int> _topn_filter_source_node_ids;
};
#include "common/compile_check_end.h"
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0696de46007..e239fae0c7b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -70,6 +70,7 @@ class PackedFileManager;
namespace segment_v2 {
class InvertedIndexSearcherCache;
class InvertedIndexQueryCache;
+class ConditionCache;
class TmpFileDirs;
class EncodingInfoResolver;
@@ -382,6 +383,7 @@ public:
segment_v2::EncodingInfoResolver* get_encoding_info_resolver() {
return _encoding_info_resolver;
}
+ segment_v2::ConditionCache* get_condition_cache() { return
_condition_cache; }
QueryCache* get_query_cache() { return _query_cache; }
pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
@@ -536,6 +538,7 @@ private:
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache =
nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
segment_v2::EncodingInfoResolver* _encoding_info_resolver = nullptr;
+ segment_v2::ConditionCache* _condition_cache = nullptr;
QueryCache* _query_cache = nullptr;
std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;
DeleteBitmapAggCache* _delete_bitmap_agg_cache {nullptr};
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 9a2fd612e86..e7d1fca8cff 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -55,6 +55,7 @@
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/page_cache.h"
+#include "olap/rowset/segment_v2/condition_cache.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/schema_cache.h"
@@ -111,7 +112,6 @@
#include "util/dns_cache.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
-#include "util/metrics.h"
#include "util/parse_util.h"
#include "util/pretty_printer.h"
#include "util/threadpool.h"
@@ -661,6 +661,13 @@ Status ExecEnv::init_mem_env() {
// Initialize encoding info resolver
_encoding_info_resolver = new segment_v2::EncodingInfoResolver();
+ // use memory limit
+ int64_t condition_cache_limit = config::condition_cache_limit * 1024L *
1024L;
+ _condition_cache =
ConditionCache::create_global_cache(condition_cache_limit);
+ LOG(INFO) << "Condition cache memory limit: "
+ << PrettyPrinter::print(condition_cache_limit, TUnit::BYTES)
+ << ", origin config value: " << config::condition_cache_limit;
+
// init orc memory pool
_orc_memory_pool = new doris::vectorized::ORCMemoryPool();
_arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();
@@ -844,6 +851,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_inverted_index_query_cache);
SAFE_DELETE(_inverted_index_searcher_cache);
SAFE_DELETE(_encoding_info_resolver);
+ SAFE_DELETE(_condition_cache);
SAFE_DELETE(_lookup_connection_cache);
SAFE_DELETE(_schema_cache);
SAFE_DELETE(_segment_loader);
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index 180ab001967..3a072de82f1 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -56,6 +56,7 @@ public:
QUERY_CACHE = 20,
TABLET_COLUMN_OBJECT_POOL = 21,
SCHEMA_CLOUD_DICTIONARY_CACHE = 22,
+ CONDITION_CACHE = 23,
};
static std::string type_string(CacheType type) {
@@ -102,6 +103,10 @@ public:
return "QueryCache";
case CacheType::TABLET_COLUMN_OBJECT_POOL:
return "TabletColumnObjectPool";
+ case CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE:
+ return "SchemaCloudDictionaryCache";
+ case CacheType::CONDITION_CACHE:
+ return "ConditionCache";
default:
throw Exception(Status::FatalError("not match type of cache policy
:{}",
static_cast<int>(type)));
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index aad0692df23..7a92b3676f1 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -121,6 +121,13 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(
"(segment_v2) total number of rows in queried segments (before index
pruning)",
segment_read, Labels({{"type", "segment_row_total"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(condition_cache_search_count,
MetricUnit::OPERATIONS,
+ "number of condition cache lookups when
digest != 0",
+ condition_cache, Labels({{"type",
"condition_cache_search"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(condition_cache_hit_count,
MetricUnit::OPERATIONS,
+ "number of condition cache hits",
condition_cache,
+ Labels({{"type",
"condition_cache_hit"}}));
+
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_txn_begin_request_total,
MetricUnit::OPERATIONS,
"", stream_load_txn_request,
Labels({{"type", "begin"}}));
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_txn_commit_request_total,
MetricUnit::OPERATIONS,
@@ -326,6 +333,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
cumulative_compaction_task_pending_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_read_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
condition_cache_search_count);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
condition_cache_hit_count);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_row_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
stream_load_txn_begin_request_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 20b8a4a3770..95ffb833a37 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -122,6 +122,10 @@ public:
IntCounter* segment_read_total = nullptr;
// total number of rows in queried segments (before index pruning)
IntCounter* segment_row_total = nullptr;
+ // number of condition cache lookups when digest != 0
+ IntCounter* condition_cache_search_count = nullptr;
+ // number of condition cache hits
+ IntCounter* condition_cache_hit_count = nullptr;
IntCounter* stream_load_txn_begin_request_total = nullptr;
IntCounter* stream_load_txn_commit_request_total = nullptr;
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 099b484cf23..4770e6088e2 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -213,12 +213,7 @@ public:
void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const
override {
- auto real_data = data->get_data_at(0);
- if (real_data.data == nullptr) {
- hash = HashUtil::xxHash64NullWithSeed(hash);
- } else {
- hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size,
hash);
- }
+ data->update_xxHash_with_value(0, 1, hash, null_data);
}
void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp
b/be/src/vec/exec/scan/olap_scanner.cpp
index 92346aa68b6..e413b8e6628 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -95,7 +95,8 @@ OlapScanner::OlapScanner(pipeline::ScanLocalStateBase*
parent, OlapScanner::Para
.vir_col_idx_to_type {},
.score_runtime {},
.collection_statistics {},
- .ann_topn_runtime {}}) {
+ .ann_topn_runtime {},
+ .condition_cache_digest =
parent->get_condition_cache_digest()}) {
_tablet_reader_params.set_read_source(std::move(params.read_source),
_state->skip_delete_bitmap());
_has_prepared = false;
@@ -821,6 +822,10 @@ void OlapScanner::_collect_profile_before_close() {
stats.output_index_result_column_timer);
COUNTER_UPDATE(local_state->_filtered_segment_counter,
stats.filtered_segment_number);
COUNTER_UPDATE(local_state->_total_segment_counter,
stats.total_segment_number);
+ COUNTER_UPDATE(local_state->_condition_cache_hit_segment_counter,
+ stats.condition_cache_hit_seg_nums);
+ COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
+ stats.condition_cache_filtered_rows);
COUNTER_UPDATE(local_state->_tablet_reader_init_timer,
stats.tablet_reader_init_timer_ns);
COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer,
diff --git a/be/src/vec/exprs/vbitmap_predicate.h
b/be/src/vec/exprs/vbitmap_predicate.h
index 8fabd985595..cf50f3e069a 100644
--- a/be/src/vec/exprs/vbitmap_predicate.h
+++ b/be/src/vec/exprs/vbitmap_predicate.h
@@ -73,6 +73,9 @@ public:
return fmt::format(" VBitmapPredicate:{}", VExpr::debug_string());
}
+ // not need support bitmap filter get_digest
+ uint64_t get_digest(uint64_t seed) const override { return 0; }
+
private:
Status _do_execute(VExprContext* context, const Block* block, const
uint8_t* __restrict filter,
Selector* selector, size_t count, ColumnPtr&
result_column) const;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp
b/be/src/vec/exprs/vbloom_predicate.cpp
index c9bc8cc5349..40057026d3d 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -115,5 +115,16 @@ void
VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase> filter) {
_filter = filter;
}
+uint64_t VBloomPredicate::get_digest(uint64_t seed) const {
+ seed = _children[0]->get_digest(seed);
+ if (seed) {
+ char* data;
+ int len;
+ _filter->get_data(&data, &len);
+ return HashUtil::hash64(data, len, seed);
+ }
+ return 0;
+}
+
#include "common/compile_check_end.h"
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h
b/be/src/vec/exprs/vbloom_predicate.h
index da01b005262..59d0baee07e 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -60,6 +60,8 @@ public:
std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const
override { return _filter; }
+ uint64_t get_digest(uint64_t seed) const override;
+
private:
Status _do_execute(VExprContext* context, const Block* block, const
uint8_t* __restrict filter,
Selector* selector, size_t count, ColumnPtr&
result_column) const;
diff --git a/be/src/vec/exprs/vcast_expr.h b/be/src/vec/exprs/vcast_expr.h
index 0e0b708d309..6d23fc3bc14 100644
--- a/be/src/vec/exprs/vcast_expr.h
+++ b/be/src/vec/exprs/vcast_expr.h
@@ -60,6 +60,15 @@ public:
virtual std::string cast_name() const { return "CAST"; }
+ uint64_t get_digest(uint64_t seed) const override {
+ auto res = VExpr::get_digest(seed);
+ if (res) {
+ return HashUtil::hash64(_target_data_type_name.data(),
_target_data_type_name.size(),
+ res);
+ }
+ return 0;
+ }
+
protected:
FunctionBasePtr _function;
std::string _expr_name;
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h
b/be/src/vec/exprs/vdirect_in_predicate.h
index 9db5d48877b..38d8a78904e 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -107,6 +107,14 @@ public:
return true;
}
+ uint64_t get_digest(uint64_t seed) const override {
+ seed = _children[0]->get_digest(seed);
+ if (seed) {
+ return _filter->get_digest(seed);
+ }
+ return seed;
+ }
+
private:
Status _do_execute(VExprContext* context, const Block* block, const
uint8_t* __restrict filter,
Selector* selector, size_t count, ColumnPtr&
result_column,
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index d8fa7949584..4745394a04d 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -825,6 +825,25 @@ Status VExpr::check_constant(const Block& block,
ColumnNumbers arguments) const
return Status::OK();
}
+uint64_t VExpr::get_digest(uint64_t seed) const {
+ auto digest = seed;
+ for (auto child : _children) {
+ digest = child->get_digest(digest);
+ if (digest == 0) {
+ return 0;
+ }
+ }
+
+ auto& fn_name = _fn.name.function_name;
+ if (!fn_name.empty()) {
+ digest = HashUtil::hash64(fn_name.c_str(), fn_name.size(), digest);
+ } else {
+ digest = HashUtil::hash64((const char*)&_node_type,
sizeof(_node_type), digest);
+ digest = HashUtil::hash64((const char*)&_opcode, sizeof(_opcode),
digest);
+ }
+ return digest;
+}
+
ColumnPtr VExpr::get_result_from_const(size_t count) const {
return ColumnConst::create(_constant_col->column_ptr, count);
}
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 6e92f4d5b44..38e96abe6ee 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -352,6 +352,8 @@ public:
bool ann_dist_is_fulfilled() const;
+ virtual uint64_t get_digest(uint64_t seed) const;
+
protected:
/// Simple debug string that provides no expr subclass-specific information
std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index c26b5fec495..c733f5f6748 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -471,6 +471,9 @@ Status VExprContext::evaluate_ann_range_search(
return Status::OK();
}
-#include "common/compile_check_end.h"
+uint64_t VExprContext::get_digest(uint64_t seed) const {
+ return _root->get_digest(seed);
+}
+#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index f4747ce06f3..e7c7c5ebdee 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -387,6 +387,8 @@ public:
common_expr_to_slotref_map,
roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats&
ann_index_stats);
+ uint64_t get_digest(uint64_t seed) const;
+
private:
// Close method is called in vexpr context dector, not need call expicility
void close();
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 6f151614da4..63c0c1b5298 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -61,6 +61,8 @@ public:
bool is_not_in() const { return _is_not_in; };
Status evaluate_inverted_index(VExprContext* context, uint32_t
segment_num_rows) override;
+ uint64_t get_digest(uint64_t seed) const override { return 0; }
+
private:
FunctionBasePtr _function;
std::string _expr_name;
diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h
b/be/src/vec/exprs/vlambda_function_call_expr.h
index f1472e48572..cc99db0ef85 100644
--- a/be/src/vec/exprs/vlambda_function_call_expr.h
+++ b/be/src/vec/exprs/vlambda_function_call_expr.h
@@ -88,6 +88,8 @@ public:
return out.str();
}
+ uint64_t get_digest(uint64_t seed) const override { return 0; }
+
private:
std::string _expr_name;
LambdaFunctionPtr _lambda_function;
diff --git a/be/src/vec/exprs/vlambda_function_expr.h
b/be/src/vec/exprs/vlambda_function_expr.h
index c69d5221ff2..e6d39a186aa 100644
--- a/be/src/vec/exprs/vlambda_function_expr.h
+++ b/be/src/vec/exprs/vlambda_function_expr.h
@@ -54,6 +54,8 @@ public:
const std::string& expr_name() const override { return _expr_name; }
+ uint64_t get_digest(uint64_t seed) const override { return 0; }
+
private:
const std::string _expr_name = "vlambda_function_expr";
};
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index cd02620e7e2..2e64d7c5e9e 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -105,5 +105,10 @@ bool VLiteral::equals(const VExpr& other) {
return true;
}
+uint64_t VLiteral::get_digest(uint64_t seed) const {
+ _column_ptr->update_xxHash_with_value(0, 1, seed, nullptr);
+ return seed;
+}
+
#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 47ea2ba91b9..4f4009db3f7 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -65,6 +65,8 @@ public:
bool equals(const VExpr& other) override;
+ uint64_t get_digest(uint64_t seed) const override;
+
protected:
ColumnPtr _column_ptr;
std::string _expr_name;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 91c60660678..ea37dddef26 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -67,6 +67,15 @@ public:
Status execute_filter(VExprContext* context, const Block* block,
uint8_t* __restrict result_filter_data, size_t rows,
bool accept_null,
bool* can_filter_all) const override;
+
+ uint64_t get_digest(uint64_t seed) const override {
+ seed = _impl->get_digest(seed);
+ if (seed) {
+ return HashUtil::crc_hash64(&_null_aware, sizeof(_null_aware),
seed);
+ }
+ return seed;
+ }
+
VExprSPtr get_impl() const override { return _impl; }
void attach_profile_counter(std::shared_ptr<RuntimeProfile::Counter>
rf_input_rows,
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index d5df1220ad7..173f889b0c8 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -56,6 +56,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const
doris::RowDescriptor&
state->desc_tbl().debug_string());
}
_column_name = &slot_desc->col_name();
+ _column_uniq_id = slot_desc->col_unique_id();
_column_id = desc.get_column_id(_slot_id);
if (_column_id < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
@@ -136,4 +137,12 @@ bool VSlotRef::equals(const VExpr& other) {
return true;
}
+uint64_t VSlotRef::get_digest(uint64_t seed) const {
+ if (_data_type->get_primitive_type() == TYPE_VARIANT) {
+ return 0;
+ }
+ seed = HashUtil::hash64(&_column_uniq_id, sizeof(int), seed);
+ return HashUtil::hash64(_column_name->c_str(), _column_name->size(), seed);
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index f6f8d24c032..960b4f809a0 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -70,9 +70,12 @@ public:
MOCK_FUNCTION const std::string& column_name() const { return
*_column_name; }
+ uint64_t get_digest(uint64_t seed) const override;
+
private:
int _slot_id;
int _column_id;
+ int _column_uniq_id = -1;
const std::string* _column_name = nullptr;
const std::string _column_label;
};
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9defb0cbf08..576c3019144 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -198,6 +198,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String QUERY_CACHE_FORCE_REFRESH =
"query_cache_force_refresh";
public static final String QUERY_CACHE_ENTRY_MAX_BYTES =
"query_cache_entry_max_bytes";
public static final String QUERY_CACHE_ENTRY_MAX_ROWS =
"query_cache_entry_max_rows";
+ public static final String ENABLE_CONDITION_CACHE =
"enable_condition_cache";
public static final String ENABLE_COST_BASED_JOIN_REORDER =
"enable_cost_based_join_reorder";
@@ -1405,6 +1406,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT)
public boolean enableLeftSemiDirectReturnOpt = true;
+ @VariableMgr.VarAttr(name = ENABLE_CONDITION_CACHE)
+ public boolean enableConditionCache = true;
+
@VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
public boolean forwardToMaster = true;
@@ -3379,6 +3383,7 @@ public class SessionVariable implements Serializable,
Writable {
public void initFuzzyModeVariables() {
Random random = new SecureRandom();
this.feDebug = true;
+ this.enableConditionCache = Config.pull_request_id % 2 == 0;
this.parallelPipelineTaskNum = random.nextInt(8);
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
@@ -5063,6 +5068,9 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough);
tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);
+ if (enableConditionCache) {
+
tResult.setConditionCacheDigest(getAffectQueryResultVariableHashCode());
+ }
if (maxScanKeyNum > 0) {
tResult.setMaxScanKeyNum(maxScanKeyNum);
@@ -6066,6 +6074,20 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public int getAffectQueryResultVariableHashCode() {
+ int hash = 0;
+ for (Field affectQueryResultField : affectQueryResultFields) {
+ String name = affectQueryResultField.getName();
+ try {
+ Object value = affectQueryResultField.get(this);
+ hash = 31 * hash + (value != null ? value.hashCode() : 0);
+ } catch (Throwable t) {
+ throw new IllegalStateException("Can not access
SessionVariable." + name, t);
+ }
+ }
+ return hash;
+ }
+
public static boolean isFeDebug() {
if (ConnectContext.get() != null) {
return ConnectContext.get().getSessionVariable().feDebug;
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 3217eeacb06..9fbc2058232 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -265,7 +265,7 @@ struct TQueryOptions {
91: optional bool runtime_filter_wait_infinitely = false;
- 92: optional i32 wait_full_block_schedule_times = 1; // deprecated
+ 92: optional i32 condition_cache_digest = 0;
93: optional i32 inverted_index_max_expansions = 50;
@@ -368,7 +368,7 @@ struct TQueryOptions {
144: optional bool enable_inverted_index_searcher_cache = true;
145: optional bool enable_inverted_index_query_cache = true;
- 146: optional bool fuzzy_disable_runtime_filter_in_be = false; // deprecated
+ 146: optional bool enable_condition_cache = false; //deprecated
147: optional i32 profile_level = 1;
diff --git a/regression-test/data/query_p0/cache/condition_cache.out
b/regression-test/data/query_p0/cache/condition_cache.out
new file mode 100644
index 00000000000..6bcb15e9813
--- /dev/null
+++ b/regression-test/data/query_p0/cache/condition_cache.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !condition_cache1 --
+2 Bob 30 90
+4 David 28 92
+5 Eve 26 88
+
+-- !condition_cache2 --
+1 Alice 25 85.5
+3 Charlie 22 75.5
+
+-- !condition_cache3 --
+2 Bob 30 90
+4 David 28 92
+5 Eve 26 88
+
+-- !condition_cache4 --
+1 Alice 25 85.5
+3 Charlie 22 75.5
+
+-- !condition_cache5 --
+2 Bob 30 90
+4 David 28 92
+5 Eve 26 88
+
+-- !condition_cache6 --
+1 Alice 25 85.5
+3 Charlie 22 75.5
+
+-- !condition_delete1 --
+4 David 28 92
+5 Eve 26 88
+
+-- !condition_delete2 --
+1 Alice 25 85.5
+3 Charlie 22 75.5
+
+-- !join_no_cache --
+4 David 28 Engineering Senior Developer
+5 Eve 26 Finance Analyst
+
+-- !join_cache1 --
+4 David 28 Engineering Senior Developer
+5 Eve 26 Finance Analyst
+
+-- !join_cache2 --
+4 David 28 Engineering Senior Developer
+5 Eve 26 Finance Analyst
+
+-- !join_bf_cache1 --
+
+-- !join_bf_cache2 --
+1 Alice 25 Marketing Manager
+3 Charlie 22 Engineering Senior Developer
+4 David 28 Finance Analyst
+
+-- !join_cache3 --
+4 David 28 Engineering Senior Developer
+5 Eve 26 Finance Analyst
+
+-- !join_diff_cond --
+1 Alice Engineering 100000
+4 David Engineering 140000
+
+-- !join_after_mod --
+4 David 28 Engineering Senior Developer
+5 Eve 26 Finance Analyst
+6 Frank 32 Engineering Team Lead
+
+-- !cast_diff1 --
+Query A 4 2025-01-01T12:00:00.123499
+
+-- !cast_diff2 --
+
diff --git
a/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
b/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
index e060142ef7f..aebcdfbfb0b 100644
--- a/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
+++ b/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
@@ -56,6 +56,7 @@ suite("test_ngram_bloomfilter_index_change") {
// Test settings
sql "set enable_function_pushdown=true"
+ sql "set enable_condition_cache=false"
sql "set enable_profile=true"
sql "set profile_level=2"
@@ -326,4 +327,4 @@ suite("test_ngram_bloomfilter_index_change") {
// Final cleanup
sql "DROP INDEX idx_ngram_customer_name ON ${tableName};"
sleep(2000)
-}
\ No newline at end of file
+}
diff --git a/regression-test/suites/query_p0/cache/condition_cache.groovy
b/regression-test/suites/query_p0/cache/condition_cache.groovy
new file mode 100755
index 00000000000..0a6438b63cc
--- /dev/null
+++ b/regression-test/suites/query_p0/cache/condition_cache.groovy
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
+
+suite("condition_cache") {
+ def tableName = "table_condition_cache"
+ def joinTableName = "table_join_condition_cache"
+
+ def test = {
+ sql "set enable_condition_cache=false"
+ sql "set runtime_filter_type=0"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int NULL,
+ `name` varchar(50) NULL,
+ `age` int NULL,
+ `score` double NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "min_load_replica_num" = "-1",
+ "is_being_synced" = "false",
+ "storage_medium" = "hdd",
+ "storage_format" = "V2",
+ "inverted_index_storage_format" = "V3",
+ "light_schema_change" = "true",
+ "disable_auto_compaction" = "false",
+ "enable_single_replica_compaction" = "false"
+ )
+ """
+
+ sql """
+ INSERT INTO ${tableName}(id, name, age, score)
+ VALUES
+ (1, "Alice", 25, 85.5),
+ (2, "Bob", 30, 90.0),
+ (3, "Charlie", 22, 75.5),
+ (4, "David", 28, 92.0),
+ (5, "Eve", 26, 88.0)
+ """
+
+ // Create join table
+ sql """ DROP TABLE IF EXISTS ${joinTableName} """
+ sql """
+ CREATE TABLE ${joinTableName} (
+ `id` int NULL,
+ `department` varchar(50) NULL,
+ `position` varchar(50) NULL,
+ `salary` double NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `department`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "min_load_replica_num" = "-1",
+ "is_being_synced" = "false",
+ "storage_medium" = "hdd",
+ "storage_format" = "V2",
+ "inverted_index_storage_format" = "V3",
+ "light_schema_change" = "true",
+ "disable_auto_compaction" = "false",
+ "enable_single_replica_compaction" = "false"
+ )
+ """
+
+ sql """
+ INSERT INTO ${joinTableName}(id, department, position, salary)
+ VALUES
+ (1, "Engineering", "Developer", 100000),
+ (2, "Marketing", "Manager", 120000),
+ (3, "HR", "Specialist", 80000),
+ (4, "Engineering", "Senior Developer", 140000),
+ (5, "Finance", "Analyst", 95000)
+ """
+
+ // First query with WHERE condition - Run without cache
+ order_qt_condition_cache1 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ // Second query with different WHERE condition - Run without cache
+ order_qt_condition_cache2 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // Enable condition cache
+ sql "set enable_condition_cache=true"
+
+ // Run the same first query with cache enabled
+ order_qt_condition_cache3 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ // Run the same second query with cache enabled
+ order_qt_condition_cache4 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // Run both queries again to test cache hit
+ order_qt_condition_cache5 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ order_qt_condition_cache6 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // Test delete operation impact on condition cache
+ // Delete some data
+ sql "DELETE FROM ${tableName} WHERE age = 30" // Delete Bob's record
+
+ // Run the same queries after delete to see if cache is invalidated
+ order_qt_condition_delete1 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ order_qt_condition_delete2 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // rebuild table to skip the delete operation
+ sql "create table temp like ${tableName}"
+ sql "insert into temp select * from ${tableName}"
+ sql "drop table ${tableName}"
+ sql "alter table temp rename ${tableName}"
+
+ // Test with two-table join and runtime_filter set to bloom filter (6)
+ // First, disable condition cache and reset runtime_filter
+ sql "set enable_condition_cache=false"
+ sql "set runtime_filter_type=2"
+
+ // Run join query without condition cache
+ order_qt_join_no_cache """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id = t2.id
+ WHERE t1.age > 25 AND t2.salary > 90000
+ """
+
+ // Enable condition cache with bloom filter runtime_filter
+ sql "set enable_condition_cache=true"
+
+ // Run the same join query with condition cache enabled
+ order_qt_join_cache1 """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id = t2.id
+ WHERE t1.age > 25 AND t2.salary > 90000
+ """
+
+ // Run the same join query again to test cache hit
+ order_qt_join_cache2 """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id = t2.id
+ WHERE t1.age > 25 AND t2.salary > 90000
+ """
+
+ // Run the same join query with condition cache enabled and expr in
bloom filter
+ order_qt_join_bf_cache1 """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id + 10 = t2.id
+ """
+ // Run the same join query with condition cache enabled and expr different
in bloom filter
+ order_qt_join_bf_cache2 """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id + 1 = t2.id
+ """
+
+ sql "set runtime_filter_type=12"
+
+ // Run the same join query again after changing runtime_filter_type
+ order_qt_join_cache3 """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id = t2.id
+ WHERE t1.age > 25 AND t2.salary > 90000
+ """
+ // Test different join query with same tables and bloom filter
+ order_qt_join_diff_cond """
+ SELECT
+ t1.id,
+ t1.name,
+ t2.department,
+ t2.salary
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id = t2.id
+ WHERE t1.score > 85 AND t2.department = 'Engineering'
+ """
+
+ // Test data modification impact on join condition cache
+ sql "INSERT INTO ${tableName}(id, name, age, score) VALUES (6,
'Frank', 32, 91.0)"
+ sql "INSERT INTO ${joinTableName}(id, department, position, salary)
VALUES (6, 'Engineering', 'Team Lead', 150000)"
+
+ // Run the same join query after data modification
+ order_qt_join_after_mod """
+ SELECT
+ t1.id,
+ t1.name,
+ t1.age,
+ t2.department,
+ t2.position
+ FROM ${tableName} t1
+ JOIN ${joinTableName} t2 ON t1.id = t2.id
+ WHERE t1.age > 25 AND t2.salary > 90000
+ """
+
+ // Test cast precision difference
+ // 1. Create table
+ sql """DROP TABLE IF EXISTS test_cast_diff"""
+ sql """
+ CREATE TABLE test_cast_diff (
+ `id` int NULL,
+ `ts` datetime(6) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ // 2. Insert test data (excluding id=5 to focus on subtle rounding
differences)
+ sql """
+ INSERT INTO test_cast_diff VALUES
+ (1, '2025-01-01 12:00:00.123449'),
+ (2, '2025-01-01 12:00:00.123450'),
+ (3, '2025-01-01 12:00:00.123455'),
+ (4, '2025-01-01 12:00:00.123499')
+ """
+
+ // 3. Query A: cast to DATETIME(5) and compare with a 5-digit
fractional constant
+ order_qt_cast_diff1 """
+ SELECT
+ 'Query A' AS type,
+ id,
+ ts
+ FROM test_cast_diff
+ WHERE CAST(ts AS DATETIME(5)) = '2025-01-01 12:00:00.12350'
+ """
+
+ // 4. Query B: cast to DATETIME(3) but use the same constant string
+ // Note: the constant will be implicitly rounded to 3-digit precision
(.124)
+ order_qt_cast_diff2 """
+ SELECT
+ 'Query B' AS type,
+ id,
+ ts
+ FROM test_cast_diff
+ WHERE CAST(ts AS DATETIME(3)) = '2025-01-01 12:00:00.12350'
+ """
+ }
+
+ test()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]