[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394046#comment-16394046
 ] 

Rong Rong edited comment on FLINK-8690 at 3/10/18 5:52 AM:
-----------------------------------------------------------

That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_f1)
FROM (SELECT f1 AS DIST_f1 GROUP BY f1)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 


was (Author: walterddr):
That should resolve our problem partially. The real reason why I introduced 
another node before logical plan is because 
**AggregateExpandDistinctAggregatesRule** is actually calcite specific and will 
apply globally regardless of whether it is on DataSet or DataStream and that's 
the rule we want to avoid applying in DataStream API.

Basically it converts 

{code:java}
/COUNT (DISTINCT f1)
{code}

Into 

{code:java}
COUNT (DIST_A)
FROM (SELECT A AS DIST_A GROUP BY A)
{code}

There's always a way to unite these two operators together using 
FlinkLogicalAggregateConverter. But it seems counter-intuitive first split then 
unite them together. 

> Update logical rule set to generate FlinkLogicalAggregate explicitly allow 
> distinct agg on DataStream
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8690
>                 URL: https://issues.apache.org/jira/browse/FLINK-8690
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Rong Rong
>            Assignee: Rong Rong
>            Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to