Hi Chang,

Thanks for working on this.

Could you please explain how your proposal can be extended to the
file-based data sources? Since at least half of the Spark community are
using file-based data sources, I think any designs should consider the
file-based data sources as well. I work on both sql-based and file-based
data sources, and I understand that they are very different. It’s
challenging to have a design to work for both, but the current filter push
down and column pruning have been designed nicely to fit both sides. I
think we should follow the same approach to make Aggregate push down work
for both too.

I am currently collaborating with the Apple Spark team and Facebook Spark
team to push down Aggregate to file-based data sources. We are doing some
ongoing work right now to push down Max/Min/Count to parquet and later to
ORC to utilize the statistics information there (
https://github.com/apache/spark/pull/32049). Please correct me if I am
wrong: it seems to me that your proposal doesn't consider file-based data
sources at all and will stop us from continuing our work.

Let's schedule a meeting to discuss this?

Thanks,

Huaxin



On Wed, Apr 7, 2021 at 1:32 AM Chang Chen <baibaic...@gmail.com> wrote:

> hi huaxin
>
> please review https://github.com/apache/spark/pull/32061
>
> as for add a *trait PrunedFilteredAggregateScan* for V1 JDBC, I delete
> trait, since V1 DataSource needn't support aggregation push down
>
> Chang Chen <baibaic...@gmail.com> 于2021年4月5日周一 下午10:02写道:
>
>> Hi huaxin
>>
>> What I am concerned about is abstraction
>>
>>    1. How to extend sources.Aggregation. Because Catalyst Expression
>>    is recursion, it is very bad to define a new hierarchy, I think 
>> ScanBuilder
>>    must convert pushed expressions to its format.
>>    2. The optimization rule is also an extended point, I didn't see any
>>    consideration on join push down. I also think
>>    SupportsPushDownRequiredColumns and SupportsPushDownFilters are
>>    problematic.
>>
>> Obviously, File Based Source and SQL Based Source are quite different on
>> push down capabilities. I am not sure they can be consolidated into one API.
>>
>> I will push my PR tomorrow, and after that, could we schedule a meeting
>> to discuss the API?
>>
>> huaxin gao <huaxin.ga...@gmail.com> 于2021年4月5日周一 上午2:24写道:
>>
>>> Hello Chang,
>>>
>>> Thanks for proposing the SPIP and initiating the discussion. However, I
>>> think the problem with your proposal is that you haven’t taken into
>>> consideration file-based data sources such as parquet, ORC, etc. As far as
>>> I know, most of the Spark users have file-based data sources.  As a matter
>>> of fact, I have customers waiting for Aggregate push down for Parquet.
>>> That’s the reason I have my current implementation, which has a unified
>>> Aggregate push down approach for both the file-based data sources and JDBC.
>>>
>>> I discussed with several members of the Spark community recently, and we
>>> have agreed to break down the Aggregate push down work into the following
>>> steps:
>>>
>>>    1.
>>>
>>>    Implement Max, Min and Count push down in Parquet
>>>    2.
>>>
>>>    Add a new physical plan rewrite rule to remove partial aggregate. We
>>>    can optimize one more step to remove ShuffleExchange if the group by 
>>> column
>>>    and partition col are the same.
>>>    3.
>>>
>>>    Implement Max, Min and Count push down in JDBC
>>>    4.
>>>
>>>    Implement Sum and Avg push down in JDBC
>>>
>>>
>>> I plan to implement Aggregate push down for Parquet first for now. The
>>> reasons are:
>>>
>>>    1.
>>>
>>>    It’s relatively easier to implement Parquet Aggregate push down than
>>>    JDBC.
>>>
>>>
>>>    1.
>>>
>>>    Only need to implement  Max, Min and Count
>>>    2.
>>>
>>>    No need to deal with the differences between Spark and other
>>>    databases. For example, aggregating decimal values have different
>>>    behaviours between database implementations.
>>>
>>> The main point is that we want to keep the PR minimal and support the
>>> basic infrastructure for Aggregate push down first. Actually, the PR for
>>> implementing Parquet Aggregate push down is already very big. We don’t want
>>> to have a huge PR to solve all the problems. It’s too hard to review.
>>>
>>>
>>>    1.
>>>
>>>    I think it’s too early to implement the JDBC Aggregate push down for
>>>    now. Underneath, V2 DS JDBC still calls the V1 DS JDBC path. If we
>>>    implement JDBC Aggregate push down now, we still need to add a *trait
>>>    PrunedFilteredAggregateScan* for V1 JDBC. One of the major
>>>    motivations that we are having V2 DS is that we want to improve the
>>>    flexibility of implementing new operator push down by avoiding adding a 
>>> new
>>>    push down trait. If we still add a new pushdown trait in V1 DS JDBC, I 
>>> feel
>>>    we are defeating the purpose of having DS V2. So I want to wait until we
>>>    fully migrate to DS V2 JDBC, and then implement Aggregate push down for
>>>    JDBC.
>>>
>>>
>>> I have submitted Parquet Aggregate push down PR. Here is the link:
>>>
>>> https://github.com/apache/spark/pull/32049
>>>
>>>
>>> Thanks,
>>>
>>> Huaxin
>>>
>>>
>>> On Fri, Apr 2, 2021 at 1:04 AM Chang Chen <baibaic...@gmail.com> wrote:
>>>
>>>> The link is broken. I post a PDF version.
>>>>
>>>> Chang Chen <baibaic...@gmail.com> 于2021年4月2日周五 下午3:57写道:
>>>>
>>>>> Hi All
>>>>>
>>>>> We would like to post s SPIP of Datasource V2 SQL PushDown in Spark.
>>>>> Here is document link:
>>>>>
>>>>>
>>>>> https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9
>>>>>
>>>>> This SPIP aims to make pushdown more extendable.
>>>>>
>>>>> I would like to thank huaxin gao, my prototype is based on her PR. I
>>>>> will submit a PR ASAP
>>>>>
>>>>> Thanks
>>>>>
>>>>> Chang.
>>>>>
>>>>

Reply via email to