Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2049226852 ## datafusion/physical-optimizer/src/push_down_filter.rs: ## @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_physical_expr::conjunction; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter_pushdown::{ +FilterDescription, FilterPushdownResult, FilterPushdownSupport, +}; +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::ExecutionPlan; + +/// Attempts to recursively push given filters from the top of the tree into leafs. +/// +/// # Default Implementation +/// +/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op +/// that assumes that: +/// +/// * Parent filters can't be passed onto children. +/// * This node has no filters to contribute. +/// +/// # Example: Push filter into a `DataSourceExec` +/// +/// For example, consider the following plan: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. +/// +/// If this filter is selective pushing it into the scan can avoid massive +/// amounts of data being read from the source (the projection is `*` so all +/// matching columns are read). +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// # Example: Push filters with `ProjectionExec` +/// +/// Let's consider a more complex example involving a [`ProjectionExec`] +/// node in between the [`FilterExec`] and `DataSourceExec` nodes that +/// creates a new column that the filter depends on. +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │filters = │ +/// │ [cost>50,id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ProjectionExec│ +/// │ cost = price * 1.2 │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// We want to push down the filters `[id=1]` to the `DataSourceExec` node, +/// but can't push down `cost>50` because it requires the [`ProjectionExec`] +/// node to be executed first. A simple thing to do would be to split up the +/// filter into two separate filters and push down the first one: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │filters = │ +/// │ [cost>50]│ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ProjectionExec│ +/// │ cost = price * 1.2 │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// We can actually however do better by pushing down `price * 1.2 > 50` +//
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2049205041 ## datafusion/physical-optimizer/src/push_down_filter.rs: ## @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_physical_expr::conjunction; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter_pushdown::{ +FilterDescription, FilterPushdownResult, FilterPushdownSupport, +}; +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::ExecutionPlan; + +/// Attempts to recursively push given filters from the top of the tree into leafs. +/// +/// # Default Implementation +/// +/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op +/// that assumes that: +/// +/// * Parent filters can't be passed onto children. +/// * This node has no filters to contribute. +/// +/// # Example: Push filter into a `DataSourceExec` +/// +/// For example, consider the following plan: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. +/// +/// If this filter is selective pushing it into the scan can avoid massive +/// amounts of data being read from the source (the projection is `*` so all +/// matching columns are read). +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// # Example: Push filters with `ProjectionExec` +/// +/// Let's consider a more complex example involving a [`ProjectionExec`] +/// node in between the [`FilterExec`] and `DataSourceExec` nodes that +/// creates a new column that the filter depends on. +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │filters = │ +/// │ [cost>50,id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ProjectionExec│ +/// │ cost = price * 1.2 │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// We want to push down the filters `[id=1]` to the `DataSourceExec` node, +/// but can't push down `cost>50` because it requires the [`ProjectionExec`] +/// node to be executed first. A simple thing to do would be to split up the +/// filter into two separate filters and push down the first one: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │filters = │ +/// │ [cost>50]│ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ProjectionExec│ +/// │ cost = price * 1.2 │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// We can actually however do better by pushing down `price * 1.2 > 50` +//
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2813216975 @berkaysynnada if CI passes I think this is ready to merge and we can tweak later as we implement in more places 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812885176 Ok sounds good to me for now. We'll measure in our production system and if there's overhead from planning time we can come back and edit the API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812853791 > * Having 2 parameters for plans seems very strange. On the other hand, removing it forces us to make deep copies. However, when I look the copied structs, it doesn't seem to have big deal as it's done only one time. Perhaps we can figure out a way avoiding those 2 defects at the same time (via another trait, or another API's). I'm open to discuss this > * While I am worried about the `ExecutionPlan`s we have in DataFusion but even more concerning to me is the one's we don't see: custom user plans which might be quite expensive to clone. And even if it's not a big impact for this rule if we add more and more rules each with multiple recursions it's going to be a non-trivial overhead. Especially since DataFusion doesn't support prepared statements or anything like that, it has to re-plan every time a query is run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada merged PR #15566: URL: https://github.com/apache/datafusion/pull/15566 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2813394021 > Great collaboration @adriangb, thank you. I hope more will come. Me too, great stuff! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2813382081 Great collaboration @adriangb, thank you. I hope more will come. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2049207122 ## datafusion/physical-optimizer/src/push_down_filter.rs: ## @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; + +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_physical_expr::conjunction; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::filter_pushdown::{ +FilterDescription, FilterPushdownResult, FilterPushdownSupport, +}; +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::ExecutionPlan; + +/// Attempts to recursively push given filters from the top of the tree into leafs. +/// +/// # Default Implementation +/// +/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op +/// that assumes that: +/// +/// * Parent filters can't be passed onto children. +/// * This node has no filters to contribute. +/// +/// # Example: Push filter into a `DataSourceExec` +/// +/// For example, consider the following plan: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node. +/// +/// If this filter is selective pushing it into the scan can avoid massive +/// amounts of data being read from the source (the projection is `*` so all +/// matching columns are read). +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// # Example: Push filters with `ProjectionExec` +/// +/// Let's consider a more complex example involving a [`ProjectionExec`] +/// node in between the [`FilterExec`] and `DataSourceExec` nodes that +/// creates a new column that the filter depends on. +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │filters = │ +/// │ [cost>50,id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ProjectionExec│ +/// │ cost = price * 1.2 │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// We want to push down the filters `[id=1]` to the `DataSourceExec` node, +/// but can't push down `cost>50` because it requires the [`ProjectionExec`] +/// node to be executed first. A simple thing to do would be to split up the +/// filter into two separate filters and push down the first one: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │filters = │ +/// │ [cost>50]│ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ProjectionExec│ +/// │ cost = price * 1.2 │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// We can actually however do better by pushing down `price * 1.2 > 50` +/// ins
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812731132 I'm done with my turn for now. Only todo's are updating docs and reverting the tests changes. I've not yet gone into those as we might still have some changes after the meet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812795783 I have also updated the tests. Now, we don't break any existing behavior, but we can safely pushdown filters into sources -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812881155 > > * Having 2 parameters for plans seems very strange. On the other hand, removing it forces us to make deep copies. However, when I look the copied structs, it doesn't seem to have big deal as it's done only one time. Perhaps we can figure out a way avoiding those 2 defects at the same time (via another trait, or another API's). I'm open to discuss this > > While I am worried about the `ExecutionPlan`s we have in DataFusion but even more concerning to me is the one's we don't see: custom user plans which might be quite expensive to clone. And even if it's not a big impact for this rule if we add more and more rules each with multiple recursions it's going to be a non-trivial overhead. Especially since DataFusion doesn't support prepared statements or anything like that, it has to re-plan every time a query is run. If I have to select one option, I prefer the simple API honestly, and take the cloning cost as it's not a big deal for now at least. Let's listen some other people's opinion on this? For the multiple recursion issue, if we are stick to dropping plans which the filter does't reach to the source, O(N^2) is a must -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812624874 > I'll be done with my changes in an hour or less. Would you like to discuss over the code after that? Sure sounds good. Should we set up a call to move this along faster? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812623615 > Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan). > Assume E is an operator making some filtering, and F is another operator which does not allow filter push down. So, E cannot be pushed down over F. Knowing all these, can we say that all other operators(A,B,C or D --possibly having some filters) cannot also be pushed down over F? Is it a safe assumption to make? If `F` does not allow filter pushdown than anything upstream of it will not be able to push down filters into the scan (G). However there may still be filter pushdown e.g. from `B` to `E`: `TopK` <- `FilterExec` <- `F` <- `Scan`. In this case `TopK` may push down into `FilterExec` and `FilterExec` will try to push down into `Scan` but be blocked by `F`, so `FilterExec` stays where it is with it's current filters but also absorbs any filters from `TopK`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812621880 > > * I've renamed retry as revisit. Of curse it'll be better to remove it now if we can find a better way, but that will be automatically resolved once TreenodeRecursion supports revisit mechanism. There are a few other places it's needed and having some workarounds like this. > > I still don't understand why any sort of "retry" or "revisit" is necessary. I feel like if we just add a second method to ExecutionPlan and are maybe less eager about popping out `FilterExec`s this won't be needed. I'll be done with my changes in an hour or less. Would you like to discuss over the code after that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812616744 > * I've renamed retry as revisit. Of curse it'll be better to remove it now if we can find a better way, but that will be automatically resolved once TreenodeRecursion supports revisit mechanism. There are a few other places it's needed and having some workarounds like this. I still don't understand why any sort of "retry" or "revisit" is necessary. I feel like if we just add a second method to ExecutionPlan and are maybe less eager about popping out `FilterExec`s this won't be needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812614956 > Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches I'm not sure. The flow before was: 1. Recurse down with filters from parents `x = 5` (example). 1. Hit a FilterExec. It says it's okay to push down `x = 5`. 1. Pass through CoalesceBatchesExec. 1. Hit a scan that says it can't absorb the filters and returns `Unsupported`. 1. Recurse back up to the `FilterExec`. 1. Ask the `FilterExec` if it can handle `x = 5` itself. 1. FilterExec says yes, it replaces itself with a new `FilterExec`. The story is the same if there are no filters from parents except for the last two steps which are skipped and the plan is unmodified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812573049 > Related with that, I've one question: Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan). Assume E is an operator making some filtering, and F is another operator which does not allow filter push down. So, E cannot be pushed down over F. Knowing all these, can we say that all other operators(A,B,C or D --possibly having some filters) cannot also be pushed down over F? Is it a safe assumption to make? I'm enabling this selective passing feature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2812469566 Current Status: 1) Having 2 parameters for plans seems very strange. On the other hand, removing it forces us to make deep copies. However, when I look the copied structs, it doesn't seem to have big deal as it's done only one time. Perhaps we can figure out a way avoiding those 2 defects at the same time (via another trait, or another API's). I'm open to discuss this 2) I've renamed retry as revisit. Of curse it'll be better to remove it now if we can find a better way, but that will be automatically resolved once TreenodeRecursion supports revisit mechanism. There are a few other places it's needed and having some workarounds like this. 3) I'm currently on this: > Do you agree that we should go on with the plan emerging after FilterPushdown rule works only if the Filter's are joining the sources? If a Filter cannot join the source and stop at any intermediate point, then we don't accept that pushed down version and keep the original plan? Related with that, I've one question: Let's say you have a plan like A(root)<- B <- C <- D <- E <- F <- G (scan). Assume E is an operator making some filtering, and F is another operator which does not allow filter push down. So, E cannot be pushed down over F. Knowing all these, can we say that all other operators(A,B,C or D --possibly having some filters) cannot also be pushed down over F? Is it a safe assumption to make? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2811993429 > Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches Do you agree that we should go on with the plan emerging after FilterPushdown rule works only if the Filter's are joining the sources? If a Filter cannot join the source and stop at any intermediate point, then we don't accept that pushed down version and keep the original plan? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2811986744 > @berkaysynnada I updated the SLT tests. I'll have another review tomorrow but things I'd like to point out now: > > 1. We should still think about the `retry` parameter. Ideally we can get rid of it somehow. I'll try to improve those points now. I'll give an update then > 2. The SLT plans show a lot of updates where the position of the `FilterExec` was swapped with other operators. Unless we can unequivocally prove that the new position is better we should probably minimize the risk for this PR by minimizing the changes, meaning the FilterExec doesn't move in all of these plans. Yep, I've noticed that too, but how those plans didn't change before? We were trying to pushdown filters over RepartitionExec's and CoalesceBatches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2811168067 @berkaysynnada I updated the SLT tests. I'll have another review tomorrow but things I'd like to point out now: 1. We should still think about the `retry` parameter. Ideally we can get rid of it somehow. 2. The SLT plans show a lot of updates where the position of the `FilterExec` was swapped with other operators. Unless we can unequivocally prove that the new position is better we should probably minimize the risk for this PR by minimizing the changes, meaning the FilterExec doesn't move in all of these plans. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809592722 > I've also another question. When parquet_options.pushdown_filters is true, we can pushdown filters and remove the FilterExec's from the plan. When parquet_options.pushdown_filters is false, we can still pushdown filters but cannot remove the FilterExec's from the plan. > > Is this our motivation? It's a bit more complicated than that, and it may be hard to match the current behavior exactly. For now for this PR I'd say what we want is: - If `parquet_options.pushdown_filters = true` then `ParquetSource` accepts filter pushdown as exact - If `parquet_options.pushdown_filters = false` then `ParquetSource` does not accept filter pushdown at all \* \* In theory there's little harm in accepting pushdown only into stats pruning but I think that might be hard to get right / require new APIs and realistically this is mostly a temporary issue because once #3463 gets resolved the default will be `parquet_options.pushdown_filters = true` so if `false` is suboptimal it's not a big deal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809586017 > > @adriangb one point doesn't come to me obvious. I've erased a comment saying that "filter predicates reflect the output schema after applying the projection", and some tests are written based on that. Is that the case really? I'm seeing that in with_projection() API of FilterExec the predicates are kept as they are, so they are not updated according to the projection. What am I missing? > > Yeah I saw that you got rid of `reassign_predicate_columns`. Maybe I misunderstood how it's supposed to work by I thought that if you have a `FilterExec { projection: [1, 0], predicate: Column { name: "b", index: 0 } }` the predicate is referencing the indexes after `FilterExec` does it's projection. So if you want to push that predicate down to children (e.g. to the scan) you need to invert the projection / match the column with the input schema (i.e. `Column { name: "b", index: 1 }`. Okay, there is no problem with my understanding then. I'll implement the projection case as well now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809577101 > @adriangb one point doesn't come to me obvious. I've erased a comment saying that "filter predicates reflect the output schema after applying the projection", and some tests are written based on that. Is that the case really? I'm seeing that in with_projection() API of FilterExec the predicates are kept as they are, so they are not updated according to the projection. What am I missing? Yeah I saw that you got rid of `reassign_predicate_columns`. The point is that if you have a `FilterExec { projection: [1, 0], predicate: Column { name: "b", index: 0 } }` the predicate is referencing the indexes after `FilterExec` does it's projection. So if you want to push that predicate down to children (e.g. to the scan) you need to invert the projection / match the column with the input schema (i.e. `Column { name: "b", index: 1 }`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809563180 I've also another question. When parquet_options.pushdown_filters is true, we can pushdown filters and remove the FilterExec's from the plan. When parquet_options.pushdown_filters is false, we can still pushdown filters but cannot remove the FilterExec's from the plan. Is this our motivation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809547292 > @adriangb one point doesn't come to me obvious. I've erased a comment saying that "filter predicates reflect the output schema after applying the projection", and some tests are written based on that. Is that the case really? I'm seeing that in with_projection() API of FilterExec the predicates are kept as they are, so they are not updated according to the projection. What am I missing? and also in slt tests, predicate indices are set according to the input schema (original filter schema), not the output schema of the filter having a projection. Are you trying to emphasize something different than I understand? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809540717 @adriangb one point doesn't come to me obvious. I've erased a comment saying that "filter predicates reflect the output schema after applying the projection", and some tests are written based on that. Is that the case really? I'm seeing that in with_projection() API of FilterExec the predicates are kept as they are, so they are not updated according to the projection. What am I missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809515533 Great! Btw I gave you write access to our fork so you should be able to push to this branch directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809507083 > @berkaysynnada I merged your change. Still have some failing tests. Also as I said in [pydantic#26 (comment)](https://github.com/pydantic/datafusion/pull/26#discussion_r2046768875) the `retry` flag needs either a better name or we should consider how we can replace it, even if we need 2 methods on `ExecutionPlan`. Yep, I saw your comments, and address/reply them all once it passes all tests. I'm still working on -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2809500404 @berkaysynnada I merged your change. Still have some failing tests. Also as I said in https://github.com/pydantic/datafusion/pull/26#discussion_r2046768875 the `retry` flag needs either a better name or we should consider how we can replace it, even if we need 2 methods on `ExecutionPlan`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2808569749 > @berkaysynnada any luck? I can take a look tomorrow but don't want to duplicate effort I was interrupted, sorry. I'll send my part in a few hours, and ping you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2807089857 > I'm working on the failures now @berkaysynnada any luck? I can take a look tomorrow but don't want to duplicate effort -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2801793059 I'm working on the failures now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2801746776 > I've made an attempt [pydantic@2dfa8b8](https://github.com/pydantic/datafusion/commit/2dfa8b803f2103c6ff81cfa483dbb70150feeb67) > > I hope things becomes more clear now. I just tried to show the design idea, and I'm sure the final state will be much better after some polish Thank you @berkaysynnada! I merged your commit and tried to clean up the merge a bit. There are some failing tests, one at least seems like real bug with the `swap_remove` in https://github.com/pydantic/datafusion/blob/cda6e8d6b74e860e9978abf988c00d5a00d1ac07/datafusion/physical-optimizer/src/filter_pushdown.rs#L424. I have to run for a bit but if you can take a look that'd be great 🙏🏻 I think other than that the main thing we need is adding / better docs for the new APIs (in particular on `ExecutionPlan`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2801349750 I've made an attempt https://github.com/pydantic/datafusion/pull/25/commits/2dfa8b803f2103c6ff81cfa483dbb70150feeb67 I hope things becomes more clear now. I just tried to show the design idea, and I'm sure the final state will be much better after some polish -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2797543695 Thank you as well! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
ozankabak commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2797532385 All right -- we will submit a PR early next week and get it merged ASAP to enable you to carry on. We will also keep on collaborating with you for subsequent PRs as this functionality is important to us. Thanks for the awesome collaboration -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796969760 > I don't think dynamic vs. static is the right distinction to make here. I did it since your examples were on dynamic filters. I just wanted to show dynamic filters case will not be a problem at all. > IMO what really needs to happen is that each node gets to decide what to do with the remaining filters, and be able to distinguish between the filters it offered up and the ones that came down from parents. We can do that without any difficulty with the proposed design > So for example a `SortExec` will "handle" the filter it injected by doing nothing with it and will transmit up the chain that it could not handle any of the filters that were passed down. These are all trivial to implement. If all things will become more clear, I can clone this branch, and update the optimizer and execution plan API according to the proposal, and submit the alternative PR to address the questions in your mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796862617 > Could you help clarify when the FilterExec nodes get inserted? Maybe some examples with DataSourceExecs that do not accept any filters would help. You can look at the notion doc when the current node is AggregateExec. remaining_filters is the indicator of FilterExec requirement I've also thought on Dynamic filter case. Actually they are more flexible than normal filters. Normal filters are pulled off from the operator's itself, and pushed down as far as possible, and when it's not possible, a FilterExec is put accordingly. For the dynamic filters, as they are derived from normal filters, the actual filter's itself is not pulled off. So, like normal filters, dynamic filters also are pushed down along the plan, and when it's not possible to do further, again a normal FilterExec can be inserted for the Dynamic filters as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2797315179 > @adriangb perhaps we can work on creating a new PR (stacked on this one) that hooks everything up for dynamic filter pushdown. That way we can have things ready to go once we get an implementation `PhysicalExpr` pushdown I think I'd rather wait for @berkaysynnada to make a PR to this PR or however the new proposal gets introduced and we merge that, otherwise my stacked PR will immediately be out of date. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796865438 > One more question: it seems like in all cases we end up eagerly cloning every node: `Join::try_new`. If I understand correctly this may even happen twice per node as we do the passes. > > Is this acceptable performance wise? I get that most things are Arc'ed but that's still a lot of copying. > > In my implementation I took a lot of care to make sure that if there were no filters being pushed, etc there was zero copying / node replacement, but maybe I overestimated the impact. You don't need to worry about that. I'm sure we can figure out a way of avoiding redundant clones -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796986491 If you're able to submit a PR with your proposal I think the best course of action would be to merge that ASAP and iterate from there by looking at challenges that may or may not arise from concrete implementations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796949025 > I've also thought on Dynamic filter case. Actually they are more flexible than normal filters. Normal filters are pulled off from the operator's itself, and pushed down as far as possible, and when it's not possible, a FilterExec is put accordingly. For the dynamic filters, as they are derived from normal filters, the actual filter's itself is not pulled off. So, like normal filters, dynamic filters also are pushed down along the plan, and when it's not possible to do further, again a normal FilterExec can be inserted for the Dynamic filters as well I don't think dynamic vs. static is the right distinction to make here. IMO what really needs to happen is that each node gets to decide what to do with the remaining filters, and be able to distinguish between the filters it offered up and the ones that came down from parents. So for example a `SortExec` will "handle" the filter it injected by doing nothing with it and will transmit up the chain that it could not handle any of the filters that were passed down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796684310 One more question: it seems like in all cases we end up eagerly cloning every node: `Join::try_new`. If I understand correctly this may even happen twice per node as we do the passes. Is this acceptable performance wise? I get that most things are Arc'ed but that's still a lot of copying. In my implementation I took a lot of care to make sure that if there were no filters being pushed, etc there was zero copying / node replacement, but maybe I overestimated the impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796659881 Could you help clarify when the FilterExec nodes get inserted? Maybe some examples with DataSourceExecs that do not accept any filters would help. In particular I'm interested in the case of: TopK generates dynamic filter -> Coalesce -> Repartition -> DataSourceExec that rejects all filters As well as TopK generates dynamic filter -> Existing FilterExec -> Coalesce -> Repartition -> DataSourceExec that rejects all filters TopK generates dynamic filter -> Coalesce -> Repartition -> Existing FilterExec -> DataSourceExec that rejects all filters -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796648147 > So, joins are like this (this example for joins which prefer keeping all filter expressions inside themselves) If the join prefers keeping a filter exec, not add the predicates to its internal, then you can just return them as the remaining filters, instead of rewriting and updating the join itself -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796643229 So, joins are like this (this example for joins which prefer keeping all filter expressions inside themselves) ``` fn try_pushdown_filters(&self, fd: FilterDescription) -> Result { // collect all filters let mut all_filters = fd; all_filters.extend(self.filter); let mut self_filters = vec![]; let mut left_filters = vec![]; let mut right_filters = vec![]; // extract child filters for filter in all_filters { if all_left_columns(filter) { left_filters.push(filter); } else if all_right_columns(filter) { right_filters.push(filter); } else { self_filters.push(filter); } } // get the new join with updated filter expressions: let new_self = Join::try_new(self.left_input, self.right_input, ..., filter: self_filters); FilterPushdownResult {child_filters: vec![left_filters, right_filters], remaining_filters: [], operator: new_self} } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796620968 Here is a join example: assuming this initial plan: ``` - FilterExec: [a@0 = foo, a@0 > x@1] - Join: [schema: {left: a@0, right: x@0}, filter: {x@0 is even AND a@0 < x@0 + 10}] - DataSource: [a] - DataSource: [x] ``` The rule is shaped like that from my pseudo docs: ```rust context.transform_down(|node| { let FilterPushdownResult { child_filters, remaining_filters, operator } = node.plan.try_pushdown_filters(node.data)?; if remaining_filters.is_empty() { for child_index in 0..self.children.len { node.children[child_index].data = child_filters[child_index]; } return Ok(Transformed::yes(node)); } else { node.plan = FilterExec::try_new(predicate: remaining_filters, input: node.plan); node.children = [node.clone().clear_node_data()]; node.data = Default::default(); } }) ``` 1st iteration on FilterExec: ``` child_filters: [[a@0 = foo, a@0 = x@1]] remaining_filters: [] operator: Join… if condition comes up true ⇒ Join node has [a@0 = foo, a@0 > x@1] as the node data ``` 2nd iteration on JoinExec: ``` child_filters: [[a@0=foo], [x@0 is even]] remaining_filters: [] operator: Join: [schema: {left: a@0, right: x@0}, filter: {a@0 > x@1 AND a@0 < x@0 + 10}] if condition comes up true ⇒ LeftDataSource node has [a@0 = foo], RightDataSource node has [x@0 is even] as the node data ``` 3rd iteration on LeftDataSource ``` child_filters: [[]] remaining_filters: [] operator: LeftDataSource: filter: [a@0 = foo] ``` 4th iteration on RightDataSource ``` child_filters: [[]] remaining_filters: [] operator: LeftDataSource: filter: [x@0 is even] ``` PS. I’m not considering dynamic filters in this example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796558633 Sounds good -- thanks @ozankabak . We can wait a few days and see what the status. @adriangb perhaps we can work on creating a new PR (stacked on this one) that hooks everything up for dynamic filter pushdown. That way we can have things ready to go once we get an implementation `PhysicalExpr` pushdown -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796560380 > I think the new design is sound, much simpler, and more inline with how almost all other plan alterations in DF work. We can add a join example to the doc quickly, and if everybody agrees we can proceed with it. In my opinion everyone is aligned on the proposed API changes, and there is no reason to hold off trying to implement them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796545275 There's no specific urgency, I think we're all just excited to get on and build on top of this but more importantly test out designs against real implementation to flesh out any unknowns. If the timeline is 1-2 days I think we can just wait to merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
ozankabak commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2796149727 @alamb and @adriangb: Is there a specific urgency to get this merged ASAP? If so, we can do it and then replace the API. However, unless there is a good reason, I think it is a bad idea to have API churn, and increase the total amount of work just to avoid the extra 1-2 days work on this PR. Once this PR merges and the API starts to get used, it becomes hard to replace it (I'm going through this in equivalence code these days). I think the new design is sound and much simpler -- we can add a join example to the doc too, and if everybody agrees we can proceed with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787642658 Sure sounds good thank you for reviewing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036146656 ## datafusion/datasource/src/source.rs: ## @@ -254,3 +284,13 @@ impl DataSourceExec { }) } } + +/// Create a new `DataSourceExec` from a `DataSource` Review Comment: I don't feel strongly either way. I must have missed the existing function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2789854759 @alamb could you chime in on the proposal of moving recursion back into the optimizer? Happy to re-evaluate if you think that makes sense. If you decide to leave it as is I would love to move this to merge 🙏🏻. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787637557 > > I think we should leave this open for a while to let anyone else who wants time to review (such as @berkaysynnada ) but I think it is mergable > > @berkaysynnada would love to get your input here, hoping we can confirm these APIs will work for the JOIN use cases 🙏🏻 I'm sorry, cannot finish the review yet(looking at https://github.com/apache/datafusion/pull/15568 as well). I could understand what you are trying to do, and both PR's seem very good, but I am taking notes of some points to discuss. After finalizing these 2, I'll share them with you (hopefully tomorrow morning). Thank you for your patience 😞 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031897452 ## datafusion/physical-optimizer/src/filter_pushdown.rs: ## @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{config::ConfigOptions, DataFusionError, Result}; +use datafusion_physical_plan::{ +execution_plan::ExecutionPlanFilterPushdownResult, ExecutionPlan, +}; + +use crate::PhysicalOptimizerRule; + +/// A physical optimizer rule that pushes down filters in the execution plan. +/// See [`ExecutionPlan::try_pushdown_filters`] for a detailed description of the algorithm. +#[derive(Debug)] +pub struct PushdownFilter {} Review Comment: this rule is a thing of beauty ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,508 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::{FileSource, FileSourceFilterPushdownResult}, +file_scan_config::FileScanConfig, +file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, filter_pushdown::FilterPushdownSupport, +metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: Option) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031970050 ## datafusion/physical-plan/src/filter_pushdown.rs: ## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Result of trying to push down fitlers to a child plan. +/// This is used by [`FilterPushdownResult`] to indicate whether the filter was +/// "absorbed" by the child ([`FilterPushdownSupport::Exact`]) or not +/// ([`FilterPushdownSupport::Unsupported`]). +/// If the filter was not absorbed, the parent plan must apply the filter +/// itself, or return to the caller that it was not pushed down. +/// If the filter was absorbed, the parent plan can drop the filter or +/// tell the caller that it was pushed down by forwarding on the [`FilterPushdownSupport::Exact`] +/// information. +#[derive(Debug, Clone, Copy)] +pub enum FilterPushdownSupport { +/// Filter may not have been pushed down to the child plan, or the child plan +/// can only partially apply the filter but may have false positives (but not false negatives). +/// In this case the parent **must** behave as if the filter was not pushed down +/// and must apply the filter itself. +Unsupported, Review Comment: Okay point taken: I will re-use the same three variant enum and names and alias the existing one to the new one to eliminate duplication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2032067603 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,508 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::{FileSource, FileSourceFilterPushdownResult}, +file_scan_config::FileScanConfig, +file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, filter_pushdown::FilterPushdownSupport, +metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: Option) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self, _config: &FileScanConfig) -> Arc { +todo!("should not be called") +} + +fn with_statistics(&self, statistics: Statistics) -> Arc { +Arc::new(TestSource { +statistics: Some(statistics), +..self.clone() +}) +} + +fn metrics(&self) -> &ExecutionPlanMetricsSet { +todo!("should not be called") +} + +fn statistics(&self) -> Result { +Ok(self +.statistics +.as_ref() +.expect("statistics not set") +.clone()) +} + +fn file_type(&self) -> &str { +"test" +} + +fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { +match t { +DisplayFormatType::Default | DisplayFormatType::Verbose => { +let predicate_string = self +.predicate +.as_ref() +.map(|p| format!(", predicate={p}")) +.unwrap_or_default(); + +write!(f, "{}", predicate_string) +} +DisplayFormatType::TreeRender => { +if let Some(predicate) = &self.predicate { +writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; +} +Ok(()) +} +} +} + +fn try_pushdown_filters( +&self, +filters: &[PhysicalExprRef], +
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794719805 > I tried to summarize our proposal with @ozankabak > https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f?pvs=73 Thank you @berkaysynnada -- I think this design sounds very reasonable to me and I would be happy to see it in DataFusion. However I don't think it fundamentally changes the filter pushdown that is possible so it is not clear to me that implementing a different API should hold up this PR (which is ready to go with comments and tests). Unless there some fundamental technical objections that I have missed, what I suggest is: 1. We merge this PR as is to unblock work on https://github.com/apache/datafusion/issues/15037 2. In parallel you work on the [API in the proposal](https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f?pvs=73) and when it is ready we can review it / merge it quickly If we are going to change the implementation anyways, I don't think doing so as a follow on PR is any more work than doing it in this PR and I expect the downstream impacts to be be minimal. The only potential concern I can see with this plan is API churn -- specifically changing a public API (especially as DF 47 nears release). I think in this case we could add a comment like the following as a hint to any downstream users that we plan to update the API shortly ```rust /// NOTE this API is subject to change in the near future ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794727022 > The rule part is more or less is like that. Now, let’s examine the try_pushdown_filters() impls for some operators I think it might also help to think through how it would work for a Join where a predicate could be pushed into one input but not the other and that had to remap the columns correctly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794723836 BTW I don't seem to be able to leave comments on https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f so I will leave some here ```rust /// Result of try_pushdown_filters(), storing the necessary information about a filter pushdown trial struct FilterPushdownResult { // Filter predicates which can be pushed down through the operator. // NOTE that these are not placed into any operator. // NEW: NOTE these filters may be different than passed down if the operator changes schema (like projection) // * and may be different for different children (like for join) child_filters: Vec, // Filters which cannot be pushed down through the operator. // NOTE that caller of try_pushdown_filters() should handle these remanining predicates, // possibly introducing a FilterExec on top of this operator. remaining_filters: FilterDescription // Possibly updated new operator operator: Arc } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794616294 Thanks folks! I was not aware of `PlanContext`, seems like a nifty little helper. I'm actually talking with @alamb in a few minutes to try to push this forward. We'll try to grok your proposal together and maybe that will allow me to move forward with it myself, it would be great if you can join! https://meet.google.com/poy-aenf-snh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2792203825 > The TLDR is that because each filter may be allowed or not, and maybe transformed, to be pushed down into each child we end up with a matrix of filters x children that we need to ask the node for. For example, imagine joins, each of its children needs to be treated differently and only some filters can be pushed into some children. That meant that as you suggest it had to be at least 2 methods on ExecutionPlan (2 with complex APIs or 3 simpler ones). You can properly (and IMO more easily) apply the pushdown logic in the rule recursion as well. There are many rules doing that. > The recursion is also a bit wonky because you need to: > > 1. Recurse down and on your way up transmit not only the new nodes but also the filter support result. That's exactly the intended usage of PlanContext > 2. There's jumps, eg when you hit a node that doesn't support pushdown but still want to recurse into sub-trees. That's not a problem as well in transform_down() when you say Recursion::Continue and Transformed::No > 3. You're carrying around non-trivial context as you go down. That's now a problem as well in PlanContext. > Having implemented it both ways I've come to the conclusion that trying to generalize the recursion into the optimizer rules makes things harder for both the simple cases (FilterExec, RepartitionExec) and the complex cases (HashJoinExec, ProjectionExec). Doing it this way makes the simple implementations trivial and the complex ones will be complex but will be self contained and not have to fit into some rigid APIs. You already keep the operator specific parts in the ExecutionPlan API. For example if I give an example on the simplest case: ```rust pub fn try_pushdown_filters_to_input( plan: &Arc, input: &Arc, parent_filters: &[PhysicalExprRef], config: &ConfigOptions, ) -> Result>> { match input.try_pushdown_filters(input, parent_filters, config)? { FilterPushdownResult::NotPushed => { // No pushdown possible, keep this child as is Ok(FilterPushdownResult::NotPushed) } FilterPushdownResult::Pushed { updated: inner, support, } => { // We have a child that has pushed down some filters let new_inner = with_new_children_if_necessary(Arc::clone(plan), vec![inner])?; Ok(FilterPushdownResult::Pushed { updated: new_inner, support, }) } } } ``` Here, you will not need to do the recursive call, and the last part of with_new_children...() logic. As you've mentioned, maybe a single API wouldn't be enough, but perhaps having 2 or 3 is what we really need to comply separation of concerns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793342766 Sounds good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
ozankabak commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793246314 I think we can work on this PR, a new PR may not be necessary. We will share a design document with you describing why we think a variation of the structure @alamb proposed is likely sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2794211343 I tried to summarize our proposal with @ozankabak https://synnada.notion.site/FilterPushdown-API-Proposal-1d1f46d2dab1802e80a7e1bccec2604f?pvs=73 @adriangb @alamb I hope the design is clear enough and captures everyone's input. Let me know if anything feels off or if you'd like to iterate further -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793196830 > We will get this over the finish line in a few days. So you're going to make a PR to replace this one? Please do consider my example below. There's a lot more complexity than the simple API that @alamb proposed. I really think that: > Ultimately the issue in my mind is that how the recursion proceeds is in great part determined by each ExecutionPlan and that makes it hard to generalize into a set of standard APIs. We can try to come up with a set of APIs to describe all behaviors but: 1. It's going to be a complex API that relies on multiple methods being kept in sync / indexes mapping correctly. 2. I wouldn't be confident that it captures all behavior, e.g. in writing the original version I kept realizing that I'd missed edge cases with projections, joins, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2037441578 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: Option) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self, _config: &FileScanConfig) -> Arc { +todo!("should not be called") +} + +fn with_statistics(&self, statistics: Statistics) -> Arc { +Arc::new(TestSource { +statistics: Some(statistics), +..self.clone() +}) +} + +fn metrics(&self) -> &ExecutionPlanMetricsSet { +todo!("should not be called") +} + +fn statistics(&self) -> Result { +Ok(self +.statistics +.as_ref() +.expect("statistics not set") +.clone()) +} + +fn file_type(&self) -> &str { +"test" +} + +fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { +match t { +DisplayFormatType::Default | DisplayFormatType::Verbose => { +let predicate_string = self +.predicate +.as_ref() +.map(|p| format!(", predicate={p}")) +.unwrap_or_default(); + +write!(f, "{}", predicate_string) +} +DisplayFormatType::TreeRender => { +if let Some(predicate) = &self.predicate { +writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; +} +Ok(()) +} +} +} + +fn try_pushdown_filters( +&self, +filters: &[PhysicalExprRef], +
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
ozankabak commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2793101507 > Maybe it is possible to move the recursion into the optimizer rule but still keep a `ExecutionPlan` method by making a complex call signature, maybe something like this: > > ```rust > struct FilterPushdownArgs { > // expressions > filters: Vec> > } > > /// return from trying to push the filters down > struct FilterPushdownResult { > // filters that could potentially be pushed down to each child > child_filters: Vec>, > // filters that could not be handled by this node internally > remaining_filters: Vec> > // new_self > new_self: Arc > } > > impl ExecutionPlan { > fn try_pushdown_filters(&self, args: FilterPushdownArgs) -> Result { > .. > } > } > ``` This sort of thing is exactly what I had in mind. This enables us to do a clean separation between the API and the rule. @berkaysynnada and I discussed this in detail and we will circle back with a write-up including a more-fleshed out version of this. We will get this over the finish line in a few days. Thanks for the awesome collaboration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2792250700 @adriangb I don’t want you to go back and forth excessively. Since you’ve spent a lot of time on this and are supporting the current version as the easiest and most understandable way to extend, I’m fine with following your decision. It seems there are pros/cons in both approaches (though I’m still in favor of rule recursion and API separation), but we need to move forward by picking one. I don’t think switching to the other mechanism would require much effort, so I might try it out and see how it ends up (but I can’t spend time on that right now unfortunately). I don’t know if this makes sense to you, but here’s one last design suggestion: What if we define a new trait like `DynamicFilterPlan: ExecutionPlan` and add all the required APIs to it? Then the main driver logic in the optimizer could check whether a plan is just an ExecutionPlan (handled by the default logic, possibly configurable for transparent operators too), or if it’s a DynamicFilterPlan, which has all the necessary APIs implemented to allow continued pushdown. TLDR, I’ve made one last design suggestion above. If it doesn’t make sense or isn’t applicable, feel free to ignore it. We can merge this as is. This is a great PR, and what we’ve been discussing isn’t a major issue either way. Thank you again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036861732 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: Option) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self, _config: &FileScanConfig) -> Arc { +todo!("should not be called") +} + +fn with_statistics(&self, statistics: Statistics) -> Arc { +Arc::new(TestSource { +statistics: Some(statistics), +..self.clone() +}) +} + +fn metrics(&self) -> &ExecutionPlanMetricsSet { +todo!("should not be called") +} + +fn statistics(&self) -> Result { +Ok(self +.statistics +.as_ref() +.expect("statistics not set") +.clone()) +} + +fn file_type(&self) -> &str { +"test" +} + +fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { +match t { +DisplayFormatType::Default | DisplayFormatType::Verbose => { +let predicate_string = self +.predicate +.as_ref() +.map(|p| format!(", predicate={p}")) +.unwrap_or_default(); + +write!(f, "{}", predicate_string) +} +DisplayFormatType::TreeRender => { +if let Some(predicate) = &self.predicate { +writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; +} +Ok(()) +} +} +} + +fn try_pushdown_filters( +&self, +filters: &[PhysicalExprRef], +
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036813659 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, Review Comment: yes, that also works -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2791076569 > Maybe it is possible to move the recursion into the optimizer rule but still keep a `ExecutionPlan` method by making a complex call signature, maybe something like this: > I think my original suggestions were around trying to simplify the changes to ExecutionPlan trait. As I recall the original draft PR had at least three new methods on `ExecutionPlan` that had to be kept in sync: > > 1. Could the node itself handle any filters internally? > 2. Could the node push filters past itself (the equivalent of what `try_swap_with_projection` does)? > 3. Did the node produce any dynamic filters? > > Maybe it is possible to move the recursion into the optimizer rule but still keep a `ExecutionPlan` method by making a complex call signature, maybe something like this: Yes precisely. I think we can collapse (1) & (2) at the cost of a more complex API for the collapsed method. But I think we'll always need 2 methods because of how the recursion works: the current node can't know what "remainder" filters it has the option to handle until we've pushed down into its children, but to push down into its children we first need to ask it which filters it has / allows to be pushed down to each child. I can try to rework to go back to something like that, but I expect the APIs to be much more complex for _all_ cases (not just for the actually complicated ones) and the diff is going to be much larger. Are there concrete things that the current approach falls short on that moving the recursion into the OptimizerRule would solve? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2791020940 Maybe it is possible to move the recursion into the optimizer rule but still keep a `ExecutionPlan` method by making a complex call signature, maybe something like this: ```rust struct FilterPushdownArgs { // expressions filters: Vec> } /// return from trying to push the filters down struct FilterPushdownResult { // filters that could potentially be pushed down to each child child_filters: Vec>, // filters that could not be handled by this node internally remaining_filters: Vec> // new_self new_self: Arc } impl ExecutionPlan { fn try_pushdown_filters(&self, args: FilterPushdownArgs) -> Result { .. } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2791020563 > It was @alamb that suggested we do it this way, unless I misunderstood his suggestion. > > I think it's possible to do the recursion as an optimizer rule but making the APIs flexible enough to handle all of the cases with joins, etc. gets very complicated especially because the recursion needs to probe and jump, transmit information back up, etc. I think my original suggestions were around trying to simplify the changes to ExecutionPlan trait. As I recall the original draft PR had at least three new methods on `ExecutionPlan` that had to be kept in sync: 1. Could the node itself handle any filters internally? 2. Could the node push filters past itself (the equivalent of what `try_swap_with_projection` does)? 3. Did the node produce any dynamic filters? By combining the operations into a single method that did all three at once the ExecutionPlan changes were much smaller. I don't remember any opinions about the recursion but I may not have made that clear -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036145638 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: Option) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self, _config: &FileScanConfig) -> Arc { +todo!("should not be called") +} + +fn with_statistics(&self, statistics: Statistics) -> Arc { +Arc::new(TestSource { +statistics: Some(statistics), +..self.clone() +}) +} + +fn metrics(&self) -> &ExecutionPlanMetricsSet { +todo!("should not be called") +} + +fn statistics(&self) -> Result { +Ok(self +.statistics +.as_ref() +.expect("statistics not set") +.clone()) +} + +fn file_type(&self) -> &str { +"test" +} + +fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { +match t { +DisplayFormatType::Default | DisplayFormatType::Verbose => { +let predicate_string = self +.predicate +.as_ref() +.map(|p| format!(", predicate={p}")) +.unwrap_or_default(); + +write!(f, "{}", predicate_string) +} +DisplayFormatType::TreeRender => { +if let Some(predicate) = &self.predicate { +writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; +} +Ok(()) +} +} +} + +fn try_pushdown_filters( +&self, +filters: &[PhysicalExprRef], +
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036084527 ## datafusion/expr/src/filter_pushdown.rs: ## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Result of attempting to push down a filter/predicate expression. +/// +/// This is used by: +/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at the physical plan level +/// (e.g. pushing down dynamic fitlers from a hash join into scans). +/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning partitions). +/// +/// There are three possible outcomes of a filter pushdown: +/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is not understood. +/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may not be exact. Review Comment: Makes sense. The variants were chosen to match the existing `TableProviderFilterPushdown`, which @alamb requested specifically. IMO we could have 2 variants here: `Unsupported` and `Exact` and it would be clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
geoffreyclaude commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2036042823 ## datafusion/expr/src/filter_pushdown.rs: ## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Result of attempting to push down a filter/predicate expression. +/// +/// This is used by: +/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at the physical plan level +/// (e.g. pushing down dynamic fitlers from a hash join into scans). Review Comment: ```suggestion /// (e.g. pushing down dynamic filters from a hash join into scans). ``` ## datafusion/expr/src/filter_pushdown.rs: ## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Result of attempting to push down a filter/predicate expression. +/// +/// This is used by: +/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at the physical plan level +/// (e.g. pushing down dynamic fitlers from a hash join into scans). +/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning partitions). +/// +/// There are three possible outcomes of a filter pushdown: +/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is not understood. +/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may not be exact. Review Comment: `Inexact` kind of hints that the filter might prune too much. How about `Partial` or some other word that makes it clearer only false positives (rows that should have been filtered but weren't) are possible? In this context it's obvious of course, but without the context a bit less. Eg, someone implementing a custom exec node. ## datafusion/expr/src/filter_pushdown.rs: ## @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Result of attempting to push down a filter/predicate expression. +/// +/// This is used by: +/// * `FilterPushdownResult` in `ExecutionPlan` to do predicate pushdown at the physical plan level +/// (e.g. pushing down dynamic fitlers from a hash join into scans). +/// * `TableProvider` to do predicate pushdown at planning time (e.g. pruning partitions). +/// +/// There are three possible outcomes of a filter pushdown: +/// * [`FilterPushdown::Unsupported`] - the filter could not be applied / is not understood. +/// * [`FilterPushdown::Inexact`] - the filter could be applied, but it may not be exact. +/// The caller should treat this the same as [`FilterPushdown::Unsupported`] for the most part +/// and must not assume that any pruning / filter was done since there may be false positive
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
ozankabak commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2790048758 This is a great contribution and is very close to merge. However, let's make sure the design is right to avoid API churn. Why do you think `try_swap_with_projection` is similar? I don't see any recursion in it -- an optimizer rule handles the recursion if I'm not missing something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2790824023 For comparison, here is roughly what I had before doing the recursion as part of the `OptimizerRule`: https://github.com/pydantic/datafusion/blob/fbf93a2bdd0a5c1532336026dfa71ac7305c1655/datafusion/physical-optimizer/src/filter_pushdown.rs As you can see it ends up splitting the flow up into multiple steps: 1. Ask the current node for any filters it wants to push down and which filters can be pushed down (possibly with modification) into which children. 2. Recurse into the children. 3. Carry the result of the pushdown for each filter back up. 4. Combine results for each child together to get an overall pushed/not pushed for each filter. 5. Ask the current node if it can handle any of the remaining filters. 6. Return to the caller. This all gets pretty complicated and tracking the state is convoluted. And it doesn't even handle certain edge cases that I think are easyish to handle with the new APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2790778558 Some implementations do recurse I think, for similar reasons to our recursion here: https://github.com/pydantic/datafusion/blob/f8a6384bdf21b2eeb7bcfe3f08e52712735bb285/datafusion/physical-plan/src/projection.rs#L534-L535 It was @alamb that suggested we do it this way, unless I misunderstood his suggestion. I think it's possible to do the recursion as an optimizer rule but making the APIs flexible enough to handle all of the cases with joins, etc. gets very complicated especially because the recursion needs to probe and jump, transmit information back up, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035424292 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: Option) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self, _config: &FileScanConfig) -> Arc { +todo!("should not be called") +} + +fn with_statistics(&self, statistics: Statistics) -> Arc { +Arc::new(TestSource { +statistics: Some(statistics), +..self.clone() +}) +} + +fn metrics(&self) -> &ExecutionPlanMetricsSet { +todo!("should not be called") +} + +fn statistics(&self) -> Result { +Ok(self +.statistics +.as_ref() +.expect("statistics not set") +.clone()) +} + +fn file_type(&self) -> &str { +"test" +} + +fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { +match t { +DisplayFormatType::Default | DisplayFormatType::Verbose => { +let predicate_string = self +.predicate +.as_ref() +.map(|p| format!(", predicate={p}")) +.unwrap_or_default(); + +write!(f, "{}", predicate_string) +} +DisplayFormatType::TreeRender => { +if let Some(predicate) = &self.predicate { +writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; +} +Ok(()) +} +} +} + +fn try_pushdown_filters( +&self, +filters: &[PhysicalExprRef], +
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035397116 ## datafusion/datasource/src/source.rs: ## @@ -254,3 +284,13 @@ impl DataSourceExec { }) } } + +/// Create a new `DataSourceExec` from a `DataSource` Review Comment: @alamb sneaked this bit in, wdyt Andrew? ## datafusion/physical-expr/src/utils/mod.rs: ## @@ -47,6 +47,31 @@ pub fn split_conjunction( split_impl(Operator::And, predicate, vec![]) } +/// Create a conjunction of the given predicates. +/// If the input is empty, return a literal true. +/// If the input contains a single predicate, return the predicate. +/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`). +pub fn conjunction( +predicates: impl IntoIterator>, +) -> Arc { +conjunction_opt(predicates).unwrap_or_else(|| crate::expressions::lit(true)) +} + +/// Create a conjunction of the given predicates. Review Comment: There's actually several places where it's useful to know if the result is `lit(true)` because you handle that differently. E.g. `FilterExec` drops itself out of the plan. And it's only a small bit more code to have here to avoid having to match `lit(true)` in other places. ## datafusion/physical-plan/src/filter_pushdown.rs: ## @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use datafusion_expr::FilterPushdown; +use datafusion_physical_expr::PhysicalExprRef; + +/// The combined result of a filter pushdown operation. +/// This includes: +/// * The inner plan that was produced by the pushdown operation. +/// * The support for each filter that was pushed down. +pub enum FilterPushdownResult { Review Comment: It can also be a FileSource, etc. I will try to tweak the comments to use the terminology "node" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035176129 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, Review Comment: If I don't misunderstand, DataSource's or ExecutionPlan's don't store or keep this FilterPushdown in their internal states. That info just passes through the operators during the pushdown attempt. So, maybe we can remove this "support" field, and instead define 2 different TestSource one with support and one without support (or simply define a generic boolean). I suggest this to not misguide people considering this test as an example usage. ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction,
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2035387395 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,529 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_common::{internal_err, Result}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, +}; +use datafusion_expr::test::function_stub::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::{ +aggregate::AggregateExprBuilder, conjunction, Partitioning, PhysicalExprRef, +}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter_pushdown::{FilterPushdown, FilterPushdownResult}; +use datafusion_physical_plan::{ +aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, +coalesce_batches::CoalesceBatchesExec, +filter::FilterExec, +repartition::RepartitionExec, +}; +use datafusion_physical_plan::{ +displayable, metrics::ExecutionPlanMetricsSet, DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone, Default)] +struct TestSource { +support: Option, Review Comment: I think that would be quite a bit more code, but I'll add a comment saying this is not meant to be an example and that folks should look at ParquetSource instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2789742021 @berkaysynnada the original implementation had recursion in the optimizer rule. It was @alamb suggestion to do it this way, which mirrors the existing `try_swap_with_projection` method. The TLDR is that because each filter may be allowed or not, and maybe transformed, to be pushed down into each child we end up with a matrix of filters x children that we need to ask the node for. For example, imagine joins, each of its children needs to be treated differently and only some filters can be pushed into some children. That meant that as you suggest it had to be at least 2 methods on ExecutionPlan (2 with complex APIs or 3 simpler ones). The recursion is also a bit wonky because you need to: 1. Recurse down and on your way up transmit not only the new nodes but also the filter support result. 2. There's jumps, eg when you hit a node that doesn't support pushdown but still want to recurse into sub-trees. 3. You're carrying around non-trivial context as you go down. Having implemented it both ways I've come to the conclusion that trying to generalize the recursion into the optimizer rules makes things harder for both the simple cases (FilterExec, RepartitionExec) and the complex cases (HashJoinExec, ProjectionExec). Doing it this way makes the simple implementations trivial and the complex ones will be complex but will be self contained and not have to fit into some rigid APIs. All said and done since the implementation is a lot simpler this way, because there is precedent for it in `try_swap_with_projection` and because I'm more confident in this API being flexible enough to handle joins, etc I would prefer to keep it as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2031961859 ## datafusion/physical-plan/src/execution_plan.rs: ## @@ -467,6 +468,383 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + +/// A physical optimizer rule that pushes down filters in the execution plan. Review Comment: Here are some documentation suggestions: - Suggestions for docs: https://github.com/pydantic/datafusion/pull/23 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787124497 done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787081064 👍 will do asap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2033630457 ## datafusion/physical-plan/src/execution_plan.rs: ## @@ -467,6 +468,439 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + +/// Attempts to recursively push given filters into this `ExecutionPlan` or +/// its children and push any filters from this node into its children. +/// +/// This is used to implement filter pushdown in the physical plan. Note +/// that DataFusion also implements filter pushdown in the logical plan, +/// which is a different code path. This method is here to support +/// additional optimizations that may be only be possible in the physical +/// plan such as dynamic filtering (see below). +/// +/// See [`try_pushdown_filters_to_input`] for a simple implementation +/// +/// # Arguments +/// * `plan`: `Arc`d instance of self +/// * `parent_filters`: A vector of [`PhysicalExpr`]s from the parent of this node +///to try and push down Review Comment: ```suggestion /// to try and push down ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2787071875 Can we get the one final clippy thing fixed here so we can merge this PR in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2786441314 > I think we should leave this open for a while to let anyone else who wants time to review (such as @berkaysynnada ) but I think it is mergable @berkaysynnada would love to get your input here, hoping we can confirm these APIs will work for the JOIN use cases 🙏🏻 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
alamb commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2784577003 Here is a proposal to simplify the API: https://github.com/pydantic/datafusion/pull/24 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2032103422 ## datafusion/physical-plan/src/filter_pushdown.rs: ## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Result of trying to push down fitlers to a child plan. +/// This is used by [`FilterPushdownResult`] to indicate whether the filter was +/// "absorbed" by the child ([`FilterPushdownSupport::Exact`]) or not +/// ([`FilterPushdownSupport::Unsupported`]). +/// If the filter was not absorbed, the parent plan must apply the filter +/// itself, or return to the caller that it was not pushed down. +/// If the filter was absorbed, the parent plan can drop the filter or +/// tell the caller that it was pushed down by forwarding on the [`FilterPushdownSupport::Exact`] +/// information. +#[derive(Debug, Clone, Copy)] +pub enum FilterPushdownSupport { +/// Filter may not have been pushed down to the child plan, or the child plan +/// can only partially apply the filter but may have false positives (but not false negatives). +/// In this case the parent **must** behave as if the filter was not pushed down +/// and must apply the filter itself. +Unsupported, Review Comment: Done: e3c7343 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029972339 ## datafusion/physical-plan/src/execution_plan.rs: ## @@ -467,6 +468,353 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + +/// A physical optimizer rule that pushes down filters in the execution plan. +/// For example, consider the following plan: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +/// │projection = *│ +/// └──┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the `FilterExec` node to the `DataSourceExec` node. +/// If this filter is selective it can avoid massive amounts of data being read from the source (the projection is `*` so all matching columns are read). +/// In this simple case we: +/// 1. Enter the recursion with no filters. +/// 2. We find the `FilterExec` node and it tells us that it has a filter (see [`ExecutionPlan::filters_for_pushdown`] and `datafusion::physical_plan::filter::FilterExec`). +/// 3. We recurse down into it's children (the `DataSourceExec` node) now carrying the filters `[id = 1]`. +/// 4. The `DataSourceExec` node tells us that it can handle the filter and we mark it as handled exact (see [`ExecutionPlan::with_filter_pushdown_result`]). +/// 5. Since the `DataSourceExec` node has no children we recurse back up the tree. +/// 6. We now tell the `FilterExec` node that it has a child that can handle the filter and we mark it as handled exact (see [`ExecutionPlan::with_filter_pushdown_result`]). +///The `FilterExec` node can now return a new execution plan, either a copy of itself without that filter or if has no work left to do it can even return the child node directly. +/// 7. We recurse back up to `CoalesceBatchesExec` and do nothing there since it had no filters to push down. +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──┐ +/// │ CoalesceBatchesExec │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │DataSourceExec│ +// │projection = *│ +// │ filters = [ id=1] │ +/// └──┘ +/// ``` +/// +/// Let's consider a more complex example involving a `ProjectionExec` node in betweeen the `FilterExec` and `DataSourceExec` nodes that creates a new column that the filter depends on. +/// +/// ```text +// ┌──┐ +// │ CoalesceBatchesExec │ +// └──┘ +// │ +// ▼ +// ┌──┐ +// │ FilterExec │ +// │filters = │ +// │ [cost>50,id=1] │ +// └──┘ +// │ +// ▼ +// ┌──┐ +// │ProjectionExec│ +// │ cost = price * 1.2 │ +// └──┘ +// │ +// ▼ +// ┌──┐ +// │DataSourceExec│ +// │projection = *│ +// └──┘ +/// ``` +/// +/// We want to push down the filters [id=1] to the `DataSourceExec` node, but can't push down `cost>50` because it requires the `ProjectionExec` node to be executed first: +/// +/// ```text +// ┌──┐ +// │ CoalesceBatchesExec │ +// └──┘ +// │ +// ▼ +// ┌──┐ +// │ FilterExec │ +// │filters = │ +// │ [cost>50]│ +// └──┘ +// │ +// ▼ +// ┌──┐ +// │ProjectionExec│ +// │ cost = price * 1.2 │ +// └──┘ +// │ +// ▼ +// ┌──┐ +// │DataSourceExec│ +// │projection = *│ +// │ filters = [ id=1] │ +// └──┘ +/// ``` +/// +/// There are also cases where we may be able to push down filters within a subtree but not the entire tree. +/// A good exmaple of this is aggreagation nodes: +/// +/// ```text +/// ┌──┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──┘ +/// │ +/// ▼ +/// ┌──┐ +/// │ F
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
berkaysynnada commented on PR #15566: URL: https://github.com/apache/datafusion/pull/15566#issuecomment-2781418886 I'll take a look at this as well asap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029972523 ## datafusion/core/tests/physical_optimizer/filter_pushdown.rs: ## @@ -0,0 +1,322 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::{ +datasource::object_store::ObjectStoreUrl, +logical_expr::Operator, +physical_plan::{ +expressions::{BinaryExpr, Column, Literal}, +PhysicalExpr, +}, +scalar::ScalarValue, +}; +use datafusion_common::internal_err; +use datafusion_common::{config::ConfigOptions, Statistics}; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource::{ +file::{FileSource, FileSourceFilterPushdownResult}, +file_scan_config::FileScanConfig, +file_stream::FileOpener, +}; +use datafusion_physical_expr::{conjunction, PhysicalExprRef}; +use datafusion_physical_expr_common::physical_expr::fmt_sql; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::{ +displayable, execution_plan::FilterSupport, metrics::ExecutionPlanMetricsSet, +DisplayFormatType, ExecutionPlan, +}; +use object_store::ObjectStore; +use std::sync::{Arc, OnceLock}; +use std::{ +any::Any, +fmt::{Display, Formatter}, +}; + +/// A placeholder data source that accepts filter pushdown +#[derive(Clone)] +struct TestSource { +support: FilterSupport, +predicate: Option, +statistics: Option, +} + +impl TestSource { +fn new(support: FilterSupport) -> Self { +Self { +support, +predicate: None, +statistics: None, +} +} +} + +impl FileSource for TestSource { +fn create_file_opener( +&self, +_object_store: Arc, +_base_config: &FileScanConfig, +_partition: usize, +) -> Arc { +todo!("should not be called") +} + +fn as_any(&self) -> &dyn Any { +todo!("should not be called") +} + +fn with_batch_size(&self, _batch_size: usize) -> Arc { +todo!("should not be called") +} + +fn with_schema(&self, _schema: SchemaRef) -> Arc { +todo!("should not be called") +} + +fn with_projection(&self, _config: &FileScanConfig) -> Arc { +todo!("should not be called") +} + +fn with_statistics(&self, statistics: Statistics) -> Arc { +Arc::new(TestSource { +statistics: Some(statistics), +..self.clone() +}) +} + +fn metrics(&self) -> &ExecutionPlanMetricsSet { +todo!("should not be called") +} + +fn statistics(&self) -> datafusion_common::Result { +Ok(self +.statistics +.as_ref() +.expect("statistics not set") +.clone()) +} + +fn file_type(&self) -> &str { +"test" +} + +fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { +match t { +DisplayFormatType::Default | DisplayFormatType::Verbose => { +let predicate_string = self +.predicate +.as_ref() +.map(|p| format!(", predicate={p}")) +.unwrap_or_default(); + +write!(f, "{}", predicate_string) +} +DisplayFormatType::TreeRender => { +if let Some(predicate) = &self.predicate { +writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; +} +Ok(()) +} +} +} + +fn push_down_filters( +&self, +filters: &[PhysicalExprRef], +) -> datafusion_common::Result> { +let new = Arc::new(TestSource { +support: self.support, +predicate: Some(conjunction(filters.iter().map(Arc::clone))), +statistics: self.statistics.clone(), +}); +Ok(Some(FileSourceFilterPushdownResult::new( +n
Re: [PR] ExecutionPlan: add APIs for filter pushdown & optimizer rule to apply them [datafusion]
adriangb commented on code in PR #15566: URL: https://github.com/apache/datafusion/pull/15566#discussion_r2029911089 ## datafusion/physical-plan/src/execution_plan.rs: ## @@ -467,8 +467,106 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + +/// Returns a set of filters that this operator owns but would like to be pushed down. +/// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state, +/// while a `FilterExec` will just hand of the filters it has as is. +/// The default implementation returns an empty vector. +/// These filters are applied row-by row and any that return `false` or `NULL` will be +/// filtered out and any that return `true` will be kept. +/// The expressions returned **must** always return `true` or `false`; +/// other truthy or falsy values are not allowed (e.g. `0`, `1`). +/// +/// # Returns +/// A vector of filters that this operator would like to push down. +/// These should be treated as the split conjunction of a `WHERE` clause. +/// That is, a query such as `WHERE a = 1 AND b = 2` would return two +/// filters: `a = 1` and `b = 2`. +/// They can always be assembled into a single filter using +/// [`split_conjunction`][datafusion_physical_expr::split_conjunction]. +fn filters_for_pushdown(&self) -> Result>> { +Ok(Vec::new()) +} + +/// Checks which filters this node allows to be pushed down through it from a parent to a child. +/// For example, a `ProjectionExec` node can allow filters that only refernece +/// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further. +/// That is, it only allows some filters through because it changes the schema of the data. +/// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data. +/// RepartitionExec nodes allow all filters to be pushed down as they don't change the schema or cardinality. +fn filter_pushdown_request( Review Comment: Playing around with it a little bit I'm not sure that merging `filters_for_pushdown` and `filter_pushdown_request` is any better. Some operators will care about implementing one and not the other, and merging them pushes complexity into the implementer which is now handled in the optimizer rule. I'll let you give it a try and see if you agree. I might push a fix for https://github.com/apache/datafusion/pull/15566/files#r2029910723 which will complicate things even more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org