I think filter expressions and grouping sets are semantic arguments instead
of utilities.
If we want to push them into sources, the connector developers should be
aware of them.
Wrapping them in a context implicitly is error-prone that the existing
connector will produce wrong results
 when upgrading to new Flink versions (as we are pushing
grouping_sets/filter_args, but connector ignores it).
I think for these cases, providing a new default method to override might
be a better choice.

Best,
Jark

On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi,
>
> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
> 1)). Can we push it down? I'm not sure that a single call expression can
> express it, and how we should embody it and convey it to users.
>
> Best,
> Jingsong
>
> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com> wrote:
>
>> Hi Jark,
>>
>> I don't want to limit this interface to LocalAgg Push down. Actually,
>> sometimes, we can push whole aggregation to source too.
>>
>> So, this rule can do something more advanced. For example, we can push
>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>> f2)". Then, we need to add more information to push down.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote:
>>
>>> I think this may be over designed. We should have confidence in the
>>> interface we design, the interface should be stable.
>>> Wrapping things in a big context has a cost of losing user convenience.
>>> Foremost, we don't see any parameters to add in the future. Do you know
>>> any potential parameters?
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com> wrote:
>>>
>>>> Hi Sebastian,
>>>>
>>>> Well, I mean:
>>>>
>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>> aggregateExpressions, DataType producedDataType);`
>>>> VS
>>>> ```
>>>> boolean applyAggregates(Aggregation agg);
>>>>
>>>> interface Aggregation {
>>>>   int[] groupingFields();
>>>>   List<CallExpression> aggregateExpressions();
>>>>   DataType producedDataType();
>>>> }
>>>> ```
>>>>
>>>> Maybe I've over considered it, but I think Aggregation is a complicated
>>>> thing. Maybe we need to extend its parameters in the future, so make the
>>>> parameters interface, which is conducive to the future expansion without
>>>> destroying the compatibility of user implementation. If it is the way
>>>> before, users need to modify the code.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <liuyang0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jinsong,
>>>>>
>>>>> Thx a lot for your suggestion. These points really need to be clear in
>>>>> the proposal.
>>>>>
>>>>> For the semantic problem, I think the main point is the different
>>>>> returned data types
>>>>> for the target aggregate function and the row format returned by the
>>>>> underlying storage.
>>>>> That's why we provide the producedDataType in the
>>>>> SupportsAggregatePushDown interface.
>>>>> Need to let developers know that we need to handle the semantic
>>>>> differences between
>>>>> the underlying storage system and Flink in related connectors.
>>>>> [Supplemented in proposal]
>>>>>
>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>>>>> it's also a key point.
>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule set,
>>>>> and better to put it
>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>
>>>>> For the scalability of the interface, actually I don't exactly
>>>>> understand your suggestion. Is it to add
>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>> interface, and holds the
>>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>>>> DataType producedDataType`
>>>>> fields?
>>>>>
>>>>> Looking forward to your further feedback or guidance.
>>>>>
>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道:
>>>>>
>>>>>> Thanks for your proposal! Sebastian.
>>>>>>
>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>>>>> solved
>>>>>> many of my concerns.
>>>>>>
>>>>>> ## Semantic problems
>>>>>>
>>>>>> We may need to add some mechanisms or comments, because as far as I
>>>>>> know,
>>>>>> the semantics of each database is actually different, which may need
>>>>>> to be
>>>>>> reflected in your specific implementation.
>>>>>>
>>>>>> For example, the AVG output types of various databases may be
>>>>>> different.
>>>>>> For example, MySQL outputs double, this is different from Flink. What
>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But we
>>>>>> also
>>>>>> need care about decimal and others)
>>>>>>
>>>>>> ## The phase of push-down rule
>>>>>>
>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>> which may
>>>>>> make the cost calculation very troublesome.
>>>>>> So in PHYSICAL_REWRITE?
>>>>>>
>>>>>> ## About interface
>>>>>>
>>>>>> For scalability, I slightly recommend that we introduce an `Aggregate`
>>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>>> int[]
>>>>>> groupingFields, DataType producedDataType` fields. In this way, we
>>>>>> can add
>>>>>> fields easily without breaking compatibility.
>>>>>>
>>>>>> I think the current design is very good, just put forward some ideas.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <liuyang0...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Jark,
>>>>>> >
>>>>>> > Thx for your further feedback and help. The interface of
>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>> >
>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>> TableSource can
>>>>>> > handle all of the aggregates.
>>>>>> > It's better to just return a boolean type to indicate whether all of
>>>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>>>> >
>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>>> type of
>>>>>> > the new table source to make sure that the new table source can
>>>>>> > connect with the related exchange node. The format of this
>>>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>>>> type.
>>>>>> > The reason for adding this parameter in this function is also to
>>>>>> facilitate
>>>>>> > the user to build the final output type. I have changed this
>>>>>> parameter
>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>> >
>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed
>>>>>> to use
>>>>>> > groupingFields. [Resolved in proposal]
>>>>>> >
>>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>>> >
>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道:
>>>>>> >
>>>>>> > > I'm also +1 for idea#2.
>>>>>> > >
>>>>>> > > Regarding to the updated interface,
>>>>>> > >
>>>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>> > >
>>>>>> > > final class Result {
>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>> > > }
>>>>>> > >
>>>>>> > > I have following comments:
>>>>>> > >
>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>> return type
>>>>>> > > enough?
>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>> accepted,
>>>>>> > > otherwise the pushdown should fail.
>>>>>> > >     Therefore, users don't need to distinguish which aggregates
>>>>>> are
>>>>>> > > "accepted".
>>>>>> > >
>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data type
>>>>>> of the
>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>> concat a
>>>>>> > final
>>>>>> > > output type.
>>>>>> > >     The return type of each agg function can be obtained from the
>>>>>> > > `CallExpression`.
>>>>>> > >
>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>>> > > `groupedFields` ?
>>>>>> > >     The `groupSet` may confuse users that it relates to "grouping
>>>>>> sets".
>>>>>> > >
>>>>>> > >
>>>>>> > > What do you think?
>>>>>> > >
>>>>>> > > Best,
>>>>>> > > Jark
>>>>>> > >
>>>>>> > >
>>>>>> > >
>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com> wrote:
>>>>>> > >
>>>>>> > >> Sorry for the typo -_-!
>>>>>> > >> I meant idea #2.
>>>>>> > >>
>>>>>> > >> Best,
>>>>>> > >> Kurt
>>>>>> > >>
>>>>>> > >>
>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>> liuyang0...@gmail.com>
>>>>>> > >> wrote:
>>>>>> > >>
>>>>>> > >>> Hi Kurt,
>>>>>> > >>>
>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more like a
>>>>>> > >>> physical operator rather than logical
>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>> handle all in
>>>>>> > >>> the physical optimization phase?
>>>>>> > >>>
>>>>>> > >>> Looking forward for the further discussion.
>>>>>> > >>>
>>>>>> > >>>
>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道:
>>>>>> > >>>
>>>>>> > >>>> Local aggregation is more like a physical operator rather than
>>>>>> logical
>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>> > >>>>
>>>>>> > >>>> Best,
>>>>>> > >>>> Kurt
>>>>>> > >>>>
>>>>>> > >>>>
>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>> liuyang0...@gmail.com>
>>>>>> > >>>> wrote:
>>>>>> > >>>>
>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>> suggestions.
>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the
>>>>>> new
>>>>>> > table
>>>>>> > >>>> > source api,
>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>> optimize
>>>>>> > >>>> rule. If
>>>>>> > >>>> > the new rule
>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>>>>> later. I
>>>>>> > >>>> have
>>>>>> > >>>> > change to use
>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>> definition
>>>>>> > in
>>>>>> > >>>> above
>>>>>> > >>>> > proposal.
>>>>>> > >>>> >
>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>> choice, and
>>>>>> > >>>> have
>>>>>> > >>>> > resolved this
>>>>>> > >>>> > comment in the proposal.
>>>>>> > >>>> >
>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as
>>>>>> we don't
>>>>>> > >>>> have
>>>>>> > >>>> > Druid connector
>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>> > >>>> >
>>>>>> > >>>> > But perhaps the biggest question may be whether we should
>>>>>> use idea 1
>>>>>> > >>>> or
>>>>>> > >>>> > idea 2 in proposal.
>>>>>> > >>>> >
>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>> proposal,
>>>>>> > our
>>>>>> > >>>> team
>>>>>> > >>>> > can drive to
>>>>>> > >>>> > complete this feature.
>>>>>> > >>>> >
>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道:
>>>>>> > >>>> >
>>>>>> > >>>> > > Hi Sebastian,
>>>>>> > >>>> > >
>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>> improvement for
>>>>>> > >>>> Flink
>>>>>> > >>>> > SQL.
>>>>>> > >>>> > > I went through the design doc and have following thoughts:
>>>>>> > >>>> > >
>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>>>>> > proposed
>>>>>> > >>>> a new
>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update
>>>>>> your
>>>>>> > >>>> proposal to
>>>>>> > >>>> > > use the new interfaces?
>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>> > >>>> > >
>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>> > >>>> representation
>>>>>> > >>>> > than
>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would
>>>>>> be
>>>>>> > easier
>>>>>> > >>>> to
>>>>>> > >>>> > know
>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>> > >>>> > >
>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>> supported
>>>>>> > in
>>>>>> > >>>> the
>>>>>> > >>>> > > plan?
>>>>>> > >>>> > >
>>>>>> > >>>> > > Best,
>>>>>> > >>>> > > Jark
>>>>>> > >>>> > >
>>>>>> > >>>> > >
>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>> > liuyang0...@gmail.com>
>>>>>> > >>>> > wrote:
>>>>>> > >>>> > >
>>>>>> > >>>> > > > Hi all,
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done
>>>>>> at Flink
>>>>>> > >>>> layer.
>>>>>> > >>>> > > > With the developing of storage, many downstream storage
>>>>>> of Flink
>>>>>> > >>>> SQL
>>>>>> > >>>> > has
>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>>>>> > >>>> performance
>>>>>> > >>>> > from
>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>> overhead.
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>> > >>>> > > >
>>>>>> > >>>> > > >
>>>>>> > >>>> > >
>>>>>> > >>>> >
>>>>>> > >>>>
>>>>>> >
>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > --
>>>>>> > >>>> > > >
>>>>>> > >>>> > > > *With kind regards
>>>>>> > >>>> > > >
>>>>>> ------------------------------------------------------------
>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>>> Science
>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> > >>>> > > > QQ: 3239559*
>>>>>> > >>>> > > >
>>>>>> > >>>> > >
>>>>>> > >>>> >
>>>>>> > >>>> >
>>>>>> > >>>> > --
>>>>>> > >>>> >
>>>>>> > >>>> > *With kind regards
>>>>>> > >>>> > ------------------------------------------------------------
>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> > >>>> > QQ: 3239559*
>>>>>> > >>>> >
>>>>>> > >>>>
>>>>>> > >>>
>>>>>> > >>>
>>>>>> > >>> --
>>>>>> > >>>
>>>>>> > >>> *With kind regards
>>>>>> > >>> ------------------------------------------------------------
>>>>>> > >>> Sebastian Liu 刘洋
>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> > >>> QQ: 3239559*
>>>>>> > >>>
>>>>>> > >>>
>>>>>> >
>>>>>> > --
>>>>>> >
>>>>>> > *With kind regards
>>>>>> > ------------------------------------------------------------
>>>>>> > Sebastian Liu 刘洋
>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> > QQ: 3239559*
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *With kind regards
>>>>> ------------------------------------------------------------
>>>>> Sebastian Liu 刘洋
>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>> Mobile\WeChat: +86—15201613655
>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>> QQ: 3239559*
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>

Reply via email to