[ 
https://issues.apache.org/jira/browse/FLINK-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954739#comment-15954739
 ] 

radu commented on FLINK-6260:
-----------------------------

[~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] 
[~stefano.bortoli]

As per [~fhueske] suggestion in FLINK-6249 I have created a separate JIRA for 
the case of supporting distinct over group by.

> Distinct Aggregates for Group By Windows
> ----------------------------------------
>
>                 Key: FLINK-6260
>                 URL: https://issues.apache.org/jira/browse/FLINK-6260
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: radu
>              Labels: features
>
> Time target: ProcTime/EventTime
> SQL targeted query examples:
> ------------
> Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for 
> the elements of the aggregate(s)
> `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() 
> TO HOUR)`
> Q2. Distinct is applied to the collection of outputs to be selected.
> `SELECT STREAM DISTINCT procTime(), prodId  FROM stream1 GROUP BY 
> FLOOR(procTime() TO DAY)`
> =>  DISTINCT operation makes sense only within the context of windows or some 
> bounded defined structures. Otherwise the operation would keep an infinite 
> amount of data to ensure uniqueness and would not trigger for certain 
> functions (e.g. aggregates)
> =>  We can follow the same design/implementation as for JIRA FLINK-6249 
> (supporting Distinct Aggregates for OVER Windows)
> => We can consider as a sub-JIRA issue the implementation of DISTINCT for 
> select clauses. 
> =>   Aggregations over distinct elements without any boundary (i.e.     
> within SELECT clause) do not make sense just as aggregations do not     make 
> sense without groupings or windows.
> If distinct is applied as in Q1 example on group elements than either we 
> define a new implementation if selection is general or extend the current 
> implementation of grouped aggregates with distinct group aggregates
> If distinct is applied as in Q2 example for the select all elements, then a 
> new implementation needs to be defined. This would work over a specific 
> window / processFunction and within the processing function the uniqueness of 
>  the results to be processed will be done. This will happen for each 
> partition. The data structure used to trace distinct elements will be reset 
> with each new window (i.e., group by scope)
>       
> Examples
> ------------
> `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `
> `Q2: SELECT  COUNT(DISTINCT  b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO 
> HOUR) `
> ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||
> ||10:00:01|   (ab,1)|           |   | 
> ||10:05:00| (aa,2)|       |   | 
> ||11:00:00|         | ab,aa | 2 | 
> ||11:03:00|   (aa,2)|       |   |
> ||11:09:00|   (aa,2 |       |   | 
> ||12:00:00|             | aa    | 1 |
> |...|
> Implementation option
> ---------------------
> Considering that the behavior is similar as the one implemented for  over 
> window behavior (with the difference that the distinction is reset for each , 
> group scope), the implementation will be done by reusing the existing 
> implementation of the over window functions.  Distinction can be achieved 
> within the aggregate itself or within the window/processfunction logic that 
> computes the aggregates. As multiple aggregates which require distinction can 
> be computed in the same time, the preferred option is to create distinction 
> within the process logic. For the case of selecting distinct outputs (i.e., 
> not aggregates) we can follow the same implementation design: support 
> distinction in the aggregation and than emitting only one output per each 
> element saw (instead of calling aggregate method of the aggregates) . 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to