This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 0ae7e1ce986 [feature](be) Support parquet minmax aggregate pushdown
(#63868)
0ae7e1ce986 is described below
commit 0ae7e1ce9869a2950a924e398fa29e38d4ce23ce
Author: Gabriel <[email protected]>
AuthorDate: Fri May 29 14:32:40 2026 +0800
[feature](be) Support parquet minmax aggregate pushdown (#63868)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Add a metadata-backed MIN/MAX aggregate pushdown path
for external Parquet readers and gate Iceberg v2 aggregate pushdown when
delete files are present.
### Release note
Support min/max aggregate pushdown for eligible external Parquet scans.
### Check List (For Author)
- Test: Unit Test / Manual test
- Added AggregateReaderTest and
ParquetReaderTest.minmax_pushdown_from_statistics.
- Manual test: git diff --check and git diff --cached --check.
- Not run: run-be-ut.sh failed because this environment only has JDK 11
and requires JDK 17; clang-format script failed because llvm@16 is not
installed.
- Behavior changed: Yes, eligible Parquet scans can return min/max
aggregate rows from footer statistics; unsafe Iceberg delete-file scans
disable aggregate pushdown.
- Does this need documentation: No
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/format/new_parquet/parquet_reader.cpp | 108 +++-
be/src/format/new_parquet/parquet_reader.h | 3 +
be/src/format/reader/column_mapper.cpp | 66 ++-
be/src/format/reader/column_mapper.h | 6 +-
be/src/format/reader/file_reader.h | 43 +-
be/src/format/reader/table_reader.cpp | 22 +-
be/src/format/reader/table_reader.h | 170 +++++-
be/src/format/table/iceberg_reader_v2.cpp | 50 +-
be/src/format/table/iceberg_reader_v2.h | 11 +-
be/test/format/new_parquet/parquet_reader_test.cpp | 31 +-
be/test/format/reader/expr/cast_test.cpp | 193 +++++-
be/test/format/reader/table_reader_test.cpp | 654 ++++++++++++++++++++-
12 files changed, 1208 insertions(+), 149 deletions(-)
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 2626df205fa..26093575c11 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -323,35 +323,27 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block*
file_block,
SelectionVector* selection,
uint16_t* selected_rows) {
- // Expression filters may reference several predicate columns. Execute
them only after all
+ // Conjuncts may reference several predicate columns. Execute them only
after all referenced
// predicate columns in the file-local block have been materialized.
- for (const auto& expression_filter : _request->expression_filters) {
- if (expression_filter.conjunct == nullptr) {
- if (expression_filter.delete_conjunct == nullptr) {
- continue;
- }
- } else {
- if (*selected_rows == 0) {
- break;
- }
- IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
- bool can_filter_all = false;
- RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
- file_block, filter.data(),
static_cast<size_t>(batch_rows), false,
- &can_filter_all));
- *selected_rows =
- can_filter_all ? 0
- : _apply_filter_to_selection(filter,
selection, *selected_rows);
- }
+ for (const auto& conjunct : _request->conjuncts) {
if (*selected_rows == 0) {
break;
}
- if (expression_filter.delete_conjunct == nullptr) {
- continue;
+ IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+ bool can_filter_all = false;
+ RETURN_IF_ERROR(conjunct->execute_filter(file_block, filter.data(),
+
static_cast<size_t>(batch_rows), false,
+ &can_filter_all));
+ *selected_rows =
+ can_filter_all ? 0 : _apply_filter_to_selection(filter,
selection, *selected_rows);
+ }
+ for (const auto& delete_conjunct : _request->delete_conjuncts) {
+ if (*selected_rows == 0) {
+ break;
}
int result_column_id = -1;
- RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute(
- expression_filter.delete_conjunct.get(), file_block,
&result_column_id));
+
RETURN_IF_ERROR(delete_conjunct->root()->execute(delete_conjunct.get(),
file_block,
+ &result_column_id));
DORIS_CHECK(result_column_id >= 0 &&
result_column_id <
static_cast<int>(file_block->columns()));
const auto& delete_filter = assert_cast<const ColumnUInt8&>(
@@ -745,6 +737,76 @@ Status ParquetReader::get_block(Block* file_block, size_t*
rows, bool* eof) {
}
}
+Status ParquetReader::get_aggregate_result(const reader::FileAggregateRequest&
request,
+ reader::FileAggregateResult*
result) {
+ DORIS_CHECK(result != nullptr);
+ if (_state == nullptr || _state->metadata == nullptr || _state->schema ==
nullptr) {
+ return Status::Uninitialized("ParquetReader is not open");
+ }
+ result->count = 0;
+ result->columns.clear();
+ if (request.agg_type != TPushAggOp::type::COUNT &&
+ request.agg_type != TPushAggOp::type::MINMAX) {
+ return Status::NotSupported("Unsupported parquet aggregate pushdown
type {}",
+ request.agg_type);
+ }
+
+ // Aggregate row count in all selected row groups. For MIN/MAX aggregate,
this is used to determine whether there is no row group selected.
+ for (const auto row_group_idx : _state->selected_row_groups) {
+ auto row_group_metadata = _state->metadata->RowGroup(row_group_idx);
+ DORIS_CHECK(row_group_metadata != nullptr);
+ result->count += row_group_metadata->num_rows();
+ }
+ if (request.agg_type == TPushAggOp::type::COUNT) {
+ return Status::OK();
+ }
+
+ result->columns.resize(request.columns.size());
+ for (size_t request_column_idx = 0; request_column_idx <
request.columns.size();
+ ++request_column_idx) {
+ const auto file_column_id =
request.columns[request_column_idx].file_column_id;
+ if (file_column_id < 0 ||
+ file_column_id >=
static_cast<int32_t>(_state->file_schema.size())) {
+ return Status::InvalidArgument("Invalid parquet aggregate column
id {}",
+ file_column_id);
+ }
+ const auto& column_schema = _state->file_schema[file_column_id];
+ DORIS_CHECK(column_schema != nullptr);
+ // TODO: Support min/max pushdown for complex column by traversing
down to the leaf column readers. This requires supporting complex column
statistics in parquet file reader, which is currently not implemented in
parquet-cpp.
+ if (column_schema->leaf_column_id < 0) {
+ return Status::NotSupported(
+ "Parquet aggregate pushdown only supports primitive column
{}",
+ column_schema->name);
+ }
+
+ auto& aggregate_column = result->columns[request_column_idx];
+ for (const auto row_group_idx : _state->selected_row_groups) {
+ auto row_group_metadata =
_state->metadata->RowGroup(row_group_idx);
+ DORIS_CHECK(row_group_metadata != nullptr);
+ auto column_chunk =
row_group_metadata->ColumnChunk(column_schema->leaf_column_id);
+ DORIS_CHECK(column_chunk != nullptr);
+ const auto statistics =
ParquetStatisticsUtils::TransformColumnStatistics(
+ *column_schema, column_chunk->statistics());
+ if (!statistics.has_min_max) {
+ return Status::NotSupported("Missing parquet min/max
statistics for column {}",
+ column_schema->name);
+ }
+ if (!aggregate_column.has_min || statistics.min_value <
aggregate_column.min_value) {
+ aggregate_column.min_value = statistics.min_value;
+ aggregate_column.has_min = true;
+ }
+ if (!aggregate_column.has_max || aggregate_column.max_value <
statistics.max_value) {
+ aggregate_column.max_value = statistics.max_value;
+ aggregate_column.has_max = true;
+ }
+ }
+ if (!aggregate_column.has_min || !aggregate_column.has_max) {
+ return Status::NotSupported("No parquet row group selected for
min/max pushdown");
+ }
+ }
+ return Status::OK();
+}
+
Status ParquetReader::close() {
if (_state != nullptr) {
if (_state->file_reader != nullptr) {
diff --git a/be/src/format/new_parquet/parquet_reader.h
b/be/src/format/new_parquet/parquet_reader.h
index 14a891c75e1..85d766f8882 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -69,6 +69,9 @@ public:
// 返回列必须保持 file-local 语义,不能在这里补 default/generated/partition 列。
Status get_block(Block* file_block, size_t* rows, bool* eof) override;
+ Status get_aggregate_result(const reader::FileAggregateRequest& request,
+ reader::FileAggregateResult* result) override;
+
Status close() override;
protected:
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index e8e7442a8d7..c6114b20df3 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -43,6 +43,13 @@ struct FileSlotRewriteInfo {
std::string file_column_name;
};
+static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref,
+ const FileSlotRewriteInfo& rewrite_info)
{
+ return TableSlotRef::create_shared(slot_ref.slot_id(),
+
cast_set<int>(rewrite_info.block_position), -1,
+ rewrite_info.file_type,
rewrite_info.file_column_name);
+}
+
static VExprSPtr rewrite_table_expr_to_file_expr(
const VExprSPtr& expr,
const std::map<int32_t, FileSlotRewriteInfo>&
table_column_to_file_slot) {
@@ -54,9 +61,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
const auto rewrite_it =
table_column_to_file_slot.find(slot_ref->slot_id());
if (rewrite_it != table_column_to_file_slot.end()) {
const auto& rewrite_info = rewrite_it->second;
- auto file_slot = TableSlotRef::create_shared(
- slot_ref->slot_id(),
cast_set<int>(rewrite_info.block_position), -1,
- rewrite_info.file_type, rewrite_info.file_column_name);
+ auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info);
if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
return file_slot;
}
@@ -66,6 +71,27 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
}
return expr;
}
+ // rewrite_table_expr_to_file_expr localizes the expression tree in-place
because VExpr does
+ // not provide a generic deep-clone API. A previous split may already have
inserted Cast(slot)
+ // for the same table-level conjunct. Keep that rewrite idempotent:
rewrite the cast child
+ // from table slot to the current split's file slot, and drop the cast
when the current split
+ // no longer needs it.
+ if (dynamic_cast<const Cast*>(expr.get()) != nullptr &&
expr->get_num_children() == 1) {
+ const auto& child = expr->children()[0];
+ if (child->is_slot_ref()) {
+ const auto* slot_ref = assert_cast<const VSlotRef*>(child.get());
+ const auto rewrite_it =
table_column_to_file_slot.find(slot_ref->slot_id());
+ if (rewrite_it != table_column_to_file_slot.end() &&
+ expr->data_type()->equals(*rewrite_it->second.table_type)) {
+ auto rewritten_child = create_file_slot_ref(*slot_ref,
rewrite_it->second);
+ if
(rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) {
+ return rewritten_child;
+ }
+ expr->set_children({std::move(rewritten_child)});
+ return expr;
+ }
+ }
+ }
// VExpr currently does not provide a generic deep-clone API for arbitrary
expression types.
// Keep all slot-localization mutation inside ColumnMapper and rebuild it
for every split
@@ -85,13 +111,28 @@ static constexpr const char*
ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_update
static void add_scan_column(FileScanRequest* file_request, ColumnId
file_column_id,
std::vector<ColumnId>* scan_columns) {
+ if (scan_columns == &file_request->non_predicate_columns &&
+ std::find(file_request->predicate_columns.begin(),
file_request->predicate_columns.end(),
+ file_column_id) != file_request->predicate_columns.end()) {
+ return;
+ }
// column_positions is the global read-column index for this scan request,
so it also
// deduplicates predicate_columns and non_predicate_columns across all
filter/projection paths.
- if (file_request->column_positions.count(file_column_id) == 0) {
+ const bool newly_added =
file_request->column_positions.count(file_column_id) == 0;
+ if (newly_added) {
file_request->column_positions.emplace(file_column_id,
file_request->column_positions.size());
+ }
+ if (std::find(scan_columns->begin(), scan_columns->end(), file_column_id)
==
+ scan_columns->end()) {
scan_columns->push_back(file_column_id);
}
+ if (scan_columns == &file_request->predicate_columns) {
+ file_request->non_predicate_columns.erase(
+ std::remove(file_request->non_predicate_columns.begin(),
+ file_request->non_predicate_columns.end(),
file_column_id),
+ file_request->non_predicate_columns.end());
+ }
}
static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
@@ -293,7 +334,8 @@ Status TableColumnMapper::create_scan_request(const
std::vector<TableFilter>& ta
file_request->non_predicate_columns.clear();
file_request->column_positions.clear();
file_request->complex_projections.clear();
- file_request->expression_filters.clear();
+ file_request->conjuncts.clear();
+ file_request->delete_conjuncts.clear();
file_request->column_predicate_filters.clear();
file_request->reader_expression_map.clear();
// 1. Build referenced non-predicate columns
@@ -379,19 +421,9 @@ Status TableColumnMapper::localize_filters(const
std::vector<TableFilter>& table
continue;
}
if (table_filter.conjunct != nullptr) {
- FileExpressionFilter expression_filter;
- expression_filter.conjunct =
+ file_request->conjuncts.push_back(
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
- table_filter.conjunct->root(),
table_column_to_file_slot));
-
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
- for (const auto table_column_id : table_filter.slot_ids) {
- const auto* mapping = _find_mapping(table_column_id);
- if (mapping == nullptr ||
!mapping->file_column_id.has_value()) {
- continue;
- }
-
expression_filter.file_column_ids.push_back(*mapping->file_column_id);
- }
-
file_request->expression_filters.push_back(std::move(expression_filter));
+ table_filter.conjunct->root(),
table_column_to_file_slot)));
}
}
for (const auto& [table_column_id, predicates] : table_column_predicates) {
diff --git a/be/src/format/reader/column_mapper.h
b/be/src/format/reader/column_mapper.h
index 75b53f68d2d..e1839652a47 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -106,7 +106,7 @@ public:
// 把 table-level scan 请求转换成 file-local scan 请求。
// table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
- // projected_file_columns、expression_filters、column_predicate_filters 和
+ //
projected_file_columns、conjuncts、delete_conjuncts、column_predicate_filters 和
// reader_expression_map。
virtual Status create_scan_request(const std::vector<TableFilter>&
table_filters,
const TableColumnPredicates&
table_column_predicates,
@@ -149,7 +149,9 @@ private:
}
bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr&
file_type) const {
- return table_type == file_type;
+ DORIS_CHECK(table_type != nullptr);
+ DORIS_CHECK(file_type != nullptr);
+ return table_type->equals(*file_type);
}
TableColumnMapperOptions _options;
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index 28de8f068b0..7e6d18acedc 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -27,7 +27,9 @@
#include "common/status.h"
#include "core/data_type/data_type.h"
+#include "core/field.h"
#include "exprs/vexpr_fwd.h"
+#include "gen_cpp/PlanNodes_types.h"
#include "io/file_factory.h"
#include "io/fs/file_reader_writer_fwd.h"
@@ -75,15 +77,6 @@ struct FieldProjection {
std::vector<FieldProjection> children;
};
-// File-local expression filter. It may reference multiple predicate_columns,
so FileReader should
-// evaluate it after all referenced predicate columns have been materialized
in the file-local block.
-struct FileExpressionFilter {
- VExprContextSPtr conjunct;
- // DeletePredicate
- VExprContextSPtr delete_conjunct;
- std::vector<ColumnId> file_column_ids;
-};
-
// File-local single-column predicates for file-layer pruning, such as
min/max, page index,
// dictionary and bloom filter. Predicates must all belong to file_column_id.
struct FileColumnPredicateFilter {
@@ -108,12 +101,37 @@ struct FileScanRequest {
std::vector<ColumnId> non_predicate_columns;
std::map<ColumnId, size_t> column_positions; // file_column_id ->
file-local block position
std::map<ColumnId, FieldProjection> complex_projections;
- std::vector<FileExpressionFilter> expression_filters;
+ // Complex conjuncts converted to file-local predicates from table-level
predicates.
+ VExprContextSPtrs conjuncts;
+ // Delete predicates converted to file-local predicates.
+ VExprContextSPtrs delete_conjuncts;
+ // Only simple predicates that can be directly evaluated on column, such
as `a` > 1. Now we use it for zone-map filtering.
std::vector<FileColumnPredicateFilter> column_predicate_filters;
// fallback path if filters cannot be localized to file-local predicates.
The expression can reference projected_file_columns and partition columns.
std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
};
+struct FileAggregateRequest {
+ struct Column {
+ ColumnId file_column_id = -1;
+ };
+
+ TPushAggOp::type agg_type = TPushAggOp::type::NONE;
+ std::vector<Column> columns;
+};
+
+struct FileAggregateResult {
+ struct Column {
+ bool has_min = false;
+ bool has_max = false;
+ Field min_value;
+ Field max_value;
+ };
+
+ int64_t count = 0;
+ std::vector<Column> columns;
+};
+
// 文件物理读取层通用接口。
// 该接口只描述 file-local schema、file-local scan request 和 file-local block。
// TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。
@@ -188,6 +206,11 @@ public:
return Status::OK();
}
+ virtual Status get_aggregate_result(const FileAggregateRequest& request,
+ FileAggregateResult* result) {
+ return Status::NotSupported("FileReader does not support aggregate
pushdown");
+ }
+
// 关闭当前物理文件 reader 并释放文件层状态。
// 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。
virtual Status close() {
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index 8289d637d78..2c92b9ca1a1 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -153,6 +153,7 @@ Status TableReader::init(TableReadOptions options) {
_io_ctx = options.io_ctx;
_runtime_state = options.runtime_state;
_scanner_profile = options.scanner_profile;
+ _push_down_agg_type = options.push_down_agg_type;
_projected_columns = std::move(options.projected_columns);
_system_properties = create_system_properties(_scan_params);
_profile = std::move(options.profile);
@@ -173,19 +174,13 @@ Status TableReader::_build_table_filters_from_conjuncts()
{
Status TableReader::_open_local_filter_exprs(const FileScanRequest&
file_request) {
RowDescriptor row_desc;
- for (const auto& expression_filter : file_request.expression_filters) {
- if (expression_filter.conjunct == nullptr) {
- if (expression_filter.delete_conjunct == nullptr) {
- continue;
- }
- } else {
-
RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc));
- RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
- }
- if (expression_filter.delete_conjunct != nullptr) {
-
RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state,
row_desc));
-
RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state));
- }
+ for (const auto& conjunct : file_request.conjuncts) {
+ RETURN_IF_ERROR(conjunct->prepare(_runtime_state, row_desc));
+ RETURN_IF_ERROR(conjunct->open(_runtime_state));
+ }
+ for (const auto& delete_conjunct : file_request.delete_conjuncts) {
+ RETURN_IF_ERROR(delete_conjunct->prepare(_runtime_state, row_desc));
+ RETURN_IF_ERROR(delete_conjunct->open(_runtime_state));
}
return Status::OK();
}
@@ -235,6 +230,7 @@ Status TableReader::prepare_split(const SplitReadOptions&
options) {
_current_task = std::make_unique<ScanTask>();
_current_task->data_file = create_file_description(options.current_range);
_delete_rows = nullptr;
+ _aggregate_pushdown_tried = false;
return _parse_delete_predicates(options);
}
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index de7626dfb24..83e0ec44fc8 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -36,6 +36,7 @@
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_struct.h"
+#include "core/field.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "format/new_parquet/column_reader.h"
@@ -43,6 +44,7 @@
#include "format/reader/expr/delete_predicate.h"
#include "format/reader/expr/slot_ref.h"
#include "format/reader/file_reader.h"
+#include "gen_cpp/PlanNodes_types.h"
#include "runtime/descriptors.h"
namespace doris {
@@ -66,8 +68,8 @@ struct TableColumn {
bool is_partition_key = false;
};
-// table-level filter。
-// TableColumnMapper 负责把它转换成 FileExpressionFilter 或 reader_expression_map。
+// All complex predicates on table/global schema, which cannot be directly
localized to file
+// schema. They will be evaluated at table level and may depend on multiple
columns.
struct TableFilter {
// 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
VExprContextSPtr conjunct;
@@ -108,21 +110,29 @@ struct ReadProfile {
};
struct TableReadOptions {
+ // Columns need to be read from file and output by table reader. They are
all in table/global
+ // schema semantics.
const std::vector<TableColumn> projected_columns;
+ // Simple predicates for a single column, which is parsed on scan operator.
const TableColumnPredicates column_predicates;
- // All conjuncts from scan operator
+ // All complex conjuncts from scan operator
const VExprContext conjuncts;
+ // File format of the underlying data files, needed for reader
initialization and reader-level
+ // filter pushdown.
const FileFormat format;
TFileScanRangeParams* scan_params;
std::shared_ptr<io::IOContext> io_ctx;
RuntimeState* runtime_state;
RuntimeProfile* scanner_profile;
const bool allow_missing_columns = true;
+ // Push-down aggregate type.
+ const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
std::unique_ptr<ReadProfile> profile;
};
struct SplitReadOptions {
+ // Split-level information for reader initialization, which may include
file path, partition values, delete file info, etc. The content is table format
specific and opaque to table reader base class; it's the responsibility of the
concrete table reader implementation to parse necessary information for reader
initialization and filter pushdown.
std::map<std::string, Field> partition_values;
ShardedKVCache* cache;
TFileRangeDesc current_range;
@@ -175,6 +185,18 @@ public:
}
}
+ // Materialize a reduced row set for upper aggregate operators
when aggregate
+ // pushdown can be applied. This is not the final aggregate
result: COUNT emits
+ // `count` default rows for the upper COUNT(*), and MIN/MAX emits
two rows containing
+ // file-level min/max values for the upper MIN/MAX.
+ if (!_aggregate_pushdown_tried) {
+ bool pushed_down = false;
+
RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down));
+ if (pushed_down) {
+ return Status::OK();
+ }
+ }
+
bool current_eof = false;
_data_reader.block_template.clear_column_data();
size_t current_rows = 0;
@@ -329,10 +351,8 @@ protected:
std::make_shared<DataTypeInt64>(),
parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME));
- FileExpressionFilter delete_filter;
- delete_filter.delete_conjunct =
VExprContext::create_shared(std::move(delete_predicate));
- delete_filter.file_column_ids.push_back(row_position_column_id);
- request->expression_filters.push_back(std::move(delete_filter));
+ request->delete_conjuncts.push_back(
+ VExprContext::create_shared(std::move(delete_predicate)));
return Status::OK();
}
@@ -343,7 +363,6 @@ protected:
_data_reader.reader.reset();
_data_reader.column_mapper.clear();
_table_filters.clear();
- _table_column_predicates.clear();
_data_reader.file_schema.clear();
_data_reader.block_schema.clear();
_data_reader.block_template.clear();
@@ -368,6 +387,62 @@ protected:
// Materialize virtual columns in table block, such as _row_id and
_last_updated_sequence_number in Iceberg. This is called after finalize_chunk,
so the virtual column can be referenced in finalize_expr.
virtual Status materialize_virtual_columns(Block* table_block) { return
Status::OK(); }
+ Status _try_materialize_aggregate_pushdown_rows(Block* block, bool*
pushed_down) {
+ DORIS_CHECK(block != nullptr);
+ DORIS_CHECK(pushed_down != nullptr);
+ *pushed_down = false;
+ block->clear_column_data(_projected_columns.size());
+ _aggregate_pushdown_tried = true;
+ if (!_supports_aggregate_pushdown(_push_down_agg_type)) {
+ return Status::OK();
+ }
+
+ FileAggregateRequest file_request;
+ _build_file_aggregate_request(_push_down_agg_type, &file_request);
+ FileAggregateResult file_result;
+ const auto status =
_data_reader.reader->get_aggregate_result(file_request, &file_result);
+ if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(
+ _materialize_aggregate_pushdown_rows(_push_down_agg_type,
file_result, block));
+ *pushed_down = true;
+ RETURN_IF_ERROR(close_current_reader());
+ return Status::OK();
+ }
+
+ virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const
{
+ // Only COUNT and MIN/MAX can be push down.
+ if (agg_type != TPushAggOp::type::COUNT && agg_type !=
TPushAggOp::type::MINMAX) {
+ return false;
+ }
+ // Only support aggregate pushdown when there is no delete, filter and
column predicate, so
+ // the reduced rows consumed by the upper aggregate remain
semantically equivalent to a
+ // normal scan.
+ if (_delete_rows != nullptr && !_delete_rows->empty()) {
+ return false;
+ }
+ if (!_table_filters.empty() || !_table_column_predicates.empty()) {
+ return false;
+ }
+ if (agg_type == TPushAggOp::type::COUNT) {
+ return true;
+ }
+ // For MIN/MAX, only support direct file-to-table column mappings. The
two emitted rows
+ // must be enough for the upper MIN/MAX aggregate without evaluating
projections, default
+ // expressions or virtual columns.
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ if (!mapping.file_column_id.has_value() ||
mapping.has_complex_projection ||
+ mapping.virtual_column_type != TableVirtualColumnType::INVALID
||
+ mapping.default_expr != nullptr || mapping.file_type ==
nullptr ||
+ mapping.table_type == nullptr) {
+ return false;
+ }
+ }
+ return true;
+ }
+
Status _materialize_mapping_column(const ColumnMapping& mapping, Block*
current_block,
const size_t rows, ColumnPtr* column) {
if (mapping.projection != nullptr) {
@@ -411,6 +486,82 @@ protected:
return Status::OK();
}
+ void _build_file_aggregate_request(TPushAggOp::type agg_type,
+ FileAggregateRequest* request) const {
+ DORIS_CHECK(request != nullptr);
+ DORIS_CHECK(_supports_aggregate_pushdown(agg_type));
+ request->agg_type = agg_type;
+ request->columns.clear();
+ if (agg_type == TPushAggOp::type::COUNT) {
+ return;
+ }
+ request->columns.reserve(_data_reader.column_mapper.mappings().size());
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ DORIS_CHECK(mapping.file_column_id.has_value());
+ request->columns.push_back({*mapping.file_column_id});
+ }
+ }
+
+ Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type,
+ const FileAggregateResult&
file_result,
+ Block* block) {
+ if (agg_type == TPushAggOp::type::COUNT) {
+ // COUNT pushdown is not a final count value. It emits `count`
default rows so the
+ // upper COUNT(*) aggregate can count them and produce the final
result, including
+ // zero rows when count is 0.
+ for (size_t column_idx = 0; column_idx < block->columns();
++column_idx) {
+ block->replace_by_position(column_idx,
+ block->get_by_position(column_idx)
+
.type->create_column_const_with_default_value(
+
cast_set<size_t>(file_result.count)));
+ }
+ return Status::OK();
+ }
+ // MIN/MAX pushdown emits two rows, min first and max second, for each
projected column.
+ // The upper MIN/MAX aggregate consumes those two rows to produce the
final aggregate value.
+ DORIS_CHECK(file_result.columns.size() ==
_data_reader.column_mapper.mappings().size());
+ DORIS_CHECK(block->columns() ==
_data_reader.column_mapper.mappings().size());
+ Block file_block;
+ file_block.reserve(_data_reader.block_schema.size());
+ for (const auto& field : _data_reader.block_schema) {
+ file_block.insert({field.type->create_column(), field.type,
field.name});
+ }
+ for (size_t column_idx = 0; column_idx < file_result.columns.size();
++column_idx) {
+ const auto& result_column = file_result.columns[column_idx];
+ if (!result_column.has_min || !result_column.has_max) {
+ return Status::NotSupported("Missing min/max aggregate result
for column {}",
+
_projected_columns[column_idx].name);
+ }
+ const auto& mapping =
_data_reader.column_mapper.mappings()[column_idx];
+ DORIS_CHECK(mapping.file_column_id.has_value());
+ bool found_file_column = false;
+ for (size_t block_position = 0; block_position <
_data_reader.block_schema.size();
+ ++block_position) {
+ if (_data_reader.block_schema[block_position].id ==
*mapping.file_column_id) {
+ found_file_column = true;
+ auto column =
+
file_block.get_by_position(block_position).column->assume_mutable();
+ if (column->empty()) {
+ column->insert(result_column.min_value);
+ column->insert(result_column.max_value);
+ file_block.replace_by_position(block_position,
std::move(column));
+ }
+ break;
+ }
+ }
+ DORIS_CHECK(found_file_column);
+ }
+ for (size_t column_idx = 0; column_idx <
_data_reader.column_mapper.mappings().size();
+ ++column_idx) {
+ ColumnPtr table_column;
+ RETURN_IF_ERROR(
+
_materialize_mapping_column(_data_reader.column_mapper.mappings()[column_idx],
+ &file_block, 2,
&table_column));
+ block->replace_by_position(column_idx, std::move(table_column));
+ }
+ return Status::OK();
+ }
+
struct DataReader {
std::unique_ptr<FileReader> reader;
TableColumnMapper column_mapper;
@@ -426,6 +577,7 @@ protected:
std::shared_ptr<io::FileSystemProperties> _system_properties;
// partition key -> value
std::map<std::string, Field> _partition_values;
+ // Predicates built from scan conjuncts before file-level localization.
std::vector<TableFilter> _table_filters;
TableColumnPredicates _table_column_predicates;
VExprContext _conjuncts {nullptr};
@@ -437,6 +589,8 @@ protected:
RuntimeState* _runtime_state;
RuntimeProfile* _scanner_profile;
FileFormat _format;
+ TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE;
+ bool _aggregate_pushdown_tried = false;
private:
static const SchemaField* _find_schema_field(const
std::vector<SchemaField>& schema,
diff --git a/be/src/format/table/iceberg_reader_v2.cpp
b/be/src/format/table/iceberg_reader_v2.cpp
index ed6649fce2c..f9587361dcf 100644
--- a/be/src/format/table/iceberg_reader_v2.cpp
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -115,6 +115,13 @@ Status
IcebergTableReader::customize_file_scan_request(reader::FileScanRequest*
return Status::OK();
}
+bool IcebergTableReader::_supports_aggregate_pushdown(TPushAggOp::type
agg_type) const {
+ if (!TableReader::_supports_aggregate_pushdown(agg_type)) {
+ return false;
+ }
+ return _equality_delete_filters.empty();
+}
+
Status IcebergTableReader::_parse_deletion_vector_file(const
TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc,
bool* has_delete_file) {
@@ -184,7 +191,6 @@ Status IcebergTableReader::_init_delete_predicates(const
TTableFormatFileDesc& t
equality_delete_files.push_back(delete_file);
}
}
-
// `_delete_rows != nullptr` means DeleteVector is parsed
if (_delete_rows != nullptr) {
_position_delete_rows_storage = *_delete_rows;
@@ -250,14 +256,13 @@ std::unique_ptr<io::FileDescription>
IcebergTableReader::_delete_file_descriptio
return file_description;
}
-const reader::SchemaField* IcebergTableReader::_find_delete_field(
- const std::vector<reader::SchemaField>& schema, const std::string&
name) {
- for (const auto& field : schema) {
- if (field.name == name) {
- return &field;
- }
+std::string IcebergTableReader::_data_file_path() const {
+ if (_iceberg_params != nullptr &&
_iceberg_params->__isset.original_file_path) {
+ return _iceberg_params->original_file_path;
}
- return nullptr;
+ DORIS_CHECK(_current_task != nullptr);
+ DORIS_CHECK(_current_task->data_file != nullptr);
+ return _current_task->data_file->path;
}
Status
IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest*
request) {
@@ -273,8 +278,6 @@ Status
IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRe
for (const auto& filter : _equality_delete_filters) {
auto delete_predicate =
std::make_shared<EqualityDeletePredicate>(filter.delete_block,
filter.field_ids);
- reader::FileExpressionFilter expression_filter;
- expression_filter.delete_conjunct =
VExprContext::create_shared(delete_predicate);
DCHECK_EQ(filter.field_ids.size(), filter.key_types.size());
for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) {
const int field_id = filter.field_ids[idx];
@@ -301,22 +304,13 @@ Status
IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRe
cast_expr->add_child(std::move(slot));
delete_predicate->add_child(std::move(cast_expr));
}
- expression_filter.file_column_ids.push_back(field_it->id);
}
- request->expression_filters.push_back(std::move(expression_filter));
+ request->delete_conjuncts.push_back(
+ VExprContext::create_shared(std::move(delete_predicate)));
}
return Status::OK();
}
-std::string IcebergTableReader::_data_file_path() const {
- if (_iceberg_params != nullptr &&
_iceberg_params->__isset.original_file_path) {
- return _iceberg_params->original_file_path;
- }
- DORIS_CHECK(_current_task != nullptr);
- DORIS_CHECK(_current_task->data_file != nullptr);
- return _current_task->data_file->path;
-}
-
Status IcebergTableReader::_read_parquet_position_delete_file(
const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams&
scan_params,
IcebergDeleteFileIOContext* delete_io_ctx,
PositionDeleteRowsCollector* collector) {
@@ -344,8 +338,15 @@ Status
IcebergTableReader::_read_parquet_position_delete_file(
std::vector<reader::SchemaField> schema;
RETURN_IF_ERROR(reader.get_schema(&schema));
- const auto* file_path_field = _find_delete_field(schema,
ICEBERG_FILE_PATH);
- const auto* pos_field = _find_delete_field(schema, ICEBERG_ROW_POS);
+ reader::SchemaField* file_path_field = nullptr;
+ reader::SchemaField* pos_field = nullptr;
+ for (auto& field : schema) {
+ if (field.name == ICEBERG_FILE_PATH) {
+ file_path_field = &field;
+ } else if (field.name == ICEBERG_ROW_POS) {
+ pos_field = &field;
+ }
+ }
if (file_path_field == nullptr || pos_field == nullptr) {
return Status::InternalError("Position delete parquet file is missing
required columns");
}
@@ -381,9 +382,8 @@ Status IcebergTableReader::_init_position_delete_rows(
TFileScanRangeParams delete_scan_params =
_scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
reader::DeleteRows position_delete_rows;
- const auto data_file_path = _data_file_path();
IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
- PositionDeleteRowsCollector collector(data_file_path,
&position_delete_rows);
+ PositionDeleteRowsCollector collector(_data_file_path(),
&position_delete_rows);
for (const auto& delete_file : delete_files) {
RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file,
delete_scan_params,
&delete_io_ctx,
&collector));
diff --git a/be/src/format/table/iceberg_reader_v2.h
b/be/src/format/table/iceberg_reader_v2.h
index 497a989289a..a543ae0797d 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -53,6 +53,8 @@ protected:
Status customize_file_scan_request(reader::FileScanRequest* file_request)
override;
+ bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const
override;
+
Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc,
bool* has_delete_file) override;
@@ -93,18 +95,17 @@ private:
static std::unique_ptr<io::FileDescription> _delete_file_description(
const TFileRangeDesc& range);
- static const reader::SchemaField* _find_delete_field(
- const std::vector<reader::SchemaField>& schema, const std::string&
name);
+ std::string _data_file_path() const;
+ // Append row position column to file scan request for position delete
handling.
Status _append_row_position_output_column(reader::FileScanRequest*
request);
-
+ // Append equality delete predicates to file scan request based on the
delete files in iceberg
+ // params. DeleteVector and position delete files use the common
DeleteRows path in TableReader.
Status _append_equality_delete_predicates(reader::FileScanRequest*
request);
Status _init_equality_delete_predicates(
const std::vector<TIcebergDeleteFileDesc>& delete_files);
- std::string _data_file_path() const;
-
// Read equality/position delete files.
Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc&
delete_file,
const TFileScanRangeParams&
scan_params,
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 255ad574a26..6d0156af9ce 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -485,9 +485,7 @@ TEST_F(NewParquetReaderTest,
ReadPredicateAndNonPredicateColumnsWithSelection) {
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {0};
request->non_predicate_columns = {1};
- reader::FileExpressionFilter expression_filter;
- expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
- request->expression_filters.push_back(std::move(expression_filter));
+ request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
reader::FileColumnPredicateFilter column_filter;
column_filter.file_column_id = 0;
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
@@ -532,9 +530,7 @@ TEST_F(NewParquetReaderTest,
ReadMultiPredicateColumnsBeforeExpressionFilter) {
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {0, 1};
request->non_predicate_columns = {};
- reader::FileExpressionFilter expression_filter;
- expression_filter.conjunct = create_int32_sum_greater_than_conjunct(0, 1,
7);
- request->expression_filters.push_back(std::move(expression_filter));
+ request->conjuncts.push_back(create_int32_sum_greater_than_conjunct(0, 1,
7));
ASSERT_TRUE(reader->open(request).ok());
size_t rows = 0;
@@ -567,9 +563,7 @@ TEST_F(NewParquetReaderTest,
PredicateFiltersRowGroupsByStatistics) {
auto request = std::make_unique<reader::FileScanRequest>();
request->predicate_columns = {0};
request->non_predicate_columns = {1};
- reader::FileExpressionFilter expression_filter;
- expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
- request->expression_filters.push_back(std::move(expression_filter));
+ request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
reader::FileColumnPredicateFilter column_filter;
column_filter.file_column_id = 0;
column_filter.predicates.push_back(create_comparison_predicate<PredicateType::GT>(
@@ -756,9 +750,7 @@ TEST_F(NewParquetReaderTest,
RowPositionReaderKeepsPositionsAfterSelection) {
{0, 0},
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
};
- reader::FileExpressionFilter expression_filter;
- expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
- request->expression_filters.push_back(std::move(expression_filter));
+ request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
ASSERT_TRUE(reader->open(request).ok());
size_t rows = 0;
@@ -800,11 +792,7 @@ TEST_F(NewParquetReaderTest,
DeletePredicateFiltersRowPositions) {
{0, 0},
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
};
- reader::FileExpressionFilter delete_filter;
- delete_filter.delete_conjunct =
VExprContext::create_shared(std::move(delete_predicate));
- delete_filter.file_column_ids.push_back(
- parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID);
- request->expression_filters.push_back(std::move(delete_filter));
+
request->delete_conjuncts.push_back(VExprContext::create_shared(std::move(delete_predicate)));
ASSERT_TRUE(reader->open(request).ok());
size_t rows = 0;
@@ -846,13 +834,8 @@ TEST_F(NewParquetReaderTest,
QueryPredicateAndDeletePredicateFilterRowPositions)
{0, 0},
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
};
- reader::FileExpressionFilter expression_filter;
- expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
- expression_filter.delete_conjunct =
VExprContext::create_shared(std::move(delete_predicate));
- expression_filter.file_column_ids.push_back(0);
- expression_filter.file_column_ids.push_back(
- parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID);
- request->expression_filters.push_back(std::move(expression_filter));
+ request->conjuncts.push_back(create_int32_greater_than_conjunct(0, 2));
+
request->delete_conjuncts.push_back(VExprContext::create_shared(std::move(delete_predicate)));
ASSERT_TRUE(reader->open(request).ok());
size_t rows = 0;
diff --git a/be/test/format/reader/expr/cast_test.cpp
b/be/test/format/reader/expr/cast_test.cpp
index a236d327a1f..93858dbf53e 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -238,6 +238,26 @@ TEST_F(CastTest,
ColumnMapperBuildsCastProjectionForTypeMismatch) {
mapping.projection->close();
}
+TEST_F(CastTest, ColumnMapperTreatsEquivalentTypesAsTrivial) {
+ reader::TableColumnMapper mapper;
+ reader::TableColumn table_column;
+ table_column.id = 7;
+ table_column.name = "value";
+ table_column.type = std::make_shared<DataTypeInt32>();
+ std::vector<reader::TableColumn> projected_columns {table_column};
+
+ reader::SchemaField file_field;
+ file_field.id = 0;
+ file_field.name = "value";
+ file_field.type = std::make_shared<DataTypeInt32>();
+ std::vector<reader::SchemaField> file_schema {file_field};
+
+ auto status = mapper.create_mapping(projected_columns, {}, file_schema);
+ ASSERT_TRUE(status.ok()) << status;
+ ASSERT_EQ(mapper.mappings().size(), 1);
+ EXPECT_TRUE(mapper.mappings()[0].is_trivial);
+}
+
TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
reader::TableColumnMapper mapper;
reader::TableColumn table_column;
@@ -264,9 +284,9 @@ TEST_F(CastTest,
ColumnMapperBuildsCastFilterForTypeMismatch) {
reader::FileScanRequest file_request;
ASSERT_TRUE(
mapper.create_scan_request({table_filter}, {}, projected_columns,
&file_request).ok());
- ASSERT_EQ(file_request.expression_filters.size(), 1);
+ ASSERT_EQ(file_request.conjuncts.size(), 1);
ASSERT_EQ(file_request.predicate_columns,
std::vector<reader::ColumnId>({0}));
- const auto& localized_expr =
file_request.expression_filters[0].conjunct->root();
+ const auto& localized_expr = file_request.conjuncts[0]->root();
ASSERT_EQ(localized_expr->get_num_children(), 1);
const auto& localized_child = localized_expr->children()[0];
ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
@@ -279,7 +299,7 @@ TEST_F(CastTest,
ColumnMapperBuildsCastFilterForTypeMismatch) {
Block block;
block.insert(ColumnHelper::create_column_with_name<DataTypeInt32>({11,
22}));
- auto* conjunct = file_request.expression_filters[0].conjunct.get();
+ auto* conjunct = file_request.conjuncts[0].get();
status = conjunct->prepare(&state, RowDescriptor());
ASSERT_TRUE(status.ok()) << status;
status = conjunct->open(&state);
@@ -293,7 +313,172 @@ TEST_F(CastTest,
ColumnMapperBuildsCastFilterForTypeMismatch) {
EXPECT_EQ(filter[0], 0);
EXPECT_EQ(filter[1], 1);
- file_request.expression_filters[0].conjunct->close();
+ file_request.conjuncts[0]->close();
+}
+
+TEST_F(CastTest, ColumnMapperDoesNotNestCastFilterAcrossScanRequests) {
+ reader::TableColumnMapper mapper;
+ reader::TableColumn table_column;
+ table_column.id = 7;
+ table_column.name = "value";
+ table_column.type = std::make_shared<DataTypeInt64>();
+ std::vector<reader::TableColumn> projected_columns {table_column};
+
+ reader::SchemaField file_field;
+ file_field.id = 0;
+ file_field.name = "value";
+ file_field.type = std::make_shared<DataTypeInt32>();
+ std::vector<reader::SchemaField> file_schema {file_field};
+
+ auto status = mapper.create_mapping(projected_columns, {}, file_schema);
+ ASSERT_TRUE(status.ok()) << status;
+
+ auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+ predicate->add_child(TableSlotRef::create_shared(7, 7, -1,
table_column.type, "value"));
+ reader::TableFilter table_filter;
+ table_filter.conjunct = VExprContext::create_shared(predicate);
+ table_filter.slot_ids = {7};
+
+ reader::FileScanRequest first_request;
+ ASSERT_TRUE(
+ mapper.create_scan_request({table_filter}, {}, projected_columns,
&first_request).ok());
+ reader::FileScanRequest second_request;
+ ASSERT_TRUE(mapper.create_scan_request({table_filter}, {},
projected_columns, &second_request)
+ .ok());
+
+ ASSERT_EQ(second_request.conjuncts.size(), 1);
+ const auto& localized_expr = second_request.conjuncts[0]->root();
+ ASSERT_EQ(localized_expr->get_num_children(), 1);
+ const auto& localized_child = localized_expr->children()[0];
+ ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
+ ASSERT_EQ(localized_child->get_num_children(), 1);
+ const auto* localized_slot =
+ assert_cast<const
TableSlotRef*>(localized_child->children()[0].get());
+ EXPECT_EQ(localized_slot->column_id(), 0);
+}
+
+TEST_F(CastTest, ColumnMapperRewritesPreviousCastFilterToMatchingSplitType) {
+ reader::TableColumn table_column;
+ table_column.id = 7;
+ table_column.name = "value";
+ table_column.type = std::make_shared<DataTypeInt64>();
+ std::vector<reader::TableColumn> projected_columns {table_column};
+
+ auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+ predicate->add_child(TableSlotRef::create_shared(7, 7, -1,
table_column.type, "value"));
+ reader::TableFilter table_filter;
+ table_filter.conjunct = VExprContext::create_shared(predicate);
+ table_filter.slot_ids = {7};
+
+ reader::SchemaField int_file_field;
+ int_file_field.id = 0;
+ int_file_field.name = "value";
+ int_file_field.type = std::make_shared<DataTypeInt32>();
+
+ reader::TableColumnMapper int_mapper;
+ ASSERT_TRUE(int_mapper.create_mapping(projected_columns, {},
{int_file_field}).ok());
+ reader::FileScanRequest int_request;
+ ASSERT_TRUE(int_mapper.create_scan_request({table_filter}, {},
projected_columns, &int_request)
+ .ok());
+
+ const auto& int_localized_expr = int_request.conjuncts[0]->root();
+ ASSERT_EQ(int_localized_expr->get_num_children(), 1);
+ ASSERT_NE(dynamic_cast<const
Cast*>(int_localized_expr->children()[0].get()), nullptr);
+
+ reader::SchemaField bigint_file_field;
+ bigint_file_field.id = 0;
+ bigint_file_field.name = "value";
+ bigint_file_field.type = std::make_shared<DataTypeInt64>();
+
+ reader::TableColumnMapper bigint_mapper;
+ ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {},
{bigint_file_field}).ok());
+ reader::FileScanRequest bigint_request;
+ ASSERT_TRUE(bigint_mapper
+ .create_scan_request({table_filter}, {},
projected_columns, &bigint_request)
+ .ok());
+
+ const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root();
+ ASSERT_EQ(bigint_localized_expr->get_num_children(), 1);
+ const auto& bigint_localized_child = bigint_localized_expr->children()[0];
+ const auto* localized_slot = assert_cast<const
TableSlotRef*>(bigint_localized_child.get());
+ EXPECT_EQ(localized_slot->column_id(), 0);
+ EXPECT_TRUE(localized_slot->data_type()->equals(*bigint_file_field.type));
+
+ Block block;
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({11,
22}));
+ auto* conjunct = bigint_request.conjuncts[0].get();
+ auto status = conjunct->prepare(&state, RowDescriptor());
+ ASSERT_TRUE(status.ok()) << status;
+ status = conjunct->open(&state);
+ ASSERT_TRUE(status.ok()) << status;
+ IColumn::Filter filter(block.rows(), 1);
+ bool can_filter_all = false;
+ status = conjunct->execute_filter(&block, filter.data(), block.rows(),
false, &can_filter_all);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_FALSE(can_filter_all);
+ ASSERT_EQ(filter.size(), 2);
+ EXPECT_EQ(filter[0], 0);
+ EXPECT_EQ(filter[1], 1);
+ conjunct->close();
+}
+
+TEST_F(CastTest, ColumnMapperKeepsTableSlotIdWhenFileBlockPositionChanges) {
+ reader::TableColumn table_column;
+ table_column.id = 7;
+ table_column.name = "value";
+ table_column.type = std::make_shared<DataTypeInt64>();
+ std::vector<reader::TableColumn> projected_columns {table_column};
+
+ reader::SchemaField file_field;
+ file_field.id = 10;
+ file_field.name = "value";
+ file_field.type = std::make_shared<DataTypeInt64>();
+
+ reader::TableColumnMapper mapper;
+ ASSERT_TRUE(mapper.create_mapping(projected_columns, {},
{file_field}).ok());
+
+ auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+ predicate->add_child(TableSlotRef::create_shared(7, 7, -1,
table_column.type, "value"));
+ reader::TableFilter table_filter;
+ table_filter.conjunct = VExprContext::create_shared(predicate);
+ table_filter.slot_ids = {7};
+
+ reader::FileScanRequest first_request;
+ ASSERT_TRUE(mapper.localize_filters({table_filter}, {},
&first_request).ok());
+ ASSERT_EQ(first_request.conjuncts.size(), 1);
+ const auto* first_slot = assert_cast<const TableSlotRef*>(
+ first_request.conjuncts[0]->root()->children()[0].get());
+ EXPECT_EQ(first_slot->slot_id(), 7);
+ EXPECT_EQ(first_slot->column_id(), 0);
+
+ reader::FileScanRequest second_request;
+ second_request.column_positions.emplace(9, 0);
+ second_request.column_positions.emplace(10, 1);
+ second_request.non_predicate_columns.push_back(9);
+ ASSERT_TRUE(mapper.localize_filters({table_filter}, {},
&second_request).ok());
+ ASSERT_EQ(second_request.conjuncts.size(), 1);
+ const auto* second_slot = assert_cast<const TableSlotRef*>(
+ second_request.conjuncts[0]->root()->children()[0].get());
+ EXPECT_EQ(second_slot->slot_id(), 7);
+ EXPECT_EQ(second_slot->column_id(), 1);
+
+ Block block;
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({100,
100}));
+ block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({11,
22}));
+ auto* conjunct = second_request.conjuncts[0].get();
+ auto status = conjunct->prepare(&state, RowDescriptor());
+ ASSERT_TRUE(status.ok()) << status;
+ status = conjunct->open(&state);
+ ASSERT_TRUE(status.ok()) << status;
+ IColumn::Filter filter(block.rows(), 1);
+ bool can_filter_all = false;
+ status = conjunct->execute_filter(&block, filter.data(), block.rows(),
false, &can_filter_all);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_FALSE(can_filter_all);
+ ASSERT_EQ(filter.size(), 2);
+ EXPECT_EQ(filter[0], 0);
+ EXPECT_EQ(filter[1], 1);
+ conjunct->close();
}
} // namespace doris
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index 8a72937002d..1bb6eaf26be 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -268,14 +268,38 @@ void write_parquet_file(const std::string& file_path,
int32_t id, const std::str
builder.build()));
}
+void write_iceberg_equality_delete_parquet_file(const std::string& file_path,
int32_t field_id,
+ int32_t value) {
+ const auto metadata =
+ arrow::key_value_metadata({"PARQUET:field_id"},
{std::to_string(field_id)});
+ auto schema = arrow::schema({
+ arrow::field("id", arrow::int32(), false)->WithMetadata(metadata),
+ });
+ auto table = arrow::Table::Make(schema, {build_int32_array({value})});
+
+ auto file_result = arrow::io::FileOutputStream::Open(file_path);
+ ASSERT_TRUE(file_result.ok()) << file_result.status();
+ std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+ ::parquet::WriterProperties::Builder builder;
+ builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+ builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+ builder.compression(::parquet::Compression::UNCOMPRESSED);
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out, 1,
+ builder.build()));
+}
+
void write_int_pair_parquet_file(const std::string& file_path, const
std::vector<int32_t>& ids,
const std::vector<int32_t>& scores,
const std::vector<std::string>& values,
int64_t row_group_size = -1) {
+ const auto id_metadata = arrow::key_value_metadata({"PARQUET:field_id"},
{"0"});
+ const auto score_metadata =
arrow::key_value_metadata({"PARQUET:field_id"}, {"1"});
+ const auto value_metadata =
arrow::key_value_metadata({"PARQUET:field_id"}, {"2"});
auto schema = arrow::schema({
- arrow::field("id", arrow::int32(), false),
- arrow::field("score", arrow::int32(), false),
- arrow::field("value", arrow::utf8(), false),
+ arrow::field("id", arrow::int32(),
false)->WithMetadata(id_metadata),
+ arrow::field("score", arrow::int32(),
false)->WithMetadata(score_metadata),
+ arrow::field("value", arrow::utf8(),
false)->WithMetadata(value_metadata),
});
auto table = arrow::Table::Make(schema, {build_int32_array(ids),
build_int32_array(scores),
build_string_array(values)});
@@ -398,6 +422,16 @@ TIcebergDeleteFileDesc
make_iceberg_position_delete_file(const std::string& path
return delete_file;
}
+TIcebergDeleteFileDesc make_iceberg_equality_delete_file(const std::string&
path,
+ const
std::vector<int32_t>& field_ids) {
+ TIcebergDeleteFileDesc delete_file;
+ delete_file.__set_content(2);
+ delete_file.__set_path(path);
+ delete_file.__set_field_ids(field_ids);
+ delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET);
+ return delete_file;
+}
+
TFileScanRangeParams make_local_parquet_scan_params() {
TFileScanRangeParams scan_params;
scan_params.__set_file_type(TFileType::FILE_LOCAL);
@@ -557,6 +591,268 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, PushDownCountFromNewParquetReader) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
"doris_table_reader_count_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40,
50},
+ {"one", "two", "three", "four", "five"}, 2);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
+ .profile = nullptr,
+ })
+ .ok());
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 5);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownMinMaxFromNewParquetReader) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
"doris_table_reader_minmax_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20},
+ {"three", "one", "five", "two"}, 2);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+ projected_columns.push_back(make_table_column(1, "score",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::MINMAX,
+ .profile = nullptr,
+ })
+ .ok());
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 2);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& score_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
+ EXPECT_EQ(id_column.get_element(0), 1);
+ EXPECT_EQ(id_column.get_element(1), 5);
+ EXPECT_EQ(score_column.get_element(0), 10);
+ EXPECT_EQ(score_column.get_element(1), 50);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownMinMaxCastsFileValueToTableType) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_minmax_cast_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {3, 1, 5, 2}, {30, 10, 50, 20},
+ {"three", "one", "five", "two"}, 2);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt64>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::MINMAX,
+ .profile = nullptr,
+ })
+ .ok());
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 2);
+ const auto& id_column = assert_cast<const
ColumnInt64&>(*block.get_by_position(0).column);
+ EXPECT_EQ(id_column.get_element(0), 1);
+ EXPECT_EQ(id_column.get_element(1), 5);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownCountFallsBackWithTableConjunct) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_count_conjunct_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 2)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
+ .profile = nullptr,
+ })
+ .ok());
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 1);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ EXPECT_EQ(id_column.get_element(0), 3);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownCountFallsBackWithColumnPredicate) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_count_predicate_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"}, 1);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ TableColumnPredicates column_predicates;
+
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
Field::create_field<TYPE_INT>(2), false));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates =
std::move(column_predicates),
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
+ .profile = nullptr,
+ })
+ .ok());
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 1);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ EXPECT_EQ(id_column.get_element(0), 3);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, PushDownMinMaxFallsBackWithoutDirectFileMapping) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
+ "doris_table_reader_minmax_missing_mapping_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 1, "one");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ make_table_column(99, "missing_id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::MINMAX,
+ .profile = nullptr,
+ })
+ .ok());
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 1);
+ EXPECT_EQ(block.get_by_position(0).column->get_int(0), 0);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_conjunct_filter_test";
@@ -644,7 +940,61 @@ TEST(TableReaderTest,
OpenReaderBuildsColumnPredicateFilters) {
write_int_pair_parquet_file(file_path, {1, 2, 3}, {1, 5, 8}, {"one",
"two", "three"}, 1);
std::vector<TableColumn> projected_columns;
- projected_columns.push_back(make_table_column(2, "value",
std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(2, "value",
std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ TableColumnPredicates column_predicates;
+
column_predicates[0].push_back(create_comparison_predicate<PredicateType::GT>(
+ 0, "id", std::make_shared<DataTypeInt32>(),
Field::create_field<TYPE_INT>(2), false));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates =
std::move(column_predicates),
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& value_column = assert_cast<const
ColumnString&>(*block.get_by_position(0).column);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
+ ASSERT_EQ(id_column.size(), 1);
+ ASSERT_EQ(value_column.size(), 1);
+ EXPECT_EQ(id_column.get_element(0), 3);
+ EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ColumnPredicateSurvivesReopenSplit) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_predicate_reopen_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const std::vector<std::string> file_paths = {
+ (test_dir / "split_1.parquet").string(),
+ (test_dir / "split_2.parquet").string(),
+ };
+ write_int_pair_parquet_file(file_paths[0], {1, 3}, {10, 30}, {"one",
"three"}, 1);
+ write_int_pair_parquet_file(file_paths[1], {2, 4}, {20, 40}, {"two",
"four"}, 1);
+
+ std::vector<TableColumn> projected_columns;
projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
TableColumnPredicates column_predicates;
@@ -667,21 +1017,22 @@ TEST(TableReaderTest,
OpenReaderBuildsColumnPredicateFilters) {
})
.ok());
- ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+ std::vector<int32_t> ids;
+ for (const auto& file_path : file_paths) {
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
- Block block = build_table_block(projected_columns);
- bool eos = false;
- ASSERT_TRUE(reader.get_block(&block, &eos).ok());
- ASSERT_FALSE(eos);
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ ASSERT_EQ(id_column.size(), 1);
+ ids.push_back(id_column.get_element(0));
- const auto& value_column = assert_cast<const
ColumnString&>(*block.get_by_position(0).column);
- const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
- ASSERT_EQ(id_column.size(), 1);
- ASSERT_EQ(value_column.size(), 1);
- EXPECT_EQ(id_column.get_element(0), 3);
- EXPECT_EQ(value_column.get_data_at(0).to_string(), "three");
+ ASSERT_TRUE(reader.close().ok());
+ }
- ASSERT_TRUE(reader.close().ok());
+ EXPECT_EQ(ids, std::vector<int32_t>({3, 4}));
std::filesystem::remove_all(test_dir);
}
@@ -763,6 +1114,51 @@ TEST(TableReaderTest,
CreateScanRequestDeduplicatesSharedPredicateColumns) {
}
}
+TEST(TableReaderTest,
CreateScanRequestPromotesProjectedColumnToPredicateColumn) {
+ const auto int_type = std::make_shared<DataTypeInt32>();
+ const std::vector<TableColumn> projected_columns = {
+ make_table_column(0, "id", int_type),
+ make_table_column(1, "score", int_type),
+ };
+ const std::vector<SchemaField> file_schema = {
+ {.id = 0,
+ .name = "id",
+ .type = int_type,
+ .children = {},
+ .file_path = {0},
+ .field_id_path = {0},
+ .name_path = {"id"},
+ .column_type = DATA_COLUMN},
+ {.id = 1,
+ .name = "score",
+ .type = int_type,
+ .children = {},
+ .file_path = {1},
+ .field_id_path = {1},
+ .name_path = {"score"},
+ .column_type = DATA_COLUMN},
+ };
+
+ TableColumnMapper mapper;
+ ASSERT_TRUE(mapper.create_mapping(projected_columns, {},
file_schema).ok());
+
+ TableFilter table_filter {
+ .conjunct = VExprContext::create_shared(
+ std::make_shared<TableInt32GreaterThanExpr>(0, 0, 1)),
+ .slot_ids = {0},
+ };
+
+ FileScanRequest file_request;
+ ASSERT_TRUE(
+ mapper.create_scan_request({table_filter}, {}, projected_columns,
&file_request).ok());
+
+ EXPECT_EQ(file_request.predicate_columns, std::vector<ColumnId>({0}));
+ EXPECT_EQ(file_request.non_predicate_columns, std::vector<ColumnId>({1}));
+ ASSERT_EQ(file_request.column_positions.size(), 2);
+ EXPECT_EQ(file_request.column_positions.at(0), 1);
+ EXPECT_EQ(file_request.column_positions.at(1), 0);
+}
+
TEST(TableReaderTest, OpenReaderPushesMultiColumnConjunctToParquetReader) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_multi_conjunct_test";
@@ -1194,6 +1590,7 @@ TEST(TableReaderTest,
IcebergTableReaderAppliesDeletionVectorFile) {
.runtime_state = &state,
.scanner_profile = &profile,
.allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
.profile =
make_table_read_profile(&profile),
})
.ok());
@@ -1210,6 +1607,226 @@ TEST(TableReaderTest,
IcebergTableReaderAppliesDeletionVectorFile) {
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, IcebergTableReaderDoesNotPushDownAggregateWithDeletes) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_aggregate_delete_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto dv_path = (test_dir / "delete-vector.bin").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+ const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+ file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)}));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 2);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ EXPECT_EQ(id_column.get_element(0), 2);
+ EXPECT_EQ(id_column.get_element(1), 3);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
IcebergTableReaderDoesNotPushDownAggregateWithPositionDelete) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_aggregate_position_delete_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto delete_file_path = (test_dir /
"position-delete.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+ write_position_delete_parquet_file(delete_file_path, {file_path}, {1});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+ file_path, {make_iceberg_position_delete_file(delete_file_path)}));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 2);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ EXPECT_EQ(id_column.get_element(0), 1);
+ EXPECT_EQ(id_column.get_element(1), 3);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, IcebergPositionDeleteFallsBackToSplitPath) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_position_delete_path_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto delete_file_path = (test_dir /
"position-delete.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+ write_position_delete_parquet_file(delete_file_path, {file_path}, {1});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+ TTableFormatFileDesc table_format_params;
+ TIcebergFileDesc iceberg_params;
+ iceberg_params.__set_format_version(2);
+
iceberg_params.__set_delete_files({make_iceberg_position_delete_file(delete_file_path)});
+ table_format_params.__set_iceberg_params(iceberg_params);
+ split_options.current_range.__set_table_format_params(table_format_params);
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ EXPECT_EQ(read_iceberg_ids(&reader, projected_columns),
std::vector<int32_t>({1, 3}));
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
IcebergTableReaderDoesNotPushDownAggregateWithEqualityDelete) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_aggregate_equality_delete_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto delete_file_path = (test_dir /
"equality-delete.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3}, {10, 20, 30}, {"one",
"two", "three"});
+ write_iceberg_equality_delete_parquet_file(delete_file_path, 0, 2);
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .push_down_agg_type =
TPushAggOp::type::COUNT,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+ file_path, {make_iceberg_equality_delete_file(delete_file_path,
{0})}));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 2);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ EXPECT_EQ(id_column.get_element(0), 1);
+ EXPECT_EQ(id_column.get_element(1), 3);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_iceberg_position_delete_file_test";
@@ -1332,8 +1949,9 @@ TEST(TableReaderTest,
RowPositionDeletePredicateColumnIsNotRepeatedAsOutputColum
EXPECT_EQ(request.non_predicate_columns, std::vector<ColumnId>({0}));
ASSERT_TRUE(request.column_positions.contains(row_position_column_id));
EXPECT_EQ(request.column_positions.at(row_position_column_id), 1);
- ASSERT_EQ(request.expression_filters.size(), 1);
- EXPECT_NE(request.expression_filters[0].delete_conjunct, nullptr);
+ ASSERT_TRUE(request.conjuncts.empty());
+ ASSERT_EQ(request.delete_conjuncts.size(), 1);
+ EXPECT_NE(request.delete_conjuncts[0], nullptr);
}
TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]