This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 0ae7e1ce986 [feature](be) Support parquet minmax aggregate pushdown 
(#63868)
0ae7e1ce986 is described below

commit 0ae7e1ce9869a2950a924e398fa29e38d4ce23ce
Author: Gabriel <[email protected]>
AuthorDate: Fri May 29 14:32:40 2026 +0800

    [feature](be) Support parquet minmax aggregate pushdown (#63868)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary: Add a metadata-backed MIN/MAX aggregate pushdown path
    for external Parquet readers and gate Iceberg v2 aggregate pushdown when
    delete files are present.
    
    ### Release note
    
    Support min/max aggregate pushdown for eligible external Parquet scans.
    
    ### Check List (For Author)
    
    - Test: Unit Test / Manual test
    
    - Added AggregateReaderTest and
    ParquetReaderTest.minmax_pushdown_from_statistics.
    
        - Manual test: git diff --check and git diff --cached --check.
    
    - Not run: run-be-ut.sh failed because this environment only has JDK 11
    and requires JDK 17; clang-format script failed because llvm@16 is not
    installed.
    
    - Behavior changed: Yes, eligible Parquet scans can return min/max
    aggregate rows from footer statistics; unsafe Iceberg delete-file scans
    disable aggregate pushdown.
    
    - Does this need documentation: No
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/format/new_parquet/parquet_reader.cpp       | 108 +++-
 be/src/format/new_parquet/parquet_reader.h         |   3 +
 be/src/format/reader/column_mapper.cpp             |  66 ++-
 be/src/format/reader/column_mapper.h               |   6 +-
 be/src/format/reader/file_reader.h                 |  43 +-
 be/src/format/reader/table_reader.cpp              |  22 +-
 be/src/format/reader/table_reader.h                | 170 +++++-
 be/src/format/table/iceberg_reader_v2.cpp          |  50 +-
 be/src/format/table/iceberg_reader_v2.h            |  11 +-
 be/test/format/new_parquet/parquet_reader_test.cpp |  31 +-
 be/test/format/reader/expr/cast_test.cpp           | 193 +++++-
 be/test/format/reader/table_reader_test.cpp        | 654 ++++++++++++++++++++-
 12 files changed, 1208 insertions(+), 149 deletions(-)

diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 2626df205fa..26093575c11 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -323,35 +323,27 @@ Status ParquetReader::_read_filter_columns(int64_t 
batch_rows, Block* file_block
 Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* 
file_block,
                                                 SelectionVector* selection,
                                                 uint16_t* selected_rows) {
-    // Expression filters may reference several predicate columns. Execute 
them only after all
+    // Conjuncts may reference several predicate columns. Execute them only 
after all referenced
     // predicate columns in the file-local block have been materialized.
-    for (const auto& expression_filter : _request->expression_filters) {
-        if (expression_filter.conjunct == nullptr) {
-            if (expression_filter.delete_conjunct == nullptr) {
-                continue;
-            }
-        } else {
-            if (*selected_rows == 0) {
-                break;
-            }
-            IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
-            bool can_filter_all = false;
-            RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
-                    file_block, filter.data(), 
static_cast<size_t>(batch_rows), false,
-                    &can_filter_all));
-            *selected_rows =
-                    can_filter_all ? 0
-                                   : _apply_filter_to_selection(filter, 
selection, *selected_rows);
-        }
+    for (const auto& conjunct : _request->conjuncts) {
         if (*selected_rows == 0) {
             break;
         }
-        if (expression_filter.delete_conjunct == nullptr) {
-            continue;
+        IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+        bool can_filter_all = false;
+        RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(),
+                                                 
static_cast<size_t>(batch_rows), false,
+                                                 &can_filter_all));
+        *selected_rows =
+                can_filter_all ? 0 : _apply_filter_to_selection(filter, 
selection, *selected_rows);
+    }
+    for (const auto& delete_conjunct : _request->delete_conjuncts) {
+        if (*selected_rows == 0) {
+            break;
         }
         int result_column_id = -1;
-        RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute(
-                expression_filter.delete_conjunct.get(), file_block, 
&result_column_id));
+        
RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(), 
file_block,
+                                                         &result_column_id));
         DORIS_CHECK(result_column_id >= 0 &&
                     result_column_id < 
static_cast<int>(file_block->columns()));
         const auto& delete_filter = assert_cast<const ColumnUInt8&>(
@@ -745,6 +737,76 @@ Status ParquetReader::get_block(Block* file_block, size_t* 
rows, bool* eof) {
     }
 }
 
+Status ParquetReader::get_aggregate_result(const reader::FileAggregateRequest& 
request,
+                                           reader::FileAggregateResult* 
result) {
+    DORIS_CHECK(result != nullptr);
+    if (_state == nullptr || _state->metadata == nullptr || _state->schema == 
nullptr) {
+        return Status::Uninitialized("ParquetReader is not open");
+    }
+    result->count = 0;
+    result->columns.clear();
+    if (request.agg_type != TPushAggOp::type::COUNT &&
+        request.agg_type != TPushAggOp::type::MINMAX) {
+        return Status::NotSupported("Unsupported parquet aggregate pushdown 
type {}",
+                                    request.agg_type);
+    }
+
+    // Aggregate row count in all selected row groups. For MIN/MAX aggregate, 
this is used to determine whether there is no row group selected.
+    for (const auto row_group_idx : _state->selected_row_groups) {
+        auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
+        DORIS_CHECK(row_group_metadata != nullptr);
+        result->count += row_group_metadata->num_rows();
+    }
+    if (request.agg_type == TPushAggOp::type::COUNT) {
+        return Status::OK();
+    }
+
+    result->columns.resize(request.columns.size());
+    for (size_t request_column_idx = 0; request_column_idx < 
request.columns.size();
+         ++request_column_idx) {
+        const auto file_column_id = 
request.columns[request_column_idx].file_column_id;
+        if (file_column_id < 0 ||
+            file_column_id >= 
static_cast<int32_t>(_state->file_schema.size())) {
+            return Status::InvalidArgument("Invalid parquet aggregate column 
id {}",
+                                           file_column_id);
+        }
+        const auto& column_schema = _state->file_schema[file_column_id];
+        DORIS_CHECK(column_schema != nullptr);
+        // TODO: Support min/max pushdown for complex column by traversing 
down to the leaf column readers. This requires supporting complex column 
statistics in parquet file reader, which is currently not implemented in 
parquet-cpp.
+        if (column_schema->leaf_column_id < 0) {
+            return Status::NotSupported(
+                    "Parquet aggregate pushdown only supports primitive column 
{}",
+                    column_schema->name);
+        }
+
+        auto& aggregate_column = result->columns[request_column_idx];
+        for (const auto row_group_idx : _state->selected_row_groups) {
+            auto row_group_metadata = 
_state->metadata->RowGroup(row_group_idx);
+            DORIS_CHECK(row_group_metadata != nullptr);
+            auto column_chunk = 
row_group_metadata->ColumnChunk(column_schema->leaf_column_id);
+            DORIS_CHECK(column_chunk != nullptr);
+            const auto statistics = 
ParquetStatisticsUtils::TransformColumnStatistics(
+                    *column_schema, column_chunk->statistics());
+            if (!statistics.has_min_max) {
+                return Status::NotSupported("Missing parquet min/max 
statistics for column {}",
+                                            column_schema->name);
+            }
+            if (!aggregate_column.has_min || statistics.min_value < 
aggregate_column.min_value) {
+                aggregate_column.min_value = statistics.min_value;
+                aggregate_column.has_min = true;
+            }
+            if (!aggregate_column.has_max || aggregate_column.max_value < 
statistics.max_value) {
+                aggregate_column.max_value = statistics.max_value;
+                aggregate_column.has_max = true;
+            }
+        }
+        if (!aggregate_column.has_min || !aggregate_column.has_max) {
+            return Status::NotSupported("No parquet row group selected for 
min/max pushdown");
+        }
+    }
+    return Status::OK();
+}
+
 Status ParquetReader::close() {
     if (_state != nullptr) {
         if (_state->file_reader != nullptr) {
diff --git a/be/src/format/new_parquet/parquet_reader.h 
b/be/src/format/new_parquet/parquet_reader.h
index 14a891c75e1..85d766f8882 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -69,6 +69,9 @@ public:
     // 返回列必须保持 file-local 语义,不能在这里补 default/generated/partition 列。
     Status get_block(Block* file_block, size_t* rows, bool* eof) override;
 
+    Status get_aggregate_result(const reader::FileAggregateRequest& request,
+                                reader::FileAggregateResult* result) override;
+
     Status close() override;
 
 protected:
diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
index e8e7442a8d7..c6114b20df3 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -43,6 +43,13 @@ struct FileSlotRewriteInfo {
     std::string file_column_name;
 };
 
+static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref,
+                                      const FileSlotRewriteInfo& rewrite_info) 
{
+    return TableSlotRef::create_shared(slot_ref.slot_id(),
+                                       
cast_set<int>(rewrite_info.block_position), -1,
+                                       rewrite_info.file_type, 
rewrite_info.file_column_name);
+}
+
 static VExprSPtr rewrite_table_expr_to_file_expr(
         const VExprSPtr& expr,
         const std::map<int32_t, FileSlotRewriteInfo>& 
table_column_to_file_slot) {
@@ -54,9 +61,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
         const auto rewrite_it = 
table_column_to_file_slot.find(slot_ref->slot_id());
         if (rewrite_it != table_column_to_file_slot.end()) {
             const auto& rewrite_info = rewrite_it->second;
-            auto file_slot = TableSlotRef::create_shared(
-                    slot_ref->slot_id(), 
cast_set<int>(rewrite_info.block_position), -1,
-                    rewrite_info.file_type, rewrite_info.file_column_name);
+            auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info);
             if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
                 return file_slot;
             }
@@ -66,6 +71,27 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
         }
         return expr;
     }
+    // rewrite_table_expr_to_file_expr localizes the expression tree in-place 
because VExpr does
+    // not provide a generic deep-clone API. A previous split may already have 
inserted Cast(slot)
+    // for the same table-level conjunct. Keep that rewrite idempotent: 
rewrite the cast child
+    // from table slot to the current split's file slot, and drop the cast 
when the current split
+    // no longer needs it.
+    if (dynamic_cast<const Cast*>(expr.get()) != nullptr && 
expr->get_num_children() == 1) {
+        const auto& child = expr->children()[0];
+        if (child->is_slot_ref()) {
+            const auto* slot_ref = assert_cast<const VSlotRef*>(child.get());
+            const auto rewrite_it = 
table_column_to_file_slot.find(slot_ref->slot_id());
+            if (rewrite_it != table_column_to_file_slot.end() &&
+                expr->data_type()->equals(*rewrite_it->second.table_type)) {
+                auto rewritten_child = create_file_slot_ref(*slot_ref, 
rewrite_it->second);
+                if 
(rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) {
+                    return rewritten_child;
+                }
+                expr->set_children({std::move(rewritten_child)});
+                return expr;
+            }
+        }
+    }
 
     // VExpr currently does not provide a generic deep-clone API for arbitrary 
expression types.
     // Keep all slot-localization mutation inside ColumnMapper and rebuild it 
for every split
@@ -85,13 +111,28 @@ static constexpr const char* 
ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_update
 
 static void add_scan_column(FileScanRequest* file_request, ColumnId 
file_column_id,
                             std::vector<ColumnId>* scan_columns) {
+    if (scan_columns == &file_request->non_predicate_columns &&
+        std::find(file_request->predicate_columns.begin(), 
file_request->predicate_columns.end(),
+                  file_column_id) != file_request->predicate_columns.end()) {
+        return;
+    }
     // column_positions is the global read-column index for this scan request, 
so it also
     // deduplicates predicate_columns and non_predicate_columns across all 
filter/projection paths.
-    if (file_request->column_positions.count(file_column_id) == 0) {
+    const bool newly_added = 
file_request->column_positions.count(file_column_id) == 0;
+    if (newly_added) {
         file_request->column_positions.emplace(file_column_id,
                                                
file_request->column_positions.size());
+    }
+    if (std::find(scan_columns->begin(), scan_columns->end(), file_column_id) 
==
+        scan_columns->end()) {
         scan_columns->push_back(file_column_id);
     }
+    if (scan_columns == &file_request->predicate_columns) {
+        file_request->non_predicate_columns.erase(
+                std::remove(file_request->non_predicate_columns.begin(),
+                            file_request->non_predicate_columns.end(), 
file_column_id),
+                file_request->non_predicate_columns.end());
+    }
 }
 
 static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
@@ -293,7 +334,8 @@ Status TableColumnMapper::create_scan_request(const 
std::vector<TableFilter>& ta
     file_request->non_predicate_columns.clear();
     file_request->column_positions.clear();
     file_request->complex_projections.clear();
-    file_request->expression_filters.clear();
+    file_request->conjuncts.clear();
+    file_request->delete_conjuncts.clear();
     file_request->column_predicate_filters.clear();
     file_request->reader_expression_map.clear();
     // 1. Build referenced non-predicate columns
@@ -379,19 +421,9 @@ Status TableColumnMapper::localize_filters(const 
std::vector<TableFilter>& table
             continue;
         }
         if (table_filter.conjunct != nullptr) {
-            FileExpressionFilter expression_filter;
-            expression_filter.conjunct =
+            file_request->conjuncts.push_back(
                     
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
-                            table_filter.conjunct->root(), 
table_column_to_file_slot));
-            
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
-            for (const auto table_column_id : table_filter.slot_ids) {
-                const auto* mapping = _find_mapping(table_column_id);
-                if (mapping == nullptr || 
!mapping->file_column_id.has_value()) {
-                    continue;
-                }
-                
expression_filter.file_column_ids.push_back(*mapping->file_column_id);
-            }
-            
file_request->expression_filters.push_back(std::move(expression_filter));
+                            table_filter.conjunct->root(), 
table_column_to_file_slot)));
         }
     }
     for (const auto& [table_column_id, predicates] : table_column_predicates) {
diff --git a/be/src/format/reader/column_mapper.h 
b/be/src/format/reader/column_mapper.h
index 75b53f68d2d..e1839652a47 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -106,7 +106,7 @@ public:
 
     // 把 table-level scan 请求转换成 file-local scan 请求。
     // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
-    // projected_file_columns、expression_filters、column_predicate_filters 和
+    // 
projected_file_columns、conjuncts、delete_conjuncts、column_predicate_filters 和
     // reader_expression_map。
     virtual Status create_scan_request(const std::vector<TableFilter>& 
table_filters,
                                        const TableColumnPredicates& 
table_column_predicates,
@@ -149,7 +149,9 @@ private:
     }
 
     bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& 
file_type) const {
-        return table_type == file_type;
+        DORIS_CHECK(table_type != nullptr);
+        DORIS_CHECK(file_type != nullptr);
+        return table_type->equals(*file_type);
     }
 
     TableColumnMapperOptions _options;
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index 28de8f068b0..7e6d18acedc 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -27,7 +27,9 @@
 
 #include "common/status.h"
 #include "core/data_type/data_type.h"
+#include "core/field.h"
 #include "exprs/vexpr_fwd.h"
+#include "gen_cpp/PlanNodes_types.h"
 #include "io/file_factory.h"
 #include "io/fs/file_reader_writer_fwd.h"
 
@@ -75,15 +77,6 @@ struct FieldProjection {
     std::vector<FieldProjection> children;
 };
 
-// File-local expression filter. It may reference multiple predicate_columns, 
so FileReader should
-// evaluate it after all referenced predicate columns have been materialized 
in the file-local block.
-struct FileExpressionFilter {
-    VExprContextSPtr conjunct;
-    // DeletePredicate
-    VExprContextSPtr delete_conjunct;
-    std::vector<ColumnId> file_column_ids;
-};
-
 // File-local single-column predicates for file-layer pruning, such as 
min/max, page index,
 // dictionary and bloom filter. Predicates must all belong to file_column_id.
 struct FileColumnPredicateFilter {
@@ -108,12 +101,37 @@ struct FileScanRequest {
     std::vector<ColumnId> non_predicate_columns;
     std::map<ColumnId, size_t> column_positions; // file_column_id -> 
file-local block position
     std::map<ColumnId, FieldProjection> complex_projections;
-    std::vector<FileExpressionFilter> expression_filters;
+    // Complex conjuncts converted to file-local predicates from table-level 
predicates.
+    VExprContextSPtrs conjuncts;
+    // Delete predicates converted to file-local predicates.
+    VExprContextSPtrs delete_conjuncts;
+    // Only simple predicates that can be directly evaluated on column, such 
as `a` > 1. Now we use it for zone-map filtering.
     std::vector<FileColumnPredicateFilter> column_predicate_filters;
     // fallback path if filters cannot be localized to file-local predicates. 
The expression can reference projected_file_columns and partition columns.
     std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
 };
 
+struct FileAggregateRequest {
+    struct Column {
+        ColumnId file_column_id = -1;
+    };
+
+    TPushAggOp::type agg_type = TPushAggOp::type::NONE;
+    std::vector<Column> columns;
+};
+
+struct FileAggregateResult {
+    struct Column {
+        bool has_min = false;
+        bool has_max = false;
+        Field min_value;
+        Field max_value;
+    };
+
+    int64_t count = 0;
+    std::vector<Column> columns;
+};
+
 // 文件物理读取层通用接口。
 // 该接口只描述 file-local schema、file-local scan request 和 file-local block。
 // TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。
@@ -188,6 +206,11 @@ public:
         return Status::OK();
     }
 
+    virtual Status get_aggregate_result(const FileAggregateRequest& request,
+                                        FileAggregateResult* result) {
+        return Status::NotSupported("FileReader does not support aggregate 
pushdown");
+    }
+
     // 关闭当前物理文件 reader 并释放文件层状态。
     // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。
     virtual Status close() {
diff --git a/be/src/format/reader/table_reader.cpp 
b/be/src/format/reader/table_reader.cpp
index 8289d637d78..2c92b9ca1a1 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -153,6 +153,7 @@ Status TableReader::init(TableReadOptions options) {
     _io_ctx = options.io_ctx;
     _runtime_state = options.runtime_state;
     _scanner_profile = options.scanner_profile;
+    _push_down_agg_type = options.push_down_agg_type;
     _projected_columns = std::move(options.projected_columns);
     _system_properties = create_system_properties(_scan_params);
     _profile = std::move(options.profile);
@@ -173,19 +174,13 @@ Status TableReader::_build_table_filters_from_conjuncts() 
{
 
 Status TableReader::_open_local_filter_exprs(const FileScanRequest& 
file_request) {
     RowDescriptor row_desc;
-    for (const auto& expression_filter : file_request.expression_filters) {
-        if (expression_filter.conjunct == nullptr) {
-            if (expression_filter.delete_conjunct == nullptr) {
-                continue;
-            }
-        } else {
-            
RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc));
-            RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
-        }
-        if (expression_filter.delete_conjunct != nullptr) {
-            
RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state, 
row_desc));
-            
RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state));
-        }
+    for (const auto& conjunct : file_request.conjuncts) {
+        RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc));
+        RETURN_IF_ERROR(conjunct->open(_runtime_state));
+    }
+    for (const auto& delete_conjunct : file_request.delete_conjuncts) {
+        RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc));
+        RETURN_IF_ERROR(delete_conjunct->open(_runtime_state));
     }
     return Status::OK();
 }
@@ -235,6 +230,7 @@ Status TableReader::prepare_split(const SplitReadOptions& 
options) {
     _current_task = std::make_unique<ScanTask>();
     _current_task->data_file = create_file_description(options.current_range);
     _delete_rows = nullptr;
+    _aggregate_pushdown_tried = false;
     return _parse_delete_predicates(options);
 }
 
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index de7626dfb24..83e0ec44fc8 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -36,6 +36,7 @@
 #include "core/data_type/data_type_nullable.h"
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/data_type_struct.h"
+#include "core/field.h"
 #include "exprs/vexpr_context.h"
 #include "exprs/vexpr_fwd.h"
 #include "format/new_parquet/column_reader.h"
@@ -43,6 +44,7 @@
 #include "format/reader/expr/delete_predicate.h"
 #include "format/reader/expr/slot_ref.h"
 #include "format/reader/file_reader.h"
+#include "gen_cpp/PlanNodes_types.h"
 #include "runtime/descriptors.h"
 
 namespace doris {
@@ -66,8 +68,8 @@ struct TableColumn {
     bool is_partition_key = false;
 };
 
-// table-level filter。
-// TableColumnMapper 负责把它转换成 FileExpressionFilter 或 reader_expression_map。
+// All complex predicates on table/global schema, which cannot be directly 
localized to file
+// schema. They will be evaluated at table level and may depend on multiple 
columns.
 struct TableFilter {
     // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
     VExprContextSPtr conjunct;
@@ -108,21 +110,29 @@ struct ReadProfile {
 };
 
 struct TableReadOptions {
+    // Columns need to be read from file and output by table reader. They are 
all in table/global
+    // schema semantics.
     const std::vector<TableColumn> projected_columns;
+    // Simple predicates for a single column, which is parsed on scan operator.
     const TableColumnPredicates column_predicates;
-    // All conjuncts from scan operator
+    // All complex conjuncts from scan operator
     const VExprContext conjuncts;
+    // File format of the underlying data files, needed for reader 
initialization and reader-level
+    // filter pushdown.
     const FileFormat format;
     TFileScanRangeParams* scan_params;
     std::shared_ptr<io::IOContext> io_ctx;
     RuntimeState* runtime_state;
     RuntimeProfile* scanner_profile;
     const bool allow_missing_columns = true;
+    // Push-down aggregate type.
+    const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
 
     std::unique_ptr<ReadProfile> profile;
 };
 
 struct SplitReadOptions {
+    // Split-level information for reader initialization, which may include 
file path, partition values, delete file info, etc. The content is table format 
specific and opaque to table reader base class; it's the responsibility of the 
concrete table reader implementation to parse necessary information for reader 
initialization and filter pushdown.
     std::map<std::string, Field> partition_values;
     ShardedKVCache* cache;
     TFileRangeDesc current_range;
@@ -175,6 +185,18 @@ public:
                 }
             }
 
+            // Materialize a reduced row set for upper aggregate operators 
when aggregate
+            // pushdown can be applied. This is not the final aggregate 
result: COUNT emits
+            // `count` default rows for the upper COUNT(*), and MIN/MAX emits 
two rows containing
+            // file-level min/max values for the upper MIN/MAX.
+            if (!_aggregate_pushdown_tried) {
+                bool pushed_down = false;
+                
RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down));
+                if (pushed_down) {
+                    return Status::OK();
+                }
+            }
+
             bool current_eof = false;
             _data_reader.block_template.clear_column_data();
             size_t current_rows = 0;
@@ -329,10 +351,8 @@ protected:
                 std::make_shared<DataTypeInt64>(),
                 
parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME));
 
-        FileExpressionFilter delete_filter;
-        delete_filter.delete_conjunct = 
VExprContext::create_shared(std::move(delete_predicate));
-        delete_filter.file_column_ids.push_back(row_position_column_id);
-        request->expression_filters.push_back(std::move(delete_filter));
+        request->delete_conjuncts.push_back(
+                VExprContext::create_shared(std::move(delete_predicate)));
         return Status::OK();
     }
 
@@ -343,7 +363,6 @@ protected:
         _data_reader.reader.reset();
         _data_reader.column_mapper.clear();
         _table_filters.clear();
-        _table_column_predicates.clear();
         _data_reader.file_schema.clear();
         _data_reader.block_schema.clear();
         _data_reader.block_template.clear();
@@ -368,6 +387,62 @@ protected:
     // Materialize virtual columns in table block, such as _row_id and 
_last_updated_sequence_number in Iceberg. This is called after finalize_chunk, 
so the virtual column can be referenced in finalize_expr.
     virtual Status materialize_virtual_columns(Block* table_block) { return 
Status::OK(); }
 
+    Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* 
pushed_down) {
+        DORIS_CHECK(block != nullptr);
+        DORIS_CHECK(pushed_down != nullptr);
+        *pushed_down = false;
+        block->clear_column_data(_projected_columns.size());
+        _aggregate_pushdown_tried = true;
+        if (!_supports_aggregate_pushdown(_push_down_agg_type)) {
+            return Status::OK();
+        }
+
+        FileAggregateRequest file_request;
+        _build_file_aggregate_request(_push_down_agg_type, &file_request);
+        FileAggregateResult file_result;
+        const auto status = 
_data_reader.reader->get_aggregate_result(file_request, &file_result);
+        if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) {
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(
+                _materialize_aggregate_pushdown_rows(_push_down_agg_type, 
file_result, block));
+        *pushed_down = true;
+        RETURN_IF_ERROR(close_current_reader());
+        return Status::OK();
+    }
+
+    virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const 
{
+        // Only COUNT and MIN/MAX can be push down.
+        if (agg_type != TPushAggOp::type::COUNT && agg_type != 
TPushAggOp::type::MINMAX) {
+            return false;
+        }
+        // Only support aggregate pushdown when there is no delete, filter and 
column predicate, so
+        // the reduced rows consumed by the upper aggregate remain 
semantically equivalent to a
+        // normal scan.
+        if (_delete_rows != nullptr && !_delete_rows->empty()) {
+            return false;
+        }
+        if (!_table_filters.empty() || !_table_column_predicates.empty()) {
+            return false;
+        }
+        if (agg_type == TPushAggOp::type::COUNT) {
+            return true;
+        }
+        // For MIN/MAX, only support direct file-to-table column mappings. The 
two emitted rows
+        // must be enough for the upper MIN/MAX aggregate without evaluating 
projections, default
+        // expressions or virtual columns.
+        for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+            if (!mapping.file_column_id.has_value() || 
mapping.has_complex_projection ||
+                mapping.virtual_column_type != TableVirtualColumnType::INVALID 
||
+                mapping.default_expr != nullptr || mapping.file_type == 
nullptr ||
+                mapping.table_type == nullptr) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     Status _materialize_mapping_column(const ColumnMapping& mapping, Block* 
current_block,
                                        const size_t rows, ColumnPtr* column) {
         if (mapping.projection != nullptr) {
@@ -411,6 +486,82 @@ protected:
         return Status::OK();
     }
 
+    void _build_file_aggregate_request(TPushAggOp::type agg_type,
+                                       FileAggregateRequest* request) const {
+        DORIS_CHECK(request != nullptr);
+        DORIS_CHECK(_supports_aggregate_pushdown(agg_type));
+        request->agg_type = agg_type;
+        request->columns.clear();
+        if (agg_type == TPushAggOp::type::COUNT) {
+            return;
+        }
+        request->columns.reserve(_data_reader.column_mapper.mappings().size());
+        for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+            DORIS_CHECK(mapping.file_column_id.has_value());
+            request->columns.push_back({*mapping.file_column_id});
+        }
+    }
+
+    Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type,
+                                                const FileAggregateResult& 
file_result,
+                                                Block* block) {
+        if (agg_type == TPushAggOp::type::COUNT) {
+            // COUNT pushdown is not a final count value. It emits `count` 
default rows so the
+            // upper COUNT(*) aggregate can count them and produce the final 
result, including
+            // zero rows when count is 0.
+            for (size_t column_idx = 0; column_idx < block->columns(); 
++column_idx) {
+                block->replace_by_position(column_idx,
+                                           block->get_by_position(column_idx)
+                                                   
.type->create_column_const_with_default_value(
+                                                           
cast_set<size_t>(file_result.count)));
+            }
+            return Status::OK();
+        }
+        // MIN/MAX pushdown emits two rows, min first and max second, for each 
projected column.
+        // The upper MIN/MAX aggregate consumes those two rows to produce the 
final aggregate value.
+        DORIS_CHECK(file_result.columns.size() == 
_data_reader.column_mapper.mappings().size());
+        DORIS_CHECK(block->columns() == 
_data_reader.column_mapper.mappings().size());
+        Block file_block;
+        file_block.reserve(_data_reader.block_schema.size());
+        for (const auto& field : _data_reader.block_schema) {
+            file_block.insert({field.type->create_column(), field.type, 
field.name});
+        }
+        for (size_t column_idx = 0; column_idx < file_result.columns.size(); 
++column_idx) {
+            const auto& result_column = file_result.columns[column_idx];
+            if (!result_column.has_min || !result_column.has_max) {
+                return Status::NotSupported("Missing min/max aggregate result 
for column {}",
+                                            
_projected_columns[column_idx].name);
+            }
+            const auto& mapping = 
_data_reader.column_mapper.mappings()[column_idx];
+            DORIS_CHECK(mapping.file_column_id.has_value());
+            bool found_file_column = false;
+            for (size_t block_position = 0; block_position < 
_data_reader.block_schema.size();
+                 ++block_position) {
+                if (_data_reader.block_schema[block_position].id == 
*mapping.file_column_id) {
+                    found_file_column = true;
+                    auto column =
+                            
file_block.get_by_position(block_position).column->assume_mutable();
+                    if (column->empty()) {
+                        column->insert(result_column.min_value);
+                        column->insert(result_column.max_value);
+                        file_block.replace_by_position(block_position, 
std::move(column));
+                    }
+                    break;
+                }
+            }
+            DORIS_CHECK(found_file_column);
+        }
+        for (size_t column_idx = 0; column_idx < 
_data_reader.column_mapper.mappings().size();
+             ++column_idx) {
+            ColumnPtr table_column;
+            RETURN_IF_ERROR(
+                    
_materialize_mapping_column(_data_reader.column_mapper.mappings()[column_idx],
+                                                &file_block, 2, 
&table_column));
+            block->replace_by_position(column_idx, std::move(table_column));
+        }
+        return Status::OK();
+    }
+
     struct DataReader {
         std::unique_ptr<FileReader> reader;
         TableColumnMapper column_mapper;
@@ -426,6 +577,7 @@ protected:
     std::shared_ptr<io::FileSystemProperties> _system_properties;
     // partition key -> value
     std::map<std::string, Field> _partition_values;
+    // Predicates built from scan conjuncts before file-level localization.
     std::vector<TableFilter> _table_filters;
     TableColumnPredicates _table_column_predicates;
     VExprContext _conjuncts {nullptr};
@@ -437,6 +589,8 @@ protected:
     RuntimeState* _runtime_state;
     RuntimeProfile* _scanner_profile;
     FileFormat _format;
+    TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
+    bool _aggregate_pushdown_tried = false;
 
 private:
     static const SchemaField* _find_schema_field(const 
std::vector<SchemaField>& schema,
diff --git a/be/src/format/table/iceberg_reader_v2.cpp 
b/be/src/format/table/iceberg_reader_v2.cpp
index ed6649fce2c..f9587361dcf 100644
--- a/be/src/format/table/iceberg_reader_v2.cpp
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -115,6 +115,13 @@ Status 
IcebergTableReader::customize_file_scan_request(reader::FileScanRequest*
     return Status::OK();
 }
 
+bool IcebergTableReader::_supports_aggregate_pushdown(TPushAggOp::type 
agg_type) const {
+    if (!TableReader::_supports_aggregate_pushdown(agg_type)) {
+        return false;
+    }
+    return _equality_delete_filters.empty();
+}
+
 Status IcebergTableReader::_parse_deletion_vector_file(const 
TTableFormatFileDesc& t_desc,
                                                        DeleteFileDesc* desc,
                                                        bool* has_delete_file) {
@@ -184,7 +191,6 @@ Status IcebergTableReader::_init_delete_predicates(const 
TTableFormatFileDesc& t
             equality_delete_files.push_back(delete_file);
         }
     }
-
     // `_delete_rows != nullptr` means DeleteVector is parsed
     if (_delete_rows != nullptr) {
         _position_delete_rows_storage = *_delete_rows;
@@ -250,14 +256,13 @@ std::unique_ptr<io::FileDescription> 
IcebergTableReader::_delete_file_descriptio
     return file_description;
 }
 
-const reader::SchemaField* IcebergTableReader::_find_delete_field(
-        const std::vector<reader::SchemaField>& schema, const std::string& 
name) {
-    for (const auto& field : schema) {
-        if (field.name == name) {
-            return &field;
-        }
+std::string IcebergTableReader::_data_file_path() const {
+    if (_iceberg_params != nullptr && 
_iceberg_params->__isset.original_file_path) {
+        return _iceberg_params->original_file_path;
     }
-    return nullptr;
+    DORIS_CHECK(_current_task != nullptr);
+    DORIS_CHECK(_current_task->data_file != nullptr);
+    return _current_task->data_file->path;
 }
 
 Status 
IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest* 
request) {
@@ -273,8 +278,6 @@ Status 
IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRe
     for (const auto& filter : _equality_delete_filters) {
         auto delete_predicate =
                 std::make_shared<EqualityDeletePredicate>(filter.delete_block, 
filter.field_ids);
-        reader::FileExpressionFilter expression_filter;
-        expression_filter.delete_conjunct = 
VExprContext::create_shared(delete_predicate);
         DCHECK_EQ(filter.field_ids.size(), filter.key_types.size());
         for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) {
             const int field_id = filter.field_ids[idx];
@@ -301,22 +304,13 @@ Status 
IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRe
                 cast_expr->add_child(std::move(slot));
                 delete_predicate->add_child(std::move(cast_expr));
             }
-            expression_filter.file_column_ids.push_back(field_it->id);
         }
-        request->expression_filters.push_back(std::move(expression_filter));
+        request->delete_conjuncts.push_back(
+                VExprContext::create_shared(std::move(delete_predicate)));
     }
     return Status::OK();
 }
 
-std::string IcebergTableReader::_data_file_path() const {
-    if (_iceberg_params != nullptr && 
_iceberg_params->__isset.original_file_path) {
-        return _iceberg_params->original_file_path;
-    }
-    DORIS_CHECK(_current_task != nullptr);
-    DORIS_CHECK(_current_task->data_file != nullptr);
-    return _current_task->data_file->path;
-}
-
 Status IcebergTableReader::_read_parquet_position_delete_file(
         const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams& 
scan_params,
         IcebergDeleteFileIOContext* delete_io_ctx, 
PositionDeleteRowsCollector* collector) {
@@ -344,8 +338,15 @@ Status 
IcebergTableReader::_read_parquet_position_delete_file(
 
     std::vector<reader::SchemaField> schema;
     RETURN_IF_ERROR(reader.get_schema(&schema));
-    const auto* file_path_field = _find_delete_field(schema, 
ICEBERG_FILE_PATH);
-    const auto* pos_field = _find_delete_field(schema, ICEBERG_ROW_POS);
+    reader::SchemaField* file_path_field = nullptr;
+    reader::SchemaField* pos_field = nullptr;
+    for (auto& field : schema) {
+        if (field.name == ICEBERG_FILE_PATH) {
+            file_path_field = &field;
+        } else if (field.name == ICEBERG_ROW_POS) {
+            pos_field = &field;
+        }
+    }
     if (file_path_field == nullptr || pos_field == nullptr) {
         return Status::InternalError("Position delete parquet file is missing 
required columns");
     }
@@ -381,9 +382,8 @@ Status IcebergTableReader::_init_position_delete_rows(
     TFileScanRangeParams delete_scan_params =
             _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
     reader::DeleteRows position_delete_rows;
-    const auto data_file_path = _data_file_path();
     IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
-    PositionDeleteRowsCollector collector(data_file_path, 
&position_delete_rows);
+    PositionDeleteRowsCollector collector(_data_file_path(), 
&position_delete_rows);
     for (const auto& delete_file : delete_files) {
         RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file, 
delete_scan_params,
                                                            &delete_io_ctx, 
&collector));
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
index 497a989289a..a543ae0797d 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -53,6 +53,8 @@ protected:
 
     Status customize_file_scan_request(reader::FileScanRequest* file_request) 
override;
 
+    bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const 
override;
+
     Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, 
DeleteFileDesc* desc,
                                        bool* has_delete_file) override;
 
@@ -93,18 +95,17 @@ private:
     static std::unique_ptr<io::FileDescription> _delete_file_description(
             const TFileRangeDesc& range);
 
-    static const reader::SchemaField* _find_delete_field(
-            const std::vector<reader::SchemaField>& schema, const std::string& 
name);
+    std::string _data_file_path() const;
 
+    // Append row position column to file scan request for position delete 
handling.
     Status _append_row_position_output_column(reader::FileScanRequest* 
request);
-
+    // Append equality delete predicates to file scan request based on the 
delete files in iceberg
+    // params. DeleteVector and position delete files use the common 
DeleteRows path in TableReader.
     Status _append_equality_delete_predicates(reader::FileScanRequest* 
request);
 
     Status _init_equality_delete_predicates(
             const std::vector<TIcebergDeleteFileDesc>& delete_files);
 
-    std::string _data_file_path() const;
-
     // Read equality/position delete files.
     Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc& 
delete_file,
                                               const TFileScanRangeParams& 
scan_params,
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 255ad574a26..6d0156af9ce 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -485,9 +485,7 @@ TEST_F(NewParquetReaderTest, 
ReadPredicateAndNonPredicateColumnsWithSelection) {
     auto request = std::make_unique<reader::FileScanRequest>();
     request->predicate_columns = {0};
     request->non_predicate_columns = {1};
-    reader::FileExpressionFilter expression_filter;
-    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
-    request->expression_filters.push_back(std::move(expression_filter));
+    request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
     reader::FileColumnPredicateFilter column_filter;
     column_filter.file_column_id = 0;
     
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
@@ -532,9 +530,7 @@ TEST_F(NewParquetReaderTest, 
ReadMultiPredicateColumnsBeforeExpressionFilter) {
     auto request = std::make_unique<reader::FileScanRequest>();
     request->predicate_columns = {0, 1};
     request->non_predicate_columns = {};
-    reader::FileExpressionFilter expression_filter;
-    expression_filter.conjunct = create_int32_sum_greater_than_conjunct(0, 1, 
7);
-    request->expression_filters.push_back(std::move(expression_filter));
+    request->conjuncts.push_back(create_int32_sum_greater_than_conjunct(0, 1, 
7));
     ASSERT_TRUE(reader->open(request).ok());
 
     size_t rows = 0;
@@ -567,9 +563,7 @@ TEST_F(NewParquetReaderTest, 
PredicateFiltersRowGroupsByStatistics) {
     auto request = std::make_unique<reader::FileScanRequest>();
     request->predicate_columns = {0};
     request->non_predicate_columns = {1};
-    reader::FileExpressionFilter expression_filter;
-    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
-    request->expression_filters.push_back(std::move(expression_filter));
+    request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
     reader::FileColumnPredicateFilter column_filter;
     column_filter.file_column_id = 0;
     
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
@@ -756,9 +750,7 @@ TEST_F(NewParquetReaderTest, 
RowPositionReaderKeepsPositionsAfterSelection) {
             {0, 0},
             {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
     };
-    reader::FileExpressionFilter expression_filter;
-    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
-    request->expression_filters.push_back(std::move(expression_filter));
+    request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
     ASSERT_TRUE(reader->open(request).ok());
 
     size_t rows = 0;
@@ -800,11 +792,7 @@ TEST_F(NewParquetReaderTest, 
DeletePredicateFiltersRowPositions) {
             {0, 0},
             {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
     };
-    reader::FileExpressionFilter delete_filter;
-    delete_filter.delete_conjunct = 
VExprContext::create_shared(std::move(delete_predicate));
-    delete_filter.file_column_ids.push_back(
-            parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID);
-    request->expression_filters.push_back(std::move(delete_filter));
+    
request->delete_conjuncts.push_back(VExprContext::create_shared(std::move(delete_predicate)));
     ASSERT_TRUE(reader->open(request).ok());
 
     size_t rows = 0;
@@ -846,13 +834,8 @@ TEST_F(NewParquetReaderTest, 
QueryPredicateAndDeletePredicateFilterRowPositions)
             {0, 0},
             {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
     };
-    reader::FileExpressionFilter expression_filter;
-    expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
-    expression_filter.delete_conjunct = 
VExprContext::create_shared(std::move(delete_predicate));
-    expression_filter.file_column_ids.push_back(0);
-    expression_filter.file_column_ids.push_back(
-            parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID);
-    request->expression_filters.push_back(std::move(expression_filter));
+    request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
+    
request->delete_conjuncts.push_back(VExprContext::create_shared(std::move(delete_predicate)));
     ASSERT_TRUE(reader->open(request).ok());
 
     size_t rows = 0;
diff --git a/be/test/format/reader/expr/cast_test.cpp 
b/be/test/format/reader/expr/cast_test.cpp
index a236d327a1f..93858dbf53e 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -238,6 +238,26 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastProjectionForTypeMismatch) {
     mapping.projection->close();
 }
 
+TEST_F(CastTest, ColumnMapperTreatsEquivalentTypesAsTrivial) {
+    reader::TableColumnMapper mapper;
+    reader::TableColumn table_column;
+    table_column.id = 7;
+    table_column.name = "value";
+    table_column.type = std::make_shared<DataTypeInt32>();
+    std::vector<reader::TableColumn> projected_columns {table_column};
+
+    reader::SchemaField file_field;
+    file_field.id = 0;
+    file_field.name = "value";
+    file_field.type = std::make_shared<DataTypeInt32>();
+    std::vector<reader::SchemaField> file_schema {file_field};
+
+    auto status = mapper.create_mapping(projected_columns, {}, file_schema);
+    ASSERT_TRUE(status.ok()) << status;
+    ASSERT_EQ(mapper.mappings().size(), 1);
+    EXPECT_TRUE(mapper.mappings()[0].is_trivial);
+}
+
 TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
     reader::TableColumnMapper mapper;
     reader::TableColumn table_column;
@@ -264,9 +284,9 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastFilterForTypeMismatch) {
     reader::FileScanRequest file_request;
     ASSERT_TRUE(
             mapper.create_scan_request({table_filter}, {}, projected_columns, 
&file_request).ok());
-    ASSERT_EQ(file_request.expression_filters.size(), 1);
+    ASSERT_EQ(file_request.conjuncts.size(), 1);
     ASSERT_EQ(file_request.predicate_columns, 
std::vector<reader::ColumnId>({0}));
-    const auto& localized_expr = 
file_request.expression_filters[0].conjunct->root();
+    const auto& localized_expr = file_request.conjuncts[0]->root();
     ASSERT_EQ(localized_expr->get_num_children(), 1);
     const auto& localized_child = localized_expr->children()[0];
     ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
@@ -279,7 +299,7 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastFilterForTypeMismatch) {
 
     Block block;
     block.insert(ColumnHelper::create_column_with_name<DataTypeInt32>({11, 
22}));
-    auto* conjunct = file_request.expression_filters[0].conjunct.get();
+    auto* conjunct = file_request.conjuncts[0].get();
     status = conjunct->prepare(&state, RowDescriptor());
     ASSERT_TRUE(status.ok()) << status;
     status = conjunct->open(&state);
@@ -293,7 +313,172 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastFilterForTypeMismatch) {
     EXPECT_EQ(filter[0], 0);
     EXPECT_EQ(filter[1], 1);
 
-    file_request.expression_filters[0].conjunct->close();
+    file_request.conjuncts[0]->close();
+}
+
+TEST_F(CastTest, ColumnMapperDoesNotNestCastFilterAcrossScanRequests) {
+    reader::TableColumnMapper mapper;
+    reader::TableColumn table_column;
+    table_column.id = 7;
+    table_column.name = "value";
+    table_column.type = std::make_shared<DataTypeInt64>();
+    std::vector<reader::TableColumn> projected_columns {table_column};
+
+    reader::SchemaField file_field;
+    file_field.id = 0;
+    file_field.name = "value";
+    file_field.type = std::make_shared<DataTypeInt32>();
+    std::vector<reader::SchemaField> file_schema {file_field};
+
+    auto status = mapper.create_mapping(projected_columns, {}, file_schema);
+    ASSERT_TRUE(status.ok()) << status;
+
+    auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+    predicate->add_child(TableSlotRef::create_shared(7, 7, -1, 
table_column.type, "value"));
+    reader::TableFilter table_filter;
+    table_filter.conjunct = VExprContext::create_shared(predicate);
+    table_filter.slot_ids = {7};
+
+    reader::FileScanRequest first_request;
+    ASSERT_TRUE(
+            mapper.create_scan_request({table_filter}, {}, projected_columns, 
&first_request).ok());
+    reader::FileScanRequest second_request;
+    ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, 
projected_columns, &second_request)
+                        .ok());
+
+    ASSERT_EQ(second_request.conjuncts.size(), 1);
+    const auto& localized_expr = second_request.conjuncts[0]->root();
+    ASSERT_EQ(localized_expr->get_num_children(), 1);
+    const auto& localized_child = localized_expr->children()[0];
+    ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
+    ASSERT_EQ(localized_child->get_num_children(), 1);
+    const auto* localized_slot =
+            assert_cast<const 
TableSlotRef*>(localized_child->children()[0].get());
+    EXPECT_EQ(localized_slot->column_id(), 0);
+}
+
+TEST_F(CastTest, ColumnMapperRewritesPreviousCastFilterToMatchingSplitType) {
+    reader::TableColumn table_column;
+    table_column.id = 7;
+    table_column.name = "value";
+    table_column.type = std::make_shared<DataTypeInt64>();
+    std::vector<reader::TableColumn> projected_columns {table_column};
+
+    auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+    predicate->add_child(TableSlotRef::create_shared(7, 7, -1, 
table_column.type, "value"));
+    reader::TableFilter table_filter;
+    table_filter.conjunct = VExprContext::create_shared(predicate);
+    table_filter.slot_ids = {7};
+
+    reader::SchemaField int_file_field;
+    int_file_field.id = 0;
+    int_file_field.name = "value";
+    int_file_field.type = std::make_shared<DataTypeInt32>();
+
+    reader::TableColumnMapper int_mapper;
+    ASSERT_TRUE(int_mapper.create_mapping(projected_columns, {}, 
{int_file_field}).ok());
+    reader::FileScanRequest int_request;
+    ASSERT_TRUE(int_mapper.create_scan_request({table_filter}, {}, 
projected_columns, &int_request)
+                        .ok());
+
+    const auto& int_localized_expr = int_request.conjuncts[0]->root();
+    ASSERT_EQ(int_localized_expr->get_num_children(), 1);
+    ASSERT_NE(dynamic_cast<const 
Cast*>(int_localized_expr->children()[0].get()), nullptr);
+
+    reader::SchemaField bigint_file_field;
+    bigint_file_field.id = 0;
+    bigint_file_field.name = "value";
+    bigint_file_field.type = std::make_shared<DataTypeInt64>();
+
+    reader::TableColumnMapper bigint_mapper;
+    ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, 
{bigint_file_field}).ok());
+    reader::FileScanRequest bigint_request;
+    ASSERT_TRUE(bigint_mapper
+                        .create_scan_request({table_filter}, {}, 
projected_columns, &bigint_request)
+                        .ok());
+
+    const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root();
+    ASSERT_EQ(bigint_localized_expr->get_num_children(), 1);
+    const auto& bigint_localized_child = bigint_localized_expr->children()[0];
+    const auto* localized_slot = assert_cast<const 
TableSlotRef*>(bigint_localized_child.get());
+    EXPECT_EQ(localized_slot->column_id(), 0);
+    EXPECT_TRUE(localized_slot->data_type()->equals(*bigint_file_field.type));
+
+    Block block;
+    block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({11, 
22}));
+    auto* conjunct = bigint_request.conjuncts[0].get();
+    auto status = conjunct->prepare(&state, RowDescriptor());
+    ASSERT_TRUE(status.ok()) << status;
+    status = conjunct->open(&state);
+    ASSERT_TRUE(status.ok()) << status;
+    IColumn::Filter filter(block.rows(), 1);
+    bool can_filter_all = false;
+    status = conjunct->execute_filter(&block, filter.data(), block.rows(), 
false, &can_filter_all);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_FALSE(can_filter_all);
+    ASSERT_EQ(filter.size(), 2);
+    EXPECT_EQ(filter[0], 0);
+    EXPECT_EQ(filter[1], 1);
+    conjunct->close();
+}
+
+TEST_F(CastTest, ColumnMapperKeepsTableSlotIdWhenFileBlockPositionChanges) {
+    reader::TableColumn table_column;
+    table_column.id = 7;
+    table_column.name = "value";
+    table_column.type = std::make_shared<DataTypeInt64>();
+    std::vector<reader::TableColumn> projected_columns {table_column};
+
+    reader::SchemaField file_field;
+    file_field.id = 10;
+    file_field.name = "value";
+    file_field.type = std::make_shared<DataTypeInt64>();
+
+    reader::TableColumnMapper mapper;
+    ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, 
{file_field}).ok());
+
+    auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+    predicate->add_child(TableSlotRef::create_shared(7, 7, -1, 
table_column.type, "value"));
+    reader::TableFilter table_filter;
+    table_filter.conjunct = VExprContext::create_shared(predicate);
+    table_filter.slot_ids = {7};
+
+    reader::FileScanRequest first_request;
+    ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, 
&first_request).ok());
+    ASSERT_EQ(first_request.conjuncts.size(), 1);
+    const auto* first_slot = assert_cast<const TableSlotRef*>(
+            first_request.conjuncts[0]->root()->children()[0].get());
+    EXPECT_EQ(first_slot->slot_id(), 7);
+    EXPECT_EQ(first_slot->column_id(), 0);
+
+    reader::FileScanRequest second_request;
+    second_request.column_positions.emplace(9, 0);
+    second_request.column_positions.emplace(10, 1);
+    second_request.non_predicate_columns.push_back(9);
+    ASSERT_TRUE(mapper.localize_filters({table_filter}, {}, 
&second_request).ok());
+    ASSERT_EQ(second_request.conjuncts.size(), 1);
+    const auto* second_slot = assert_cast<const TableSlotRef*>(
+            second_request.conjuncts[0]->root()->children()[0].get());
+    EXPECT_EQ(second_slot->slot_id(), 7);
+    EXPECT_EQ(second_slot->column_id(), 1);
+
+    Block block;
+    block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({100, 
100}));
+    block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({11, 
22}));
+    auto* conjunct = second_request.conjuncts[0].get();
+    auto status = conjunct->prepare(&state, RowDescriptor());
+    ASSERT_TRUE(status.ok()) << status;
+    status = conjunct->open(&state);
+    ASSERT_TRUE(status.ok()) << status;
+    IColumn::Filter filter(block.rows(), 1);
+    bool can_filter_all = false;
+    status = conjunct->execute_filter(&block, filter.data(), block.rows(), 
false, &can_filter_all);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_FALSE(can_filter_all);
+    ASSERT_EQ(filter.size(), 2);
+    EXPECT_EQ(filter[0], 0);
+    EXPECT_EQ(filter[1], 1);
+    conjunct->close();
 }
 
 } // namespace doris
diff --git a/be/test/format/reader/table_reader_test.cpp 
b/be/test/format/reader/table_reader_test.cpp
index 8a72937002d..1bb6eaf26be 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -268,14 +268,38 @@ void write_parquet_file(const std::string& file_path, 
int32_t id, const std::str
                                                       builder.build()));
 }
 
+void write_iceberg_equality_delete_parquet_file(const std::string& file_path, 
int32_t field_id,
+                                                int32_t value) {
+    const auto metadata =
+            arrow::key_value_metadata({"PARQUET:field_id"}, 
{std::to_string(field_id)});
+    auto schema = arrow::schema({
+            arrow::field("id", arrow::int32(), false)->WithMetadata(metadata),
+    });
+    auto table = arrow::Table::Make(schema, {build_int32_array({value})});
+
+    auto file_result = arrow::io::FileOutputStream::Open(file_path);
+    ASSERT_TRUE(file_result.ok()) << file_result.status();
+    std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+    ::parquet::WriterProperties::Builder builder;
+    builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+    builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+    builder.compression(::parquet::Compression::UNCOMPRESSED);
+    PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table, 
arrow::default_memory_pool(), out, 1,
+                                                      builder.build()));
+}
+
 void write_int_pair_parquet_file(const std::string& file_path, const 
std::vector<int32_t>& ids,
                                  const std::vector<int32_t>& scores,
                                  const std::vector<std::string>& values,
                                  int64_t row_group_size = -1) {
+    const auto id_metadata = arrow::key_value_metadata({"PARQUET:field_id"}, 
{"0"});
+    const auto score_metadata = 
arrow::key_value_metadata({"PARQUET:field_id"}, {"1"});
+    const auto value_metadata = 
arrow::key_value_metadata({"PARQUET:field_id"}, {"2"});
     auto schema = arrow::schema({
-            arrow::field("id", arrow::int32(), false),
-            arrow::field("score", arrow::int32(), false),
-            arrow::field("value", arrow::utf8(), false),
+            arrow::field("id", arrow::int32(), 
false)->WithMetadata(id_metadata),
+            arrow::field("score", arrow::int32(), 
false)->WithMetadata(score_metadata),
+            arrow::field("value", arrow::utf8(), 
false)->WithMetadata(value_metadata),
     });
     auto table = arrow::Table::Make(schema, {build_int32_array(ids), 
build_int32_array(scores),
                                              build_string_array(values)});
@@ -398,6 +422,16 @@ TIcebergDeleteFileDesc 
make_iceberg_position_delete_file(const std::string& path
     return delete_file;
 }
 
+TIcebergDeleteFileDesc make_iceberg_equality_delete_file(const std::string& 
path,
+                                                         const 
std::vector<int32_t>& field_ids) {
+    TIcebergDeleteFileDesc delete_file;
+    delete_file.__set_content(2);
+    delete_file.__set_path(path);
+    delete_file.__set_field_ids(field_ids);
+    delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET);
+    return delete_file;
+}
+
 TFileScanRangeParams make_local_parquet_scan_params() {
     TFileScanRangeParams scan_params;
     scan_params.__set_file_type(TFileType::FILE_LOCAL);
@@ -557,6 +591,268 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
     std::filesystem::remove_all(test_dir);
 }
 
+TEST(TableReaderTest, PushDownCountFromNewParquetReader) {
+    const auto test_dir = std::filesystem::temp_directory_path() / 
"doris_table_reader_count_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40, 
50},
+                                {"one", "two", "three", "four", "five"}, 2);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 5);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownMinMaxFromNewParquetReader) {
+    const auto test_dir = std::filesystem::temp_directory_path() / 
"doris_table_reader_minmax_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20},
+                                {"three", "one", "five", "two"}, 2);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+    projected_columns.push_back(make_table_column(1, "score", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::MINMAX,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 2);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    const auto& score_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
+    EXPECT_EQ(id_column.get_element(0), 1);
+    EXPECT_EQ(id_column.get_element(1), 5);
+    EXPECT_EQ(score_column.get_element(0), 10);
+    EXPECT_EQ(score_column.get_element(1), 50);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownMinMaxCastsFileValueToTableType) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_minmax_cast_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20},
+                                {"three", "one", "five", "two"}, 2);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt64>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::MINMAX,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 2);
+    const auto& id_column = assert_cast<const 
ColumnInt64&>(*block.get_by_position(0).column);
+    EXPECT_EQ(id_column.get_element(0), 1);
+    EXPECT_EQ(id_column.get_element(1), 5);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownCountFallsBackWithTableConjunct) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_count_conjunct_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(
+                                            
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 1);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    EXPECT_EQ(id_column.get_element(0), 3);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownCountFallsBackWithColumnPredicate) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_count_predicate_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"}, 1);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    TableColumnPredicates column_predicates;
+    
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+            0, "id", std::make_shared<DataTypeInt32>(), 
Field::create_field<TYPE_INT>(2), false));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = 
std::move(column_predicates),
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 1);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    EXPECT_EQ(id_column.get_element(0), 3);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownMinMaxFallsBackWithoutDirectFileMapping) {
+    const auto test_dir = std::filesystem::temp_directory_path() /
+                          "doris_table_reader_minmax_missing_mapping_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_parquet_file(file_path, 1, "one");
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(
+            make_table_column(99, "missing_id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::MINMAX,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 1);
+    EXPECT_EQ(block.get_by_position(0).column->get_int(0), 0);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
 TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_conjunct_filter_test";
@@ -644,7 +940,61 @@ TEST(TableReaderTest, 
OpenReaderBuildsColumnPredicateFilters) {
     write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one", 
"two", "three"}, 1);
 
     std::vector<TableColumn> projected_columns;
-    projected_columns.push_back(make_table_column(2, "value", 
std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(2, "value", 
std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    TableColumnPredicates column_predicates;
+    
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+            0, "id", std::make_shared<DataTypeInt32>(), 
Field::create_field<TYPE_INT>(2), false));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = 
std::move(column_predicates),
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = nullptr,
+                                    .io_ctx = nullptr,
+                                    .runtime_state = &state,
+                                    .scanner_profile = nullptr,
+                                    .allow_missing_columns = true,
+                                    .profile = nullptr,
+                            })
+                        .ok());
+
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& value_column = assert_cast<const 
ColumnString&>(*block.get_by_position(0).column);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
+    ASSERT_EQ(id_column.size(), 1);
+    ASSERT_EQ(value_column.size(), 1);
+    EXPECT_EQ(id_column.get_element(0), 3);
+    EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ColumnPredicateSurvivesReopenSplit) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_predicate_reopen_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const std::vector<std::string> file_paths = {
+            (test_dir / "split_1.parquet").string(),
+            (test_dir / "split_2.parquet").string(),
+    };
+    write_int_pair_parquet_file(file_paths[0], {1, 3}, {10, 30}, {"one", 
"three"}, 1);
+    write_int_pair_parquet_file(file_paths[1], {2, 4}, {20, 40}, {"two", 
"four"}, 1);
+
+    std::vector<TableColumn> projected_columns;
     projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
 
     TableColumnPredicates column_predicates;
@@ -667,21 +1017,22 @@ TEST(TableReaderTest, 
OpenReaderBuildsColumnPredicateFilters) {
                             })
                         .ok());
 
-    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+    std::vector<int32_t> ids;
+    for (const auto& file_path : file_paths) {
+        ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
 
-    Block block = build_table_block(projected_columns);
-    bool eos = false;
-    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
-    ASSERT_FALSE(eos);
+        Block block = build_table_block(projected_columns);
+        bool eos = false;
+        ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+        ASSERT_FALSE(eos);
+        const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+        ASSERT_EQ(id_column.size(), 1);
+        ids.push_back(id_column.get_element(0));
 
-    const auto& value_column = assert_cast<const 
ColumnString&>(*block.get_by_position(0).column);
-    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
-    ASSERT_EQ(id_column.size(), 1);
-    ASSERT_EQ(value_column.size(), 1);
-    EXPECT_EQ(id_column.get_element(0), 3);
-    EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+        ASSERT_TRUE(reader.close().ok());
+    }
 
-    ASSERT_TRUE(reader.close().ok());
+    EXPECT_EQ(ids, std::vector<int32_t>({3, 4}));
     std::filesystem::remove_all(test_dir);
 }
 
@@ -763,6 +1114,51 @@ TEST(TableReaderTest, 
CreateScanRequestDeduplicatesSharedPredicateColumns) {
     }
 }
 
+TEST(TableReaderTest, 
CreateScanRequestPromotesProjectedColumnToPredicateColumn) {
+    const auto int_type = std::make_shared<DataTypeInt32>();
+    const std::vector<TableColumn> projected_columns = {
+            make_table_column(0, "id", int_type),
+            make_table_column(1, "score", int_type),
+    };
+    const std::vector<SchemaField> file_schema = {
+            {.id = 0,
+             .name = "id",
+             .type = int_type,
+             .children = {},
+             .file_path = {0},
+             .field_id_path = {0},
+             .name_path = {"id"},
+             .column_type = DATA_COLUMN},
+            {.id = 1,
+             .name = "score",
+             .type = int_type,
+             .children = {},
+             .file_path = {1},
+             .field_id_path = {1},
+             .name_path = {"score"},
+             .column_type = DATA_COLUMN},
+    };
+
+    TableColumnMapper mapper;
+    ASSERT_TRUE(mapper.create_mapping(projected_columns, {}, 
file_schema).ok());
+
+    TableFilter table_filter {
+            .conjunct = VExprContext::create_shared(
+                    std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
+            .slot_ids = {0},
+    };
+
+    FileScanRequest file_request;
+    ASSERT_TRUE(
+            mapper.create_scan_request({table_filter}, {}, projected_columns, 
&file_request).ok());
+
+    EXPECT_EQ(file_request.predicate_columns, std::vector<ColumnId>({0}));
+    EXPECT_EQ(file_request.non_predicate_columns, std::vector<ColumnId>({1}));
+    ASSERT_EQ(file_request.column_positions.size(), 2);
+    EXPECT_EQ(file_request.column_positions.at(0), 1);
+    EXPECT_EQ(file_request.column_positions.at(1), 0);
+}
+
 TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_multi_conjunct_test";
@@ -1194,6 +1590,7 @@ TEST(TableReaderTest, 
IcebergTableReaderAppliesDeletionVectorFile) {
                                     .runtime_state = &state,
                                     .scanner_profile = &profile,
                                     .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
                                     .profile = 
make_table_read_profile(&profile),
                             })
                         .ok());
@@ -1210,6 +1607,226 @@ TEST(TableReaderTest, 
IcebergTableReaderAppliesDeletionVectorFile) {
     std::filesystem::remove_all(test_dir);
 }
 
+TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithDeletes) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_iceberg_aggregate_delete_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    const auto dv_path = (test_dir / "delete-vector.bin").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+    const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeProfile profile("test_profile");
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    auto scan_params = make_local_parquet_scan_params();
+    io::FileReaderStats file_reader_stats;
+    io::FileCacheStatistics file_cache_stats;
+    auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+    ShardedKVCache cache(1);
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = &scan_params,
+                                    .io_ctx = io_ctx,
+                                    .runtime_state = &state,
+                                    .scanner_profile = &profile,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
+                                    .profile = 
make_table_read_profile(&profile),
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    split_options.cache = &cache;
+    
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+            file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)}));
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 2);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    EXPECT_EQ(id_column.get_element(0), 2);
+    EXPECT_EQ(id_column.get_element(1), 3);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, 
IcebergTableReaderDoesNotPushDownAggregateWithPositionDelete) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_iceberg_aggregate_position_delete_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    const auto delete_file_path = (test_dir / 
"position-delete.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+    write_position_delete_parquet_file(delete_file_path, {file_path}, {1});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeProfile profile("test_profile");
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    auto scan_params = make_local_parquet_scan_params();
+    io::FileReaderStats file_reader_stats;
+    io::FileCacheStatistics file_cache_stats;
+    auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+    ShardedKVCache cache(1);
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = &scan_params,
+                                    .io_ctx = io_ctx,
+                                    .runtime_state = &state,
+                                    .scanner_profile = &profile,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
+                                    .profile = 
make_table_read_profile(&profile),
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    split_options.cache = &cache;
+    
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+            file_path, {make_iceberg_position_delete_file(delete_file_path)}));
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 2);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    EXPECT_EQ(id_column.get_element(0), 1);
+    EXPECT_EQ(id_column.get_element(1), 3);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, IcebergPositionDeleteFallsBackToSplitPath) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_iceberg_position_delete_path_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    const auto delete_file_path = (test_dir / 
"position-delete.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+    write_position_delete_parquet_file(delete_file_path, {file_path}, {1});
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeProfile profile("test_profile");
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    auto scan_params = make_local_parquet_scan_params();
+    io::FileReaderStats file_reader_stats;
+    io::FileCacheStatistics file_cache_stats;
+    auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+    ShardedKVCache cache(1);
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = &scan_params,
+                                    .io_ctx = io_ctx,
+                                    .runtime_state = &state,
+                                    .scanner_profile = &profile,
+                                    .allow_missing_columns = true,
+                                    .profile = 
make_table_read_profile(&profile),
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    split_options.cache = &cache;
+    TTableFormatFileDesc table_format_params;
+    TIcebergFileDesc iceberg_params;
+    iceberg_params.__set_format_version(2);
+    
iceberg_params.__set_delete_files({make_iceberg_position_delete_file(delete_file_path)});
+    table_format_params.__set_iceberg_params(iceberg_params);
+    split_options.current_range.__set_table_format_params(table_format_params);
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    EXPECT_EQ(read_iceberg_ids(&reader, projected_columns), 
std::vector<int32_t>({1, 3}));
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, 
IcebergTableReaderDoesNotPushDownAggregateWithEqualityDelete) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_iceberg_aggregate_equality_delete_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    const auto delete_file_path = (test_dir / 
"equality-delete.parquet").string();
+    write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one", 
"two", "three"});
+    write_iceberg_equality_delete_parquet_file(delete_file_path, 0, 2);
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeProfile profile("test_profile");
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    auto scan_params = make_local_parquet_scan_params();
+    io::FileReaderStats file_reader_stats;
+    io::FileCacheStatistics file_cache_stats;
+    auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+    ShardedKVCache cache(1);
+    doris::iceberg::IcebergTableReader reader;
+    ASSERT_TRUE(reader.init({
+                                    .projected_columns = projected_columns,
+                                    .column_predicates = {},
+                                    .conjuncts = VExprContext(nullptr),
+                                    .format = FileFormat::PARQUET,
+                                    .scan_params = &scan_params,
+                                    .io_ctx = io_ctx,
+                                    .runtime_state = &state,
+                                    .scanner_profile = &profile,
+                                    .allow_missing_columns = true,
+                                    .push_down_agg_type = 
TPushAggOp::type::COUNT,
+                                    .profile = 
make_table_read_profile(&profile),
+                            })
+                        .ok());
+
+    auto split_options = build_split_options(file_path);
+    split_options.cache = &cache;
+    
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+            file_path, {make_iceberg_equality_delete_file(delete_file_path, 
{0})}));
+    ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 2);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    EXPECT_EQ(id_column.get_element(0), 1);
+    EXPECT_EQ(id_column.get_element(1), 3);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
 TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_iceberg_position_delete_file_test";
@@ -1332,8 +1949,9 @@ TEST(TableReaderTest, 
RowPositionDeletePredicateColumnIsNotRepeatedAsOutputColum
     EXPECT_EQ(request.non_predicate_columns, std::vector<ColumnId>({0}));
     ASSERT_TRUE(request.column_positions.contains(row_position_column_id));
     EXPECT_EQ(request.column_positions.at(row_position_column_id), 1);
-    ASSERT_EQ(request.expression_filters.size(), 1);
-    EXPECT_NE(request.expression_filters[0].delete_conjunct, nullptr);
+    ASSERT_TRUE(request.conjuncts.empty());
+    ASSERT_EQ(request.delete_conjuncts.size(), 1);
+    EXPECT_NE(request.delete_conjuncts[0], nullptr);
 }
 
 TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to