This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 3b2d91a5ae8 [Exec](Cache) Support condition cache in Apache doris 
(#61385)
3b2d91a5ae8 is described below

commit 3b2d91a5ae803cf800d9c39e1fee71d1734ceefc
Author: HappenLee <[email protected]>
AuthorDate: Wed Mar 18 14:35:47 2026 +0800

    [Exec](Cache) Support condition cache in Apache doris (#61385)
    
    ### What problem does this PR solve?
    
    cherry pick condition cache code
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   3 +
 be/src/exprs/hybrid_set.h                          |  38 +++
 be/src/olap/iterators.h                            |  24 +-
 be/src/olap/olap_common.h                          |   4 +
 be/src/olap/parallel_scanner_builder.cpp           |  80 ++++-
 be/src/olap/parallel_scanner_builder.h             |   4 +-
 be/src/olap/rowset/beta_rowset_reader.cpp          |  16 +
 be/src/olap/rowset/rowset_reader_context.h         |   2 +
 be/src/olap/rowset/segment_v2/condition_cache.cpp  |  52 +++
 be/src/olap/rowset/segment_v2/condition_cache.h    | 137 ++++++++
 be/src/olap/rowset/segment_v2/row_ranges.h         |  14 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 116 ++++++-
 be/src/olap/rowset/segment_v2/segment_iterator.h   |   9 +-
 be/src/olap/tablet_reader.cpp                      |   1 +
 be/src/olap/tablet_reader.h                        |   2 +
 be/src/pipeline/exec/olap_scan_operator.cpp        |   6 +-
 be/src/pipeline/exec/olap_scan_operator.h          |   4 +
 be/src/pipeline/exec/scan_operator.cpp             |  20 +-
 be/src/pipeline/exec/scan_operator.h               |  16 +-
 be/src/runtime/exec_env.h                          |   3 +
 be/src/runtime/exec_env_init.cpp                   |  10 +-
 be/src/runtime/memory/cache_policy.h               |   5 +
 be/src/util/doris_metrics.cpp                      |   9 +
 be/src/util/doris_metrics.h                        |   4 +
 be/src/vec/columns/column_const.h                  |   7 +-
 be/src/vec/exec/scan/olap_scanner.cpp              |   7 +-
 be/src/vec/exprs/vbitmap_predicate.h               |   3 +
 be/src/vec/exprs/vbloom_predicate.cpp              |  11 +
 be/src/vec/exprs/vbloom_predicate.h                |   2 +
 be/src/vec/exprs/vcast_expr.h                      |   9 +
 be/src/vec/exprs/vdirect_in_predicate.h            |   8 +
 be/src/vec/exprs/vexpr.cpp                         |  19 ++
 be/src/vec/exprs/vexpr.h                           |   2 +
 be/src/vec/exprs/vexpr_context.cpp                 |   5 +-
 be/src/vec/exprs/vexpr_context.h                   |   2 +
 be/src/vec/exprs/vin_predicate.h                   |   2 +
 be/src/vec/exprs/vlambda_function_call_expr.h      |   2 +
 be/src/vec/exprs/vlambda_function_expr.h           |   2 +
 be/src/vec/exprs/vliteral.cpp                      |   5 +
 be/src/vec/exprs/vliteral.h                        |   2 +
 be/src/vec/exprs/vruntimefilter_wrapper.h          |   9 +
 be/src/vec/exprs/vslot_ref.cpp                     |   9 +
 be/src/vec/exprs/vslot_ref.h                       |   3 +
 .../java/org/apache/doris/qe/SessionVariable.java  |  22 ++
 gensrc/thrift/PaloInternalService.thrift           |   4 +-
 .../data/query_p0/cache/condition_cache.out        |  73 +++++
 .../test_ngram_bloomfilter_index_change.groovy     |   3 +-
 .../suites/query_p0/cache/condition_cache.groovy   | 354 +++++++++++++++++++++
 49 files changed, 1091 insertions(+), 56 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fc299779c8d..372a33dc886 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1207,6 +1207,9 @@ DEFINE_Int32(inverted_index_query_cache_shards, "256");
 // inverted index match bitmap cache size
 DEFINE_String(inverted_index_query_cache_limit, "10%");
 
+// condition cache limit
+DEFINE_Int16(condition_cache_limit, "512");
+
 // inverted index
 DEFINE_mDouble(inverted_index_ram_buffer_size, "512");
 // -1 indicates not working.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 73922709283..f8416557a2e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1258,6 +1258,9 @@ DECLARE_Int32(inverted_index_query_cache_shards);
 // inverted index match bitmap cache size
 DECLARE_String(inverted_index_query_cache_limit);
 
+// condition cache limit
+DECLARE_Int16(condition_cache_limit);
+
 // inverted index
 DECLARE_mDouble(inverted_index_ram_buffer_size);
 DECLARE_mInt32(inverted_index_max_buffered_docs);
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 27b32a4325e..6bfee554ce0 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <gen_cpp/internal_service.pb.h>
+#include <pdqsort.h>
 
 #include "common/object_pool.h"
 #include "exprs/filter_base.h"
@@ -259,6 +260,7 @@ public:
                                               const uint8_t* __restrict filter 
= nullptr) = 0;
 
     virtual void to_pb(PInFilter* filter) = 0;
+    virtual uint64_t get_digest(uint64_t seed) = 0;
 
     class IteratorBase {
     public:
@@ -431,6 +433,21 @@ public:
 
     void to_pb(PInFilter* filter) override { set_pb(filter, 
get_convertor<ElementType>()); }
 
+    uint64_t get_digest(uint64_t seed) override {
+        std::vector<ElementType> elems(_set.begin(), _set.end());
+        pdqsort(elems.begin(), elems.end());
+        if constexpr (std::is_same<ElementType, bool>::value) {
+            for (const auto& v : elems) {
+                seed = HashUtil::crc_hash64(&v, sizeof(v), seed);
+            }
+        } else {
+            seed = HashUtil::crc_hash64(elems.data(),
+                                        (uint32_t)(elems.size() * 
sizeof(ElementType)), seed);
+        }
+
+        return HashUtil::crc_hash64(&_contain_null, sizeof(_contain_null), 
seed);
+    }
+
 private:
     ContainerType _set;
     ObjectPool _pool;
@@ -626,6 +643,16 @@ public:
 
     void to_pb(PInFilter* filter) override { set_pb(filter, 
get_convertor<std::string>()); }
 
+    uint64_t get_digest(uint64_t seed) override {
+        std::vector<StringRef> elems(_set.begin(), _set.end());
+        pdqsort(elems.begin(), elems.end());
+
+        for (const auto& v : elems) {
+            seed = HashUtil::crc_hash64(v.data, (uint32_t)v.size, seed);
+        }
+        return HashUtil::crc_hash64(&_contain_null, sizeof(_contain_null), 
seed);
+    }
+
 private:
     ContainerType _set;
     ObjectPool _pool;
@@ -824,6 +851,17 @@ public:
         throw Exception(ErrorCode::INTERNAL_ERROR, "StringValueSet do not 
support to_pb");
     }
 
+    uint64_t get_digest(uint64_t seed) override {
+        std::vector<StringRef> elems(_set.begin(), _set.end());
+        pdqsort(elems.begin(), elems.end());
+
+        for (const auto& v : elems) {
+            seed = HashUtil::crc_hash64(v.data, (uint32_t)v.size, seed);
+        }
+
+        return HashUtil::crc_hash64(&_contain_null, sizeof(_contain_null), 
seed);
+    }
+
 private:
     ContainerType _set;
     ObjectPool _pool;
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index ea264e9f746..efbb0a2e00b 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -25,6 +25,7 @@
 #include "olap/block_column_predicate.h"
 #include "olap/column_predicate.h"
 #include "olap/olap_common.h"
+#include "olap/row_cursor.h"
 #include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
 #include "olap/rowset/segment_v2/row_ranges.h"
 #include "olap/tablet_schema.h"
@@ -35,7 +36,6 @@
 
 namespace doris {
 
-class RowCursor;
 class Schema;
 class ColumnPredicate;
 
@@ -70,6 +70,22 @@ public:
         const RowCursor* upper_key = nullptr;
         // whether `upper_key` is included in the range
         bool include_upper;
+
+        uint64_t get_digest(uint64_t seed) const {
+            if (lower_key != nullptr) {
+                auto key_str = lower_key->to_string();
+                seed = HashUtil::hash64(key_str.c_str(), key_str.size(), seed);
+                seed = HashUtil::hash64(&include_lower, sizeof(include_lower), 
seed);
+            }
+
+            if (upper_key != nullptr) {
+                auto key_str = upper_key->to_string();
+                seed = HashUtil::hash64(key_str.c_str(), key_str.size(), seed);
+                seed = HashUtil::hash64(&include_upper, sizeof(include_upper), 
seed);
+            }
+
+            return seed;
+        }
     };
 
     // reader's key ranges, empty if not existed.
@@ -132,6 +148,12 @@ public:
 
     std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
     CollectionStatisticsPtr collection_statistics;
+
+    // Cache for sparse column data to avoid redundant reads
+    // col_unique_id -> cached column_ptr
+    std::unordered_map<int32_t, vectorized::ColumnPtr> sparse_column_cache;
+
+    uint64_t condition_cache_digest = 0;
 };
 
 struct CompactionSampleInfo {
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 9768bb2f7e0..9b236464743 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -402,6 +402,10 @@ struct OlapReaderStatistics {
     int64_t output_index_result_column_timer = 0;
     // number of segment filtered by column stat when creating seg iterator
     int64_t filtered_segment_number = 0;
+    // number of segment with condition cache hit
+    int64_t condition_cache_hit_seg_nums = 0;
+    // number of rows filtered by condition cache hit
+    int64_t condition_cache_filtered_rows = 0;
     // total number of segment
     int64_t total_segment_number = 0;
 
diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index 6c76cd3cc77..a978cb56336 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -35,7 +35,7 @@ using namespace vectorized;
 
 Status ParallelScannerBuilder::build_scanners(std::list<ScannerSPtr>& 
scanners) {
     RETURN_IF_ERROR(_load());
-    if (_optimize_index_scan_parallelism) {
+    if (_scan_parallelism_by_segment) {
         return _build_scanners_by_segment(scanners);
     } else if (_is_dup_mow_key) {
         // Default strategy for DUP/MOW tables: split by rowids within segments
@@ -88,7 +88,7 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
                     auto rows_need = _rows_per_scanner - rows_collected;
 
                     // 0.9: try to avoid splitting the segments into 
excessively small parts.
-                    if (rows_need >= remaining_rows * 0.9) {
+                    if (rows_need >= remaining_rows * 9 / 10) {
                         rows_need = remaining_rows;
                     }
                     DCHECK_LE(rows_need, remaining_rows);
@@ -171,6 +171,8 @@ Status 
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<ScannerSPtr>&
 // for the involved tablets. It preserves delete predicates and key ranges, 
and clones
 // RowsetReader per scanner to avoid sharing between scanners.
 Status 
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>& 
scanners) {
+    DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
+
     for (auto&& [tablet, version] : _tablets) {
         DCHECK(_all_read_sources.contains(tablet->tablet_id()));
         auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
@@ -180,8 +182,10 @@ Status 
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
             
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
         }
 
-        // For each RowSet split in the read source, split by segment id and 
build
-        // one scanner per segment. Keep delete predicates shared.
+        // Collect segments into scanners based on rows count instead of one 
scanner per segment
+        TabletReadSource partitial_read_source;
+        int64_t rows_collected = 0;
+
         for (auto& rs_split : entire_read_source.rs_splits) {
             auto reader = rs_split.rs_reader;
             auto rowset = reader->rowset();
@@ -192,23 +196,65 @@ Status 
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
                 continue;
             }
 
-            // Build scanners for [i, i+1) segment range, without row-range 
slicing.
-            for (int64_t i = 0; i < rowset->num_segments(); ++i) {
-                RowSetSplits split(reader->clone());
-                split.segment_offsets.first = i;
-                split.segment_offsets.second = i + 1;
-                // No row-ranges slicing; scan whole segment i.
-                DCHECK_GE(split.segment_offsets.second, 
split.segment_offsets.first + 1);
+            int64_t segment_start = 0;
+            auto split = RowSetSplits(reader->clone());
+
+            for (size_t i = 0; i < segments_rows.size(); ++i) {
+                const size_t rows_of_segment = segments_rows[i];
+
+                // Check if adding this segment would exceed rows_per_scanner
+                // 0.9: try to avoid splitting the segments into excessively 
small parts.
+                if (rows_collected > 0 && (rows_collected + rows_of_segment > 
_rows_per_scanner &&
+                                           rows_collected < _rows_per_scanner 
* 9 / 10)) {
+                    // Create a new scanner with collected segments
+                    split.segment_offsets.first = segment_start;
+                    split.segment_offsets.second =
+                            i; // Range is [segment_start, i), including all 
segments from segment_start to i-1
+
+                    DCHECK_GT(split.segment_offsets.second, 
split.segment_offsets.first);
+
+                    
partitial_read_source.rs_splits.emplace_back(std::move(split));
+
+                    scanners.emplace_back(_build_scanner(
+                            tablet, version, _key_ranges,
+                            {.rs_splits = 
std::move(partitial_read_source.rs_splits),
+                             .delete_predicates = 
entire_read_source.delete_predicates,
+                             .delete_bitmap = 
entire_read_source.delete_bitmap}));
+
+                    // Reset for next scanner
+                    partitial_read_source = TabletReadSource();
+                    split = RowSetSplits(reader->clone());
+                    segment_start = i;
+                    rows_collected = 0;
+                }
+
+                // Add current segment to the current scanner
+                rows_collected += rows_of_segment;
+            }
 
-                TabletReadSource partitial_read_source;
+            // Add remaining segments in this rowset to a scanner
+            if (rows_collected > 0) {
+                split.segment_offsets.first = segment_start;
+                split.segment_offsets.second = segments_rows.size();
+                DCHECK_GT(split.segment_offsets.second, 
split.segment_offsets.first);
                 partitial_read_source.rs_splits.emplace_back(std::move(split));
+            }
+        }
 
-                scanners.emplace_back(
-                        _build_scanner(tablet, version, _key_ranges,
-                                       {.rs_splits = 
std::move(partitial_read_source.rs_splits),
-                                        .delete_predicates = 
entire_read_source.delete_predicates,
-                                        .delete_bitmap = 
entire_read_source.delete_bitmap}));
+        // Add remaining segments across all rowsets to a scanner
+        if (rows_collected > 0) {
+            DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
+#ifndef NDEBUG
+            for (auto& split : partitial_read_source.rs_splits) {
+                DCHECK(split.rs_reader != nullptr);
+                DCHECK_LT(split.segment_offsets.first, 
split.segment_offsets.second);
             }
+#endif
+            scanners.emplace_back(
+                    _build_scanner(tablet, version, _key_ranges,
+                                   {.rs_splits = 
std::move(partitial_read_source.rs_splits),
+                                    .delete_predicates = 
entire_read_source.delete_predicates,
+                                    .delete_bitmap = 
entire_read_source.delete_bitmap}));
         }
     }
 
diff --git a/be/src/olap/parallel_scanner_builder.h 
b/be/src/olap/parallel_scanner_builder.h
index de1ea33b149..7c57711bc70 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -65,7 +65,7 @@ public:
 
     void set_min_rows_per_scanner(int64_t size) { _min_rows_per_scanner = 
size; }
 
-    void set_optimize_index_scan_parallelism(bool v) { 
_optimize_index_scan_parallelism = v; }
+    void set_scan_parallelism_by_segment(bool v) { 
_scan_parallelism_by_segment = v; }
 
     const OlapReaderStatistics* builder_stats() const { return 
&_builder_stats; }
 
@@ -96,7 +96,7 @@ private:
     std::map<RowsetId, std::vector<size_t>> _all_segments_rows;
 
     // Force building one scanner per segment when true.
-    bool _optimize_index_scan_parallelism {false};
+    bool _scan_parallelism_by_segment {false};
 
     std::shared_ptr<RuntimeProfile> _scanner_profile;
     OlapReaderStatistics _builder_stats;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0b44597ed95..43f7f89adea 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -146,6 +146,10 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
             read_columns.push_back(cid);
         }
     }
+    // disable condition cache if you have delete condition
+    _read_context->condition_cache_digest =
+            delete_columns_set.empty() ? _read_context->condition_cache_digest 
: 0;
+    // create segment iterators
     VLOG_NOTICE << "read columns size: " << read_columns.size();
     _input_schema = 
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
     // output_schema only contains return_columns (excludes extra columns like 
delete-predicate columns).
@@ -224,6 +228,14 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
                 
_read_context->runtime_state->query_options().disable_file_cache;
     }
 
+    if (_read_context->condition_cache_digest) {
+        for (const auto& key_range : _read_options.key_ranges) {
+            _read_context->condition_cache_digest =
+                    
key_range.get_digest(_read_context->condition_cache_digest);
+        }
+        _read_options.condition_cache_digest = 
_read_context->condition_cache_digest;
+    }
+
     _read_options.io_ctx.expiration_time =
             read_context->ttl_seconds > 0 && 
_rowset->rowset_meta()->newest_write_timestamp() > 0
                     ? _rowset->rowset_meta()->newest_write_timestamp() + 
read_context->ttl_seconds
@@ -275,6 +287,10 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
             DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
             auto local_options = _read_options;
             local_options.row_ranges = _segment_row_ranges[i - seg_start];
+            if (local_options.condition_cache_digest) {
+                local_options.condition_cache_digest =
+                        
local_options.row_ranges.get_digest(local_options.condition_cache_digest);
+            }
             iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, 
should_use_cache,
                                                              _input_schema, 
local_options);
         }
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index acf18cf86a4..846c721ca34 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -98,6 +98,8 @@ struct RowsetReaderContext {
     std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
     CollectionStatisticsPtr collection_statistics;
     std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
+
+    uint64_t condition_cache_digest = 0;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/condition_cache.cpp 
b/be/src/olap/rowset/segment_v2/condition_cache.cpp
new file mode 100644
index 00000000000..f4ebd2584fc
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/condition_cache.cpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/condition_cache.h"
+
+#include <memory>
+
+#include "util/defer_op.h"
+
+namespace doris::segment_v2 {
+
+bool ConditionCache::lookup(const CacheKey& key, ConditionCacheHandle* handle) 
{
+    if (key.encode().empty()) {
+        return false;
+    }
+    auto* lru_handle = LRUCachePolicy::lookup(key.encode());
+    if (lru_handle == nullptr) {
+        return false;
+    }
+    *handle = ConditionCacheHandle(this, lru_handle);
+    return true;
+}
+
+void ConditionCache::insert(const CacheKey& key, 
std::shared_ptr<std::vector<bool>> result) {
+    if (key.encode().empty()) {
+        return;
+    }
+    std::unique_ptr<ConditionCache::CacheValue> cache_value_ptr =
+            std::make_unique<ConditionCache::CacheValue>();
+    cache_value_ptr->filter_result = result;
+
+    ConditionCacheHandle(
+            this,
+            LRUCachePolicy::insert(key.encode(), 
(void*)cache_value_ptr.release(),
+                                   result->capacity(), result->capacity(), 
CachePriority::NORMAL));
+}
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/condition_cache.h 
b/be/src/olap/rowset/segment_v2/condition_cache.h
new file mode 100644
index 00000000000..968f67df825
--- /dev/null
+++ b/be/src/olap/rowset/segment_v2/condition_cache.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <roaring/roaring.hh>
+#include <string>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "olap/lru_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/lru_cache_policy.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+#include "util/time.h"
+
+namespace doris::segment_v2 {
+
+class ConditionCacheHandle;
+
+class ConditionCache : public LRUCachePolicy {
+public:
+    using LRUCachePolicy::insert;
+
+    // The cache key or segment lru cache
+    struct CacheKey {
+        CacheKey(RowsetId rowset_id_, int64_t segment_id_, uint64_t digest_)
+                : rowset_id(rowset_id_), segment_id(segment_id_), 
digest(digest_) {}
+        RowsetId rowset_id;
+        int64_t segment_id;
+        uint64_t digest;
+
+        // Encode to a flat binary which can be used as LRUCache's key
+        [[nodiscard]] std::string encode() const {
+            char buf[16];
+            memcpy(buf, &segment_id, 8);
+            memcpy(buf + 8, &digest, 8);
+
+            return rowset_id.to_string() + std::string(buf, 16);
+        }
+    };
+
+    class CacheValue : public LRUCacheValueBase {
+    public:
+        std::shared_ptr<std::vector<bool>> filter_result;
+    };
+
+    // Create global instance of this class
+    static ConditionCache* create_global_cache(size_t capacity, uint32_t 
num_shards = 16) {
+        auto* res = new ConditionCache(capacity, num_shards);
+        return res;
+    }
+
+    // Return global instance.
+    // Client should call create_global_cache before.
+    static ConditionCache* instance() { return 
ExecEnv::GetInstance()->get_condition_cache(); }
+
+    ConditionCache() = delete;
+
+    ConditionCache(size_t capacity, uint32_t num_shards)
+            : LRUCachePolicy(CachePolicy::CacheType::CONDITION_CACHE, 
capacity, LRUCacheType::SIZE,
+                             
config::inverted_index_cache_stale_sweep_time_sec, num_shards,
+                             /*element_count_capacity*/ 0, /*enable_prune*/ 
true,
+                             /*is_lru_k*/ false) {}
+
+    bool lookup(const CacheKey& key, ConditionCacheHandle* handle);
+
+    void insert(const CacheKey& key, std::shared_ptr<std::vector<bool>> 
filter_result);
+};
+
+class ConditionCacheHandle {
+public:
+    ConditionCacheHandle() = default;
+
+    ConditionCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
+            : _cache(cache), _handle(handle) {}
+
+    ~ConditionCacheHandle() {
+        if (_handle != nullptr) {
+            _cache->release(_handle);
+        }
+    }
+
+    ConditionCacheHandle(ConditionCacheHandle&& other) noexcept {
+        // we can use std::exchange if we switch c++14 on
+        std::swap(_cache, other._cache);
+        std::swap(_handle, other._handle);
+    }
+
+    ConditionCacheHandle& operator=(ConditionCacheHandle&& other) noexcept {
+        std::swap(_cache, other._cache);
+        std::swap(_handle, other._handle);
+        return *this;
+    }
+
+    LRUCachePolicy* cache() const { return _cache; }
+
+    std::shared_ptr<std::vector<bool>> get_filter_result() const {
+        if (!_cache) {
+            return nullptr;
+        }
+        return 
((ConditionCache::CacheValue*)_cache->value(_handle))->filter_result;
+    }
+
+private:
+    LRUCachePolicy* _cache = nullptr;
+    Cache::Handle* _handle = nullptr;
+
+    // Don't allow copy and assign
+    DISALLOW_COPY_AND_ASSIGN(ConditionCacheHandle);
+};
+
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/row_ranges.h 
b/be/src/olap/rowset/segment_v2/row_ranges.h
index c98320d069a..2f990faa762 100644
--- a/be/src/olap/rowset/segment_v2/row_ranges.h
+++ b/be/src/olap/rowset/segment_v2/row_ranges.h
@@ -93,6 +93,13 @@ public:
 
     std::string to_string() const { return absl::Substitute("[$0-$1)", _from, 
_to); }
 
+    uint64_t get_digest(uint64_t seed) const {
+        uint64_t hash = seed;
+        hash = hash * 31 + _from;
+        hash = hash * 31 + _to;
+        return hash;
+    }
+
 private:
     int64_t _from;
     int64_t _to;
@@ -268,6 +275,13 @@ public:
         _count += range_to_add.count();
     }
 
+    uint64_t get_digest(uint64_t seed) const {
+        for (auto range : _ranges) {
+            seed = range.get_digest(seed);
+        }
+        return seed;
+    }
+
 private:
     std::vector<RowRange> _ranges;
     size_t _count;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 654af05b7e5..fb6e5fa171a 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -40,7 +40,6 @@
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/status.h"
-#include "io/fs/file_reader.h"
 #include "io/io_common.h"
 #include "olap/bloom_filter_predicate.h"
 #include "olap/collection_similarity.h"
@@ -56,6 +55,7 @@
 #include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h"
 #include "olap/rowset/segment_v2/column_reader.h"
 #include "olap/rowset/segment_v2/column_reader_cache.h"
+#include "olap/rowset/segment_v2/condition_cache.h"
 #include "olap/rowset/segment_v2/index_file_reader.h"
 #include "olap/rowset/segment_v2/index_iterator.h"
 #include "olap/rowset/segment_v2/index_query_context.h"
@@ -102,7 +102,6 @@
 #include "vec/exprs/vliteral.h"
 #include "vec/exprs/vslot_ref.h"
 #include "vec/functions/array/function_array_index.h"
-#include "vec/json/path_in_data.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -112,6 +111,60 @@ namespace segment_v2 {
 
 SegmentIterator::~SegmentIterator() = default;
 
+void SegmentIterator::_init_row_bitmap_by_condition_cache() {
+    // Only dispose need column predicate and expr cal in condition cache
+    if (!_col_predicates.empty() ||
+        (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty())) {
+        if (_opts.condition_cache_digest) {
+            auto* condition_cache = ConditionCache::instance();
+            ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
+                                               _opts.condition_cache_digest);
+
+            // Increment search count when digest != 0
+            
DorisMetrics::instance()->condition_cache_search_count->increment(1);
+
+            ConditionCacheHandle handle;
+            _find_condition_cache = condition_cache->lookup(cache_key, 
&handle);
+
+            // Increment hit count if cache lookup is successful
+            if (_find_condition_cache) {
+                
DorisMetrics::instance()->condition_cache_hit_count->increment(1);
+                if (_opts.runtime_state) {
+                    VLOG_DEBUG << "Condition cache hit, query id: "
+                               << print_id(_opts.runtime_state->query_id())
+                               << ", segment id: " << _segment->id()
+                               << ", cache digest: " << 
_opts.condition_cache_digest
+                               << ", rowset id: " << 
_opts.rowset_id.to_string();
+                }
+            }
+
+            auto num_rows = _segment->num_rows();
+            if (_find_condition_cache) {
+                const auto& filter_result = *(handle.get_filter_result());
+                int64_t filtered_blocks = 0;
+                for (int i = 0; i < filter_result.size(); i++) {
+                    if (!filter_result[i]) {
+                        _row_bitmap.removeRange(
+                                i * CONDITION_CACHE_OFFSET,
+                                i * CONDITION_CACHE_OFFSET + 
CONDITION_CACHE_OFFSET);
+                        filtered_blocks++;
+                    }
+                }
+                // Record condition_cache hit segment number
+                _opts.stats->condition_cache_hit_seg_nums++;
+                // Record rows filtered by condition cache hit
+                _opts.stats->condition_cache_filtered_rows +=
+                        filtered_blocks * 
SegmentIterator::CONDITION_CACHE_OFFSET;
+            } else {
+                _condition_cache = std::make_shared<std::vector<bool>>(
+                        num_rows / CONDITION_CACHE_OFFSET + 1, false);
+            }
+        }
+    } else {
+        _opts.condition_cache_digest = 0;
+    }
+}
+
 // A fast range iterator for roaring bitmap. Output ranges use closed-open 
form, like [from, to).
 // Example:
 //   input bitmap:  [0 1 4 5 6 7 10 15 16 17 18 19]
@@ -399,6 +452,8 @@ Status SegmentIterator::_lazy_init(vectorized::Block* 
block) {
     SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
     DorisMetrics::instance()->segment_read_total->increment(1);
     _row_bitmap.addRange(0, _segment->num_rows());
+    _init_row_bitmap_by_condition_cache();
+
     // z-order can not use prefix index
     if (_segment->_tablet_schema->sort_type() != SortType::ZORDER &&
         _segment->_tablet_schema->cluster_key_uids().empty()) {
@@ -526,9 +581,8 @@ Status SegmentIterator::_get_row_ranges_by_keys() {
         auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
         RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
     }
-    // pre-condition: _row_ranges == [0, num_rows)
     size_t pre_size = _row_bitmap.cardinality();
-    _row_bitmap = RowRanges::ranges_to_roaring(result_ranges);
+    _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges);
     _opts.stats->rows_key_range_filtered += (pre_size - 
_row_bitmap.cardinality());
 
     return Status::OK();
@@ -630,6 +684,8 @@ Status 
SegmentIterator::_get_row_ranges_by_column_conditions() {
                     ++it;
                 }
             }
+            _opts.condition_cache_digest =
+                    _common_expr_ctxs_push_down.empty() ? 0 : 
_opts.condition_cache_digest;
             _opts.stats->rows_inverted_index_filtered += (input_rows - 
_row_bitmap.cardinality());
             for (auto cid : _schema->column_ids()) {
                 bool result_true = 
_check_all_conditions_passed_inverted_index_for_column(cid);
@@ -2248,11 +2304,22 @@ uint16_t 
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
 Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& 
read_column_ids,
                                                 std::vector<rowid_t>& 
rowid_vector,
                                                 uint16_t* sel_rowid_idx, 
size_t select_size,
-                                                vectorized::MutableColumns* 
mutable_columns) {
+                                                vectorized::MutableColumns* 
mutable_columns,
+                                                bool init_condition_cache) {
     SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
     std::vector<rowid_t> rowids(select_size);
-    for (size_t i = 0; i < select_size; ++i) {
-        rowids[i] = rowid_vector[sel_rowid_idx[i]];
+
+    if (init_condition_cache) {
+        DCHECK(_condition_cache);
+        auto& condition_cache = *_condition_cache;
+        for (size_t i = 0; i < select_size; ++i) {
+            rowids[i] = rowid_vector[sel_rowid_idx[i]];
+            condition_cache[rowids[i] / 
SegmentIterator::CONDITION_CACHE_OFFSET] = true;
+        }
+    } else {
+        for (size_t i = 0; i < select_size; ++i) {
+            rowids[i] = rowid_vector[sel_rowid_idx[i]];
+        }
     }
 
     for (auto cid : read_column_ids) {
@@ -2296,7 +2363,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
         RETURN_IF_CATCH_EXCEPTION({
             auto res = _next_batch_internal(block);
 
-            if (res.is<END_OF_FILE>() && block->rows() == 0) {
+            if (res.is<END_OF_FILE>()) {
                 // Since we have a type check at the caller.
                 // So a replacement of nothing column with real column is 
needed.
                 const auto& idx_to_datatype = _opts.vir_col_idx_to_type;
@@ -2305,6 +2372,18 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
                     auto type = idx_to_datatype.find(idx)->second;
                     block->replace_by_position(idx, type->create_column());
                 }
+
+                if (_opts.condition_cache_digest && !_find_condition_cache) {
+                    auto* condition_cache = ConditionCache::instance();
+                    ConditionCache::CacheKey cache_key(_opts.rowset_id, 
_segment->id(),
+                                                       
_opts.condition_cache_digest);
+                    VLOG_DEBUG << "Condition cache insert, query id: "
+                               << print_id(_opts.runtime_state->query_id())
+                               << ", rowset id: " << 
_opts.rowset_id.to_string()
+                               << ", segment id: " << _segment->id()
+                               << ", cache digest: " << 
_opts.condition_cache_digest;
+                    condition_cache->insert(cache_key, 
std::move(_condition_cache));
+                }
                 return res;
             }
 
@@ -2473,11 +2552,22 @@ Status 
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
         }
 
         // step4: read non_predicate column
-        if (_selected_size > 0 && !_non_predicate_columns.empty()) {
-            RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, 
_block_rowids,
-                                                    _sel_rowid_idx.data(), 
_selected_size,
-                                                    &_current_return_columns));
-            _replace_version_col_if_needed(_non_predicate_columns, 
_selected_size);
+        if (_selected_size > 0) {
+            if (!_non_predicate_columns.empty()) {
+                RETURN_IF_ERROR(_read_columns_by_rowids(
+                        _non_predicate_columns, _block_rowids, 
_sel_rowid_idx.data(),
+                        _selected_size, &_current_return_columns,
+                        _opts.condition_cache_digest && 
!_find_condition_cache));
+                _replace_version_col_if_needed(_non_predicate_columns, 
_selected_size);
+            } else {
+                if (_opts.condition_cache_digest && !_find_condition_cache) {
+                    auto& condition_cache = *_condition_cache;
+                    for (size_t i = 0; i < _selected_size; ++i) {
+                        auto rowid = _block_rowids[_sel_rowid_idx[i]];
+                        condition_cache[rowid / 
SegmentIterator::CONDITION_CACHE_OFFSET] = true;
+                    }
+                }
+            }
         }
     }
 
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index b6fc5467acb..4d2ab090b19 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -230,7 +230,8 @@ private:
     [[nodiscard]] Status _read_columns_by_rowids(std::vector<ColumnId>& 
read_column_ids,
                                                  std::vector<rowid_t>& 
rowid_vector,
                                                  uint16_t* sel_rowid_idx, 
size_t select_size,
-                                                 vectorized::MutableColumns* 
mutable_columns);
+                                                 vectorized::MutableColumns* 
mutable_columns,
+                                                 bool init_condition_cache = 
false);
 
     Status copy_column_data_by_selector(vectorized::IColumn* input_col_ptr,
                                         vectorized::MutableColumnPtr& 
output_col,
@@ -383,6 +384,8 @@ private:
     Status _materialization_of_virtual_column(vectorized::Block* block);
     void _prepare_score_column_materialization();
 
+    void _init_row_bitmap_by_condition_cache();
+
     class BitmapRangeIterator;
     class BackwardBitmapRangeIterator;
 
@@ -514,6 +517,10 @@ private:
 
     // key is column uid, value is the sparse column cache
     std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr> 
_variant_sparse_column_cache;
+
+    bool _find_condition_cache = false;
+    std::shared_ptr<std::vector<bool>> _condition_cache;
+    static constexpr int CONDITION_CACHE_OFFSET = 2048;
 };
 
 } // namespace segment_v2
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index c4cb3b2dd10..925dd54b722 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -227,6 +227,7 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     _reader_context.all_access_paths = read_params.all_access_paths;
     _reader_context.predicate_access_paths = 
read_params.predicate_access_paths;
 
+    _reader_context.condition_cache_digest = 
read_params.condition_cache_digest;
     return Status::OK();
 }
 
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index bb2b66d3384..71efa484fa1 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -206,6 +206,8 @@ public:
         std::shared_ptr<vectorized::ScoreRuntime> score_runtime;
         CollectionStatisticsPtr collection_statistics;
         std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
+
+        uint64_t condition_cache_digest = 0;
     };
 
     TabletReader() = default;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 5260e610083..2082dd7ea87 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -372,6 +372,10 @@ Status OlapScanLocalState::_init_profile() {
     _variant_doc_value_column_iter_count =
             ADD_COUNTER(_segment_profile, "VariantDocValueColumnIterCount", 
TUnit::UNIT);
 
+    _condition_cache_hit_segment_counter =
+            ADD_COUNTER(_segment_profile, "ConditionCacheSegmentHit", 
TUnit::UNIT);
+    _condition_cache_filtered_rows_counter =
+            ADD_COUNTER(_segment_profile, "ConditionCacheFilteredRows", 
TUnit::UNIT);
     return Status::OK();
 }
 
@@ -516,7 +520,7 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
             // TODO: Use optimize_index_scan_parallelism for ann range search 
in the future.
             // Currently, ann topn is enough
             if (_ann_topn_runtime != nullptr) {
-                scanner_builder.set_optimize_index_scan_parallelism(true);
+                scanner_builder.set_scan_parallelism_by_segment(true);
             }
         }
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index ea14a1e6b06..292081b29a6 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -259,6 +259,10 @@ private:
     // total number of segment related to this scan node
     RuntimeProfile::Counter* _total_segment_counter = nullptr;
 
+    // condition cache filter stats
+    RuntimeProfile::Counter* _condition_cache_hit_segment_counter = nullptr;
+    RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr;
+
     // timer about tablet reader
     RuntimeProfile::Counter* _tablet_reader_init_timer = nullptr;
     RuntimeProfile::Counter* _tablet_reader_capture_rs_readers_timer = nullptr;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 768040b75fa..a1a7e1fb461 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -169,6 +169,20 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
                 p._common_expr_ctxs_push_down[i]->clone(state, 
_common_expr_ctxs_push_down[i]));
     }
     RETURN_IF_ERROR(_helper.acquire_runtime_filter(state, _conjuncts, 
p.row_descriptor()));
+
+    // Disable condition cache in topn filter valid. TODO:: Try to support the 
topn filter in condition cache
+    if (state->query_options().condition_cache_digest && 
p._topn_filter_source_node_ids.empty()) {
+        _condition_cache_digest = 
state->query_options().condition_cache_digest;
+        for (auto& conjunct : _conjuncts) {
+            _condition_cache_digest = 
conjunct->get_digest(_condition_cache_digest);
+            if (!_condition_cache_digest) {
+                break;
+            }
+        }
+    } else {
+        _condition_cache_digest = 0;
+    }
+
     RETURN_IF_ERROR(_process_conjuncts(state));
 
     auto status = _eos ? Status::OK() : _prepare_scanners();
@@ -1074,7 +1088,7 @@ template <typename Derived>
 Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
     auto& p = _parent->cast<typename Derived::Parent>();
     std::stringstream result;
-    std::copy(p.topn_filter_source_node_ids.begin(), 
p.topn_filter_source_node_ids.end(),
+    std::copy(p._topn_filter_source_node_ids.begin(), 
p._topn_filter_source_node_ids.end(),
               std::ostream_iterator<int>(result, ","));
     custom_profile()->add_info_string("TopNFilterSourceNodeIds", result.str());
 
@@ -1188,7 +1202,7 @@ Status ScanOperatorX<LocalStateType>::init(const 
TPlanNode& tnode, RuntimeState*
     }
 
     if (tnode.__isset.topn_filter_source_node_ids) {
-        topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
+        _topn_filter_source_node_ids = tnode.topn_filter_source_node_ids;
     }
 
     // Which means the request could be fullfilled in a single segment 
iterator request.
@@ -1220,7 +1234,7 @@ Status 
ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
         _colname_to_slot_id[slot->col_name()] = slot->id();
         _slot_id_to_slot_desc[slot->id()] = slot;
     }
-    for (auto id : topn_filter_source_node_ids) {
+    for (auto id : _topn_filter_source_node_ids) {
         if (!state->get_query_ctx()->has_runtime_predicate(id)) {
             // compatible with older versions fe
             continue;
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index aef51db57db..3d72ee6f5bd 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -90,6 +90,8 @@ public:
 
     Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
scanner_conjuncts);
 
+    uint64_t get_condition_cache_digest() const { return 
_condition_cache_digest; }
+
 protected:
     friend class vectorized::ScannerContext;
     friend class vectorized::Scanner;
@@ -125,6 +127,8 @@ protected:
 
     std::mutex _conjuncts_lock;
     RuntimeFilterConsumerHelper _helper;
+    // magic number as seed to generate hash value for condition cache
+    uint64_t _condition_cache_digest = 0;
 };
 
 template <typename LocalStateType>
@@ -171,7 +175,7 @@ class ScanLocalState : public ScanLocalStateBase {
 
     std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool 
push_down) {
         std::vector<int> result;
-        for (int id : _parent->cast<typename 
Derived::Parent>().topn_filter_source_node_ids) {
+        for (int id : _parent->cast<typename 
Derived::Parent>()._topn_filter_source_node_ids) {
             if (!state->get_query_ctx()->has_runtime_predicate(id)) {
                 // compatible with older versions fe
                 continue;
@@ -195,10 +199,7 @@ protected:
     friend class vectorized::Scanner;
 
     Status _init_profile() override;
-    virtual Status _process_conjuncts(RuntimeState* state) {
-        RETURN_IF_ERROR(_normalize_conjuncts(state));
-        return Status::OK();
-    }
+    virtual Status _process_conjuncts(RuntimeState* state) { return 
_normalize_conjuncts(state); }
     virtual bool _should_push_down_common_expr() { return false; }
 
     virtual bool _storage_no_merge() { return false; }
@@ -430,7 +431,10 @@ protected:
     // Record the value of the aggregate function 'count' from doris's be
     int64_t _push_down_count = -1;
     const int _parallel_tasks = 0;
-    std::vector<int> topn_filter_source_node_ids;
+
+    int _query_parallel_instance_num = 0;
+
+    std::vector<int> _topn_filter_source_node_ids;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0696de46007..e239fae0c7b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -70,6 +70,7 @@ class PackedFileManager;
 namespace segment_v2 {
 class InvertedIndexSearcherCache;
 class InvertedIndexQueryCache;
+class ConditionCache;
 class TmpFileDirs;
 class EncodingInfoResolver;
 
@@ -382,6 +383,7 @@ public:
     segment_v2::EncodingInfoResolver* get_encoding_info_resolver() {
         return _encoding_info_resolver;
     }
+    segment_v2::ConditionCache* get_condition_cache() { return 
_condition_cache; }
     QueryCache* get_query_cache() { return _query_cache; }
 
     pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
@@ -536,6 +538,7 @@ private:
     segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = 
nullptr;
     segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
     segment_v2::EncodingInfoResolver* _encoding_info_resolver = nullptr;
+    segment_v2::ConditionCache* _condition_cache = nullptr;
     QueryCache* _query_cache = nullptr;
     std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;
     DeleteBitmapAggCache* _delete_bitmap_agg_cache {nullptr};
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 9a2fd612e86..e7d1fca8cff 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -55,6 +55,7 @@
 #include "olap/olap_define.h"
 #include "olap/options.h"
 #include "olap/page_cache.h"
+#include "olap/rowset/segment_v2/condition_cache.h"
 #include "olap/rowset/segment_v2/encoding_info.h"
 #include "olap/rowset/segment_v2/inverted_index_cache.h"
 #include "olap/schema_cache.h"
@@ -111,7 +112,6 @@
 #include "util/dns_cache.h"
 #include "util/doris_metrics.h"
 #include "util/mem_info.h"
-#include "util/metrics.h"
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
 #include "util/threadpool.h"
@@ -661,6 +661,13 @@ Status ExecEnv::init_mem_env() {
     // Initialize encoding info resolver
     _encoding_info_resolver = new segment_v2::EncodingInfoResolver();
 
+    // use memory limit
+    int64_t condition_cache_limit = config::condition_cache_limit * 1024L * 
1024L;
+    _condition_cache = 
ConditionCache::create_global_cache(condition_cache_limit);
+    LOG(INFO) << "Condition cache memory limit: "
+              << PrettyPrinter::print(condition_cache_limit, TUnit::BYTES)
+              << ", origin config value: " << config::condition_cache_limit;
+
     // init orc memory pool
     _orc_memory_pool = new doris::vectorized::ORCMemoryPool();
     _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();
@@ -844,6 +851,7 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_inverted_index_query_cache);
     SAFE_DELETE(_inverted_index_searcher_cache);
     SAFE_DELETE(_encoding_info_resolver);
+    SAFE_DELETE(_condition_cache);
     SAFE_DELETE(_lookup_connection_cache);
     SAFE_DELETE(_schema_cache);
     SAFE_DELETE(_segment_loader);
diff --git a/be/src/runtime/memory/cache_policy.h 
b/be/src/runtime/memory/cache_policy.h
index 180ab001967..3a072de82f1 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -56,6 +56,7 @@ public:
         QUERY_CACHE = 20,
         TABLET_COLUMN_OBJECT_POOL = 21,
         SCHEMA_CLOUD_DICTIONARY_CACHE = 22,
+        CONDITION_CACHE = 23,
     };
 
     static std::string type_string(CacheType type) {
@@ -102,6 +103,10 @@ public:
             return "QueryCache";
         case CacheType::TABLET_COLUMN_OBJECT_POOL:
             return "TabletColumnObjectPool";
+        case CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE:
+            return "SchemaCloudDictionaryCache";
+        case CacheType::CONDITION_CACHE:
+            return "ConditionCache";
         default:
             throw Exception(Status::FatalError("not match type of cache policy 
:{}",
                                                static_cast<int>(type)));
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index aad0692df23..7a92b3676f1 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -121,6 +121,13 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(
         "(segment_v2) total number of rows in queried segments (before index 
pruning)",
         segment_read, Labels({{"type", "segment_row_total"}}));
 
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(condition_cache_search_count, 
MetricUnit::OPERATIONS,
+                                     "number of condition cache lookups when 
digest != 0",
+                                     condition_cache, Labels({{"type", 
"condition_cache_search"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(condition_cache_hit_count, 
MetricUnit::OPERATIONS,
+                                     "number of condition cache hits", 
condition_cache,
+                                     Labels({{"type", 
"condition_cache_hit"}}));
+
 DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_txn_begin_request_total, 
MetricUnit::OPERATIONS,
                                      "", stream_load_txn_request, 
Labels({{"type", "begin"}}));
 DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(stream_load_txn_commit_request_total, 
MetricUnit::OPERATIONS,
@@ -326,6 +333,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
cumulative_compaction_task_pending_total);
 
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_read_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
condition_cache_search_count);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
condition_cache_hit_count);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_row_total);
 
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
stream_load_txn_begin_request_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 20b8a4a3770..95ffb833a37 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -122,6 +122,10 @@ public:
     IntCounter* segment_read_total = nullptr;
     // total number of rows in queried segments (before index pruning)
     IntCounter* segment_row_total = nullptr;
+    // number of condition cache lookups when digest != 0
+    IntCounter* condition_cache_search_count = nullptr;
+    // number of condition cache hits
+    IntCounter* condition_cache_hit_count = nullptr;
 
     IntCounter* stream_load_txn_begin_request_total = nullptr;
     IntCounter* stream_load_txn_commit_request_total = nullptr;
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 099b484cf23..4770e6088e2 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -213,12 +213,7 @@ public:
 
     void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
                                   const uint8_t* __restrict null_data) const 
override {
-        auto real_data = data->get_data_at(0);
-        if (real_data.data == nullptr) {
-            hash = HashUtil::xxHash64NullWithSeed(hash);
-        } else {
-            hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, 
hash);
-        }
+        data->update_xxHash_with_value(0, 1, hash, null_data);
     }
 
     void update_crc_with_value(size_t start, size_t end, uint32_t& hash,
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp 
b/be/src/vec/exec/scan/olap_scanner.cpp
index 92346aa68b6..e413b8e6628 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -95,7 +95,8 @@ OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* 
parent, OlapScanner::Para
                                  .vir_col_idx_to_type {},
                                  .score_runtime {},
                                  .collection_statistics {},
-                                 .ann_topn_runtime {}}) {
+                                 .ann_topn_runtime {},
+                                 .condition_cache_digest = 
parent->get_condition_cache_digest()}) {
     _tablet_reader_params.set_read_source(std::move(params.read_source),
                                           _state->skip_delete_bitmap());
     _has_prepared = false;
@@ -821,6 +822,10 @@ void OlapScanner::_collect_profile_before_close() {
                    stats.output_index_result_column_timer);
     COUNTER_UPDATE(local_state->_filtered_segment_counter, 
stats.filtered_segment_number);
     COUNTER_UPDATE(local_state->_total_segment_counter, 
stats.total_segment_number);
+    COUNTER_UPDATE(local_state->_condition_cache_hit_segment_counter,
+                   stats.condition_cache_hit_seg_nums);
+    COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter,
+                   stats.condition_cache_filtered_rows);
 
     COUNTER_UPDATE(local_state->_tablet_reader_init_timer, 
stats.tablet_reader_init_timer_ns);
     COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer,
diff --git a/be/src/vec/exprs/vbitmap_predicate.h 
b/be/src/vec/exprs/vbitmap_predicate.h
index 8fabd985595..cf50f3e069a 100644
--- a/be/src/vec/exprs/vbitmap_predicate.h
+++ b/be/src/vec/exprs/vbitmap_predicate.h
@@ -73,6 +73,9 @@ public:
         return fmt::format(" VBitmapPredicate:{}", VExpr::debug_string());
     }
 
+    // not need support bitmap filter get_digest
+    uint64_t get_digest(uint64_t seed) const override { return 0; }
+
 private:
     Status _do_execute(VExprContext* context, const Block* block, const 
uint8_t* __restrict filter,
                        Selector* selector, size_t count, ColumnPtr& 
result_column) const;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index c9bc8cc5349..40057026d3d 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -115,5 +115,16 @@ void 
VBloomPredicate::set_filter(std::shared_ptr<BloomFilterFuncBase> filter) {
     _filter = filter;
 }
 
+uint64_t VBloomPredicate::get_digest(uint64_t seed) const {
+    seed = _children[0]->get_digest(seed);
+    if (seed) {
+        char* data;
+        int len;
+        _filter->get_data(&data, &len);
+        return HashUtil::hash64(data, len, seed);
+    }
+    return 0;
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vbloom_predicate.h 
b/be/src/vec/exprs/vbloom_predicate.h
index da01b005262..59d0baee07e 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -60,6 +60,8 @@ public:
 
     std::shared_ptr<BloomFilterFuncBase> get_bloom_filter_func() const 
override { return _filter; }
 
+    uint64_t get_digest(uint64_t seed) const override;
+
 private:
     Status _do_execute(VExprContext* context, const Block* block, const 
uint8_t* __restrict filter,
                        Selector* selector, size_t count, ColumnPtr& 
result_column) const;
diff --git a/be/src/vec/exprs/vcast_expr.h b/be/src/vec/exprs/vcast_expr.h
index 0e0b708d309..6d23fc3bc14 100644
--- a/be/src/vec/exprs/vcast_expr.h
+++ b/be/src/vec/exprs/vcast_expr.h
@@ -60,6 +60,15 @@ public:
 
     virtual std::string cast_name() const { return "CAST"; }
 
+    uint64_t get_digest(uint64_t seed) const override {
+        auto res = VExpr::get_digest(seed);
+        if (res) {
+            return HashUtil::hash64(_target_data_type_name.data(), 
_target_data_type_name.size(),
+                                    res);
+        }
+        return 0;
+    }
+
 protected:
     FunctionBasePtr _function;
     std::string _expr_name;
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h 
b/be/src/vec/exprs/vdirect_in_predicate.h
index 9db5d48877b..38d8a78904e 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -107,6 +107,14 @@ public:
         return true;
     }
 
+    uint64_t get_digest(uint64_t seed) const override {
+        seed = _children[0]->get_digest(seed);
+        if (seed) {
+            return _filter->get_digest(seed);
+        }
+        return seed;
+    }
+
 private:
     Status _do_execute(VExprContext* context, const Block* block, const 
uint8_t* __restrict filter,
                        Selector* selector, size_t count, ColumnPtr& 
result_column,
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index d8fa7949584..4745394a04d 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -825,6 +825,25 @@ Status VExpr::check_constant(const Block& block, 
ColumnNumbers arguments) const
     return Status::OK();
 }
 
+uint64_t VExpr::get_digest(uint64_t seed) const {
+    auto digest = seed;
+    for (auto child : _children) {
+        digest = child->get_digest(digest);
+        if (digest == 0) {
+            return 0;
+        }
+    }
+
+    auto& fn_name = _fn.name.function_name;
+    if (!fn_name.empty()) {
+        digest = HashUtil::hash64(fn_name.c_str(), fn_name.size(), digest);
+    } else {
+        digest = HashUtil::hash64((const char*)&_node_type, 
sizeof(_node_type), digest);
+        digest = HashUtil::hash64((const char*)&_opcode, sizeof(_opcode), 
digest);
+    }
+    return digest;
+}
+
 ColumnPtr VExpr::get_result_from_const(size_t count) const {
     return ColumnConst::create(_constant_col->column_ptr, count);
 }
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 6e92f4d5b44..38e96abe6ee 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -352,6 +352,8 @@ public:
 
     bool ann_dist_is_fulfilled() const;
 
+    virtual uint64_t get_digest(uint64_t seed) const;
+
 protected:
     /// Simple debug string that provides no expr subclass-specific information
     std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index c26b5fec495..c733f5f6748 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -471,6 +471,9 @@ Status VExprContext::evaluate_ann_range_search(
     return Status::OK();
 }
 
-#include "common/compile_check_end.h"
+uint64_t VExprContext::get_digest(uint64_t seed) const {
+    return _root->get_digest(seed);
+}
 
+#include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index f4747ce06f3..e7c7c5ebdee 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -387,6 +387,8 @@ public:
                     common_expr_to_slotref_map,
             roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats& 
ann_index_stats);
 
+    uint64_t get_digest(uint64_t seed) const;
+
 private:
     // Close method is called in vexpr context dector, not need call expicility
     void close();
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 6f151614da4..63c0c1b5298 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -61,6 +61,8 @@ public:
     bool is_not_in() const { return _is_not_in; };
     Status evaluate_inverted_index(VExprContext* context, uint32_t 
segment_num_rows) override;
 
+    uint64_t get_digest(uint64_t seed) const override { return 0; }
+
 private:
     FunctionBasePtr _function;
     std::string _expr_name;
diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h 
b/be/src/vec/exprs/vlambda_function_call_expr.h
index f1472e48572..cc99db0ef85 100644
--- a/be/src/vec/exprs/vlambda_function_call_expr.h
+++ b/be/src/vec/exprs/vlambda_function_call_expr.h
@@ -88,6 +88,8 @@ public:
         return out.str();
     }
 
+    uint64_t get_digest(uint64_t seed) const override { return 0; }
+
 private:
     std::string _expr_name;
     LambdaFunctionPtr _lambda_function;
diff --git a/be/src/vec/exprs/vlambda_function_expr.h 
b/be/src/vec/exprs/vlambda_function_expr.h
index c69d5221ff2..e6d39a186aa 100644
--- a/be/src/vec/exprs/vlambda_function_expr.h
+++ b/be/src/vec/exprs/vlambda_function_expr.h
@@ -54,6 +54,8 @@ public:
 
     const std::string& expr_name() const override { return _expr_name; }
 
+    uint64_t get_digest(uint64_t seed) const override { return 0; }
+
 private:
     const std::string _expr_name = "vlambda_function_expr";
 };
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index cd02620e7e2..2e64d7c5e9e 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -105,5 +105,10 @@ bool VLiteral::equals(const VExpr& other) {
     return true;
 }
 
+uint64_t VLiteral::get_digest(uint64_t seed) const {
+    _column_ptr->update_xxHash_with_value(0, 1, seed, nullptr);
+    return seed;
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 47ea2ba91b9..4f4009db3f7 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -65,6 +65,8 @@ public:
 
     bool equals(const VExpr& other) override;
 
+    uint64_t get_digest(uint64_t seed) const override;
+
 protected:
     ColumnPtr _column_ptr;
     std::string _expr_name;
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 91c60660678..ea37dddef26 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -67,6 +67,15 @@ public:
     Status execute_filter(VExprContext* context, const Block* block,
                           uint8_t* __restrict result_filter_data, size_t rows, 
bool accept_null,
                           bool* can_filter_all) const override;
+
+    uint64_t get_digest(uint64_t seed) const override {
+        seed = _impl->get_digest(seed);
+        if (seed) {
+            return HashUtil::crc_hash64(&_null_aware, sizeof(_null_aware), 
seed);
+        }
+        return seed;
+    }
+
     VExprSPtr get_impl() const override { return _impl; }
 
     void attach_profile_counter(std::shared_ptr<RuntimeProfile::Counter> 
rf_input_rows,
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index d5df1220ad7..173f889b0c8 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -56,6 +56,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor&
                 state->desc_tbl().debug_string());
     }
     _column_name = &slot_desc->col_name();
+    _column_uniq_id = slot_desc->col_unique_id();
     _column_id = desc.get_column_id(_slot_id);
     if (_column_id < 0) {
         return Status::Error<ErrorCode::INTERNAL_ERROR>(
@@ -136,4 +137,12 @@ bool VSlotRef::equals(const VExpr& other) {
     return true;
 }
 
+uint64_t VSlotRef::get_digest(uint64_t seed) const {
+    if (_data_type->get_primitive_type() == TYPE_VARIANT) {
+        return 0;
+    }
+    seed = HashUtil::hash64(&_column_uniq_id, sizeof(int), seed);
+    return HashUtil::hash64(_column_name->c_str(), _column_name->size(), seed);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index f6f8d24c032..960b4f809a0 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -70,9 +70,12 @@ public:
 
     MOCK_FUNCTION const std::string& column_name() const { return 
*_column_name; }
 
+    uint64_t get_digest(uint64_t seed) const override;
+
 private:
     int _slot_id;
     int _column_id;
+    int _column_uniq_id = -1;
     const std::string* _column_name = nullptr;
     const std::string _column_label;
 };
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 9defb0cbf08..576c3019144 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -198,6 +198,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String QUERY_CACHE_FORCE_REFRESH = 
"query_cache_force_refresh";
     public static final String QUERY_CACHE_ENTRY_MAX_BYTES = 
"query_cache_entry_max_bytes";
     public static final String QUERY_CACHE_ENTRY_MAX_ROWS = 
"query_cache_entry_max_rows";
+    public static final String ENABLE_CONDITION_CACHE = 
"enable_condition_cache";
 
     public static final String ENABLE_COST_BASED_JOIN_REORDER = 
"enable_cost_based_join_reorder";
 
@@ -1405,6 +1406,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT)
     public boolean enableLeftSemiDirectReturnOpt = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_CONDITION_CACHE)
+    public boolean enableConditionCache = true;
+
     @VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
     public boolean forwardToMaster = true;
 
@@ -3379,6 +3383,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public void initFuzzyModeVariables() {
         Random random = new SecureRandom();
         this.feDebug = true;
+        this.enableConditionCache = Config.pull_request_id % 2 == 0;
         this.parallelPipelineTaskNum = random.nextInt(8);
         this.parallelPrepareThreshold = random.nextInt(32) + 1;
         this.enableCommonExprPushdown = random.nextBoolean();
@@ -5063,6 +5068,9 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough);
         tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
         
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);
+        if (enableConditionCache) {
+            
tResult.setConditionCacheDigest(getAffectQueryResultVariableHashCode());
+        }
 
         if (maxScanKeyNum > 0) {
             tResult.setMaxScanKeyNum(maxScanKeyNum);
@@ -6066,6 +6074,20 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public int getAffectQueryResultVariableHashCode() {
+        int hash = 0;
+        for (Field affectQueryResultField : affectQueryResultFields) {
+            String name = affectQueryResultField.getName();
+            try {
+                Object value = affectQueryResultField.get(this);
+                hash = 31 * hash + (value != null ? value.hashCode() : 0);
+            } catch (Throwable t) {
+                throw new IllegalStateException("Can not access 
SessionVariable." + name, t);
+            }
+        }
+        return hash;
+    }
+
     public static boolean isFeDebug() {
         if (ConnectContext.get() != null) {
             return ConnectContext.get().getSessionVariable().feDebug;
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 3217eeacb06..9fbc2058232 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -265,7 +265,7 @@ struct TQueryOptions {
 
   91: optional bool runtime_filter_wait_infinitely = false;
 
-  92: optional i32 wait_full_block_schedule_times = 1; // deprecated
+  92: optional i32 condition_cache_digest = 0;
   
   93: optional i32 inverted_index_max_expansions = 50;
 
@@ -368,7 +368,7 @@ struct TQueryOptions {
 
   144: optional bool enable_inverted_index_searcher_cache = true;
   145: optional bool enable_inverted_index_query_cache = true;
-  146: optional bool fuzzy_disable_runtime_filter_in_be = false; // deprecated
+  146: optional bool enable_condition_cache = false; //deprecated 
 
   147: optional i32 profile_level = 1;
 
diff --git a/regression-test/data/query_p0/cache/condition_cache.out 
b/regression-test/data/query_p0/cache/condition_cache.out
new file mode 100644
index 00000000000..6bcb15e9813
--- /dev/null
+++ b/regression-test/data/query_p0/cache/condition_cache.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !condition_cache1 --
+2      Bob     30      90
+4      David   28      92
+5      Eve     26      88
+
+-- !condition_cache2 --
+1      Alice   25      85.5
+3      Charlie 22      75.5
+
+-- !condition_cache3 --
+2      Bob     30      90
+4      David   28      92
+5      Eve     26      88
+
+-- !condition_cache4 --
+1      Alice   25      85.5
+3      Charlie 22      75.5
+
+-- !condition_cache5 --
+2      Bob     30      90
+4      David   28      92
+5      Eve     26      88
+
+-- !condition_cache6 --
+1      Alice   25      85.5
+3      Charlie 22      75.5
+
+-- !condition_delete1 --
+4      David   28      92
+5      Eve     26      88
+
+-- !condition_delete2 --
+1      Alice   25      85.5
+3      Charlie 22      75.5
+
+-- !join_no_cache --
+4      David   28      Engineering     Senior Developer
+5      Eve     26      Finance Analyst
+
+-- !join_cache1 --
+4      David   28      Engineering     Senior Developer
+5      Eve     26      Finance Analyst
+
+-- !join_cache2 --
+4      David   28      Engineering     Senior Developer
+5      Eve     26      Finance Analyst
+
+-- !join_bf_cache1 --
+
+-- !join_bf_cache2 --
+1      Alice   25      Marketing       Manager
+3      Charlie 22      Engineering     Senior Developer
+4      David   28      Finance Analyst
+
+-- !join_cache3 --
+4      David   28      Engineering     Senior Developer
+5      Eve     26      Finance Analyst
+
+-- !join_diff_cond --
+1      Alice   Engineering     100000
+4      David   Engineering     140000
+
+-- !join_after_mod --
+4      David   28      Engineering     Senior Developer
+5      Eve     26      Finance Analyst
+6      Frank   32      Engineering     Team Lead
+
+-- !cast_diff1 --
+Query A        4       2025-01-01T12:00:00.123499
+
+-- !cast_diff2 --
+
diff --git 
a/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy 
b/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
index e060142ef7f..aebcdfbfb0b 100644
--- a/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
+++ b/regression-test/suites/index_p0/test_ngram_bloomfilter_index_change.groovy
@@ -56,6 +56,7 @@ suite("test_ngram_bloomfilter_index_change") {
 
     // Test settings
     sql "set enable_function_pushdown=true"
+    sql "set enable_condition_cache=false"
     sql "set enable_profile=true"
     sql "set profile_level=2"
 
@@ -326,4 +327,4 @@ suite("test_ngram_bloomfilter_index_change") {
     // Final cleanup
     sql "DROP INDEX idx_ngram_customer_name ON ${tableName};"
     sleep(2000)
-}
\ No newline at end of file
+}
diff --git a/regression-test/suites/query_p0/cache/condition_cache.groovy 
b/regression-test/suites/query_p0/cache/condition_cache.groovy
new file mode 100755
index 00000000000..0a6438b63cc
--- /dev/null
+++ b/regression-test/suites/query_p0/cache/condition_cache.groovy
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
+
+suite("condition_cache") {
+    def tableName = "table_condition_cache"
+    def joinTableName = "table_join_condition_cache"
+
+    def test = {
+        sql "set enable_condition_cache=false"
+        sql "set runtime_filter_type=0"
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE ${tableName} (
+            `id` int NULL,
+            `name` varchar(50) NULL,
+            `age` int NULL,
+            `score` double NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `name`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 10
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "min_load_replica_num" = "-1",
+            "is_being_synced" = "false",
+            "storage_medium" = "hdd",
+            "storage_format" = "V2",
+            "inverted_index_storage_format" = "V3",
+            "light_schema_change" = "true",
+            "disable_auto_compaction" = "false",
+            "enable_single_replica_compaction" = "false"
+        )
+    """
+
+        sql """
+        INSERT INTO ${tableName}(id, name, age, score) 
+        VALUES
+            (1, "Alice", 25, 85.5),
+            (2, "Bob", 30, 90.0),
+            (3, "Charlie", 22, 75.5),
+            (4, "David", 28, 92.0),
+            (5, "Eve", 26, 88.0)
+    """
+
+        // Create join table
+        sql """ DROP TABLE IF EXISTS ${joinTableName} """
+        sql """
+        CREATE TABLE ${joinTableName} (
+            `id` int NULL,
+            `department` varchar(50) NULL,
+            `position` varchar(50) NULL,
+            `salary` double NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`, `department`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 10
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "min_load_replica_num" = "-1",
+            "is_being_synced" = "false",
+            "storage_medium" = "hdd",
+            "storage_format" = "V2",
+            "inverted_index_storage_format" = "V3",
+            "light_schema_change" = "true",
+            "disable_auto_compaction" = "false",
+            "enable_single_replica_compaction" = "false"
+        )
+    """
+
+        sql """
+        INSERT INTO ${joinTableName}(id, department, position, salary) 
+        VALUES
+            (1, "Engineering", "Developer", 100000),
+            (2, "Marketing", "Manager", 120000),
+            (3, "HR", "Specialist", 80000),
+            (4, "Engineering", "Senior Developer", 140000),
+            (5, "Finance", "Analyst", 95000)
+    """
+
+        // First query with WHERE condition - Run without cache
+        order_qt_condition_cache1 """
+        SELECT 
+            id, 
+            name, 
+            age, 
+            score 
+        FROM ${tableName} 
+        WHERE age > 25 AND score > 85
+    """
+
+        // Second query with different WHERE condition - Run without cache
+        order_qt_condition_cache2 """
+        SELECT
+            id,
+            name,
+            age,
+            score
+        FROM ${tableName}
+        WHERE name LIKE 'A%' OR score < 80
+    """
+
+        // Enable condition cache
+        sql "set enable_condition_cache=true"
+
+        // Run the same first query with cache enabled
+        order_qt_condition_cache3 """
+        SELECT 
+            id, 
+            name, 
+            age, 
+            score 
+        FROM ${tableName} 
+        WHERE age > 25 AND score > 85
+    """
+
+        // Run the same second query with cache enabled
+        order_qt_condition_cache4 """
+        SELECT
+            id,
+            name,
+            age,
+            score
+        FROM ${tableName}
+        WHERE name LIKE 'A%' OR score < 80
+    """
+
+        // Run both queries again to test cache hit
+        order_qt_condition_cache5 """
+        SELECT 
+            id, 
+            name, 
+            age, 
+            score 
+        FROM ${tableName} 
+        WHERE age > 25 AND score > 85
+    """
+
+        order_qt_condition_cache6 """
+        SELECT
+            id,
+            name,
+            age,
+            score
+        FROM ${tableName}
+        WHERE name LIKE 'A%' OR score < 80
+    """
+
+        // Test delete operation impact on condition cache
+        // Delete some data
+        sql "DELETE FROM ${tableName} WHERE age = 30"  // Delete Bob's record
+
+        // Run the same queries after delete to see if cache is invalidated
+        order_qt_condition_delete1 """
+        SELECT 
+            id, 
+            name, 
+            age, 
+            score 
+        FROM ${tableName} 
+        WHERE age > 25 AND score > 85
+    """
+
+        order_qt_condition_delete2 """
+        SELECT
+            id,
+            name,
+            age,
+            score
+        FROM ${tableName}
+        WHERE name LIKE 'A%' OR score < 80
+    """
+    
+    // rebuild table to skip the delete operation
+    sql "create table temp like ${tableName}" 
+    sql "insert into temp select * from ${tableName}" 
+    sql "drop table ${tableName}"
+    sql "alter table temp rename ${tableName}" 
+
+        // Test with two-table join and runtime_filter set to bloom filter (6)
+        // First, disable condition cache and reset runtime_filter
+        sql "set enable_condition_cache=false"
+        sql "set runtime_filter_type=2"
+
+        // Run join query without condition cache
+        order_qt_join_no_cache """
+        SELECT 
+            t1.id, 
+            t1.name, 
+            t1.age, 
+            t2.department, 
+            t2.position
+        FROM ${tableName} t1 
+        JOIN ${joinTableName} t2 ON t1.id = t2.id
+        WHERE t1.age > 25 AND t2.salary > 90000
+    """
+
+        // Enable condition cache with bloom filter runtime_filter
+        sql "set enable_condition_cache=true"
+
+        // Run the same join query with condition cache enabled
+        order_qt_join_cache1 """
+        SELECT 
+            t1.id, 
+            t1.name, 
+            t1.age, 
+            t2.department, 
+            t2.position
+        FROM ${tableName} t1 
+        JOIN ${joinTableName} t2 ON t1.id = t2.id
+        WHERE t1.age > 25 AND t2.salary > 90000
+    """
+
+        // Run the same join query again to test cache hit
+        order_qt_join_cache2 """
+        SELECT 
+            t1.id, 
+            t1.name, 
+            t1.age, 
+            t2.department, 
+            t2.position
+        FROM ${tableName} t1 
+        JOIN ${joinTableName} t2 ON t1.id = t2.id
+        WHERE t1.age > 25 AND t2.salary > 90000
+    """
+    
+       // Run the same join query with condition cache enabled and expr in 
bloom filter 
+        order_qt_join_bf_cache1 """
+        SELECT
+            t1.id,
+            t1.name,
+            t1.age,
+            t2.department,
+            t2.position
+        FROM ${tableName} t1
+        JOIN ${joinTableName} t2 ON t1.id + 10 = t2.id
+    """
+   // Run the same join query with condition cache enabled and expr different 
in bloom filter
+        order_qt_join_bf_cache2 """
+        SELECT
+            t1.id,
+            t1.name,
+            t1.age,
+            t2.department,
+            t2.position
+        FROM ${tableName} t1
+        JOIN ${joinTableName} t2 ON t1.id + 1 = t2.id
+    """
+
+    sql "set runtime_filter_type=12"
+
+    // Run the same join query again after changing runtime_filter_type
+    order_qt_join_cache3 """
+        SELECT 
+            t1.id, 
+            t1.name, 
+            t1.age, 
+            t2.department, 
+            t2.position
+        FROM ${tableName} t1 
+        JOIN ${joinTableName} t2 ON t1.id = t2.id
+        WHERE t1.age > 25 AND t2.salary > 90000
+    """
+        // Test different join query with same tables and bloom filter
+        order_qt_join_diff_cond """
+        SELECT 
+            t1.id, 
+            t1.name, 
+            t2.department, 
+            t2.salary
+        FROM ${tableName} t1 
+        JOIN ${joinTableName} t2 ON t1.id = t2.id
+        WHERE t1.score > 85 AND t2.department = 'Engineering'
+    """
+
+        // Test data modification impact on join condition cache
+        sql "INSERT INTO ${tableName}(id, name, age, score) VALUES (6, 
'Frank', 32, 91.0)"
+        sql "INSERT INTO ${joinTableName}(id, department, position, salary) 
VALUES (6, 'Engineering', 'Team Lead', 150000)"
+
+        // Run the same join query after data modification
+        order_qt_join_after_mod """
+        SELECT 
+            t1.id, 
+            t1.name, 
+            t1.age, 
+            t2.department, 
+            t2.position
+        FROM ${tableName} t1 
+        JOIN ${joinTableName} t2 ON t1.id = t2.id
+        WHERE t1.age > 25 AND t2.salary > 90000
+    """
+
+        // Test cast precision difference
+        // 1. Create table
+        sql """DROP TABLE IF EXISTS test_cast_diff"""
+        sql """
+        CREATE TABLE test_cast_diff (
+            `id` int NULL,
+            `ts` datetime(6) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        )
+        """
+
+        // 2. Insert test data (excluding id=5 to focus on subtle rounding 
differences)
+        sql """
+        INSERT INTO test_cast_diff VALUES
+            (1, '2025-01-01 12:00:00.123449'),
+            (2, '2025-01-01 12:00:00.123450'),
+            (3, '2025-01-01 12:00:00.123455'),
+            (4, '2025-01-01 12:00:00.123499')
+        """
+
+        // 3. Query A: cast to DATETIME(5) and compare with a 5-digit 
fractional constant
+        order_qt_cast_diff1 """
+        SELECT
+            'Query A' AS type,
+            id,
+            ts
+        FROM test_cast_diff
+        WHERE CAST(ts AS DATETIME(5)) = '2025-01-01 12:00:00.12350'
+        """
+
+        // 4. Query B: cast to DATETIME(3) but use the same constant string
+        // Note: the constant will be implicitly rounded to 3-digit precision 
(.124)
+        order_qt_cast_diff2 """
+        SELECT
+            'Query B' AS type,
+            id,
+            ts
+        FROM test_cast_diff
+        WHERE CAST(ts AS DATETIME(3)) = '2025-01-01 12:00:00.12350'
+        """
+    }
+
+    test()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to