This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new e65a09e8b2a [test](be) Cover parquet conjunct local filter (#63705)
e65a09e8b2a is described below

commit e65a09e8b2a1246cbaa264bad5fd3a2966a700bb
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 09:49:33 2026 +0800

    [test](be) Cover parquet conjunct local filter (#63705)
    
    Problem Summary: NewParquetReaderTest only populated
    FileLocalFilter::predicates for local predicate filtering. Parquet row
    group pruning still uses predicates, while row filtering now uses
    conjunct, so the tests need to populate both with matching semantics.
---
 be/src/format/new_parquet/parquet_reader.cpp       | 56 +++++++++++-----------
 be/src/format/new_parquet/parquet_reader.h         |  6 +--
 be/test/format/new_parquet/parquet_reader_test.cpp | 44 ++++++++++++++++-
 3 files changed, 73 insertions(+), 33 deletions(-)

diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index 7f442808523..ff9d939b4d0 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -31,6 +31,7 @@
 #include "common/exception.h"
 #include "core/block/block.h"
 #include "core/data_type/data_type_nullable.h"
+#include "exprs/vexpr_context.h"
 #include "format/new_parquet/column_reader.h"
 #include "format/new_parquet/parquet_column_schema.h"
 #include "format/new_parquet/parquet_statistics.h"
@@ -193,13 +194,8 @@ void ParquetReader::_fill_schema_field(const 
ParquetColumnSchema& column_schema,
     }
 }
 
-bool ParquetReader::_has_structured_filter(const reader::FileLocalFilter& 
local_filter) {
-    for (const auto& predicate : local_filter.predicates) {
-        if (predicate != nullptr) {
-            return true;
-        }
-    }
-    return false;
+bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter& 
local_filter) {
+    return local_filter.conjunct != nullptr;
 }
 
 Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block* 
file_block,
@@ -220,24 +216,27 @@ Status ParquetReader::_read_filter_columns(int64_t 
batch_rows, Block* file_block
             return Status::Corruption("Parquet filter column {} returned {} 
rows, expected {} rows",
                                       column_reader->name(), column_rows, 
batch_rows);
         }
+        file_block->replace_by_position(block_position, std::move(column));
 
         for (const auto& local_filter : _request->local_filters) {
             if (local_filter.file_column_id != file_field_id ||
-                !_has_structured_filter(local_filter)) {
+                !_has_expression_filter(local_filter)) {
                 continue;
             }
             if (*selected_rows == 0) {
                 break;
             }
-            for (const auto& predicate : local_filter.predicates) {
-                *selected_rows = predicate->evaluate(*column, 
selection->data(), *selected_rows);
-                if (*selected_rows == 0) {
-                    break;
-                }
-            }
+            IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+            bool can_filter_all = false;
+            RETURN_IF_ERROR(local_filter.conjunct->execute_filter(
+                    file_block, filter.data(), 
static_cast<size_t>(batch_rows), false,
+                    &can_filter_all));
+            *selected_rows =
+                    can_filter_all
+                            ? 0
+                            : _apply_filter_to_selection(filter, selection, 
*selected_rows);
             break;
         }
-        file_block->replace_by_position(block_position, std::move(column));
         if (*selected_rows == 0) {
             break;
         }
@@ -245,18 +244,6 @@ Status ParquetReader::_read_filter_columns(int64_t 
batch_rows, Block* file_block
     return Status::OK();
 }
 
-Status ParquetReader::_validate_supported_local_filters(
-        const std::vector<reader::FileLocalFilter>& local_filters) {
-    for (const auto& local_filter : local_filters) {
-        if (local_filter.conjunct != nullptr) {
-            return Status::NotSupported(
-                    "Parquet expression filter fallback is not implemented for 
field {}",
-                    local_filter.file_column_id);
-        }
-    }
-    return Status::OK();
-}
-
 IColumn::Filter ParquetReader::_selection_to_filter(const SelectionVector& 
selection,
                                                     uint16_t selected_rows, 
int64_t batch_rows) {
     IColumn::Filter filter(static_cast<size_t>(batch_rows), 0);
@@ -266,6 +253,19 @@ IColumn::Filter ParquetReader::_selection_to_filter(const 
SelectionVector& selec
     return filter;
 }
 
+uint16_t ParquetReader::_apply_filter_to_selection(const IColumn::Filter& 
filter,
+                                                   SelectionVector* selection,
+                                                   uint16_t selected_rows) {
+    uint16_t new_selected_rows = 0;
+    for (uint16_t selection_idx = 0; selection_idx < selected_rows; 
++selection_idx) {
+        const auto row_idx = selection->get_index(selection_idx);
+        if (filter[row_idx] != 0) {
+            selection->set_index(new_selected_rows++, 
static_cast<SelectionVector::Index>(row_idx));
+        }
+    }
+    return new_selected_rows;
+}
+
 Status ParquetReader::_open_next_row_group(bool* has_row_group) {
     *has_row_group = false;
     while (_state->next_row_group_idx < _state->selected_row_groups.size()) {
@@ -456,8 +456,6 @@ Status 
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
                                            local_filter.file_column_id);
         }
     }
-    
RETURN_IF_ERROR(_validate_supported_local_filters(_request->local_filters));
-
     RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata, 
_state->file_schema,
                                                     *_request, 
&_state->selected_row_groups));
     RETURN_IF_ERROR(_reset_reader_position());
diff --git a/be/src/format/new_parquet/parquet_reader.h 
b/be/src/format/new_parquet/parquet_reader.h
index 6920f8c4d78..40213ebb0d6 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -121,13 +121,13 @@ private:
     void _reset_current_row_group();
     void _fill_schema_field(const ParquetColumnSchema& column_schema,
                             reader::SchemaField* field) const;
-    bool _has_structured_filter(const reader::FileLocalFilter& local_filter);
+    bool _has_expression_filter(const reader::FileLocalFilter& local_filter);
     Status _read_filter_columns(int64_t batch_rows, Block* file_block, 
SelectionVector* selection,
                                 uint16_t* selected_rows);
-    Status _validate_supported_local_filters(
-            const std::vector<reader::FileLocalFilter>& local_filters);
     IColumn::Filter _selection_to_filter(const SelectionVector& selection, 
uint16_t selected_rows,
                                          int64_t batch_rows);
+    uint16_t _apply_filter_to_selection(const IColumn::Filter& filter, 
SelectionVector* selection,
+                                        uint16_t selected_rows);
     Status _open_next_row_group(bool* has_row_group);
     Status _read_current_row_group_batch(int64_t batch_rows, Block* 
file_block, size_t* rows);
 
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index e805243ca22..7e28b7fce5b 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -33,6 +33,8 @@
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/primitive_type.h"
 #include "core/field.h"
+#include "exprs/vexpr.h"
+#include "exprs/vexpr_context.h"
 #include "format/new_parquet/parquet_reader.h"
 #include "format/reader/file_reader.h"
 #include "gen_cpp/Types_types.h"
@@ -45,6 +47,43 @@ namespace {
 
 constexpr int64_t ROW_COUNT = 5;
 
+class Int32GreaterThanExpr final : public VExpr {
+public:
+    Int32GreaterThanExpr(int column_id, int32_t value)
+            : VExpr(std::make_shared<DataTypeUInt8>(), false),
+              _column_id(column_id),
+              _value(value) {}
+
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        const auto& input =
+                assert_cast<const 
ColumnInt32&>(*block->get_by_position(_column_id).column);
+        auto result = ColumnUInt8::create();
+        auto& result_data = result->get_data();
+        result_data.resize(count);
+        for (size_t row = 0; row < count; ++row) {
+            const size_t input_row = selector == nullptr ? row : 
(*selector)[row];
+            result_data[row] = input.get_element(input_row) > _value;
+        }
+        result_column = std::move(result);
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    const int _column_id;
+    const int32_t _value;
+    const std::string _expr_name = "Int32GreaterThanExpr";
+};
+
+VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t 
value) {
+    auto ctx = 
VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id, 
value));
+    ctx->_prepared = true;
+    ctx->_opened = true;
+    return ctx;
+}
+
 std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
     std::shared_ptr<arrow::Array> array;
     EXPECT_TRUE(builder->Finish(&array).ok());
@@ -265,6 +304,7 @@ TEST_F(NewParquetReaderTest, 
ReadPredicateAndNonPredicateColumnsWithSelection) {
     request->non_predicate_columns = {1};
     reader::FileLocalFilter filter;
     filter.file_column_id = 0;
+    filter.conjunct = create_int32_greater_than_conjunct(0, 2);
     filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
             0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
     request->local_filters.push_back(std::move(filter));
@@ -306,9 +346,11 @@ TEST_F(NewParquetReaderTest, 
PredicateFiltersRowGroupsByStatistics) {
     std::vector<reader::SchemaField> schema;
     ASSERT_TRUE(reader->get_schema(&schema).ok());
     auto request = std::make_unique<reader::FileScanRequest>();
-    request->non_predicate_columns = {0, 1};
+    request->predicate_columns = {0};
+    request->non_predicate_columns = {1};
     reader::FileLocalFilter filter;
     filter.file_column_id = 0;
+    filter.conjunct = create_int32_greater_than_conjunct(0, 2);
     filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
             0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
     request->local_filters.push_back(std::move(filter));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to