This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f83e5d3df6 [improvement](query) Align olap scan schema with storage 
keys (#64413)
2f83e5d3df6 is described below

commit 2f83e5d3df65d2eaae3265034053df3f363f554c
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 1 19:13:33 2026 +0800

    [improvement](query) Align olap scan schema with storage keys (#64413)
    
    This is an alternative to keeping separate storage and execution schemas
    in the BE segment iterator for storage expression pushdown.
    
    For AGG key tables and non-MOW unique key tables, the planner now
    expands the OLAP scan tuple so the scan schema starts with the storage
    key columns in storage order. Missing key slots are marked in thrift as
    FE-filled key slots. The scan node projects back to the original
    execution output after storage filters and runtime filters are
    translated.
    
    On the BE side, the marked key slots are mapped to tablet column ids.
    Direct reader paths can fill those storage-only key columns without
    reading their data pages, while merge and aggregation reader paths still
    keep real key columns for storage semantics. Since FE now provides the
    storage-aligned key prefix, the non-direct scanner no longer rebuilds
    the return column list as "all keys plus value outputs"; it only checks
    the key prefix and keeps the existing sequence-column expansion.
---
 be/src/exec/operator/olap_scan_operator.cpp        |  10 -
 be/src/exec/operator/olap_scan_operator.h          |   3 -
 be/src/exec/scan/olap_scanner.cpp                  |  38 +-
 be/src/exec/scan/olap_scanner.h                    |  10 +-
 be/src/exec/scan/scanner_scheduler.cpp             |  11 +-
 be/src/storage/iterator/vcollect_iterator.cpp      |  24 +-
 be/src/storage/iterators.h                         |   8 +-
 be/src/storage/rowset/beta_rowset_reader.cpp       |   3 +-
 be/src/storage/rowset/rowset_reader_context.h      |   7 +-
 be/src/storage/schema.cpp                          |   6 +
 be/src/storage/schema.h                            |   4 +
 be/src/storage/segment/segment_iterator.cpp        | 394 +++++++--------------
 be/src/storage/segment/segment_iterator.h          |  26 +-
 be/src/storage/tablet/tablet_reader.cpp            |   7 +-
 be/src/storage/tablet/tablet_reader.h              |   8 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  33 ++
 .../trees/plans/physical/PhysicalOlapScan.java     |   5 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  25 ++
 .../translator/PhysicalPlanTranslatorTest.java     |  52 ++-
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 20 files changed, 301 insertions(+), 375 deletions(-)

diff --git a/be/src/exec/operator/olap_scan_operator.cpp 
b/be/src/exec/operator/olap_scan_operator.cpp
index 8c103a7072c..5fda916c405 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -1006,16 +1006,6 @@ Status OlapScanLocalState::open(RuntimeState* state) {
             RETURN_IF_ERROR(virtual_column_expr_ctx->open(state));
 
             _slot_id_to_virtual_column_expr[slot_desc->id()] = 
virtual_column_expr_ctx;
-            _slot_id_to_col_type[slot_desc->id()] = 
slot_desc->get_data_type_ptr();
-            int col_pos = 
p.intermediate_row_desc().get_column_id(slot_desc->id());
-            if (col_pos < 0) {
-                return Status::InternalError(
-                        "Invalid virtual slot, can not find its information. 
Slot desc:\n{}\nRow "
-                        "desc:\n{}",
-                        slot_desc->debug_string(), 
p.row_desc().debug_string());
-            } else {
-                _slot_id_to_index_in_block[slot_desc->id()] = col_pos;
-            }
         }
     }
 
diff --git a/be/src/exec/operator/olap_scan_operator.h 
b/be/src/exec/operator/olap_scan_operator.h
index c30aad5cf29..309c99191bd 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -339,9 +339,6 @@ private:
     std::vector<TabletReadSource> _read_sources;
 
     std::map<SlotId, VExprContextSPtr> _slot_id_to_virtual_column_expr;
-    std::map<SlotId, size_t> _slot_id_to_index_in_block;
-    // this map is needed for scanner opening.
-    std::map<SlotId, DataTypePtr> _slot_id_to_col_type;
 
     // ---- Runtime-filter partition pruning ----
     // Attaches this per-instance pruner to the shared parse result owned by
diff --git a/be/src/exec/scan/olap_scanner.cpp 
b/be/src/exec/scan/olap_scanner.cpp
index 9f6fc8ef2b6..b5ec0443045 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -92,12 +92,11 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, 
OlapScanner::Params&& param
                                  .rs_splits {},
                                  .return_columns {},
                                  .output_columns {},
+                                 .extra_columns {},
                                  .common_expr_ctxs_push_down {},
                                  .topn_filter_source_node_ids {},
                                  .key_group_cluster_key_idxes {},
                                  .virtual_column_exprs {},
-                                 .vir_cid_to_idx_in_block {},
-                                 .vir_col_idx_to_type {},
                                  .score_runtime {},
                                  .collection_statistics {},
                                  .ann_topn_runtime {},
@@ -179,8 +178,6 @@ Status OlapScanner::_prepare_impl() {
         _slot_id_to_virtual_column_expr[pair.first] = context;
     }
 
-    _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block;
-    _slot_id_to_col_type = local_state->_slot_id_to_col_type;
     _score_runtime = local_state->_score_runtime;
     // All scanners share the same ann_topn_runtime.
     _ann_topn_runtime = local_state->_ann_topn_runtime;
@@ -399,8 +396,6 @@ Status OlapScanner::_init_tablet_reader_params(
 
     _tablet_reader_params.common_expr_ctxs_push_down = 
_common_expr_ctxs_push_down;
     _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs;
-    _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block;
-    _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
     _tablet_reader_params.score_runtime = _score_runtime;
     _tablet_reader_params.output_columns = 
((OlapScanLocalState*)_local_state)->_output_column_ids;
     _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
@@ -696,6 +691,11 @@ Status OlapScanner::_init_variant_columns() {
 }
 
 Status OlapScanner::_init_return_columns() {
+    // For OLAP scan, _output_tuple_desc is the storage-aligned scan tuple
+    // descriptor. extra_key_column_slot_ids marks extra key slots that are
+    // present only for scan-schema alignment. For example, on an AGG table 
with
+    // keys (k1, k2), a query returning only k2 may still scan (k1, k2); k1 is
+    // an extra column and can be removed by the projection output tuple.
     for (auto* slot : _output_tuple_desc->slots()) {
         // variant column using path to index a column
         int32_t index = 0;
@@ -716,19 +716,25 @@ Status OlapScanner::_init_return_columns() {
         }
 
         if (slot->get_virtual_column_expr()) {
-            ColumnId virtual_column_cid = index;
-            _virtual_column_exprs[virtual_column_cid] = 
_slot_id_to_virtual_column_expr[slot->id()];
-            size_t idx_in_block = _slot_id_to_index_in_block[slot->id()];
-            _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block;
-            _vir_col_idx_to_type[idx_in_block] = 
_slot_id_to_col_type[slot->id()];
-
-            VLOG_DEBUG << fmt::format(
-                    "Virtual column, slot id: {}, cid {}, column index: {}, 
type: {}", slot->id(),
-                    virtual_column_cid, 
_vir_cid_to_idx_in_block[virtual_column_cid],
-                    _vir_col_idx_to_type[idx_in_block]->get_name());
+            _virtual_column_exprs[index] = 
_slot_id_to_virtual_column_expr[slot->id()];
+
+            VLOG_DEBUG << fmt::format("Virtual column, slot id: {}, cid {}, 
type: {}", slot->id(),
+                                      index, 
slot->get_data_type_ptr()->get_name());
         }
 
         const auto& column = tablet_schema->column(index);
+        auto* olap_local_state = 
static_cast<OlapScanLocalState*>(_local_state);
+        const auto& olap_scan_node = olap_local_state->olap_scan_node();
+        if (olap_scan_node.__isset.extra_key_column_slot_ids &&
+            olap_scan_node.extra_key_column_slot_ids.contains(slot->id())) {
+            DORIS_CHECK(column.is_key());
+            if (_tablet_reader_params.direct_mode) {
+                // Direct readers can synthesize extra storage keys because 
they are only
+                // placeholders before the scan projection removes them. 
Merge/aggregation
+                // readers must still read real key values to preserve storage 
semantics.
+                _tablet_reader_params.extra_columns.insert(index);
+            }
+        }
         int32_t unique_id =
                 column.unique_id() >= 0 ? column.unique_id() : 
column.parent_unique_id();
         if (!slot->all_access_paths().empty()) {
diff --git a/be/src/exec/scan/olap_scanner.h b/be/src/exec/scan/olap_scanner.h
index 4a09b6e3948..dc308d34713 100644
--- a/be/src/exec/scan/olap_scanner.h
+++ b/be/src/exec/scan/olap_scanner.h
@@ -18,9 +18,9 @@
 #pragma once
 
 #include <gen_cpp/PaloInternalService_types.h>
-#include <stdint.h>
 
 #include <cstddef>
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <optional>
@@ -121,17 +121,11 @@ public:
 
     std::unordered_set<uint32_t> _tablet_columns_convert_to_null_set;
 
-    // This three fields are copied from OlapScanLocalState.
+    // This field is copied from OlapScanLocalState.
     std::map<SlotId, VExprContextSPtr> _slot_id_to_virtual_column_expr;
-    std::map<SlotId, size_t> _slot_id_to_index_in_block;
-    std::map<SlotId, DataTypePtr> _slot_id_to_col_type;
 
     // ColumnId of virtual column to its expr context
     std::map<ColumnId, VExprContextSPtr> _virtual_column_exprs;
-    // ColumnId of virtual column to its index in block
-    std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
-    // The idx of vir_col in block to its data type.
-    std::map<size_t, DataTypePtr> _vir_col_idx_to_type;
     std::shared_ptr<ScoreRuntime> _score_runtime;
 
     std::shared_ptr<segment_v2::AnnTopNRuntime> _ann_topn_runtime;
diff --git a/be/src/exec/scan/scanner_scheduler.cpp 
b/be/src/exec/scan/scanner_scheduler.cpp
index 7e1deb2bae7..230a54048a2 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -362,18 +362,17 @@ void 
ScannerScheduler::_make_sure_virtual_col_is_materialized(
             continue;
         }
 
-        std::vector<std::string> vcid_to_idx;
-
-        for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
-            vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, 
pair.second));
+        std::vector<ColumnId> virtual_column_ids;
+        for (const auto& pair : olap_scanner->_virtual_column_exprs) {
+            virtual_column_ids.push_back(pair.first);
         }
 
         std::string error_msg = fmt::format(
                 "Column in idx {} is nothing, block columns {}, normal_columns 
"
                 "{}, "
-                "vir_cid_to_idx_in_block_msg {}",
+                "virtual_column_ids [{}]",
                 idx, free_block->columns(), 
olap_scanner->_return_columns.size(),
-                fmt::format("_vir_cid_to_idx_in_block:[{}]", 
fmt::join(vcid_to_idx, ",")));
+                fmt::join(virtual_column_ids, ","));
         throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
     }
 #endif
diff --git a/be/src/storage/iterator/vcollect_iterator.cpp 
b/be/src/storage/iterator/vcollect_iterator.cpp
index e6386ea9029..e4fa85042d3 100644
--- a/be/src/storage/iterator/vcollect_iterator.cpp
+++ b/be/src/storage/iterator/vcollect_iterator.cpp
@@ -287,23 +287,23 @@ Status VCollectIterator::_topn_next(Block* block) {
         return Status::Error<END_OF_FILE>("");
     }
 
+    // `block` is reused below as the per-rowset read buffer. Keep TopN 
candidates in a
+    // separate mutable block with the same schema so stored row positions 
remain stable.
     auto clone_block = block->clone_empty();
-    // Initialize virtual slot columns by schema (avoid runtime type checks):
-    // use _reader_context.vir_col_idx_to_type to construct real columns for 
those positions.
-    if (!_reader->_reader_context.vir_col_idx_to_type.empty()) {
-        const auto& idx_to_type = _reader->_reader_context.vir_col_idx_to_type;
-        for (const auto& kv : idx_to_type) {
-            size_t idx = kv.first;
-            if (idx < clone_block.columns()) {
-                clone_block.get_by_position(idx).column = 
kv.second->create_column();
-            }
-        }
-    }
+    // Initialize virtual slot columns by schema (avoid runtime type checks).
+    for (const auto& [cid, expr_ctx] : 
_reader->_reader_context.virtual_column_exprs) {
+        auto it = std::find(_reader->_return_columns.begin(), 
_reader->_return_columns.end(), cid);
+        DORIS_CHECK(it != _reader->_return_columns.end());
+        auto idx = 
cast_set<size_t>(std::distance(_reader->_return_columns.begin(), it));
+        DORIS_CHECK(idx < clone_block.columns());
+        clone_block.get_by_position(idx).column = 
expr_ctx->root()->data_type()->create_column();
+    }
+    const size_t clone_block_columns = clone_block.columns();
     MutableBlock mutable_block = 
MutableBlock::build_mutable_block(std::move(clone_block));
 
     const std::vector<uint32_t>* sort_columns = 
_reader->_reader_context.read_orderby_key_columns;
     for (auto column_idx : *sort_columns) {
-        DORIS_CHECK(column_idx < clone_block.columns());
+        DORIS_CHECK(column_idx < clone_block_columns);
     }
     size_t first_sort_column_idx = (*sort_columns)[0];
 
diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h
index 59071909da7..3b6fa114be3 100644
--- a/be/src/storage/iterators.h
+++ b/be/src/storage/iterators.h
@@ -19,6 +19,7 @@
 
 #include <cstddef>
 #include <memory>
+#include <set>
 
 #include "common/status.h"
 #include "core/block/block.h"
@@ -130,6 +131,11 @@ public:
     io::IOContext io_ctx;
     VExprContextSPtrs common_expr_ctxs_push_down;
     const std::set<int32_t>* output_columns = nullptr;
+    // Extra storage key columns that are included only to keep the scan schema
+    // aligned with the storage key prefix. SegmentIterator can synthesize
+    // placeholders only after proving predicates, delete conditions, and
+    // expressions do not need their real values.
+    std::set<ColumnId> extra_columns;
     // runtime state
     RuntimeState* runtime_state = nullptr;
     RowsetId rowset_id;
@@ -148,8 +154,6 @@ public:
 
     std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
     std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
-    std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
-    std::map<size_t, DataTypePtr> vir_col_idx_to_type;
 
     std::map<int32_t, TColumnAccessPaths> all_access_paths;
     std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp 
b/be/src/storage/rowset/beta_rowset_reader.cpp
index 56b580e152d..6798c1c6934 100644
--- a/be/src/storage/rowset/beta_rowset_reader.cpp
+++ b/be/src/storage/rowset/beta_rowset_reader.cpp
@@ -107,8 +107,6 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     _read_options.predicate_access_paths = 
_read_context->predicate_access_paths;
 
     _read_options.ann_topn_runtime = _read_context->ann_topn_runtime;
-    _read_options.vir_cid_to_idx_in_block = 
_read_context->vir_cid_to_idx_in_block;
-    _read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
     _read_options.score_runtime = _read_context->score_runtime;
     _read_options.collection_statistics = _read_context->collection_statistics;
     _read_options.rowset_id = _rowset->rowset_id();
@@ -152,6 +150,7 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     // create segment iterators
     VLOG_NOTICE << "read columns size: " << read_columns.size();
     _input_schema = 
std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
+    _read_options.extra_columns = _read_context->extra_columns;
     // output_schema only contains return_columns (excludes extra columns like 
delete-predicate columns).
     // It is used by merge/union iterators to determine how many columns to 
copy to the output block.
     _output_schema = 
std::make_shared<Schema>(_read_context->tablet_schema->columns(),
diff --git a/be/src/storage/rowset/rowset_reader_context.h 
b/be/src/storage/rowset/rowset_reader_context.h
index cb61a6b0ad9..13090805ff4 100644
--- a/be/src/storage/rowset/rowset_reader_context.h
+++ b/be/src/storage/rowset/rowset_reader_context.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H
 #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H
 
+#include <set>
 #include <vector>
 
 #include "exprs/score_runtime.h"
@@ -77,9 +78,6 @@ struct RowsetReaderContext {
     // Effective adaptive batch size byte budget. 0 means disabled internally.
     size_t preferred_block_size_bytes = 8388608UL;
 
-    // Points to the "true" output column list before non-direct-mode 
expansion.
-    // Used by BlockReader to map expanded storage columns back to the 
requested output layout.
-    const std::vector<ColumnId>* origin_return_columns = nullptr;
     bool is_unique = false;
     //record row num merged in generic iterator
     uint64_t* merged_rows = nullptr;
@@ -90,14 +88,13 @@ struct RowsetReaderContext {
     RowIdConversion* rowid_conversion = nullptr;
     bool is_key_column_group = false;
     const std::set<int32_t>* output_columns = nullptr;
+    std::set<ColumnId> extra_columns;
     RowsetId rowset_id;
     // slots that cast may be eliminated in storage layer
     std::map<std::string, DataTypePtr> target_cast_type_for_variants;
     int64_t ttl_seconds = 0;
 
     std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
-    std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
-    std::map<size_t, DataTypePtr> vir_col_idx_to_type;
 
     std::map<int32_t, TColumnAccessPaths> all_access_paths;
     std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
diff --git a/be/src/storage/schema.cpp b/be/src/storage/schema.cpp
index 27dd40e9d74..7ddd766a8ac 100644
--- a/be/src/storage/schema.cpp
+++ b/be/src/storage/schema.cpp
@@ -55,6 +55,7 @@ Schema& Schema::operator=(const Schema& other) {
 
 void Schema::_copy_from(const Schema& other) {
     _col_ids = other._col_ids;
+    _column_id_to_index = other._column_id_to_index;
     _num_key_columns = other._num_key_columns;
     _delete_sign_idx = other._delete_sign_idx;
     _has_sequence_col = other._has_sequence_col;
@@ -77,6 +78,11 @@ void Schema::_init(const std::vector<TabletColumnPtr>& cols, 
const std::vector<C
     _cols.resize(cols.size());
 
     std::unordered_set<uint32_t> col_id_set(col_ids.begin(), col_ids.end());
+    _column_id_to_index.assign(cols.size(), -1);
+    for (size_t i = 0; i < col_ids.size(); ++i) {
+        _column_id_to_index[col_ids[i]] = static_cast<int>(i);
+    }
+
     for (int cid = 0; cid < cols.size(); ++cid) {
         if (col_id_set.find(cid) == col_id_set.end()) {
             continue;
diff --git a/be/src/storage/schema.h b/be/src/storage/schema.h
index dcd27fe9c0b..5fd5a4b01f2 100644
--- a/be/src/storage/schema.h
+++ b/be/src/storage/schema.h
@@ -96,6 +96,8 @@ public:
     size_t num_column_ids() const { return _col_ids.size(); }
     const std::vector<ColumnId>& column_ids() const { return _col_ids; }
     ColumnId column_id(size_t index) const { return _col_ids[index]; }
+    int column_index(ColumnId cid) const { return _column_id_to_index[cid]; }
+    const std::vector<int>& column_id_to_index() const { return 
_column_id_to_index; }
     int32_t delete_sign_idx() const { return _delete_sign_idx; }
     bool has_sequence_col() const { return _has_sequence_col; }
     int32_t rowid_col_idx() const { return _rowid_col_idx; }
@@ -119,6 +121,8 @@ private:
     // NOTE: _cols[cid] can only be accessed when the cid is
     // contained in _col_ids
     std::vector<TabletColumnPtr> _cols;
+    // Tablet column id -> slot index in this Schema.
+    std::vector<int> _column_id_to_index;
 
     size_t _num_key_columns;
     int32_t _delete_sign_idx = -1;
diff --git a/be/src/storage/segment/segment_iterator.cpp 
b/be/src/storage/segment/segment_iterator.cpp
index d0b5a0f3c05..9d3e5d98e47 100644
--- a/be/src/storage/segment/segment_iterator.cpp
+++ b/be/src/storage/segment/segment_iterator.cpp
@@ -17,7 +17,6 @@
 
 #include "storage/segment/segment_iterator.h"
 
-#include <assert.h>
 #include <gen_cpp/Exprs_types.h>
 #include <gen_cpp/Opcodes_types.h>
 #include <gen_cpp/Types_types.h>
@@ -25,6 +24,7 @@
 
 #include <algorithm>
 #include <boost/iterator/iterator_facade.hpp>
+#include <cassert>
 #include <cstdint>
 #include <memory>
 #include <numeric>
@@ -71,7 +71,6 @@
 #include "io/cache/cached_remote_file_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/io_common.h"
-#include "runtime/descriptors.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
@@ -113,127 +112,11 @@
 #include "storage/utils.h"
 #include "util/concurrency_stats.h"
 #include "util/defer_op.h"
-#include "util/json/path_in_data.h"
 #include "util/simd/bits.h"
 
 namespace doris {
 using namespace ErrorCode;
 namespace segment_v2 {
-namespace {
-
-Status tablet_column_id_by_slot(const TabletSchemaSPtr& tablet_schema, const 
SlotDescriptor* slot,
-                                ColumnId* cid) {
-    int32_t field_index = -1;
-    if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
-        field_index = tablet_schema->field_index(
-                
PathInData(tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
-                           slot->column_paths()));
-    } else {
-        field_index = slot->col_unique_id() >= 0 ? 
tablet_schema->field_index(slot->col_unique_id())
-                                                 : 
tablet_schema->field_index(slot->col_name());
-    }
-    if (field_index < 0) {
-        return Status::InternalError(
-                "field name is invalid. field={}, field_name_to_index={}, 
col_unique_id={}",
-                slot->col_name(), tablet_schema->get_all_field_names(), 
slot->col_unique_id());
-    }
-    *cid = field_index;
-    return Status::OK();
-}
-
-Status rebind_storage_expr_to_reader_schema(
-        const StorageReadOptions& opts, const VExprSPtr& expr,
-        const std::unordered_map<ColumnId, size_t>& cid_to_pos) {
-    DORIS_CHECK(expr != nullptr);
-
-    if (expr->is_slot_ref()) {
-        auto slot_ref = std::static_pointer_cast<VSlotRef>(expr);
-        auto* slot = 
opts.runtime_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
-        if (slot == nullptr) {
-            return Status::InternalError("slot {} is not found in descriptor 
table",
-                                         slot_ref->slot_id());
-        }
-
-        ColumnId cid = 0;
-        RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, 
&cid));
-        auto pos_it = cid_to_pos.find(cid);
-        if (pos_it == cid_to_pos.end()) {
-            return Status::InternalError("slot {} column {} with cid {} is not 
in reader schema",
-                                         slot_ref->slot_id(), 
slot->col_name(), cid);
-        }
-        slot_ref->set_column_id(cast_set<int>(pos_it->second));
-    } else if (expr->is_virtual_slot_ref()) {
-        auto virtual_slot_ref = std::static_pointer_cast<VirtualSlotRef>(expr);
-        auto* slot =
-                
opts.runtime_state->desc_tbl().get_slot_descriptor(virtual_slot_ref->slot_id());
-        if (slot == nullptr) {
-            return Status::InternalError("slot {} is not found in descriptor 
table",
-                                         virtual_slot_ref->slot_id());
-        }
-
-        ColumnId cid = 0;
-        RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, 
&cid));
-        auto pos_it = cid_to_pos.find(cid);
-        if (pos_it == cid_to_pos.end()) {
-            return Status::InternalError(
-                    "virtual slot {} column {} with cid {} is not in reader 
schema",
-                    virtual_slot_ref->slot_id(), slot->col_name(), cid);
-        }
-        virtual_slot_ref->set_column_id(cast_set<int>(pos_it->second));
-        // A virtual slot has its own output position in the reader block, and 
its
-        // materialization expression may also contain real slot refs. Rebind 
both
-        // sides so evaluating the virtual expression reads from the same block
-        // layout used by SegmentIterator.
-        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(
-                opts, virtual_slot_ref->get_virtual_column_expr(), 
cid_to_pos));
-    }
-
-    for (const auto& child : expr->children()) {
-        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, child, 
cid_to_pos));
-    }
-    return Status::OK();
-}
-
-} // namespace
-
-Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts, 
const Schema& schema,
-                                             const VExprContextSPtrs& 
common_exprs,
-                                             std::map<ColumnId, 
VExprContextSPtr>& virtual_exprs) {
-    if (common_exprs.empty() && virtual_exprs.empty()) {
-        return Status::OK();
-    }
-    DORIS_CHECK(opts.runtime_state != nullptr);
-    DORIS_CHECK(opts.tablet_schema != nullptr);
-
-    const auto keys_type = opts.tablet_schema->keys_type();
-    if (keys_type == KeysType::DUP_KEYS ||
-        (keys_type == KeysType::UNIQUE_KEYS && 
opts.enable_unique_key_merge_on_write)) {
-        return Status::OK();
-    }
-
-    // Storage exprs are prepared with RowDescriptor, so 
VSlotRef/VirtualSlotRef column_id points to
-    // the scan tuple column ordinal. SegmentIterator evaluates cloned exprs 
on a block built from
-    // the reader schema instead. AGG_KEYS and non-MOW UNIQUE_KEYS readers may 
expand the reader
-    // schema, for example by filling all key columns before 
merging/aggregating rows, so the scan
-    // tuple ordinal is not always the same as the runtime block ordinal.
-    //
-    // DUP_KEYS and UNIQUE_KEYS MOW use direct readers for query scans, so 
their reader block keeps
-    // the scan tuple layout and can skip this per-segment expression-tree 
traversal. For merge/agg
-    // readers, the reader schema is the source of truth: map tablet column id 
to reader-block
-    // position and rebind every storage expr slot to that position.
-    std::unordered_map<ColumnId, size_t> cid_to_pos;
-    for (size_t pos = 0; pos < schema.num_column_ids(); ++pos) {
-        cid_to_pos.emplace(schema.column_id(cast_set<int>(pos)), pos);
-    }
-
-    for (const auto& ctx : common_exprs) {
-        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, 
ctx->root(), cid_to_pos));
-    }
-    for (const auto& [_, ctx] : virtual_exprs) {
-        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, 
ctx->root(), cid_to_pos));
-    }
-    return Status::OK();
-}
 
 SegmentIterator::~SegmentIterator() = default;
 
@@ -530,7 +413,6 @@ Status SegmentIterator::_init_impl(const 
StorageReadOptions& opts) {
     }
 
     _virtual_column_exprs = _opts.virtual_column_exprs;
-    _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block;
     _score_runtime = _opts.score_runtime;
     _ann_topn_runtime = _opts.ann_topn_runtime;
 
@@ -582,10 +464,8 @@ Status SegmentIterator::_init_impl(const 
StorageReadOptions& opts) {
 
     RETURN_IF_ERROR(_construct_compound_expr_context());
     VLOG_DEBUG << fmt::format(
-            "Segment iterator init, virtual_column_exprs size: {}, "
-            "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}",
-            _opts.virtual_column_exprs.size(), 
_opts.vir_cid_to_idx_in_block.size(),
-            _common_expr_ctxs_push_down.size());
+            "Segment iterator init, virtual_column_exprs size: {}, 
common_expr_pushdown size: {}",
+            _opts.virtual_column_exprs.size(), 
_common_expr_ctxs_push_down.size());
     _initialize_predicate_results();
     return Status::OK();
 }
@@ -1002,6 +882,7 @@ Status SegmentIterator::_apply_ann_topn_predicate() {
 
     VLOG_DEBUG << fmt::format("Try apply ann topn: {}", 
_ann_topn_runtime->debug_string());
     size_t src_col_idx = _ann_topn_runtime->get_src_column_idx();
+    // AnnTopNRuntime keeps VSlotRef::column_id(), which is the scan schema 
ordinal.
     ColumnId src_cid = _schema->column_id(src_col_idx);
     IndexIterator* ann_index_iterator = _index_iterators[src_cid].get();
     bool has_ann_index = _column_has_ann_index(src_cid);
@@ -1290,8 +1171,9 @@ Status 
SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) {
     auto node_type = expr->node_type();
     if (node_type == TExprNodeType::SLOT_REF) {
         auto slot_expr = std::dynamic_pointer_cast<doris::VSlotRef>(expr);
-        _is_common_expr_column[_schema->column_id(slot_expr->column_id())] = 
true;
-        
_common_expr_columns.insert(_schema->column_id(slot_expr->column_id()));
+        auto cid = _schema->column_id(slot_expr->column_id());
+        _is_common_expr_column[cid] = true;
+        _common_expr_columns.insert(cid);
     } else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) {
         std::shared_ptr<VirtualSlotRef> virtual_slot_ref =
                 std::dynamic_pointer_cast<VirtualSlotRef>(expr);
@@ -1512,6 +1394,9 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
     if (_opts.runtime_state && 
!_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
         return true;
     }
+    if (_can_skip_reading_extra_column(cid)) {
+        return false;
+    }
     // only support DUP_KEYS and UNIQUE_KEYS with MOW
     if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
            (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
@@ -1519,7 +1404,7 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
         return true;
     }
     // this is a virtual column, we always need to read data
-    if (this->_vir_cid_to_idx_in_block.contains(cid)) {
+    if (_virtual_column_exprs.contains(cid)) {
         return true;
     }
 
@@ -1684,8 +1569,8 @@ Status SegmentIterator::_init_return_column_iterators() {
     }
 
 #ifndef NDEBUG
-    for (auto pair : _vir_cid_to_idx_in_block) {
-        ColumnId vir_col_cid = pair.first;
+    for (const auto& entry : _virtual_column_exprs) {
+        ColumnId vir_col_cid = entry.first;
         DCHECK(_column_iterators[vir_col_cid] != nullptr)
                 << "Virtual column iterator for " << vir_col_cid << " should 
not be null";
         ColumnIterator* column_iter = _column_iterators[vir_col_cid].get();
@@ -2083,19 +1968,6 @@ Status SegmentIterator::_vec_init_lazy_materialization() 
{
         _is_need_short_eval = true;
     }
 
-    // ColumnId to column index in block
-    // ColumnId will contail all columns in tablet schema, including virtual 
columns and global rowid column,
-    _schema_block_id_map.resize(_schema->columns().size(), -1);
-    // Use cols read by query to initialize _schema_block_id_map.
-    // We need to know the index of each column in the block.
-    // There is an assumption here that the columns in the block are in the 
same order as in the read schema.
-    // TODO: A probelm is that, delete condition columns will exist in 
_schema->column_ids but not in block if
-    // delete column is not read by the query.
-    for (int i = 0; i < _schema->num_column_ids(); i++) {
-        auto cid = _schema->column_id(i);
-        _schema_block_id_map[cid] = i;
-    }
-
     // Step2: extract columns that can execute expr context
     _is_common_expr_column.resize(_schema->columns().size(), false);
     if (!_common_expr_ctxs_push_down.empty()) {
@@ -2109,13 +1981,13 @@ Status 
SegmentIterator::_vec_init_lazy_materialization() {
                 // if delete condition column not in the block, no filter is 
needed
                 // and will be removed from _columns_to_filter in the first 
next_batch.
                 if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
-                    auto loc = _schema_block_id_map[cid];
+                    auto loc = _schema->column_index(cid);
                     _columns_to_filter.push_back(loc);
                 }
             }
 
-            for (auto pair : _vir_cid_to_idx_in_block) {
-                _columns_to_filter.push_back(cast_set<ColumnId>(pair.second));
+            for (const auto& entry : _virtual_column_exprs) {
+                
_columns_to_filter.push_back(_schema->column_index(entry.first));
             }
         }
     }
@@ -2187,11 +2059,11 @@ Status 
SegmentIterator::_vec_init_lazy_materialization() {
             "_non_predicate_columns: [{}], "
             "_cols_read_by_common_expr: [{}], "
             "columns_to_filter: [{}], "
-            "_schema_block_id_map: [{}]",
+            "schema_column_id_to_index: [{}]",
             _lazy_materialization_read, _col_predicates.size(),
             fmt::join(_predicate_column_ids, ","), 
fmt::join(_non_predicate_columns, ","),
             fmt::join(_common_expr_column_ids, ","), 
fmt::join(_columns_to_filter, ","),
-            fmt::join(_schema_block_id_map, ","));
+            fmt::join(_schema->column_id_to_index(), ","));
     return Status::OK();
 }
 
@@ -2226,31 +2098,47 @@ bool 
SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptr<ColumnPredica
     }
 }
 
-bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, 
bool fill_defaults,
+// These placeholders are used only when the real column data is skipped after
+// index/count pushdown has already identified the matching rows. The value is
+// irrelevant, but nullable columns must stay non-NULL so COUNT(col) can count
+// the matched rows instead of treating every placeholder as NULL.
+static void insert_many_not_null_defaults(MutableColumnPtr& column, size_t 
num) {
+    if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(column.get())) {
+        nullable_column->insert_not_null_elements(num);
+        return;
+    }
+    column->insert_many_defaults(num);
+}
+
+bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column,
                                     size_t num_of_defaults) {
     if (_need_read_data(cid)) {
         return false;
     }
-    if (!fill_defaults) {
-        return true;
-    }
-    if (is_column_nullable(*column)) {
-        auto nullable_col_ptr = 
reinterpret_cast<ColumnNullable*>(column.get());
-        
nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
-        
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
-    } else {
-        // assert(column->is_const());
-        column->insert_many_defaults(num_of_defaults);
-    }
+    insert_many_not_null_defaults(column, num_of_defaults);
     return true;
 }
 
+bool SegmentIterator::_can_skip_reading_extra_column(ColumnId cid) {
+    if (!_opts.extra_columns.contains(cid) || _is_pred_column.empty()) {
+        return false;
+    }
+    DCHECK_EQ(_is_pred_column.size(), _is_common_expr_column.size());
+    DCHECK_LT(cid, _is_pred_column.size());
+
+    // extra_columns is only an optimization hint. The real value is still
+    // required when the column participates in expression materialization or
+    // any predicate path.
+    return !_virtual_column_exprs.contains(cid) && !_has_delete_predicate(cid) 
&&
+           !_is_pred_column[cid] && !_is_common_expr_column[cid];
+}
+
 Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
                                       MutableColumns& column_block, size_t 
nrows) {
     for (auto cid : column_ids) {
         auto& column = column_block[cid];
         size_t rows_read = nrows;
-        if (_prune_column(cid, column, true, rows_read)) {
+        if (_prune_column(cid, column, rows_read)) {
             continue;
         }
         RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, 
column));
@@ -2304,7 +2192,7 @@ Status SegmentIterator::_init_current_block(Block* block,
         }
     }
 
-    for (auto entry : _virtual_column_exprs) {
+    for (const auto& entry : _virtual_column_exprs) {
         auto cid = entry.first;
         current_columns[cid] = ColumnNothing::create(0);
         current_columns[cid]->reserve(nrows_read_limit);
@@ -2317,11 +2205,11 @@ Status SegmentIterator::_output_non_pred_columns(Block* 
block) {
     SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
     VLOG_DEBUG << fmt::format(
             "Output non-predicate columns, _non_predicate_columns: [{}], "
-            "_schema_block_id_map: [{}]",
-            fmt::join(_non_predicate_columns, ","), 
fmt::join(_schema_block_id_map, ","));
+            "schema_column_id_to_index: [{}]",
+            fmt::join(_non_predicate_columns, ","), 
fmt::join(_schema->column_id_to_index(), ","));
     RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
     for (auto cid : _non_predicate_columns) {
-        auto loc = _schema_block_id_map[cid];
+        auto loc = _schema->column_index(cid);
         // Whether a delete predicate column gets output depends on how the 
caller builds
         // the block passed to next_batch(). Both calling paths now build the 
block with
         // only the output schema (return_columns), so delete predicate 
columns are skipped:
@@ -2338,7 +2226,7 @@ Status SegmentIterator::_output_non_pred_columns(Block* 
block) {
         if (loc < block->columns()) {
             bool column_in_block_is_nothing = check_and_get_column<const 
ColumnNothing>(
                     block->get_by_position(loc).column.get());
-            bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid);
+            bool column_is_normal = !_virtual_column_exprs.contains(cid);
             bool return_column_is_nothing =
                     check_and_get_column<const 
ColumnNothing>(_current_return_columns[cid].get());
             VLOG_DEBUG << fmt::format(
@@ -2408,7 +2296,7 @@ Status SegmentIterator::_read_columns_by_index(uint32_t 
nrows_read_limit, uint16
                 VLOG_DEBUG << fmt::format("Column {} no need to read.", cid);
                 continue;
             }
-            if (_prune_column(cid, column, true, nrows_read)) {
+            if (_prune_column(cid, column, nrows_read)) {
                 VLOG_DEBUG << fmt::format("Column {} is pruned. No need to 
read data.", cid);
                 continue;
             }
@@ -2789,7 +2677,7 @@ Status 
SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
         if (_no_need_read_key_data(cid, colunm, select_size)) {
             continue;
         }
-        if (_prune_column(cid, colunm, true, select_size)) {
+        if (_prune_column(cid, colunm, select_size)) {
             continue;
         }
 
@@ -2850,10 +2738,9 @@ Status SegmentIterator::next_batch(Block* block) {
             if (res.is<END_OF_FILE>()) {
                 // Since we have a type check at the caller.
                 // So a replacement of nothing column with real column is 
needed.
-                const auto& idx_to_datatype = _opts.vir_col_idx_to_type;
-                for (const auto& pair : _vir_cid_to_idx_in_block) {
-                    size_t idx = pair.second;
-                    auto type = idx_to_datatype.find(idx)->second;
+                for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
+                    auto idx = _schema->column_index(cid);
+                    auto type = expr_ctx->root()->data_type();
                     block->replace_by_position(idx, type->create_column());
                 }
 
@@ -3039,7 +2926,7 @@ Status SegmentIterator::_next_batch_internal(Block* 
block) {
                         
RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
                     }
 
-                    DCHECK(block->columns() > 
_schema_block_id_map[*_common_expr_columns.begin()]);
+                    DCHECK(block->columns() > 
_schema->column_index(*_common_expr_columns.begin()));
                     RETURN_IF_ERROR(
                             _process_common_expr(_sel_rowid_idx.data(), 
_selected_size, block));
                 }
@@ -3091,12 +2978,12 @@ Status SegmentIterator::_next_batch_internal(Block* 
block) {
     if (!_virtual_column_exprs.empty()) {
         bool use_sel = _is_need_vec_eval || _is_need_short_eval || 
_is_need_expr_eval;
         uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr;
-        std::vector<VExprContext*> vir_ctxs;
+        VExprContextSPtrs vir_ctxs;
         vir_ctxs.reserve(_virtual_column_exprs.size());
         for (auto& [cid, ctx] : _virtual_column_exprs) {
-            vir_ctxs.push_back(ctx.get());
+            vir_ctxs.push_back(ctx);
         }
-        _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size, 
block);
+        _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size);
     }
     RETURN_IF_ERROR(_materialization_of_virtual_column(block));
     if (_opts.read_limit > 0) {
@@ -3108,7 +2995,7 @@ Status SegmentIterator::_next_batch_internal(Block* 
block) {
 Status SegmentIterator::_process_columns(const std::vector<ColumnId>& 
column_ids, Block* block) {
     RETURN_IF_ERROR(_convert_to_expected_type(column_ids));
     for (auto cid : column_ids) {
-        auto loc = _schema_block_id_map[cid];
+        auto loc = _schema->column_index(cid);
         block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
     }
     return Status::OK();
@@ -3119,12 +3006,10 @@ void SegmentIterator::_fill_column_nothing() {
     // Because:
     // 1. Before each batch, _init_return_columns is called to initialize 
_current_return_columns, and virtual columns in _current_return_columns are 
initialized as ColumnNothing.
     // 2. When select_size == 0, the read method of VirtualColumnIterator will 
definitely not be called, so the corresponding Column remains a ColumnNothing
-    for (const auto pair : _vir_cid_to_idx_in_block) {
-        auto cid = pair.first;
-        auto pos = pair.second;
+    for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
         [[maybe_unused]] const auto* nothing_col =
                 assert_cast<const 
ColumnNothing*>(_current_return_columns[cid].get());
-        _current_return_columns[cid] = 
_opts.vir_col_idx_to_type[pos]->create_column();
+        _current_return_columns[cid] = 
expr_ctx->root()->data_type()->create_column();
     }
 }
 
@@ -3140,17 +3025,15 @@ Status SegmentIterator::_check_output_block(Block* 
block) {
                     idx, block->columns(), _schema->num_column_ids(), 
_virtual_column_exprs.size());
         } else if (check_and_get_column<ColumnNothing>(entry.column.get())) {
             if (rows > 0) {
-                std::vector<std::string> vcid_to_idx;
-                for (const auto& pair : _vir_cid_to_idx_in_block) {
-                    vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, 
pair.second));
+                std::vector<ColumnId> virtual_column_ids;
+                for (const auto& pair : _virtual_column_exprs) {
+                    virtual_column_ids.push_back(pair.first);
                 }
-                std::string vir_cid_to_idx_in_block_msg =
-                        fmt::format("_vir_cid_to_idx_in_block:[{}]", 
fmt::join(vcid_to_idx, ","));
                 return Status::InternalError(
                         "Column in idx {} is nothing, block columns {}, 
normal_columns {}, "
-                        "vir_cid_to_idx_in_block_msg {}",
+                        "virtual_column_ids [{}]",
                         idx, block->columns(), _schema->num_column_ids(),
-                        vir_cid_to_idx_in_block_msg);
+                        fmt::join(virtual_column_ids, ","));
             }
         } else if (entry.column->size() != rows) {
             return Status::InternalError(
@@ -3165,10 +3048,6 @@ Status SegmentIterator::_check_output_block(Block* 
block) {
     return Status::OK();
 }
 
-Status SegmentIterator::_process_column_predicate() {
-    return Status::OK();
-}
-
 Status SegmentIterator::_process_eof(Block* block) {
     // Convert all columns in _current_return_columns to schema column
     RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
@@ -3187,32 +3066,10 @@ Status SegmentIterator::_process_eof(Block* block) {
 
 Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, 
uint16_t& selected_size,
                                              Block* block) {
-    // Here we just use col0 as row_number indicator. when reach here, we will 
calculate the predicates first.
-    //  then use the result to reduce our data read(that is, expr push down). 
there's now row in block means the first
-    //  column is not in common expr. so it's safe to replace it temporarily 
to provide correct `selected_size`.
     VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected 
size {}", block->rows(),
                               _selected_size);
 
-    bool need_mock_col = block->rows() != selected_size;
-    MutableColumnPtr col0;
-    if (need_mock_col) {
-        col0 = std::move(*block->get_by_position(0).column).mutate();
-        block->replace_by_position(
-                0, 
block->get_by_position(0).type->create_column_const_with_default_value(
-                           _selected_size));
-    }
-
-    std::vector<VExprContext*> common_ctxs;
-    common_ctxs.reserve(_common_expr_ctxs_push_down.size());
-    for (auto& ctx : _common_expr_ctxs_push_down) {
-        common_ctxs.push_back(ctx.get());
-    }
-    _output_index_result_column(common_ctxs, _sel_rowid_idx.data(), 
_selected_size, block);
-    RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), 
_selected_size, block));
-
-    if (need_mock_col) {
-        block->replace_by_position(0, std::move(col0));
-    }
+    RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block));
 
     VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, 
selected size {}",
                               block->rows(), _selected_size);
@@ -3223,14 +3080,24 @@ Status SegmentIterator::_execute_common_expr(uint16_t* 
sel_rowid_idx, uint16_t&
                                              Block* block) {
     SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
     DCHECK(!_common_expr_ctxs_push_down.empty());
-    DCHECK(block->rows() != 0);
-    int prev_columns = block->columns();
+    _output_index_result_column(_common_expr_ctxs_push_down, sel_rowid_idx, 
selected_size);
+
     uint16_t original_size = selected_size;
     _opts.stats->expr_cond_input_rows += original_size;
 
-    IColumn::Filter filter;
-    RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-            _common_expr_ctxs_push_down, block, _columns_to_filter, 
prev_columns, filter));
+    // Some output columns may stay empty until after common expr filtering. 
Use the
+    // selected row count instead of Block::rows(), which is derived from the 
first column.
+    IColumn::Filter filter(selected_size, 1);
+    bool can_filter_all = false;
+    auto* __restrict filter_data = filter.data();
+    for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
+        RETURN_IF_ERROR(expr_ctx->execute_filter(block, filter_data, 
selected_size, false,
+                                                 &can_filter_all));
+        if (can_filter_all) {
+            break;
+        }
+    }
+    RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, 
_columns_to_filter, filter));
 
     selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, 
filter);
     _opts.stats->rows_expr_cond_filtered += original_size - selected_size;
@@ -3279,15 +3146,14 @@ uint16_t 
SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
     }
 }
 
-void SegmentIterator::_output_index_result_column(const 
std::vector<VExprContext*>& expr_ctxs,
-                                                  uint16_t* sel_rowid_idx, 
uint16_t select_size,
-                                                  Block* block) {
+void SegmentIterator::_output_index_result_column(const VExprContextSPtrs& 
expr_ctxs,
+                                                  uint16_t* sel_rowid_idx, 
uint16_t select_size) {
     SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
-    if (block->rows() == 0) {
+    if (select_size == 0) {
         return;
     }
-    for (auto* expr_ctx_ptr : expr_ctxs) {
-        auto index_ctx = expr_ctx_ptr->get_index_context();
+    for (const auto& expr_ctx : expr_ctxs) {
+        auto index_ctx = expr_ctx->get_index_context();
         if (index_ctx == nullptr) {
             continue;
         }
@@ -3297,7 +3163,7 @@ void SegmentIterator::_output_index_result_column(const 
std::vector<VExprContext
             const auto& index_result_bitmap = result_bitmap.get_data_bitmap();
             auto index_result_column = ColumnUInt8::create();
             ColumnUInt8::Container& vec_match_pred = 
index_result_column->get_data();
-            vec_match_pred.resize(block->rows());
+            vec_match_pred.resize(select_size);
             std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0);
 
             const auto& null_bitmap = result_bitmap.get_null_bitmap();
@@ -3309,7 +3175,7 @@ void SegmentIterator::_output_index_result_column(const 
std::vector<VExprContext
             if (has_null_bitmap && expr_returns_nullable) {
                 null_map_column = ColumnUInt8::create();
                 auto& null_map_vec = null_map_column->get_data();
-                null_map_vec.resize(block->rows());
+                null_map_vec.resize(select_size);
                 std::fill(null_map_vec.begin(), null_map_vec.end(), 0);
                 null_map_data = &null_map_column->get_data();
             }
@@ -3326,7 +3192,7 @@ void SegmentIterator::_output_index_result_column(const 
std::vector<VExprContext
                 }
             }
 
-            DCHECK(block->rows() == vec_match_pred.size());
+            DCHECK(select_size == vec_match_pred.size());
 
             if (null_map_column) {
                 index_ctx->set_index_result_column_for_expr(
@@ -3414,8 +3280,6 @@ Status 
SegmentIterator::_construct_compound_expr_context() {
         context->set_index_context(inverted_index_context);
         expr_ctx = context;
     }
-    RETURN_IF_ERROR(rebind_storage_exprs_to_reader_schema(
-            _opts, *_schema, _common_expr_ctxs_push_down, 
_virtual_column_exprs));
     return Status::OK();
 }
 
@@ -3547,8 +3411,8 @@ void 
SegmentIterator::_calculate_common_expr_index_exec_status() {
                             for (const auto& vir_child : vir_node->children()) 
{
                                 if (vir_child->is_slot_ref()) {
                                     auto* inner_slot_ref = 
assert_cast<VSlotRef*>(vir_child.get());
-                                    
_common_expr_index_exec_status[_schema->column_id(
-                                            
inner_slot_ref->column_id())][expr.get()] = false;
+                                    auto cid = 
_schema->column_id(inner_slot_ref->column_id());
+                                    
_common_expr_index_exec_status[cid][expr.get()] = false;
                                     
_common_expr_to_slotref_map[root_expr_ctx.get()]
                                                                
[inner_slot_ref->column_id()] =
                                                                        
expr.get();
@@ -3565,8 +3429,8 @@ void 
SegmentIterator::_calculate_common_expr_index_exec_status() {
                 auto expr_without_cast = VExpr::expr_without_cast(child);
                 if (expr_without_cast->is_slot_ref() && expr->op() != 
TExprOpcode::CAST) {
                     auto* column_slot_ref = 
assert_cast<VSlotRef*>(expr_without_cast.get());
-                    
_common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())]
-                                                  [expr.get()] = false;
+                    auto cid = 
_schema->column_id(column_slot_ref->column_id());
+                    _common_expr_index_exec_status[cid][expr.get()] = false;
                     
_common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] =
                             expr.get();
                 }
@@ -3610,14 +3474,7 @@ bool SegmentIterator::_no_need_read_key_data(ColumnId 
cid, MutableColumnPtr& col
         return false;
     }
 
-    if (is_column_nullable(*column)) {
-        auto* nullable_col_ptr = 
reinterpret_cast<ColumnNullable*>(column.get());
-        
nullable_col_ptr->get_null_map_column().insert_many_defaults(nrows_read);
-        
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(nrows_read);
-    } else {
-        column->insert_many_defaults(nrows_read);
-    }
-
+    insert_many_not_null_defaults(column, nrows_read);
     return true;
 }
 
@@ -3668,55 +3525,46 @@ bool SegmentIterator::_can_opt_limit_reads() {
 
 // Before get next batch. make sure all virtual columns in block has type 
ColumnNothing.
 void SegmentIterator::_init_virtual_columns(Block* block) {
-    for (const auto& pair : _vir_cid_to_idx_in_block) {
-        auto& col_with_type_and_name = block->get_by_position(pair.second);
+    for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
+        auto idx = _schema->column_index(cid);
+        auto& col_with_type_and_name = block->get_by_position(idx);
         col_with_type_and_name.column = ColumnNothing::create(0);
-        col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second];
+        col_with_type_and_name.type = expr_ctx->root()->data_type();
     }
 }
 
 Status SegmentIterator::_materialization_of_virtual_column(Block* block) {
     // Some expr can not process empty block, such as function `element_at`.
     // So materialize virtual column in advance to avoid errors.
-    if (block->rows() == 0) {
-        for (const auto& pair : _vir_cid_to_idx_in_block) {
-            auto& col_with_type_and_name = block->get_by_position(pair.second);
-            col_with_type_and_name.column = 
_opts.vir_col_idx_to_type[pair.second]->create_column();
-            col_with_type_and_name.type = 
_opts.vir_col_idx_to_type[pair.second];
+    if (_selected_size == 0) {
+        for (const auto& [cid, expr_ctx] : _virtual_column_exprs) {
+            auto idx = _schema->column_index(cid);
+            auto& col_with_type_and_name = block->get_by_position(idx);
+            col_with_type_and_name.column = 
expr_ctx->root()->data_type()->create_column();
+            col_with_type_and_name.type = expr_ctx->root()->data_type();
         }
         return Status::OK();
     }
+    if (_virtual_column_exprs.empty()) {
+        return Status::OK();
+    }
 
     for (const auto& cid_and_expr : _virtual_column_exprs) {
         auto cid = cid_and_expr.first;
         auto column_expr = cid_and_expr.second;
-        size_t idx_in_block = _vir_cid_to_idx_in_block[cid];
-        if (block->columns() <= idx_in_block) {
-            return Status::InternalError(
-                    "Virtual column index {} is out of range, block columns 
{}, "
-                    "virtual columns size {}, virtual column expr {}",
-                    idx_in_block, block->columns(), 
_vir_cid_to_idx_in_block.size(),
-                    column_expr->root()->debug_string());
-        } else if (block->get_by_position(idx_in_block).column.get() == 
nullptr) {
-            return Status::InternalError(
-                    "Virtual column index {} is null, block columns {}, 
virtual columns size "
-                    "{}, "
-                    "virtual column expr {}",
-                    idx_in_block, block->columns(), 
_vir_cid_to_idx_in_block.size(),
-                    column_expr->root()->debug_string());
-        }
-        if (check_and_get_column<const ColumnNothing>(
-                    block->get_by_position(idx_in_block).column.get())) {
+        auto materialized_pos = _schema->column_index(cid);
+        auto& column = block->get_by_position(materialized_pos).column;
+        if (check_and_get_column<const ColumnNothing>(column.get())) {
             VLOG_DEBUG << fmt::format("Virtual column is doing 
materialization, cid {}, col idx {}",
-                                      cid, idx_in_block);
+                                      cid, materialized_pos);
             ColumnPtr result_column;
-            RETURN_IF_ERROR(column_expr->execute(block, result_column));
+            // The first block column may still be ColumnNothing(0) for a 
virtual column, while
+            // predicates have already reduced _selected_size. Evaluate the 
expression over the
+            // selected row count instead of Block::rows().
+            
RETURN_IF_ERROR(column_expr->root()->execute_column(column_expr.get(), block, 
nullptr,
+                                                                
_selected_size, result_column));
 
-            block->replace_by_position(idx_in_block, std::move(result_column));
-            if (block->get_by_position(idx_in_block).column->size() == 0) {
-                LOG_WARNING("Result of expr column {} is empty. cid {}, 
idx_in_block {}",
-                            column_expr->root()->debug_string(), cid, 
idx_in_block);
-            }
+            block->replace_by_position(materialized_pos, 
std::move(result_column));
         }
     }
     return Status::OK();
diff --git a/be/src/storage/segment/segment_iterator.h 
b/be/src/storage/segment/segment_iterator.h
index da6b5b870ab..c6ae5a47ae5 100644
--- a/be/src/storage/segment/segment_iterator.h
+++ b/be/src/storage/segment/segment_iterator.h
@@ -18,9 +18,9 @@
 #pragma once
 
 #include <gen_cpp/Exprs_types.h>
-#include <stddef.h>
-#include <stdint.h>
 
+#include <cstddef>
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <ostream>
@@ -241,7 +241,7 @@ private:
                                                    uint16_t* sel_rowid_idx, 
uint16_t select_size) {
         SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
         for (auto cid : column_ids) {
-            int block_cid = _schema_block_id_map[cid];
+            int block_cid = _schema->column_index(cid);
             // Only the additional deleted filter condition need to 
materialize column be at the end of the block
             // We should not to materialize the column of query engine do not 
need. So here just return OK.
             // Eg:
@@ -276,7 +276,6 @@ private:
     bool _can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate> 
predicate);
 
     [[nodiscard]] Status _extract_common_expr_columns(const VExprSPtr& expr);
-    // same with _extract_common_expr_columns, but only extract columns that 
can be used for index
     [[nodiscard]] Status _execute_common_expr(uint16_t* sel_rowid_idx, 
uint16_t& selected_size,
                                               Block* block);
     Status _process_common_expr(uint16_t* sel_rowid_idx, uint16_t& 
selected_size, Block* block);
@@ -292,12 +291,11 @@ private:
 
     bool _check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred);
 
-    void _output_index_result_column(const std::vector<VExprContext*>& 
expr_ctxs,
-                                     uint16_t* sel_rowid_idx, uint16_t 
select_size, Block* block);
+    void _output_index_result_column(const VExprContextSPtrs& expr_ctxs, 
uint16_t* sel_rowid_idx,
+                                     uint16_t select_size);
 
     bool _need_read_data(ColumnId cid);
-    bool _prune_column(ColumnId cid, MutableColumnPtr& column, bool 
fill_defaults,
-                       size_t num_of_defaults);
+    bool _prune_column(ColumnId cid, MutableColumnPtr& column, size_t 
num_of_defaults);
 
     Status _construct_compound_expr_context();
 
@@ -306,7 +304,7 @@ private:
         for (auto cid : col_ids) {
             auto ord = key.field(cid) <=> (*_seek_block[cid])[0];
             if (ord != std::strong_ordering::equal) {
-                return ord < 0 ? -1 : 1;
+                return ord == std::strong_ordering::less ? -1 : 1;
             }
         }
         return 0;
@@ -317,6 +315,7 @@ private:
     bool _no_need_read_key_data(ColumnId cid, MutableColumnPtr& column, size_t 
nrows_read);
 
     bool _has_delete_predicate(ColumnId cid);
+    bool _can_skip_reading_extra_column(ColumnId cid);
 
     bool _can_opt_limit_reads();
 
@@ -328,8 +327,6 @@ private:
 
     Status _process_eof(Block* block);
 
-    Status _process_column_predicate();
-
     void _fill_column_nothing();
 
     Status _process_columns(const std::vector<ColumnId>& column_ids, Block* 
block);
@@ -394,11 +391,11 @@ private:
     // so we need a field to stand for columns first time to read
     std::vector<ColumnId> _predicate_column_ids;
     std::vector<ColumnId> _common_expr_column_ids;
-    // TODO: Should use std::vector<size_t>
+    // Block slot indexes to filter after common expr evaluation. This is not
+    // tablet column ids because Block::filter_block_internal filters by block
+    // position.
     std::vector<ColumnId> _columns_to_filter;
     std::vector<bool> _converted_column_ids;
-    // TODO: Should use std::vector<size_t>
-    std::vector<int> _schema_block_id_map; // map from schema column id to 
column idx in Block
 
     // the actual init process is delayed to the first call to next_batch()
     bool _lazy_inited;
@@ -473,7 +470,6 @@ private:
 
     // cid to virtual column expr
     std::map<ColumnId, VExprContextSPtr> _virtual_column_exprs;
-    std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
 
     IndexQueryContextPtr _index_query_context;
 
diff --git a/be/src/storage/tablet/tablet_reader.cpp 
b/be/src/storage/tablet/tablet_reader.cpp
index 2e7e73a3676..4ae7945c688 100644
--- a/be/src/storage/tablet/tablet_reader.cpp
+++ b/be/src/storage/tablet/tablet_reader.cpp
@@ -204,14 +204,13 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     _reader_context.is_key_column_group = read_params.is_key_column_group;
     _reader_context.common_expr_ctxs_push_down = 
read_params.common_expr_ctxs_push_down;
     _reader_context.output_columns = &read_params.output_columns;
+    _reader_context.extra_columns = read_params.extra_columns;
     _reader_context.push_down_agg_type_opt = 
read_params.push_down_agg_type_opt;
     _reader_context.ttl_seconds = _tablet->ttl_seconds();
     _reader_context.score_runtime = read_params.score_runtime;
     _reader_context.collection_statistics = read_params.collection_statistics;
 
     _reader_context.virtual_column_exprs = read_params.virtual_column_exprs;
-    _reader_context.vir_cid_to_idx_in_block = 
read_params.vir_cid_to_idx_in_block;
-    _reader_context.vir_col_idx_to_type = read_params.vir_col_idx_to_type;
     _reader_context.ann_topn_runtime = read_params.ann_topn_runtime;
 
     _reader_context.condition_cache_digest = 
read_params.condition_cache_digest;
@@ -229,10 +228,6 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW
     _reader_context.general_read_limit = read_params.general_read_limit;
 
-    // Preserve the original requested output layout so BlockReader can map 
expanded storage
-    // columns (for non-direct AGG/UNIQUE paths) back to the final output 
block.
-    _reader_context.origin_return_columns = read_params.origin_return_columns;
-
     return Status::OK();
 }
 
diff --git a/be/src/storage/tablet/tablet_reader.h 
b/be/src/storage/tablet/tablet_reader.h
index 3d03caf6d74..4eaeaa5f6c5 100644
--- a/be/src/storage/tablet/tablet_reader.h
+++ b/be/src/storage/tablet/tablet_reader.h
@@ -158,6 +158,12 @@ public:
         std::vector<ColumnId> return_columns;
         // output_columns only contain columns in OrderByExprs and outputExprs
         std::set<int32_t> output_columns;
+        // Extra storage key columns that are present only for scan-schema 
alignment.
+        // Example: for AGG keys (k1, k2), a query that returns k2 can scan
+        // (k1, k2) and project away k1. Direct readers may avoid reading such
+        // columns only if the lower iterator proves their real values are not
+        // required by predicates, delete conditions, or expressions.
+        std::set<ColumnId> extra_columns;
         RuntimeProfile* profile = nullptr;
         RuntimeState* runtime_state = nullptr;
 
@@ -209,8 +215,6 @@ public:
         int64_t batch_size = -1;
 
         std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
-        std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
-        std::map<size_t, DataTypePtr> vir_col_idx_to_type;
 
         std::shared_ptr<ScoreRuntime> score_runtime;
         CollectionStatisticsPtr collection_statistics;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 14e063ac7f2..1c037c65edb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -36,6 +36,7 @@ import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.AliasFunction;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.RowBinlogTableWrapper;
 import org.apache.doris.catalog.TableIf;
@@ -3002,6 +3003,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                     .collect(Collectors.toSet());
             requiredWithVirtualColumns.addAll(virtualColumnInputSlotIds);
         }
+        if (scanNode instanceof OlapScanNode) {
+            preserveExtraStorageKeySlots((OlapScanNode) scanNode, 
requiredWithVirtualColumns);
+        }
         // Find the smallest column, for count(*) or other situation that slot 
is empty after prune
         SlotDescriptor smallest = 
getSmallestSlot(scanNode.getTupleDesc().getSlots());
         scanNode.getTupleDesc().getSlots().removeIf(s -> 
!requiredWithVirtualColumns.contains(s.getId()));
@@ -3021,6 +3025,35 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
     }
 
+    private void preserveExtraStorageKeySlots(OlapScanNode scanNode, 
Set<SlotId> requiredSlotIds) {
+        if (!shouldPreserveStorageKeySlots(scanNode)) {
+            return;
+        }
+        for (SlotDescriptor slot : scanNode.getTupleDesc().getSlots()) {
+            Column column = slot.getColumn();
+            if (column == null || !column.isKey()) {
+                // OLAP scan tuples follow the storage schema, where key 
columns form the prefix.
+                break;
+            }
+            if (!requiredSlotIds.contains(slot.getId())) {
+                scanNode.getExtraKeyColumnSlotIds().add(slot.getId().asInt());
+                requiredSlotIds.add(slot.getId());
+            }
+        }
+    }
+
+    private boolean shouldPreserveStorageKeySlots(OlapScanNode scanNode) {
+        if (scanNode.isIncrementalScan()) {
+            return false;
+        }
+        long selectedIndexId = scanNode.getSelectedIndexId() == -1
+                ? scanNode.getOlapTable().getBaseIndexId()
+                : scanNode.getSelectedIndexId();
+        KeysType keysType = 
scanNode.getOlapTable().getIndexMetaByIndexId(selectedIndexId).getKeysType();
+        return keysType == KeysType.AGG_KEYS
+                || (keysType == KeysType.UNIQUE_KEYS && 
!scanNode.getOlapTable().getEnableUniqueKeyMergeOnWrite());
+    }
+
     /**
      * Get the smallest slot in the slot list.
      *
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index d60ff2d5e34..d65ddcd5c6f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -436,9 +436,8 @@ public class PhysicalOlapScan extends 
PhysicalCatalogRelation implements OlapSca
                 selectedIndexId, selectedTabletIds, selectedPartitionIds, 
hasPartitionPredicate,
                 distributionSpec, preAggStatus, baseOutputs, groupExpression, 
getLogicalProperties(),
                 getPhysicalProperties(), statistics, tableSample, 
operativeSlots, virtualColumns, scoreOrderKeys,
-                scoreLimit,
-                scoreRangeInfo, annOrderKeys, annLimit, tableAlias, 
partitionPrunablePredicates, incrementalScan,
-                scanParams));
+                scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, 
tableAlias, partitionPrunablePredicates,
+                incrementalScan, scanParams));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 2f93e50d3d5..1f2c7e97746 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -189,6 +189,7 @@ public class OlapScanNode extends ScanNode {
 
     private SortInfo sortInfo = null;
     private Set<Integer> outputColumnUniqueIds = new HashSet<>();
+    private Set<Integer> extraKeyColumnSlotIds = new HashSet<>();
 
     // When scan match sort_info, we can push limit into OlapScanNode.
     // It's limit for scanner instead of scanNode so we add a new limit.
@@ -269,6 +270,10 @@ public class OlapScanNode extends ScanNode {
         this.tableSample = tSample;
     }
 
+    public Set<Integer> getExtraKeyColumnSlotIds() {
+        return extraKeyColumnSlotIds;
+    }
+
     public void setNereidsPrunedTabletIds(Set<Long> nereidsPrunedTabletIds) {
         this.nereidsPrunedTabletIds = nereidsPrunedTabletIds;
     }
@@ -1124,6 +1129,12 @@ public class OlapScanNode extends ScanNode {
                             .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
             output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
         }
+        if (!extraKeyColumnSlotIds.isEmpty()) {
+            String extraKeyColumns = extraKeyColumnSlotIds.stream().sorted()
+                    .map(this::getExtraKeyColumnExplainName)
+                    .collect(Collectors.joining(","));
+            output.append(prefix).append("EXTRA KEY COLUMNS: 
").append(extraKeyColumns).append("\n");
+        }
 
         if (!conjuncts.isEmpty()) {
             Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
@@ -1164,6 +1175,17 @@ public class OlapScanNode extends ScanNode {
         return output.toString();
     }
 
+    private String getExtraKeyColumnExplainName(Integer slotId) {
+        SlotDescriptor extraKeySlot = desc.getSlots().stream()
+                .filter(slot -> slot.getId().asInt() == slotId)
+                .findFirst()
+                .orElse(null);
+        Preconditions.checkNotNull(extraKeySlot, "missing extra key slot %s", 
slotId);
+        Column column = extraKeySlot.getColumn();
+        Preconditions.checkNotNull(column, "missing column for extra key slot 
%s", slotId);
+        return column.getName();
+    }
+
     @Override
     public int getNumInstances() {
         // In pipeline exec engine, the instance num equals be_num * parallel 
instance.
@@ -1318,6 +1340,9 @@ public class OlapScanNode extends ScanNode {
         if (outputColumnUniqueIds != null) {
             msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
         }
+        if (!extraKeyColumnSlotIds.isEmpty()) {
+            msg.olap_scan_node.setExtraKeyColumnSlotIds(extraKeyColumnSlotIds);
+        }
 
         msg.olap_scan_node.setDistributeColumnIds(new 
ArrayList<>(distributionColumnIds));
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
index c31f96792f2..a6617e595ba 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java
@@ -55,6 +55,8 @@ import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.RepeatNode;
 import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.utframe.TestWithFeService;
 
@@ -101,13 +103,11 @@ public class PhysicalPlanTranslatorTest extends 
TestWithFeService {
         OlapTable t1 = PlanConstructor.newOlapTable(0, "t1", 0, 
KeysType.AGG_KEYS);
         List<String> qualifier = new ArrayList<>();
         qualifier.add("test");
-        List<Slot> t1Output = new ArrayList<>();
-        SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE);
-        SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE);
-        SlotReference col3 = new SlotReference("col2", IntegerType.INSTANCE);
-        t1Output.add(col1);
-        t1Output.add(col2);
-        t1Output.add(col3);
+        SlotReference col1 = 
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(),
+                t1, t1.getBaseSchema().get(0), qualifier);
+        SlotReference col2 = 
SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(),
+                t1, t1.getBaseSchema().get(1), qualifier);
+        List<Slot> t1Output = ImmutableList.of(col1, col2);
         LogicalProperties t1Properties = new LogicalProperties(new 
Supplier<List<Slot>>() {
             @Override
             public List<Slot> get() {
@@ -119,11 +119,11 @@ public class PhysicalPlanTranslatorTest extends 
TestWithFeService {
                 return DataTrait.EMPTY_TRAIT;
             }
         });
-        PhysicalOlapScan scan = new 
PhysicalOlapScan(StatementScopeIdGenerator.newRelationId(), t1, qualifier, 
t1.getBaseIndexId(),
-                Collections.emptyList(), Collections.emptyList(), null, 
PreAggStatus.on(),
-                ImmutableList.of(), Optional.empty(), t1Properties, 
Optional.empty(),
-                ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), 
Optional.empty(),
-                Optional.empty(), ImmutableList.of(), Optional.empty());
+        PhysicalOlapScan scan = new 
PhysicalOlapScan(StatementScopeIdGenerator.newRelationId(), t1, qualifier,
+                t1.getBaseIndexId(), Collections.emptyList(), 
Collections.emptyList(), null, PreAggStatus.on(),
+                ImmutableList.of(), Optional.empty(), t1Properties, 
Optional.empty(), ImmutableList.of(),
+                ImmutableList.of(), ImmutableList.of(), Optional.empty(), 
Optional.empty(), ImmutableList.of(),
+                Optional.empty());
         Literal t1FilterRight = new IntegerLiteral(1);
         Expression t1FilterExpr = new GreaterThan(col1, t1FilterRight);
         PhysicalFilter<PhysicalOlapScan> filter =
@@ -139,6 +139,34 @@ public class PhysicalPlanTranslatorTest extends 
TestWithFeService {
         List<OlapScanNode> scanNodeList = new ArrayList<>();
         planNode.collect(OlapScanNode.class, scanNodeList);
         Assertions.assertEquals(2, 
scanNodeList.get(0).getTupleDesc().getSlots().size());
+        
Assertions.assertTrue(scanNodeList.get(0).getExtraKeyColumnSlotIds().isEmpty());
+
+        List<NamedExpression> extraKeyProjectList = new ArrayList<>();
+        extraKeyProjectList.add(col2);
+        PhysicalProject<PhysicalOlapScan> extraKeyProject = new 
PhysicalProject<>(extraKeyProjectList,
+                placeHolder, scan);
+        PlanTranslatorContext extraKeyPlanTranslatorContext = new 
PlanTranslatorContext();
+        PhysicalPlanTranslator extraKeyTranslator = new 
PhysicalPlanTranslator(extraKeyPlanTranslatorContext, null);
+        PlanFragment extraKeyFragment = 
extraKeyTranslator.visitPhysicalProject(extraKeyProject,
+                extraKeyPlanTranslatorContext);
+        List<OlapScanNode> extraKeyScanNodeList = new ArrayList<>();
+        extraKeyFragment.getPlanRoot().collect(OlapScanNode.class, 
extraKeyScanNodeList);
+        OlapScanNode extraKeyScanNode = extraKeyScanNodeList.get(0);
+        Assertions.assertEquals(2, 
extraKeyScanNode.getTupleDesc().getSlots().size());
+        Assertions.assertEquals("id", 
extraKeyScanNode.getTupleDesc().getSlots().get(0).getColumn().getName());
+        Assertions.assertEquals("name", 
extraKeyScanNode.getTupleDesc().getSlots().get(1).getColumn().getName());
+        Assertions.assertEquals(1, 
extraKeyScanNode.getOutputTupleDesc().getSlots().size());
+        Assertions.assertEquals("name", 
extraKeyScanNode.getOutputTupleDesc().getSlots().get(0).getColumn().getName());
+        Assertions.assertEquals(1, extraKeyScanNode.getProjectList().size());
+
+        int extraKeySlotId = 
extraKeyScanNode.getTupleDesc().getSlots().get(0).getId().asInt();
+        Assertions.assertEquals(ImmutableSet.of(extraKeySlotId), 
extraKeyScanNode.getExtraKeyColumnSlotIds());
+        Assertions.assertTrue(extraKeyScanNode.getNodeExplainString("", 
TExplainLevel.NORMAL)
+                .contains("EXTRA KEY COLUMNS: id"));
+        TPlanNode thriftScanNode = 
extraKeyScanNode.treeToThrift().getNodes().get(0);
+        
Assertions.assertTrue(thriftScanNode.olap_scan_node.isSetExtraKeyColumnSlotIds());
+        Assertions.assertEquals(ImmutableSet.of(extraKeySlotId),
+                thriftScanNode.olap_scan_node.getExtraKeyColumnSlotIds());
     }
 
     @Test
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index fb8ef30150e..5079fb2b710 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1008,6 +1008,8 @@ struct TOlapScanNode {
   // Only partitions that are candidates for pruning are included; partitions 
FE
   // does not want pruned (e.g. default catch-all) are omitted from this list.
   27: optional list<TPartitionBoundary> partition_boundaries
+  // Slot ids of extra storage key columns used only to align the scan tuple 
with storage schema.
+  28: optional set<i32> extra_key_column_slot_ids
 }
 
 struct TEqJoinCondition {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to