This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new e65a09e8b2a [test](be) Cover parquet conjunct local filter (#63705)
e65a09e8b2a is described below
commit e65a09e8b2a1246cbaa264bad5fd3a2966a700bb
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 09:49:33 2026 +0800
[test](be) Cover parquet conjunct local filter (#63705)
Problem Summary: NewParquetReaderTest only populated
FileLocalFilter::predicates for local predicate filtering. Parquet row
group pruning still uses predicates, while row filtering now uses
conjunct, so the tests need to populate both with matching semantics.
---
be/src/format/new_parquet/parquet_reader.cpp | 56 +++++++++++-----------
be/src/format/new_parquet/parquet_reader.h | 6 +--
be/test/format/new_parquet/parquet_reader_test.cpp | 44 ++++++++++++++++-
3 files changed, 73 insertions(+), 33 deletions(-)
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 7f442808523..ff9d939b4d0 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -31,6 +31,7 @@
#include "common/exception.h"
#include "core/block/block.h"
#include "core/data_type/data_type_nullable.h"
+#include "exprs/vexpr_context.h"
#include "format/new_parquet/column_reader.h"
#include "format/new_parquet/parquet_column_schema.h"
#include "format/new_parquet/parquet_statistics.h"
@@ -193,13 +194,8 @@ void ParquetReader::_fill_schema_field(const
ParquetColumnSchema& column_schema,
}
}
-bool ParquetReader::_has_structured_filter(const reader::FileLocalFilter&
local_filter) {
- for (const auto& predicate : local_filter.predicates) {
- if (predicate != nullptr) {
- return true;
- }
- }
- return false;
+bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter&
local_filter) {
+ return local_filter.conjunct != nullptr;
}
Status ParquetReader::_read_filter_columns(int64_t batch_rows, Block*
file_block,
@@ -220,24 +216,27 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
return Status::Corruption("Parquet filter column {} returned {}
rows, expected {} rows",
column_reader->name(), column_rows,
batch_rows);
}
+ file_block->replace_by_position(block_position, std::move(column));
for (const auto& local_filter : _request->local_filters) {
if (local_filter.file_column_id != file_field_id ||
- !_has_structured_filter(local_filter)) {
+ !_has_expression_filter(local_filter)) {
continue;
}
if (*selected_rows == 0) {
break;
}
- for (const auto& predicate : local_filter.predicates) {
- *selected_rows = predicate->evaluate(*column,
selection->data(), *selected_rows);
- if (*selected_rows == 0) {
- break;
- }
- }
+ IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+ bool can_filter_all = false;
+ RETURN_IF_ERROR(local_filter.conjunct->execute_filter(
+ file_block, filter.data(),
static_cast<size_t>(batch_rows), false,
+ &can_filter_all));
+ *selected_rows =
+ can_filter_all
+ ? 0
+ : _apply_filter_to_selection(filter, selection,
*selected_rows);
break;
}
- file_block->replace_by_position(block_position, std::move(column));
if (*selected_rows == 0) {
break;
}
@@ -245,18 +244,6 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
return Status::OK();
}
-Status ParquetReader::_validate_supported_local_filters(
- const std::vector<reader::FileLocalFilter>& local_filters) {
- for (const auto& local_filter : local_filters) {
- if (local_filter.conjunct != nullptr) {
- return Status::NotSupported(
- "Parquet expression filter fallback is not implemented for
field {}",
- local_filter.file_column_id);
- }
- }
- return Status::OK();
-}
-
IColumn::Filter ParquetReader::_selection_to_filter(const SelectionVector&
selection,
uint16_t selected_rows,
int64_t batch_rows) {
IColumn::Filter filter(static_cast<size_t>(batch_rows), 0);
@@ -266,6 +253,19 @@ IColumn::Filter ParquetReader::_selection_to_filter(const
SelectionVector& selec
return filter;
}
+uint16_t ParquetReader::_apply_filter_to_selection(const IColumn::Filter&
filter,
+ SelectionVector* selection,
+ uint16_t selected_rows) {
+ uint16_t new_selected_rows = 0;
+ for (uint16_t selection_idx = 0; selection_idx < selected_rows;
++selection_idx) {
+ const auto row_idx = selection->get_index(selection_idx);
+ if (filter[row_idx] != 0) {
+ selection->set_index(new_selected_rows++,
static_cast<SelectionVector::Index>(row_idx));
+ }
+ }
+ return new_selected_rows;
+}
+
Status ParquetReader::_open_next_row_group(bool* has_row_group) {
*has_row_group = false;
while (_state->next_row_group_idx < _state->selected_row_groups.size()) {
@@ -456,8 +456,6 @@ Status
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
local_filter.file_column_id);
}
}
-
RETURN_IF_ERROR(_validate_supported_local_filters(_request->local_filters));
-
RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata,
_state->file_schema,
*_request,
&_state->selected_row_groups));
RETURN_IF_ERROR(_reset_reader_position());
diff --git a/be/src/format/new_parquet/parquet_reader.h
b/be/src/format/new_parquet/parquet_reader.h
index 6920f8c4d78..40213ebb0d6 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -121,13 +121,13 @@ private:
void _reset_current_row_group();
void _fill_schema_field(const ParquetColumnSchema& column_schema,
reader::SchemaField* field) const;
- bool _has_structured_filter(const reader::FileLocalFilter& local_filter);
+ bool _has_expression_filter(const reader::FileLocalFilter& local_filter);
Status _read_filter_columns(int64_t batch_rows, Block* file_block,
SelectionVector* selection,
uint16_t* selected_rows);
- Status _validate_supported_local_filters(
- const std::vector<reader::FileLocalFilter>& local_filters);
IColumn::Filter _selection_to_filter(const SelectionVector& selection,
uint16_t selected_rows,
int64_t batch_rows);
+ uint16_t _apply_filter_to_selection(const IColumn::Filter& filter,
SelectionVector* selection,
+ uint16_t selected_rows);
Status _open_next_row_group(bool* has_row_group);
Status _read_current_row_group_batch(int64_t batch_rows, Block*
file_block, size_t* rows);
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index e805243ca22..7e28b7fce5b 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -33,6 +33,8 @@
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/field.h"
+#include "exprs/vexpr.h"
+#include "exprs/vexpr_context.h"
#include "format/new_parquet/parquet_reader.h"
#include "format/reader/file_reader.h"
#include "gen_cpp/Types_types.h"
@@ -45,6 +47,43 @@ namespace {
constexpr int64_t ROW_COUNT = 5;
+class Int32GreaterThanExpr final : public VExpr {
+public:
+ Int32GreaterThanExpr(int column_id, int32_t value)
+ : VExpr(std::make_shared<DataTypeUInt8>(), false),
+ _column_id(column_id),
+ _value(value) {}
+
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ const auto& input =
+ assert_cast<const
ColumnInt32&>(*block->get_by_position(_column_id).column);
+ auto result = ColumnUInt8::create();
+ auto& result_data = result->get_data();
+ result_data.resize(count);
+ for (size_t row = 0; row < count; ++row) {
+ const size_t input_row = selector == nullptr ? row :
(*selector)[row];
+ result_data[row] = input.get_element(input_row) > _value;
+ }
+ result_column = std::move(result);
+ return Status::OK();
+ }
+
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ const int _column_id;
+ const int32_t _value;
+ const std::string _expr_name = "Int32GreaterThanExpr";
+};
+
+VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t
value) {
+ auto ctx =
VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id,
value));
+ ctx->_prepared = true;
+ ctx->_opened = true;
+ return ctx;
+}
+
std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(builder->Finish(&array).ok());
@@ -265,6 +304,7 @@ TEST_F(NewParquetReaderTest,
ReadPredicateAndNonPredicateColumnsWithSelection) {
request->non_predicate_columns = {1};
reader::FileLocalFilter filter;
filter.file_column_id = 0;
+ filter.conjunct = create_int32_greater_than_conjunct(0, 2);
filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
request->local_filters.push_back(std::move(filter));
@@ -306,9 +346,11 @@ TEST_F(NewParquetReaderTest,
PredicateFiltersRowGroupsByStatistics) {
std::vector<reader::SchemaField> schema;
ASSERT_TRUE(reader->get_schema(&schema).ok());
auto request = std::make_unique<reader::FileScanRequest>();
- request->non_predicate_columns = {0, 1};
+ request->predicate_columns = {0};
+ request->non_predicate_columns = {1};
reader::FileLocalFilter filter;
filter.file_column_id = 0;
+ filter.conjunct = create_int32_greater_than_conjunct(0, 2);
filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
0, "id", schema[0].type, Field::create_field<TYPE_INT>(2), false));
request->local_filters.push_back(std::move(filter));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]