Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-18 Thread via GitHub


berkaysynnada commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2049226852


##
datafusion/physical-optimizer/src/push_down_filter.rs:
##
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{config::ConfigOptions, Result};
+use datafusion_physical_expr::conjunction;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::filter_pushdown::{
+FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
+use datafusion_physical_plan::tree_node::PlanContext;
+use datafusion_physical_plan::ExecutionPlan;
+
+/// Attempts to recursively push given filters from the top of the tree into 
leafs.
+///
+/// # Default Implementation
+///
+/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a 
no-op
+/// that assumes that:
+///
+/// * Parent filters can't be passed onto children.
+/// * This node has no filters to contribute.
+///
+/// # Example: Push filter into a `DataSourceExec`
+///
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │  filters = [ id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to 
the `DataSourceExec` node.
+///
+/// If this filter is selective pushing it into the scan can avoid massive
+/// amounts of data being read from the source (the projection is `*` so all
+/// matching columns are read).
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+///   │
+///   ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// # Example: Push filters with `ProjectionExec`
+///
+/// Let's consider a more complex example involving a [`ProjectionExec`]
+/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
+/// creates a new column that the filter depends on.
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │filters = │
+/// │ [cost>50,id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │ProjectionExec│
+/// │ cost = price * 1.2   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
+/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
+/// node to be executed first. A simple thing to do would be to split up the
+/// filter into two separate filters and push down the first one:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │filters = │
+/// │ [cost>50]│
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │ProjectionExec│
+/// │ cost = price * 1.2   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// We can actually however do better by pushing down `price * 1.2 > 50`
+//

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-18 Thread via GitHub


berkaysynnada commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2049205041


##
datafusion/physical-optimizer/src/push_down_filter.rs:
##
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{config::ConfigOptions, Result};
+use datafusion_physical_expr::conjunction;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::filter_pushdown::{
+FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
+use datafusion_physical_plan::tree_node::PlanContext;
+use datafusion_physical_plan::ExecutionPlan;
+
+/// Attempts to recursively push given filters from the top of the tree into 
leafs.
+///
+/// # Default Implementation
+///
+/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a 
no-op
+/// that assumes that:
+///
+/// * Parent filters can't be passed onto children.
+/// * This node has no filters to contribute.
+///
+/// # Example: Push filter into a `DataSourceExec`
+///
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │  filters = [ id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to 
the `DataSourceExec` node.
+///
+/// If this filter is selective pushing it into the scan can avoid massive
+/// amounts of data being read from the source (the projection is `*` so all
+/// matching columns are read).
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+///   │
+///   ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// # Example: Push filters with `ProjectionExec`
+///
+/// Let's consider a more complex example involving a [`ProjectionExec`]
+/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
+/// creates a new column that the filter depends on.
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │filters = │
+/// │ [cost>50,id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │ProjectionExec│
+/// │ cost = price * 1.2   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
+/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
+/// node to be executed first. A simple thing to do would be to split up the
+/// filter into two separate filters and push down the first one:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │filters = │
+/// │ [cost>50]│
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │ProjectionExec│
+/// │ cost = price * 1.2   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// We can actually however do better by pushing down `price * 1.2 > 50`
+//

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2813216975

   @berkaysynnada if CI passes I think this is ready to merge and we can tweak 
later as we implement in more places 😄 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812885176

   Ok sounds good to me for now. We'll measure in our production system and if 
there's overhead from planning time we can come back and edit the API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812853791

   > * Having 2 parameters for plans seems very strange. On the other hand, 
removing it forces us to make deep copies. However, when I look the copied 
structs, it doesn't seem to have big deal as it's done only one time. Perhaps 
we can figure out a way avoiding those 2 defects at the same time (via another 
trait, or another API's). I'm open to discuss this
   > *
   
   While I am worried about the `ExecutionPlan`s we have in DataFusion but even 
more concerning to me is the one's we don't see: custom user plans which might 
be quite expensive to clone. And even if it's not a big impact for this rule if 
we add more and more rules each with multiple recursions it's going to be a 
non-trivial overhead. Especially since DataFusion doesn't support prepared 
statements or anything like that, it has to re-plan every time a query is run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada merged PR #15566:
URL: https://github.com/apache/datafusion/pull/15566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2813394021

   > Great collaboration @adriangb, thank you. I hope more will come.
   
   Me too, great stuff!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2813382081

   Great collaboration @adriangb, thank you. I hope more will come.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2049207122


##
datafusion/physical-optimizer/src/push_down_filter.rs:
##
@@ -0,0 +1,535 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+
+use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
+use datafusion_common::{config::ConfigOptions, Result};
+use datafusion_physical_expr::conjunction;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::filter_pushdown::{
+FilterDescription, FilterPushdownResult, FilterPushdownSupport,
+};
+use datafusion_physical_plan::tree_node::PlanContext;
+use datafusion_physical_plan::ExecutionPlan;
+
+/// Attempts to recursively push given filters from the top of the tree into 
leafs.
+///
+/// # Default Implementation
+///
+/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a 
no-op
+/// that assumes that:
+///
+/// * Parent filters can't be passed onto children.
+/// * This node has no filters to contribute.
+///
+/// # Example: Push filter into a `DataSourceExec`
+///
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │  filters = [ id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to 
the `DataSourceExec` node.
+///
+/// If this filter is selective pushing it into the scan can avoid massive
+/// amounts of data being read from the source (the projection is `*` so all
+/// matching columns are read).
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+///   │
+///   ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// # Example: Push filters with `ProjectionExec`
+///
+/// Let's consider a more complex example involving a [`ProjectionExec`]
+/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
+/// creates a new column that the filter depends on.
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │filters = │
+/// │ [cost>50,id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │ProjectionExec│
+/// │ cost = price * 1.2   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
+/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
+/// node to be executed first. A simple thing to do would be to split up the
+/// filter into two separate filters and push down the first one:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │filters = │
+/// │ [cost>50]│
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │ProjectionExec│
+/// │ cost = price * 1.2   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// We can actually however do better by pushing down `price * 1.2 > 50`
+/// ins

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812731132

   I'm done with my turn for now. Only todo's are updating docs and reverting 
the tests changes. I've not yet gone into those as we might still have some 
changes after the meet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812795783

   I have also updated the tests. Now, we don't break any existing behavior, 
but we can safely pushdown filters into sources


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812881155

   > > * Having 2 parameters for plans seems very strange. On the other hand, 
removing it forces us to make deep copies. However, when I look the copied 
structs, it doesn't seem to have big deal as it's done only one time. Perhaps 
we can figure out a way avoiding those 2 defects at the same time (via another 
trait, or another API's). I'm open to discuss this
   > 
   > While I am worried about the `ExecutionPlan`s we have in DataFusion but 
even more concerning to me is the one's we don't see: custom user plans which 
might be quite expensive to clone. And even if it's not a big impact for this 
rule if we add more and more rules each with multiple recursions it's going to 
be a non-trivial overhead. Especially since DataFusion doesn't support prepared 
statements or anything like that, it has to re-plan every time a query is run.
   
   If I have to select one option, I prefer the simple API honestly, and take 
the cloning cost as it's not a big deal for now at least. Let's listen some 
other people's opinion on this?
   
   For the multiple recursion issue, if we are stick to dropping plans which 
the filter does't reach to the source, O(N^2) is a must


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812624874

   > I'll be done with my changes in an hour or less. Would you like to discuss 
over the code after that?
   
   Sure sounds good. Should we set up a call to move this along faster?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812623615

   > Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan).
   > Assume E is an operator making some filtering, and F is another operator 
which does not allow filter push down. So, E cannot be pushed down over F. 
Knowing all these, can we say that all other operators(A,B,C or D --possibly 
having some filters) cannot also be pushed down over F? Is it a safe assumption 
to make?
   
   If `F` does not allow filter pushdown than anything upstream of it will not 
be able to push down filters into the scan (G).
   However there may still be filter pushdown e.g. from `B` to `E`: `TopK` <- 
`FilterExec` <- `F` <- `Scan`. In this case `TopK` may push down into 
`FilterExec` and `FilterExec` will try to push down into `Scan` but be blocked 
by `F`, so `FilterExec` stays where it is with it's current filters but also 
absorbs any filters from `TopK`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812621880

   > > * I've renamed retry as revisit. Of curse it'll be better to remove it 
now if we can find a better way, but that will be automatically resolved once 
TreenodeRecursion supports revisit mechanism. There are a few other places it's 
needed and having some workarounds like this.
   > 
   > I still don't understand why any sort of "retry" or "revisit" is 
necessary. I feel like if we just add a second method to ExecutionPlan and are 
maybe less eager about popping out `FilterExec`s this won't be needed.
   
   I'll be done with my changes in an hour or less. Would you like to discuss 
over the code after that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812616744

   > * I've renamed retry as revisit. Of curse it'll be better to remove it now 
if we can find a better way, but that will be automatically resolved once 
TreenodeRecursion supports revisit mechanism. There are a few other places it's 
needed and having some workarounds like this.
   
   I still don't understand why any sort of "retry" or "revisit" is necessary. 
I feel like if we just add a second method to ExecutionPlan and are maybe less 
eager about popping out `FilterExec`s this won't be needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812614956

   > Yep, I've noticed that too, but how those plans didn't change before? We 
were trying to pushdown filters over RepartitionExec's and CoalesceBatches
   
   I'm not sure. The flow before was:
   1. Recurse down with filters from parents `x = 5` (example).
   1. Hit a FilterExec. It says it's okay to push down `x = 5`.
   1. Pass through CoalesceBatchesExec.
   1. Hit a scan that says it can't absorb the filters and returns 
`Unsupported`.
   1. Recurse back up to the `FilterExec`.
   1. Ask the `FilterExec` if it can handle `x = 5` itself.
   1. FilterExec says yes, it replaces itself with a new `FilterExec`.
   
   The story is the same if there are no filters from parents except for the 
last two steps which are skipped and the plan is unmodified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812573049

   > Related with that, I've one question: Let's say you have a plan like 
A(root)<- B <- C <- D <- E <- F <- G (scan). Assume E is an operator making 
some filtering, and F is another operator which does not allow filter push 
down. So, E cannot be pushed down over F. Knowing all these, can we say that 
all other operators(A,B,C or D --possibly having some filters) cannot also be 
pushed down over F? Is it a safe assumption to make?
   
   I'm enabling this selective passing feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812469566

   Current Status:
   
   1) Having 2 parameters for plans seems very strange. On the other hand, 
removing it forces us to make deep copies. However, when I look the copied 
structs, it doesn't seem to have big deal as it's done only one time. Perhaps 
we can figure out a way avoiding those 2 defects at the same time (via another 
trait, or another API's). I'm open to discuss this
   
   2) I've renamed retry as revisit. Of curse it'll be better to remove it now 
if we can find a better way, but that will be automatically resolved once 
TreenodeRecursion supports revisit mechanism. There are a few other places it's 
needed and having some workarounds like this.
   
   3) I'm currently on this:
   > Do you agree that we should go on with the plan emerging after 
FilterPushdown rule works only if the Filter's are joining the sources? If a 
Filter cannot join the source and stop at any intermediate point, then we don't 
accept that pushed down version and keep the original plan?
   
   Related with that, I've one question: Let's say you have a plan like 
A(root)<- B <- C <- D <- E <- F <- G (scan).
   Assume E is an operator making some filtering, and F is another operator 
which does not allow filter push down. So, E cannot be pushed down over F. 
Knowing all these, can we say that all other operators(A,B,C or D --possibly 
having some filters) cannot also be pushed down over F? Is it a safe assumption 
to make?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2811993429

   > Yep, I've noticed that too, but how those plans didn't change before? We 
were trying to pushdown filters over RepartitionExec's and CoalesceBatches
   
   Do you agree that we should go on with the plan emerging after 
FilterPushdown rule works only if the Filter's are joining the sources? If a 
Filter cannot join the source and stop at any intermediate point, then we don't 
accept that pushed down version and keep the original plan?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2811986744

   > @berkaysynnada I updated the SLT tests. I'll have another review tomorrow 
but things I'd like to point out now:
   > 
   > 1. We should still think about the `retry` parameter. Ideally we can get 
rid of it somehow.
   
   I'll try to improve those points now. I'll give an update then
   
   > 2. The SLT plans show a lot of updates where the position of the 
`FilterExec` was swapped with other operators. Unless we can unequivocally 
prove that the new position is better we should probably minimize the risk for 
this PR by minimizing the changes, meaning the FilterExec doesn't move in all 
of these plans.
   
   Yep, I've noticed that too, but how those plans didn't change before? We 
were trying to pushdown filters over RepartitionExec's and CoalesceBatches 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2811168067

   @berkaysynnada I updated the SLT tests. I'll have another review tomorrow 
but things I'd like to point out now:
   
   1. We should still think about the `retry` parameter. Ideally we can get rid 
of it somehow.
   2. The SLT plans show a lot of updates where the position of the 
`FilterExec` was swapped with other operators. Unless we can unequivocally 
prove that the new position is better we should probably minimize the risk for 
this PR by minimizing the changes, meaning the FilterExec doesn't move in all 
of these plans.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809592722

   > I've also another question. When parquet_options.pushdown_filters is true, 
we can pushdown filters and remove the FilterExec's from the plan. When 
parquet_options.pushdown_filters is false, we can still pushdown filters but 
cannot remove the FilterExec's from the plan.
   > 
   > Is this our motivation?
   
   It's a bit more complicated than that, and it may be hard to match the 
current behavior exactly. For now for this PR I'd say what we want is:
   - If `parquet_options.pushdown_filters = true` then `ParquetSource` accepts 
filter pushdown as exact
   - If `parquet_options.pushdown_filters = false` then `ParquetSource` does 
not accept filter pushdown at all \*
   
   \* In theory there's little harm in accepting pushdown only into stats 
pruning but I think that might be hard to get right / require new APIs and 
realistically this is mostly a temporary issue because once #3463 gets resolved 
the default will be `parquet_options.pushdown_filters = true` so if `false` is 
suboptimal it's not a big deal


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809586017

   > > @adriangb one point doesn't come to me obvious. I've erased a comment 
saying that "filter predicates reflect the output schema after applying the 
projection", and some tests are written based on that. Is that the case really? 
I'm seeing that in with_projection() API of FilterExec the predicates are kept 
as they are, so they are not updated according to the projection. What am I 
missing?
   > 
   > Yeah I saw that you got rid of `reassign_predicate_columns`. Maybe I 
misunderstood how it's supposed to work by I thought that if you have a 
`FilterExec { projection: [1, 0], predicate: Column { name: "b", index: 0 } }` 
the predicate is referencing the indexes after `FilterExec` does it's 
projection. So if you want to push that predicate down to children (e.g. to the 
scan) you need to invert the projection / match the column with the input 
schema (i.e. `Column { name: "b", index: 1 }`.
   
   Okay, there is no problem with my understanding then. I'll implement the 
projection case as well now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809577101

   > @adriangb one point doesn't come to me obvious. I've erased a comment 
saying that "filter predicates reflect the output schema after applying the 
projection", and some tests are written based on that. Is that the case really? 
I'm seeing that in with_projection() API of FilterExec the predicates are kept 
as they are, so they are not updated according to the projection. What am I 
missing?
   
   Yeah I saw that you got rid of `reassign_predicate_columns`. The point is 
that if you have a `FilterExec { projection: [1, 0], predicate: Column { name: 
"b", index: 0 } }` the predicate is referencing the indexes after `FilterExec` 
does it's projection. So if you want to push that predicate down to children 
(e.g. to the scan) you need to invert the projection / match the column with 
the input schema (i.e. `Column { name: "b", index: 1 }`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809563180

   I've also another question. 
   When parquet_options.pushdown_filters is true, we can pushdown filters and 
remove the FilterExec's from the plan. 
   When parquet_options.pushdown_filters is false, we can still pushdown 
filters but cannot remove the FilterExec's from the plan.
   
   Is this our motivation? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809547292

   > @adriangb one point doesn't come to me obvious. I've erased a comment 
saying that "filter predicates reflect the output schema after applying the 
projection", and some tests are written based on that. Is that the case really? 
I'm seeing that in with_projection() API of FilterExec the predicates are kept 
as they are, so they are not updated according to the projection. What am I 
missing?
   
   and also in slt tests, predicate indices are set according to the input 
schema (original filter schema), not the output schema of the filter having a 
projection. Are you trying to emphasize something different than I understand?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809540717

   @adriangb one point doesn't come to me obvious. I've erased a comment saying 
that "filter predicates reflect the output schema after applying the 
projection", and some tests are written based on that. Is that the case really? 
I'm seeing that in with_projection() API of FilterExec the predicates are kept 
as they are, so they are not updated according to the projection. What am I 
missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809515533

   Great! Btw I gave you write access to our fork so you should be able to push 
to this branch directly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809507083

   > @berkaysynnada I merged your change. Still have some failing tests. Also 
as I said in [pydantic#26 
(comment)](https://github.com/pydantic/datafusion/pull/26#discussion_r2046768875)
 the `retry` flag needs either a better name or we should consider how we can 
replace it, even if we need 2 methods on `ExecutionPlan`.
   
   Yep, I saw your comments, and address/reply them all once it passes all 
tests. I'm still working on


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-16 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809500404

   @berkaysynnada I merged your change. Still have some failing tests. Also as 
I said in https://github.com/pydantic/datafusion/pull/26#discussion_r2046768875 
the `retry` flag needs either a better name or we should consider how we can 
replace it, even if we need 2 methods on `ExecutionPlan`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-15 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2808569749

   > @berkaysynnada any luck? I can take a look tomorrow but don't want to 
duplicate effort
   
   I was interrupted, sorry. I'll send my part in a few hours, and ping you
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-15 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2807089857

   > I'm working on the failures now
   
   @berkaysynnada any luck? I can take a look tomorrow but don't want to 
duplicate effort


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-14 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2801793059

   I'm working on the failures now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-14 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2801746776

   > I've made an attempt 
[pydantic@2dfa8b8](https://github.com/pydantic/datafusion/commit/2dfa8b803f2103c6ff81cfa483dbb70150feeb67)
   > 
   > I hope things becomes more clear now. I just tried to show the design 
idea, and I'm sure the final state will be much better after some polish
   
   Thank you @berkaysynnada!
   
   I merged your commit and tried to clean up the merge a bit.
   
   There are some failing tests, one at least seems like real bug with the 
`swap_remove` in 
https://github.com/pydantic/datafusion/blob/cda6e8d6b74e860e9978abf988c00d5a00d1ac07/datafusion/physical-optimizer/src/filter_pushdown.rs#L424.
 
   
   I have to run for a bit but if you can take a look that'd be great 🙏🏻 
   
   I think other than that the main thing we need is adding / better docs for 
the new APIs (in particular on `ExecutionPlan`)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-14 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2801349750

   I've made an attempt 
https://github.com/pydantic/datafusion/pull/25/commits/2dfa8b803f2103c6ff81cfa483dbb70150feeb67
   
   I hope things becomes more clear now. I just tried to show the design idea, 
and  I'm sure the final state will be much better after some polish


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2797543695

   Thank you as well!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


ozankabak commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2797532385

   All right -- we will submit a PR early next week and get it merged ASAP to 
enable you to carry on. We will also keep on collaborating with you for 
subsequent PRs as this functionality is important to us. Thanks for the awesome 
collaboration


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796969760

   > I don't think dynamic vs. static is the right distinction to make here.
   
   I did it since your examples were on dynamic filters. I just wanted to show 
dynamic filters case will not be a problem at all.
   
   > IMO what really needs to happen is that each node gets to decide what to 
do with the remaining filters, and be able to distinguish between the filters 
it offered up and the ones that came down from parents. 
   
   We can do that without any difficulty with the proposed design
   
   > So for example a `SortExec` will "handle" the filter it injected by doing 
nothing with it and will transmit up the chain that it could not handle any of 
the filters that were passed down.
   
   These are all trivial to implement. If all things will become more clear, I 
can clone this branch, and update the optimizer and execution plan API 
according to the proposal, and submit the alternative PR to address the 
questions in your mind.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796862617

   > Could you help clarify when the FilterExec nodes get inserted? Maybe some 
examples with DataSourceExecs that do not accept any filters would help.
   
   You can look at the notion doc when the current node is AggregateExec. 
remaining_filters is the indicator of FilterExec requirement
   
   I've also thought on Dynamic filter case. Actually they are more flexible 
than normal filters. Normal filters are pulled off from the operator's itself, 
and pushed down as far as possible, and when it's not possible, a FilterExec is 
put accordingly. For the dynamic filters, as they are derived from normal 
filters, the actual filter's itself is not pulled off. So, like normal filters, 
dynamic filters also are pushed down along the plan, and when it's not possible 
to do further, again a normal FilterExec can be inserted for the Dynamic 
filters as well 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2797315179

   > @adriangb perhaps we can work on creating a new PR (stacked on this one) 
that hooks everything up for dynamic filter pushdown. That way we can have 
things ready to go once we get an implementation `PhysicalExpr` pushdown
   
   I think I'd rather wait for @berkaysynnada to make a PR to this PR or 
however the new proposal gets introduced and we merge that, otherwise my 
stacked PR will immediately be out of date.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796865438

   > One more question: it seems like in all cases we end up eagerly cloning 
every node: `Join::try_new`. If I understand correctly this may even happen 
twice per node as we do the passes.
   > 
   > Is this acceptable performance wise? I get that most things are Arc'ed but 
that's still a lot of copying.
   > 
   > In my implementation I took a lot of care to make sure that if there were 
no filters being pushed, etc there was zero copying / node replacement, but 
maybe I overestimated the impact.
   
   You don't need to worry about that. I'm sure we can figure out a way of 
avoiding redundant clones


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796986491

   If you're able to submit a PR with your proposal I think the best course of 
action would be to merge that ASAP and iterate from there by looking at 
challenges that may or may not arise from concrete implementations 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796949025

   > I've also thought on Dynamic filter case. Actually they are more flexible 
than normal filters. Normal filters are pulled off from the operator's itself, 
and pushed down as far as possible, and when it's not possible, a FilterExec is 
put accordingly. For the dynamic filters, as they are derived from normal 
filters, the actual filter's itself is not pulled off. So, like normal filters, 
dynamic filters also are pushed down along the plan, and when it's not possible 
to do further, again a normal FilterExec can be inserted for the Dynamic 
filters as well
   
   I don't think dynamic vs. static is the right distinction to make here. IMO 
what really needs to happen is that each node gets to decide what to do with 
the remaining filters, and be able to distinguish between the filters it 
offered up and the ones that came down from parents. So for example a 
`SortExec` will "handle" the filter it injected by doing nothing with it and 
will transmit up the chain that it could not handle any of the filters that 
were passed down.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796684310

   One more question: it seems like in all cases we end up eagerly cloning 
every node: `Join::try_new`. If I understand correctly this may even happen 
twice per node as we do the passes.
   
   Is this acceptable performance wise? I get that most things are Arc'ed but 
that's still a lot of copying.
   
   In my implementation I took a lot of care to make sure that if there were no 
filters being pushed, etc there was zero copying / node replacement, but maybe 
I overestimated the impact.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796659881

   Could you help clarify when the FilterExec nodes get inserted? Maybe some 
examples with DataSourceExecs that do not accept any filters would help.
   
   In particular I'm interested in the case of:
   TopK generates dynamic filter -> Coalesce -> Repartition -> DataSourceExec 
that rejects all filters
   
   As well as
   
   TopK generates dynamic filter -> Existing FilterExec -> Coalesce -> 
Repartition -> DataSourceExec that rejects all filters
   
   TopK generates dynamic filter -> Coalesce -> Repartition -> Existing 
FilterExec -> DataSourceExec that rejects all filters
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796648147

   > So, joins are like this (this example for joins which prefer keeping all 
filter expressions inside themselves)
   
   If the join prefers keeping a filter exec, not add the predicates to its 
internal, then you can just return them as the remaining filters, instead of 
rewriting and updating the join itself


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796643229

   So, joins are like this (this example for joins which prefer keeping all 
filter expressions inside themselves)
   ```
   fn try_pushdown_filters(&self, fd: FilterDescription) -> 
Result {
   // collect all filters
   let mut all_filters = fd;
   all_filters.extend(self.filter);
   
   let mut self_filters = vec![];
   let mut left_filters = vec![];
   let mut right_filters = vec![];
   
   // extract child filters
   for filter in all_filters {
   if all_left_columns(filter) {
   left_filters.push(filter);
   } else if all_right_columns(filter) {
   right_filters.push(filter);
   } else {
   self_filters.push(filter);
   }
   }
   
   // get the new join with updated filter expressions:
   let new_self = Join::try_new(self.left_input, self.right_input, ..., 
filter: self_filters);
   
   FilterPushdownResult {child_filters: vec![left_filters, right_filters], 
remaining_filters: [], operator: new_self}
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796620968

   Here is a join example: assuming this initial plan:
   
   ```
   - FilterExec: [a@0 = foo, a@0 > x@1]
   -   Join: [schema: {left: a@0, right: x@0}, filter: {x@0 is even AND a@0 < 
x@0 + 10}]
   - DataSource: [a]
   - DataSource: [x]
   
   ```
   
   The rule is shaped like that from my pseudo docs:
   
   ```rust
   context.transform_down(|node| {
   let FilterPushdownResult {
   child_filters, remaining_filters, operator
   } = node.plan.try_pushdown_filters(node.data)?;
   
   if remaining_filters.is_empty() {
   for child_index in 0..self.children.len {
   node.children[child_index].data = child_filters[child_index];
   }
   return Ok(Transformed::yes(node));
   } else {
   node.plan = FilterExec::try_new(predicate: remaining_filters, input: 
node.plan);
   node.children = [node.clone().clear_node_data()];
   node.data = Default::default();
   }
   })
   
   ```
   
   1st iteration on FilterExec:
   ```
   child_filters: [[a@0 = foo, a@0 = x@1]]
   
   remaining_filters: []
   
   operator: Join…
   
   if condition comes up true ⇒ Join node has [a@0 = foo, a@0 > x@1] as the 
node data
   ```
   
   2nd iteration on JoinExec:
   ```
   child_filters: [[a@0=foo], [x@0 is even]]
   
   remaining_filters: []
   
   operator: Join: [schema: {left: a@0, right: x@0}, filter: {a@0 > x@1 AND a@0 
< x@0 + 10}]
   
   if condition comes up true ⇒ LeftDataSource node has [a@0 = foo], 
RightDataSource node has [x@0 is even] as the node data
   ```
   
   3rd iteration on LeftDataSource
   ```
   child_filters: [[]]
   
   remaining_filters: []
   
   operator: LeftDataSource: filter: [a@0 = foo]
   ```
   
   4th iteration on RightDataSource
   ```
   child_filters: [[]]
   
   remaining_filters: []
   
   operator: LeftDataSource: filter: [x@0 is even]
   ```
   PS. I’m not considering dynamic filters in this example


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796558633

   Sounds good -- thanks @ozankabak . We can wait a few days and see what the 
status.
   
   @adriangb  perhaps we can work on creating a new PR (stacked on this one) 
that hooks everything up for dynamic filter pushdown. That way we can have 
things ready to go once we get an implementation `PhysicalExpr` pushdown


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796560380

   > I think the new design is sound, much simpler, and more inline with how 
almost all other plan alterations in DF work. We can add a join example to the 
doc quickly, and if everybody agrees we can proceed with it.
   
   In my opinion everyone is aligned on the proposed API changes, and there is 
no reason to hold off trying to implement them


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796545275

   There's no specific urgency, I think we're all just excited to get on and 
build on top of this but more importantly test out designs against real 
implementation to flesh out any unknowns.
   
   If the timeline is 1-2 days I think we can just wait to merge this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-11 Thread via GitHub


ozankabak commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796149727

   @alamb and @adriangb: Is there a specific urgency to get this merged ASAP? 
If so, we can do it and then replace the API. However, unless there is a good 
reason, I think it is a bad idea to have API churn, and increase the total 
amount of work just to avoid the extra 1-2 days work on this PR. Once this PR 
merges and the API starts to get used, it becomes hard to replace it (I'm going 
through this in equivalence code these days).
   
   I think the new design is sound and much simpler -- we can add a join 
example to the doc too, and if everybody agrees we can proceed with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787642658

   Sure sounds good thank you for reviewing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


alamb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036146656


##
datafusion/datasource/src/source.rs:
##
@@ -254,3 +284,13 @@ impl DataSourceExec {
 })
 }
 }
+
+/// Create a new `DataSourceExec` from a `DataSource`

Review Comment:
   I don't feel strongly either way. I must have missed the existing function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2789854759

   @alamb could you chime in on the proposal of moving recursion back into the 
optimizer? Happy to re-evaluate if you think that makes sense. If you decide to 
leave it as is I would love to move this to merge 🙏🏻.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787637557

   > > I think we should leave this open for a while to let anyone else who 
wants time to review (such as @berkaysynnada ) but I think it is mergable
   > 
   > @berkaysynnada would love to get your input here, hoping we can confirm 
these APIs will work for the JOIN use cases 🙏🏻
   
   I'm sorry, cannot finish the review yet(looking at 
https://github.com/apache/datafusion/pull/15568 as well). I could understand 
what you are trying to do, and both PR's seem very good, but I am taking notes 
of some points to discuss. After finalizing these 2, I'll share them with you 
(hopefully tomorrow morning). Thank you for your patience 😞 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


alamb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031897452


##
datafusion/physical-optimizer/src/filter_pushdown.rs:
##
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::{config::ConfigOptions, DataFusionError, Result};
+use datafusion_physical_plan::{
+execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan,
+};
+
+use crate::PhysicalOptimizerRule;
+
+/// A physical optimizer rule that pushes down filters in the execution plan.
+/// See [`ExecutionPlan::try_pushdown_filters`] for a detailed description of 
the algorithm.
+#[derive(Debug)]
+pub struct PushdownFilter {}

Review Comment:
   this rule is a thing of beauty



##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::{FileSource, FileSourceFilterPushdownResult},
+file_scan_config::FileScanConfig,
+file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, filter_pushdown::FilterPushdownSupport,
+metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: Option) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031970050


##
datafusion/physical-plan/src/filter_pushdown.rs:
##
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of trying to push down fitlers to a child plan.
+/// This is used by [`FilterPushdownResult`] to indicate whether the filter was
+/// "absorbed" by the child ([`FilterPushdownSupport::Exact`]) or not
+/// ([`FilterPushdownSupport::Unsupported`]).
+/// If the filter was not absorbed, the parent plan must apply the filter
+/// itself, or return to the caller that it was not pushed down.
+/// If the filter was absorbed, the parent plan can drop the filter or
+/// tell the caller that it was pushed down by forwarding on the 
[`FilterPushdownSupport::Exact`]
+/// information.
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownSupport {
+/// Filter may not have been pushed down to the child plan, or the child 
plan
+/// can only partially apply the filter but may have false positives (but 
not false negatives).
+/// In this case the parent **must** behave as if the filter was not 
pushed down
+/// and must apply the filter itself.
+Unsupported,

Review Comment:
   Okay point taken: I will re-use the same three variant enum and names and 
alias the existing one to the new one to eliminate duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2032067603


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::{FileSource, FileSourceFilterPushdownResult},
+file_scan_config::FileScanConfig,
+file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, filter_pushdown::FilterPushdownSupport,
+metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: Option) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self, _config: &FileScanConfig) -> Arc 
{
+todo!("should not be called")
+}
+
+fn with_statistics(&self, statistics: Statistics) -> Arc {
+Arc::new(TestSource {
+statistics: Some(statistics),
+..self.clone()
+})
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+todo!("should not be called")
+}
+
+fn statistics(&self) -> Result {
+Ok(self
+.statistics
+.as_ref()
+.expect("statistics not set")
+.clone())
+}
+
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+let predicate_string = self
+.predicate
+.as_ref()
+.map(|p| format!(", predicate={p}"))
+.unwrap_or_default();
+
+write!(f, "{}", predicate_string)
+}
+DisplayFormatType::TreeRender => {
+if let Some(predicate) = &self.predicate {
+writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+}
+Ok(())
+}
+}
+}
+
+fn try_pushdown_filters(
+&self,
+filters: &[PhysicalExprRef],
+   

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794719805

   > I tried to summarize our proposal with @ozankabak
   > 
https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f?pvs=73
   
   Thank you @berkaysynnada  -- I think this design sounds very reasonable to 
me and I would be happy to see it in DataFusion. However I don't think it 
fundamentally changes the filter pushdown that is possible so it is not clear 
to me that implementing a different API should hold up this PR (which is ready 
to go with comments and tests). 
   
   Unless there some fundamental technical objections that I have missed,  what 
I suggest is:
   1. We merge this PR as is to unblock work on 
https://github.com/apache/datafusion/issues/15037
   2. In parallel you work on the [API in the 
proposal](https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f?pvs=73)
 and when it is ready we can review it / merge it quickly
   
   If we are going to change the implementation anyways, I don't think doing so 
as a follow on PR is any more work than doing it in this PR and I expect the 
downstream impacts to be be minimal.
   
   The only potential concern I can see with this plan is API churn -- 
specifically changing a public API (especially as DF 47 nears release). I think 
in this case we could add a comment like the following as a hint to any 
downstream users that we plan to update the API shortly
   
   ```rust
   /// NOTE this API is subject to change in the near future
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794727022

   > The rule part is more or less is like that. Now, let’s examine the 
try_pushdown_filters() impls for some operators
   
   I think it might also help to think through how it would work for a Join 
where a predicate could be pushed into one input but not the other and that had 
to remap the columns correctly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794723836

   BTW I don't seem to be able to leave comments on 
https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f
 so I will leave some here
   
   ```rust
   /// Result of try_pushdown_filters(), storing the necessary information 
about a filter pushdown trial
   struct FilterPushdownResult {
 // Filter predicates which can be pushed down through the operator. 
 // NOTE that these are not placed into any operator.
 //  NEW: NOTE these filters may be different than passed down if the 
operator changes schema (like projection)
 // * and may be different for different children (like for join)
 child_filters: Vec,
 // Filters which cannot be pushed down through the operator.
 // NOTE that caller of try_pushdown_filters() should handle these 
remanining predicates,
 // possibly introducing a FilterExec on top of this operator.
 remaining_filters: FilterDescription
 // Possibly updated new operator
 operator: Arc
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794616294

   Thanks folks!
   
   I was not aware of `PlanContext`, seems like a nifty little helper.
   
   I'm actually talking with @alamb in a few minutes to try to push this 
forward. We'll try to grok your proposal together and maybe that will allow me 
to move forward with it myself, it would be great if you can join! 
https://meet.google.com/poy-aenf-snh


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2792203825

> The TLDR is that because each filter may be allowed or not, and maybe 
transformed, to be pushed down into each child we end up with a matrix of 
filters x children that we need to ask the node for. For example, imagine 
joins, each of its children needs to be treated differently and only some 
filters can be pushed into some children. That meant that as you suggest it had 
to be at least 2 methods on ExecutionPlan (2 with complex APIs or 3 simpler 
ones). 
   
   You can properly (and IMO more easily) apply the pushdown logic in the rule 
recursion as well. There are many rules doing that.
   
   > The recursion is also a bit wonky because you need to:
   > 
   > 1. Recurse down and on your way up transmit not only the new nodes but 
also the filter support result.
   
   That's exactly the intended usage of PlanContext
   
   > 2. There's jumps, eg when you hit a node that doesn't support pushdown but 
still want to recurse into sub-trees.
   
   That's not a problem as well in transform_down() when you say 
Recursion::Continue and Transformed::No
   
   > 3. You're carrying around non-trivial context as you go down.
   
   That's now a problem as well in PlanContext.
   
   > Having implemented it both ways I've come to the conclusion that trying to 
generalize the recursion into the optimizer rules makes things harder for both 
the simple cases (FilterExec, RepartitionExec) and the complex cases 
(HashJoinExec, ProjectionExec). Doing it this way makes the simple 
implementations trivial and the complex ones will be complex but will be self 
contained and not have to fit into some rigid APIs.
   
   
   You already keep the operator specific parts in the ExecutionPlan API.  For 
example if I give an example on the simplest case:
   ```rust
   pub fn try_pushdown_filters_to_input(
   plan: &Arc,
   input: &Arc,
   parent_filters: &[PhysicalExprRef],
   config: &ConfigOptions,
   ) -> Result>> {
   match input.try_pushdown_filters(input, parent_filters, config)? {
   FilterPushdownResult::NotPushed => {
   // No pushdown possible, keep this child as is
   Ok(FilterPushdownResult::NotPushed)
   }
   FilterPushdownResult::Pushed {
   updated: inner,
   support,
   } => {
   // We have a child that has pushed down some filters
   let new_inner =
   with_new_children_if_necessary(Arc::clone(plan), 
vec![inner])?;
   Ok(FilterPushdownResult::Pushed {
   updated: new_inner,
   support,
   })
   }
   }
   }
   ```
   
   Here, you will not need to do the recursive call, and the last part of 
with_new_children...() logic. As you've mentioned, maybe a single API wouldn't 
be enough, but perhaps having 2 or 3 is what we really need to comply 
separation of concerns.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793342766

   Sounds good to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


ozankabak commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793246314

   I think we can work on this PR, a new PR may not be necessary. We will share 
a design document with you describing why we think a variation of the structure 
@alamb proposed is likely sufficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794211343

   I tried to summarize our proposal with @ozankabak
   
   
https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f?pvs=73
   
   @adriangb @alamb I hope the design is clear enough and captures everyone's 
input. Let me know if anything feels off or if you'd like to iterate further


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793196830

   > We will get this over the finish line in a few days.
   
   So you're going to make a PR to replace this one?
   
   Please do consider my example below. There's a lot more complexity than the 
simple API that @alamb proposed. I really think that:
   
   > Ultimately the issue in my mind is that how the recursion proceeds is in 
great part determined by each ExecutionPlan and that makes it hard to 
generalize into a set of standard APIs.
   
   We can try to come up with a set of APIs to describe all behaviors but:
   
   1. It's going to be a complex API that relies on multiple methods being kept 
in sync / indexes mapping correctly.
   2. I wouldn't be confident that it captures all behavior, e.g. in writing 
the original version I kept realizing that I'd missed edge cases with 
projections, joins, etc.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2037441578


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: Option) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self, _config: &FileScanConfig) -> Arc 
{
+todo!("should not be called")
+}
+
+fn with_statistics(&self, statistics: Statistics) -> Arc {
+Arc::new(TestSource {
+statistics: Some(statistics),
+..self.clone()
+})
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+todo!("should not be called")
+}
+
+fn statistics(&self) -> Result {
+Ok(self
+.statistics
+.as_ref()
+.expect("statistics not set")
+.clone())
+}
+
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+let predicate_string = self
+.predicate
+.as_ref()
+.map(|p| format!(", predicate={p}"))
+.unwrap_or_default();
+
+write!(f, "{}", predicate_string)
+}
+DisplayFormatType::TreeRender => {
+if let Some(predicate) = &self.predicate {
+writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+}
+Ok(())
+}
+}
+}
+
+fn try_pushdown_filters(
+&self,
+filters: &[PhysicalExprRef],
+ 

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


ozankabak commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793101507

   > Maybe it is possible to move the recursion into the optimizer rule but 
still keep a `ExecutionPlan` method by making a complex call signature, maybe 
something like this:
   > 
   > ```rust
   > struct FilterPushdownArgs {
   >   // expressions 
   >   filters: Vec>
   > }
   > 
   > /// return from trying to push the filters down
   > struct FilterPushdownResult {
   >   // filters that could potentially be pushed down to each child
   >   child_filters: Vec>,
   >   // filters that could not be handled by this node internally
   >   remaining_filters: Vec>
   >   // new_self
   >   new_self: Arc
   > }
   > 
   > impl ExecutionPlan {
   >   fn try_pushdown_filters(&self, args: FilterPushdownArgs) -> 
Result {
   > ..
   > }
   > }
   > ```
   
   This sort of thing is exactly what I had in mind. This enables us to do a 
clean separation between the API and the rule. @berkaysynnada and I discussed 
this in detail and we will circle back with a write-up including a more-fleshed 
out version of this.
   
   We will get this over the finish line in a few days. Thanks for the awesome 
collaboration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2792250700

   @adriangb I don’t want you to go back and forth excessively. Since you’ve 
spent a lot of time on this and are supporting the current version as the 
easiest and most understandable way to extend, I’m fine with following your 
decision. It seems there are pros/cons in both approaches (though I’m still in 
favor of rule recursion and API separation), but we need to move forward by 
picking one. I don’t think switching to the other mechanism would require much 
effort, so I might try it out and see how it ends up (but I can’t spend time on 
that right now unfortunately).
   
   I don’t know if this makes sense to you, but here’s one last design 
suggestion: What if we define a new trait like `DynamicFilterPlan: 
ExecutionPlan` and add all the required APIs to it? Then the main driver logic 
in the optimizer could check whether a plan is just an ExecutionPlan (handled 
by the default logic, possibly configurable for transparent operators too), or 
if it’s a DynamicFilterPlan, which has all the necessary APIs implemented to 
allow continued pushdown.
   
   TLDR, I’ve made one last design suggestion above. If it doesn’t make sense 
or isn’t applicable, feel free to ignore it. We can merge this as is. This is a 
great PR, and what we’ve been discussing isn’t a major issue either way. Thank 
you again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


berkaysynnada commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036861732


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: Option) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self, _config: &FileScanConfig) -> Arc 
{
+todo!("should not be called")
+}
+
+fn with_statistics(&self, statistics: Statistics) -> Arc {
+Arc::new(TestSource {
+statistics: Some(statistics),
+..self.clone()
+})
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+todo!("should not be called")
+}
+
+fn statistics(&self) -> Result {
+Ok(self
+.statistics
+.as_ref()
+.expect("statistics not set")
+.clone())
+}
+
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+let predicate_string = self
+.predicate
+.as_ref()
+.map(|p| format!(", predicate={p}"))
+.unwrap_or_default();
+
+write!(f, "{}", predicate_string)
+}
+DisplayFormatType::TreeRender => {
+if let Some(predicate) = &self.predicate {
+writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+}
+Ok(())
+}
+}
+}
+
+fn try_pushdown_filters(
+&self,
+filters: &[PhysicalExprRef],
+

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-10 Thread via GitHub


berkaysynnada commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036813659


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,

Review Comment:
   yes, that also works 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2791076569

   > Maybe it is possible to move the recursion into the optimizer rule but 
still keep a `ExecutionPlan` method by making a complex call signature, maybe 
something like this:
   
   
   > I think my original suggestions were around trying to simplify the changes 
to ExecutionPlan trait. As I recall the original draft PR had at least three 
new methods on `ExecutionPlan` that had to be kept in sync:
   > 
   > 1. Could the node itself handle any filters internally?
   > 2. Could the node push filters past itself (the equivalent of what 
`try_swap_with_projection` does)?
   > 3. Did the node produce any dynamic filters?
   > 
   > Maybe it is possible to move the recursion into the optimizer rule but 
still keep a `ExecutionPlan` method by making a complex call signature, maybe 
something like this:
   
   Yes precisely.
   I think we can collapse (1) & (2) at the cost of a more complex API for the 
collapsed method.
   But I think we'll always need 2 methods because of how the recursion works: 
the current node can't know what "remainder" filters it has the option to 
handle until we've pushed down into its children, but to push down into its 
children we first need to ask it which filters it has / allows to be pushed 
down to each child.
   
   I can try to rework to go back to something like that, but I expect the APIs 
to be much more complex for _all_ cases (not just for the actually complicated 
ones) and the diff is going to be much larger.
   
   Are there concrete things that the current approach falls short on that 
moving the recursion into the OptimizerRule would solve?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2791020940

   Maybe it is possible to move the recursion into the optimizer rule but still 
keep a `ExecutionPlan` method by making a complex call signature, maybe 
something like this:
   
   ```rust
   struct FilterPushdownArgs {
 // expressions 
 filters: Vec>
   }
   
   /// return from trying to push the filters down
   struct FilterPushdownResult {
 // filters that could potentially be pushed down to each child
 child_filters: Vec>,
 // filters that could not be handled by this node internally
 remaining_filters: Vec>
 // new_self
 new_self: Arc
   }
   
   impl ExecutionPlan {
 fn try_pushdown_filters(&self, args: FilterPushdownArgs) -> 
Result {
   ..
   }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2791020563

   > It was @alamb that suggested we do it this way, unless I misunderstood his 
suggestion.
   > 
   > I think it's possible to do the recursion as an optimizer rule but making 
the APIs flexible enough to handle all of the cases with joins, etc. gets very 
complicated especially because the recursion needs to probe and jump, transmit 
information back up, etc.
   
   I think my original suggestions were around trying to simplify the changes 
to ExecutionPlan trait. As I recall the original draft PR had at least three 
new methods on `ExecutionPlan` that had to be kept in sync:
   1. Could the node itself handle any filters internally?
   2. Could the node push filters past itself (the equivalent of what 
`try_swap_with_projection` does)?
   3. Did the node produce any dynamic filters?
   
   By combining the operations into a single method that did all three at once 
the ExecutionPlan changes were much smaller. I don't remember any opinions 
about the recursion but I may not have made that clear
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


alamb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036145638


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: Option) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self, _config: &FileScanConfig) -> Arc 
{
+todo!("should not be called")
+}
+
+fn with_statistics(&self, statistics: Statistics) -> Arc {
+Arc::new(TestSource {
+statistics: Some(statistics),
+..self.clone()
+})
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+todo!("should not be called")
+}
+
+fn statistics(&self) -> Result {
+Ok(self
+.statistics
+.as_ref()
+.expect("statistics not set")
+.clone())
+}
+
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+let predicate_string = self
+.predicate
+.as_ref()
+.map(|p| format!(", predicate={p}"))
+.unwrap_or_default();
+
+write!(f, "{}", predicate_string)
+}
+DisplayFormatType::TreeRender => {
+if let Some(predicate) = &self.predicate {
+writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+}
+Ok(())
+}
+}
+}
+
+fn try_pushdown_filters(
+&self,
+filters: &[PhysicalExprRef],
+

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036084527


##
datafusion/expr/src/filter_pushdown.rs:
##
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).
+/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning 
partitions).
+///
+/// There are three possible outcomes of a filter pushdown:
+/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is 
not understood.
+/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may 
not be exact.

Review Comment:
   Makes sense. The variants were chosen to match the existing 
`TableProviderFilterPushdown`, which @alamb requested specifically.
   
   IMO we could have 2 variants here: `Unsupported` and `Exact` and it would be 
clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


geoffreyclaude commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036042823


##
datafusion/expr/src/filter_pushdown.rs:
##
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).

Review Comment:
   ```suggestion
   ///   (e.g. pushing down dynamic filters from a hash join into scans).
   ```
   



##
datafusion/expr/src/filter_pushdown.rs:
##
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).
+/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning 
partitions).
+///
+/// There are three possible outcomes of a filter pushdown:
+/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is 
not understood.
+/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may 
not be exact.

Review Comment:
   `Inexact` kind of hints that the filter might prune too much. How about 
`Partial` or some other word that makes it clearer only false positives (rows 
that should have been filtered but weren't) are possible? In this context it's 
obvious of course, but without the context a bit less. Eg, someone implementing 
a custom exec node.



##
datafusion/expr/src/filter_pushdown.rs:
##
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of attempting to push down a filter/predicate expression.
+///
+/// This is used by:
+/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at 
the physical plan level
+///   (e.g. pushing down dynamic fitlers from a hash join into scans).
+/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning 
partitions).
+///
+/// There are three possible outcomes of a filter pushdown:
+/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is 
not understood.
+/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may 
not be exact.
+///   The caller should treat this the same as [`FilterPushdown::Unsupported`] 
for the most part
+///   and must not assume that any pruning / filter was done since there may 
be false positive

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


ozankabak commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2790048758

   This is a great contribution and is very close to merge. However, let's make 
sure the design is right to avoid API churn. Why do you think 
`try_swap_with_projection` is similar? I don't see any recursion in it -- an 
optimizer rule handles the recursion if I'm not missing something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2790824023

   For comparison, here is roughly what I had before doing the recursion as 
part of the `OptimizerRule`: 
https://github.com/pydantic/datafusion/blob/fbf93a2bdd0a5c1532336026dfa71ac7305c1655/datafusion/physical-optimizer/src/filter_pushdown.rs
   
   As you can see it ends up splitting the flow up into multiple steps:
   1. Ask the current node for any filters it wants to push down and which 
filters can be pushed down (possibly with modification) into which children.
   2. Recurse into the children.
   3. Carry the result of the pushdown for each filter back up.
   4. Combine results for each child together to get an overall pushed/not 
pushed for each filter.
   5. Ask the current node if it can handle any of the remaining filters.
   6. Return to the caller.
   
   This all gets pretty complicated and tracking the state is convoluted. And 
it doesn't even handle certain edge cases that I think are easyish to handle 
with the new APIs.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2790778558

   Some implementations do recurse I think, for similar reasons to our 
recursion here:
   
https://github.com/pydantic/datafusion/blob/f8a6384bdf21b2eeb7bcfe3f08e52712735bb285/datafusion/physical-plan/src/projection.rs#L534-L535
   
   It was @alamb that suggested we do it this way, unless I misunderstood his 
suggestion.
   
   I think it's possible to do the recursion as an optimizer rule but making 
the APIs flexible enough to handle all of the cases with joins, etc. gets very 
complicated especially because the recursion needs to probe and jump, transmit 
information back up, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035424292


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: Option) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self, _config: &FileScanConfig) -> Arc 
{
+todo!("should not be called")
+}
+
+fn with_statistics(&self, statistics: Statistics) -> Arc {
+Arc::new(TestSource {
+statistics: Some(statistics),
+..self.clone()
+})
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+todo!("should not be called")
+}
+
+fn statistics(&self) -> Result {
+Ok(self
+.statistics
+.as_ref()
+.expect("statistics not set")
+.clone())
+}
+
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+let predicate_string = self
+.predicate
+.as_ref()
+.map(|p| format!(", predicate={p}"))
+.unwrap_or_default();
+
+write!(f, "{}", predicate_string)
+}
+DisplayFormatType::TreeRender => {
+if let Some(predicate) = &self.predicate {
+writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+}
+Ok(())
+}
+}
+}
+
+fn try_pushdown_filters(
+&self,
+filters: &[PhysicalExprRef],
+ 

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035397116


##
datafusion/datasource/src/source.rs:
##
@@ -254,3 +284,13 @@ impl DataSourceExec {
 })
 }
 }
+
+/// Create a new `DataSourceExec` from a `DataSource`

Review Comment:
   @alamb sneaked this bit in, wdyt Andrew?



##
datafusion/physical-expr/src/utils/mod.rs:
##
@@ -47,6 +47,31 @@ pub fn split_conjunction(
 split_impl(Operator::And, predicate, vec![])
 }
 
+/// Create a conjunction of the given predicates.
+/// If the input is empty, return a literal true.
+/// If the input contains a single predicate, return the predicate.
+/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`).
+pub fn conjunction(
+predicates: impl IntoIterator>,
+) -> Arc {
+conjunction_opt(predicates).unwrap_or_else(|| 
crate::expressions::lit(true))
+}
+
+/// Create a conjunction of the given predicates.

Review Comment:
   There's actually several places where it's useful to know if the result is 
`lit(true)` because you handle that differently. E.g. `FilterExec` drops itself 
out of the plan. And it's only a small bit more code to have here to avoid 
having to match `lit(true)` in other places.



##
datafusion/physical-plan/src/filter_pushdown.rs:
##
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub use datafusion_expr::FilterPushdown;
+use datafusion_physical_expr::PhysicalExprRef;
+
+/// The combined result of a filter pushdown operation.
+/// This includes:
+/// * The inner plan that was produced by the pushdown operation.
+/// * The support for each filter that was pushed down.
+pub enum FilterPushdownResult {

Review Comment:
   It can also be a FileSource, etc. I will try to tweak the comments to use 
the terminology "node"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


berkaysynnada commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035176129


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,

Review Comment:
   If I don't misunderstand, DataSource's or ExecutionPlan's don't store or 
keep this FilterPushdown in their internal states. That info just passes 
through the operators during the pushdown attempt. So, maybe we can remove this 
"support" field, and instead define 2 different TestSource one with support and 
one without support (or simply define a generic boolean). I suggest this to not 
misguide people considering this test as an example usage.



##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, 

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035387395


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_common::{internal_err, Result};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener,
+};
+use datafusion_expr::test::function_stub::count_udaf;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::{
+aggregate::AggregateExprBuilder, conjunction, Partitioning, 
PhysicalExprRef,
+};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::{FilterPushdown, 
FilterPushdownResult};
+use datafusion_physical_plan::{
+aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
+coalesce_batches::CoalesceBatchesExec,
+filter::FilterExec,
+repartition::RepartitionExec,
+};
+use datafusion_physical_plan::{
+displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, 
ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone, Default)]
+struct TestSource {
+support: Option,

Review Comment:
   I think that would be quite a bit more code, but I'll add a comment saying 
this is not meant to be an example and that folks should look at ParquetSource 
instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-09 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2789742021

   @berkaysynnada the original implementation had recursion in the optimizer 
rule. It was @alamb suggestion to do it this way, which mirrors the existing 
`try_swap_with_projection` method.
   
   The TLDR is that because each filter may be allowed or not, and maybe 
transformed, to be pushed down into each child we end up with a matrix of 
filters x children that we need to ask the node for. For example, imagine 
joins, each of its children needs to be treated differently and only some 
filters can be pushed into some children. That meant that as you suggest it had 
to be at least 2 methods on ExecutionPlan (2 with complex APIs or 3 simpler 
ones). The recursion is also a bit wonky because you need to:
   1. Recurse down and on your way up transmit not only the new nodes but also 
the filter support result.
   2. There's jumps, eg when you hit a node that doesn't support pushdown but 
still want to recurse into sub-trees.
   3. You're carrying around non-trivial context as you go down.
   
   Having implemented it both ways I've come to the conclusion that trying to 
generalize the recursion into the optimizer rules makes things harder for both 
the simple cases (FilterExec, RepartitionExec) and the complex cases 
(HashJoinExec, ProjectionExec). Doing it this way makes the simple 
implementations trivial and the complex ones will be complex but will be self 
contained and not have to fit into some rigid APIs.
   
   All said and done since the implementation is a lot simpler this way, 
because there is precedent for it in `try_swap_with_projection` and because I'm 
more confident in this API being flexible enough to handle joins, etc I would 
prefer to keep it as is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-08 Thread via GitHub


alamb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031961859


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -467,6 +468,383 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 ) -> Result>> {
 Ok(None)
 }
+
+/// A physical optimizer rule that pushes down filters in the execution 
plan.

Review Comment:
   Here are some documentation suggestions:
   - Suggestions for docs: https://github.com/pydantic/datafusion/pull/23



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-08 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787124497

   done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-08 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787081064

   👍 will do asap


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-08 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2033630457


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -467,6 +468,439 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 ) -> Result>> {
 Ok(None)
 }
+
+/// Attempts to recursively push given filters into this `ExecutionPlan` or
+/// its children and push any filters from this node into its children.
+///
+/// This is used to implement filter pushdown in the physical plan. Note
+/// that DataFusion also implements filter pushdown in the logical plan,
+/// which is a different code path. This method is here to support
+/// additional optimizations that may be only be possible in the physical
+/// plan such as dynamic filtering (see below).
+///
+/// See [`try_pushdown_filters_to_input`] for a simple implementation
+///
+/// # Arguments
+/// * `plan`: `Arc`d instance of self
+/// * `parent_filters`: A vector of [`PhysicalExpr`]s from the parent of 
this node
+///to try and push down

Review Comment:
   ```suggestion
   ///   to try and push down
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-08 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787071875

   Can we get the one final clippy thing fixed here so we can merge this PR in?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-08 Thread via GitHub


adriangb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2786441314

   > I think we should leave this open for a while to let anyone else who wants 
time to review (such as @berkaysynnada ) but I think it is mergable
   
   @berkaysynnada would love to get your input here, hoping we can confirm 
these APIs will work for the JOIN use cases 🙏🏻 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-07 Thread via GitHub


alamb commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2784577003

   Here is a proposal to simplify the API: 
https://github.com/pydantic/datafusion/pull/24


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-07 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2032103422


##
datafusion/physical-plan/src/filter_pushdown.rs:
##
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Result of trying to push down fitlers to a child plan.
+/// This is used by [`FilterPushdownResult`] to indicate whether the filter was
+/// "absorbed" by the child ([`FilterPushdownSupport::Exact`]) or not
+/// ([`FilterPushdownSupport::Unsupported`]).
+/// If the filter was not absorbed, the parent plan must apply the filter
+/// itself, or return to the caller that it was not pushed down.
+/// If the filter was absorbed, the parent plan can drop the filter or
+/// tell the caller that it was pushed down by forwarding on the 
[`FilterPushdownSupport::Exact`]
+/// information.
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownSupport {
+/// Filter may not have been pushed down to the child plan, or the child 
plan
+/// can only partially apply the filter but may have false positives (but 
not false negatives).
+/// In this case the parent **must** behave as if the filter was not 
pushed down
+/// and must apply the filter itself.
+Unsupported,

Review Comment:
   Done: e3c7343



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-06 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029972339


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -467,6 +468,353 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 ) -> Result>> {
 Ok(None)
 }
+
+/// A physical optimizer rule that pushes down filters in the execution 
plan.
+/// For example, consider the following plan:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │  FilterExec  │
+/// │  filters = [ id=1]   │
+/// └──┘
+/// │
+/// ▼
+/// ┌──┐
+/// │DataSourceExec│
+/// │projection = *│
+/// └──┘
+/// ```
+///
+/// Our goal is to move the `id = 1` filter from the `FilterExec` node to 
the `DataSourceExec` node.
+/// If this filter is selective it can avoid massive amounts of data being 
read from the source (the projection is `*` so all matching columns are read).
+/// In this simple case we:
+/// 1. Enter the recursion with no filters.
+/// 2. We find the `FilterExec` node and it tells us that it has a filter 
(see [`ExecutionPlan::filters_for_pushdown`] and 
`datafusion::physical_plan::filter::FilterExec`).
+/// 3. We recurse down into it's children (the `DataSourceExec` node) now 
carrying the filters `[id = 1]`.
+/// 4. The `DataSourceExec` node tells us that it can handle the filter 
and we mark it as handled exact (see 
[`ExecutionPlan::with_filter_pushdown_result`]).
+/// 5. Since the `DataSourceExec` node has no children we recurse back up 
the tree.
+/// 6. We now tell the `FilterExec` node that it has a child that can 
handle the filter and we mark it as handled exact (see 
[`ExecutionPlan::with_filter_pushdown_result`]).
+///The `FilterExec` node can now return a new execution plan, either a 
copy of itself without that filter or if has no work left to do it can even 
return the child node directly.
+/// 7. We recurse back up to `CoalesceBatchesExec` and do nothing there 
since it had no filters to push down.
+///
+/// The new plan looks like:
+///
+/// ```text
+/// ┌──┐
+/// │ CoalesceBatchesExec  │
+/// └──┘
+///   │
+///   ▼
+/// ┌──┐
+/// │DataSourceExec│
+//  │projection = *│
+//  │   filters = [ id=1]  │
+/// └──┘
+/// ```
+///
+/// Let's consider a more complex example involving a `ProjectionExec` 
node in betweeen the `FilterExec` and `DataSourceExec` nodes that creates a new 
column that the filter depends on.
+///
+/// ```text
+// ┌──┐
+// │ CoalesceBatchesExec  │
+// └──┘
+// │
+// ▼
+// ┌──┐
+// │  FilterExec  │
+// │filters = │
+// │ [cost>50,id=1]   │
+// └──┘
+// │
+// ▼
+// ┌──┐
+// │ProjectionExec│
+// │ cost = price * 1.2   │
+// └──┘
+// │
+// ▼
+// ┌──┐
+// │DataSourceExec│
+// │projection = *│
+// └──┘
+/// ```
+///
+/// We want to push down the filters [id=1] to the `DataSourceExec` node, 
but can't push down `cost>50` because it requires the `ProjectionExec` node to 
be executed first:
+///
+/// ```text
+// ┌──┐
+// │ CoalesceBatchesExec  │
+// └──┘
+// │
+// ▼
+// ┌──┐
+// │  FilterExec  │
+// │filters = │
+// │ [cost>50]│
+// └──┘
+// │
+// ▼
+// ┌──┐
+// │ProjectionExec│
+// │ cost = price * 1.2   │
+// └──┘
+// │
+// ▼
+// ┌──┐
+// │DataSourceExec│
+// │projection = *│
+// │   filters = [ id=1]  │
+// └──┘
+/// ```
+///
+/// There are also cases where we may be able to push down filters within 
a subtree but not the entire tree.
+/// A good exmaple of this is aggreagation nodes:
+///
+/// ```text
+/// ┌──┐
+/// │ ProjectionExec   │
+/// │ projection = *   │
+/// └──┘
+///   │
+///   ▼
+/// ┌──┐
+/// │ F

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-06 Thread via GitHub


berkaysynnada commented on PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2781418886

   I'll take a look at this as well asap


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-05 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029972523


##
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::{
+datasource::object_store::ObjectStoreUrl,
+logical_expr::Operator,
+physical_plan::{
+expressions::{BinaryExpr, Column, Literal},
+PhysicalExpr,
+},
+scalar::ScalarValue,
+};
+use datafusion_common::internal_err;
+use datafusion_common::{config::ConfigOptions, Statistics};
+use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
+use datafusion_datasource::source::DataSourceExec;
+use datafusion_datasource::{
+file::{FileSource, FileSourceFilterPushdownResult},
+file_scan_config::FileScanConfig,
+file_stream::FileOpener,
+};
+use datafusion_physical_expr::{conjunction, PhysicalExprRef};
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+use datafusion_physical_optimizer::filter_pushdown::PushdownFilter;
+use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::{
+displayable, execution_plan::FilterSupport, 
metrics::ExecutionPlanMetricsSet,
+DisplayFormatType, ExecutionPlan,
+};
+use object_store::ObjectStore;
+use std::sync::{Arc, OnceLock};
+use std::{
+any::Any,
+fmt::{Display, Formatter},
+};
+
+/// A placeholder data source that accepts filter pushdown
+#[derive(Clone)]
+struct TestSource {
+support: FilterSupport,
+predicate: Option,
+statistics: Option,
+}
+
+impl TestSource {
+fn new(support: FilterSupport) -> Self {
+Self {
+support,
+predicate: None,
+statistics: None,
+}
+}
+}
+
+impl FileSource for TestSource {
+fn create_file_opener(
+&self,
+_object_store: Arc,
+_base_config: &FileScanConfig,
+_partition: usize,
+) -> Arc {
+todo!("should not be called")
+}
+
+fn as_any(&self) -> &dyn Any {
+todo!("should not be called")
+}
+
+fn with_batch_size(&self, _batch_size: usize) -> Arc {
+todo!("should not be called")
+}
+
+fn with_schema(&self, _schema: SchemaRef) -> Arc {
+todo!("should not be called")
+}
+
+fn with_projection(&self, _config: &FileScanConfig) -> Arc 
{
+todo!("should not be called")
+}
+
+fn with_statistics(&self, statistics: Statistics) -> Arc {
+Arc::new(TestSource {
+statistics: Some(statistics),
+..self.clone()
+})
+}
+
+fn metrics(&self) -> &ExecutionPlanMetricsSet {
+todo!("should not be called")
+}
+
+fn statistics(&self) -> datafusion_common::Result {
+Ok(self
+.statistics
+.as_ref()
+.expect("statistics not set")
+.clone())
+}
+
+fn file_type(&self) -> &str {
+"test"
+}
+
+fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+let predicate_string = self
+.predicate
+.as_ref()
+.map(|p| format!(", predicate={p}"))
+.unwrap_or_default();
+
+write!(f, "{}", predicate_string)
+}
+DisplayFormatType::TreeRender => {
+if let Some(predicate) = &self.predicate {
+writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
+}
+Ok(())
+}
+}
+}
+
+fn push_down_filters(
+&self,
+filters: &[PhysicalExprRef],
+) -> datafusion_common::Result> {
+let new = Arc::new(TestSource {
+support: self.support,
+predicate: Some(conjunction(filters.iter().map(Arc::clone))),
+statistics: self.statistics.clone(),
+});
+Ok(Some(FileSourceFilterPushdownResult::new(
+n

Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]

2025-04-05 Thread via GitHub


adriangb commented on code in PR #15566:
URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029911089


##
datafusion/physical-plan/src/execution_plan.rs:
##
@@ -467,8 +467,106 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
 ) -> Result>> {
 Ok(None)
 }
+
+/// Returns a set of filters that this operator owns but would like to be 
pushed down.
+/// For example, a `TopK` operator may produce dynamic filters that 
reference it's currrent state,
+/// while a `FilterExec` will just hand of the filters it has as is.
+/// The default implementation returns an empty vector.
+/// These filters are applied row-by row and any that return `false` or 
`NULL` will be
+/// filtered out and any that return `true` will be kept.
+/// The expressions returned **must** always return `true` or `false`;
+/// other truthy or falsy values are not allowed (e.g. `0`, `1`).
+///
+/// # Returns
+/// A vector of filters that this operator would like to push down.
+/// These should be treated as the split conjunction of a `WHERE` clause.
+/// That is, a query such as `WHERE a = 1 AND b = 2` would return two
+/// filters: `a = 1` and `b = 2`.
+/// They can always be assembled into a single filter using
+/// [`split_conjunction`][datafusion_physical_expr::split_conjunction].
+fn filters_for_pushdown(&self) -> Result>> {
+Ok(Vec::new())
+}
+
+/// Checks which filters this node allows to be pushed down through it 
from a parent to a child.
+/// For example, a `ProjectionExec` node can allow filters that only 
refernece
+/// columns it did not create through but filters that reference columns 
it is creating cannot be pushed down any further.
+/// That is, it only allows some filters through because it changes the 
schema of the data.
+/// Aggregation nodes may not allow any filters to be pushed down as they 
change the cardinality of the data.
+/// RepartitionExec nodes allow all filters to be pushed down as they 
don't change the schema or cardinality.
+fn filter_pushdown_request(

Review Comment:
   Playing around with it a little bit I'm not sure that merging 
`filters_for_pushdown` and `filter_pushdown_request` is any better. Some 
operators will care about implementing one and not the other, and merging them 
pushes complexity into the implementer which is now handled in the optimizer 
rule.
   
   I'll let you give it a try and see if you agree.
   
   I might push a fix for 
https://github.com/apache/datafusion/pull/15566/files#r2029910723 which will 
complicate things even more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org



  1   2   >