alamb commented on code in PR #16424:
URL: https://github.com/apache/datafusion/pull/16424#discussion_r2150916597
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +511,91 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+/// Prune based on partition values and file-level statistics.
+pub struct FilePruner {
+ predicate: Arc<dyn PhysicalExpr>,
+ pruning_schema: Arc<Schema>,
Review Comment:
Could we maybe add some comments about what a `pruning_schema` is? And how
it relates to `partition_fields`
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +511,91 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+/// Prune based on partition values and file-level statistics.
+pub struct FilePruner {
+ predicate: Arc<dyn PhysicalExpr>,
+ pruning_schema: Arc<Schema>,
+ file: PartitionedFile,
+ partition_fields: Vec<FieldRef>,
+ predicate_creation_errors: Count,
+}
+
+impl FilePruner {
+ pub fn new_opt(
Review Comment:
Could we document under what circumstances it returns `None`? I think it is
when there are no dynamic predicates
It would also be good to document **why** it would return `None` (in this
case because we assume that files have already been pruned using any
non-dynamic predicates so additional pruning may happen ONLY when new dynamic
predicates are available??)
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -384,6 +353,24 @@ impl FileOpener for ParquetOpener {
.map(move |maybe_batch| {
maybe_batch
.and_then(|b|
schema_mapping.map_batch(b).map_err(Into::into))
+ })
+ .take_while(move |_| {
+ if let Some(file_pruner) = file_pruner.as_ref() {
+ match file_pruner.should_prune() {
Review Comment:
This is basically applying the filter on each record batch, right?
I think once we can actually push the filters into the parquet scan (which I
realize I have been talking about for months...) this could become be entirely
redundant
On the other hand, it also stops the input immediately if we find out the
file should stop 🤔
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,6 +512,91 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+/// Prune based on partition values and file-level statistics.
+pub struct FilePruner {
+ predicate: Arc<dyn PhysicalExpr>,
+ pruning_schema: Arc<Schema>,
+ file: PartitionedFile,
+ partition_fields: Vec<FieldRef>,
+ predicate_creation_errors: Count,
+}
+
+impl FilePruner {
+ pub fn new_opt(
+ predicate: Arc<dyn PhysicalExpr>,
+ logical_file_schema: &SchemaRef,
+ partition_fields: Vec<FieldRef>,
+ file: PartitionedFile,
+ predicate_creation_errors: Count,
+ ) -> Result<Option<Self>> {
+ // If there is not dynamic predicate, we don't need to prune
+ if !is_dynamic_physical_expr(Arc::clone(&predicate))? {
+ return Ok(None);
+ }
+ // Build a pruning schema that combines the file fields and partition
fields.
+ // Partition fileds are always at the end.
+ let pruning_schema = Arc::new(
+ Schema::new(
+ logical_file_schema
+ .fields()
+ .iter()
+ .cloned()
+ .chain(partition_fields.iter().cloned())
+ .collect_vec(),
+ )
+ .with_metadata(logical_file_schema.metadata().clone()),
+ );
+ Ok(Some(Self {
+ predicate,
+ pruning_schema,
+ file,
+ partition_fields,
+ predicate_creation_errors,
+ }))
+ }
+
+ pub fn should_prune(&self) -> Result<bool> {
+ let pruning_predicate = build_pruning_predicate(
+ Arc::clone(&self.predicate),
+ &self.pruning_schema,
+ &self.predicate_creation_errors,
+ );
Review Comment:
it happens once per file, right?
If so I agree that doing it as a follow on optimization sounds good.
However, I recommend we file a ticket while this is all in our heads / we
have the full context otherwise we'll forget what to do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]