Hi Chang, Thanks for working on this.
Could you please explain how your proposal can be extended to the file-based data sources? Since at least half of the Spark community are using file-based data sources, I think any designs should consider the file-based data sources as well. I work on both sql-based and file-based data sources, and I understand that they are very different. It’s challenging to have a design to work for both, but the current filter push down and column pruning have been designed nicely to fit both sides. I think we should follow the same approach to make Aggregate push down work for both too. I am currently collaborating with the Apple Spark team and Facebook Spark team to push down Aggregate to file-based data sources. We are doing some ongoing work right now to push down Max/Min/Count to parquet and later to ORC to utilize the statistics information there ( https://github.com/apache/spark/pull/32049). Please correct me if I am wrong: it seems to me that your proposal doesn't consider file-based data sources at all and will stop us from continuing our work. Let's schedule a meeting to discuss this? Thanks, Huaxin On Wed, Apr 7, 2021 at 1:32 AM Chang Chen <baibaic...@gmail.com> wrote: > hi huaxin > > please review https://github.com/apache/spark/pull/32061 > > as for add a *trait PrunedFilteredAggregateScan* for V1 JDBC, I delete > trait, since V1 DataSource needn't support aggregation push down > > Chang Chen <baibaic...@gmail.com> 于2021年4月5日周一 下午10:02写道: > >> Hi huaxin >> >> What I am concerned about is abstraction >> >> 1. How to extend sources.Aggregation. Because Catalyst Expression >> is recursion, it is very bad to define a new hierarchy, I think >> ScanBuilder >> must convert pushed expressions to its format. >> 2. The optimization rule is also an extended point, I didn't see any >> consideration on join push down. I also think >> SupportsPushDownRequiredColumns and SupportsPushDownFilters are >> problematic. >> >> Obviously, File Based Source and SQL Based Source are quite different on >> push down capabilities. I am not sure they can be consolidated into one API. >> >> I will push my PR tomorrow, and after that, could we schedule a meeting >> to discuss the API? >> >> huaxin gao <huaxin.ga...@gmail.com> 于2021年4月5日周一 上午2:24写道: >> >>> Hello Chang, >>> >>> Thanks for proposing the SPIP and initiating the discussion. However, I >>> think the problem with your proposal is that you haven’t taken into >>> consideration file-based data sources such as parquet, ORC, etc. As far as >>> I know, most of the Spark users have file-based data sources. As a matter >>> of fact, I have customers waiting for Aggregate push down for Parquet. >>> That’s the reason I have my current implementation, which has a unified >>> Aggregate push down approach for both the file-based data sources and JDBC. >>> >>> I discussed with several members of the Spark community recently, and we >>> have agreed to break down the Aggregate push down work into the following >>> steps: >>> >>> 1. >>> >>> Implement Max, Min and Count push down in Parquet >>> 2. >>> >>> Add a new physical plan rewrite rule to remove partial aggregate. We >>> can optimize one more step to remove ShuffleExchange if the group by >>> column >>> and partition col are the same. >>> 3. >>> >>> Implement Max, Min and Count push down in JDBC >>> 4. >>> >>> Implement Sum and Avg push down in JDBC >>> >>> >>> I plan to implement Aggregate push down for Parquet first for now. The >>> reasons are: >>> >>> 1. >>> >>> It’s relatively easier to implement Parquet Aggregate push down than >>> JDBC. >>> >>> >>> 1. >>> >>> Only need to implement Max, Min and Count >>> 2. >>> >>> No need to deal with the differences between Spark and other >>> databases. For example, aggregating decimal values have different >>> behaviours between database implementations. >>> >>> The main point is that we want to keep the PR minimal and support the >>> basic infrastructure for Aggregate push down first. Actually, the PR for >>> implementing Parquet Aggregate push down is already very big. We don’t want >>> to have a huge PR to solve all the problems. It’s too hard to review. >>> >>> >>> 1. >>> >>> I think it’s too early to implement the JDBC Aggregate push down for >>> now. Underneath, V2 DS JDBC still calls the V1 DS JDBC path. If we >>> implement JDBC Aggregate push down now, we still need to add a *trait >>> PrunedFilteredAggregateScan* for V1 JDBC. One of the major >>> motivations that we are having V2 DS is that we want to improve the >>> flexibility of implementing new operator push down by avoiding adding a >>> new >>> push down trait. If we still add a new pushdown trait in V1 DS JDBC, I >>> feel >>> we are defeating the purpose of having DS V2. So I want to wait until we >>> fully migrate to DS V2 JDBC, and then implement Aggregate push down for >>> JDBC. >>> >>> >>> I have submitted Parquet Aggregate push down PR. Here is the link: >>> >>> https://github.com/apache/spark/pull/32049 >>> >>> >>> Thanks, >>> >>> Huaxin >>> >>> >>> On Fri, Apr 2, 2021 at 1:04 AM Chang Chen <baibaic...@gmail.com> wrote: >>> >>>> The link is broken. I post a PDF version. >>>> >>>> Chang Chen <baibaic...@gmail.com> 于2021年4月2日周五 下午3:57写道: >>>> >>>>> Hi All >>>>> >>>>> We would like to post s SPIP of Datasource V2 SQL PushDown in Spark. >>>>> Here is document link: >>>>> >>>>> >>>>> https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9 >>>>> >>>>> This SPIP aims to make pushdown more extendable. >>>>> >>>>> I would like to thank huaxin gao, my prototype is based on her PR. I >>>>> will submit a PR ASAP >>>>> >>>>> Thanks >>>>> >>>>> Chang. >>>>> >>>>