darmie commented on code in PR #20417:
URL: https://github.com/apache/datafusion/pull/20417#discussion_r2829358824
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -460,26 +463,80 @@ impl FileOpener for ParquetOpener {
//
---------------------------------------------------------------------
// Filter pushdown: evaluate predicates during scan
- if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
- let row_filter = row_filter::build_row_filter(
- &predicate,
- &physical_file_schema,
- builder.metadata(),
- reorder_predicates,
- &file_metrics,
- );
+ //
+ // When predicate columns exactly match the projection columns
+ // and there is at most one static conjunct (excluding dynamic
+ // filters from TopK / joins), the RowFilter (late
+ // materialization) path provides no benefit: every projected
+ // column is a predicate column, so there are no extra columns
+ // whose decode could be skipped for non-matching rows, and
+ // with a single static conjunct there is no incremental
+ // evaluation advantage. Apply the predicate as a batch-level
+ // filter after decoding instead, avoiding the overhead of
+ // CachedArrayReader / ReadPlanBuilder / try_next_batch.
+ //
+ // When there are non-predicate projection columns (e.g.
+ // SELECT * WHERE col = X), RowFilter is valuable because it
+ // skips decoding those extra columns for non-matching rows.
+ //
+ // Multi-conjunct static predicates are left on the RowFilter
+ // path because the RowFilter evaluates conjuncts incrementally
+ // — a selective first conjunct can avoid decoding expensive
+ // later columns for non-matching rows.
+ let batch_filter_predicate = if let Some(predicate) =
+ pushdown_filters.then_some(predicate).flatten()
+ {
+ let predicate_col_indices: HashSet<usize> =
+ collect_columns(&predicate)
+ .iter()
+ .map(|c| c.index())
+ .collect();
+ let projection_col_indices: HashSet<usize> =
+ projection.column_indices().into_iter().collect();
+
+ // Count only static conjuncts — dynamic filters (e.g.
+ // from TopK or join pushdown) are runtime-generated and
+ // reference the same projected columns, so they don't
+ // benefit from RowFilter's incremental evaluation.
+ let static_conjunct_count = split_conjunction(&predicate)
+ .iter()
+ .filter(|c| !is_dynamic_physical_expr(c))
+ .count();
+ let skip_row_filter = !predicate_col_indices.is_empty()
+ && predicate_col_indices == projection_col_indices
+ && static_conjunct_count <= 1;
+
+ if skip_row_filter {
Review Comment:
@Dandandan let's check again
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]