Hi Stefano,

I don't think we should integrate this with LogicalWindowAggregate which is
meant for GroupBy windows and not Over windows.
Moreover, LogicalWindowAggregate is on the logical plan level but we need
to implement a physical operator, i.e., a DataStreamRel.
Calcite parses the SQL query into the logical representation already. The
windowing semantics is captured in the LogicalProject / LogicalCalc.
Radu pointed [1] out that it makes sense to apply a rule to extract the
window semantics into a LogicalWindow with a Calcite optimization rule.

>From there we should add DataStreamRel that creates the required DataStream
transformations and functions and the corresponding translation rule that
converts LogicalWindow / LogicalCalc into the DataStreamRel.

Btw. It would be good to move this discussion to the JIRA issue. Just
replying to the CREATE mail will not mirror the discussion on JIRA.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-5654

2017-02-06 15:26 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>:

> Hi Fabian,
>
> After working around the rule, I am moving towards the implementation of
> the Aggregation function.
>
> I started working extending DataStreamRel (for which I created a Java
> version). However, I noticed the LogicalWindowAggregate provides the list
> of aggregatedCalls and other parameters. Perhaps it is a better idea to
> start extending this one. But I may not be aware of some implications
> related to this choice. What do you think?
>
> My first idea was to implement a WindowAggregateUtil class including some
> methods to extract and perhaps interpret window parameters (e.g.
> boundaries, aggregate calls, parameter pointers, etc. )
>
> Dr. Stefano Bortoli
> Senior Research Engineer - Big Data and Semantic Technology Expert
> IT R&D Division
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> -----Original Message-----
> From: Fabian Hueske [mailto:fhue...@gmail.com]
> Sent: Thursday, February 02, 2017 1:48 PM
> To: dev@flink.apache.org
> Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS
> BETWEEN UNBOUNDED PRECEDING aggregation to SQL
>
> Sounds good to me Stefano!
>
> Best, Fabian
>
> 2017-02-01 13:43 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>:
>
> > Hi all,
> >
> > I was thinking to open a JIRA for the procTime() function so that it
> > could be merged before and others could use it as well. What do you
> think?
> >
> > Regards,
> > Stefano
> >
> >
> > -----Original Message-----
> > From: Fabian Hueske [mailto:fhue...@gmail.com]
> > Sent: Friday, January 27, 2017 10:34 AM
> > To: dev@flink.apache.org
> > Subject: Re: [jira] [Created] (FLINK-5656) Add processing time OVER
> > ROWS BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> >
> > Hi Stefano,
> >
> > I can assign the issue to you if you want to.
> > Just drop a comment in JIRA.
> >
> > Best, Fabian
> >
> > 2017-01-27 9:39 GMT+01:00 Stefano Bortoli <stefano.bort...@huawei.com>:
> >
> > > Hi Fabian,
> > >
> > > In the next days I will start working on this issue. As soon as I
> > > have a proposal I will start sharing it for discussion.
> > >
> > > Regards,
> > > Dr. Stefano Bortoli
> > > Senior Research Engineer - Big Data and Semantic Technology Expert
> > > IT R&D Division
> > >
> > > -----Original Message-----
> > > From: Fabian Hueske (JIRA) [mailto:j...@apache.org]
> > > Sent: Thursday, January 26, 2017 2:49 PM
> > > To: dev@flink.apache.org
> > > Subject: [jira] [Created] (FLINK-5656) Add processing time OVER ROWS
> > > BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> > >
> > > Fabian Hueske created FLINK-5656:
> > > ------------------------------------
> > >
> > >              Summary: Add processing time OVER ROWS BETWEEN
> > > UNBOUNDED PRECEDING aggregation to SQL
> > >                  Key: FLINK-5656
> > >                  URL: https://issues.apache.org/jira/browse/FLINK-5656
> > >              Project: Flink
> > >           Issue Type: Sub-task
> > >           Components: Table API & SQL
> > >             Reporter: Fabian Hueske
> > >
> > >
> > > The goal of this issue is to add support for OVER ROW aggregations
> > > on processing time streams to the SQL interface.
> > >
> > > Queries similar to the following should be supported:
> > > {code}
> > > SELECT
> > >   a,
> > >   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN
> > > UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB,
> > >   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROW BETWEEN
> > > UNBOUNDED PRECEDING AND CURRENT ROW) AS minB FROM myStream {code}
> > >
> > > The following restrictions should initially apply:
> > > - All OVER clauses in the same SELECT clause must be exactly the same.
> > > - The PARTITION BY clause is optional (no partitioning results in
> > > single threaded execution).
> > > - The ORDER BY clause may only have procTime() as parameter.
> > > procTime() is a parameterless scalar function that just indicates
> > processing time mode.
> > > - x PRECEDING is not supported (see FLINK-5653)
> > > - FOLLOWING is not supported.
> > >
> > > The restrictions will be resolved in follow up issues. If we find
> > > that some of the restrictions are trivial to address, we can add the
> > > functionality in this issue as well.
> > >
> > > This issue includes:
> > > - Design of the DataStream operator to compute OVER ROW aggregates
> > > - Translation from Calcite's RelNode representation (LogicalProject
> > > with RexOver expression).
> > >
> > >
> > >
> > >
> > > --
> > > This message was sent by Atlassian JIRA
> > > (v6.3.4#6332)
> > >
> >
>

Reply via email to