Hi all,

We are discussing very detailed content about this proposal. We are trying
to design the API in many aspects (functionality, compatibility, ease of
use, etc.). I think this is a very good process. Only such a detailed
discussion, In order to develop PR more clearly and smoothly in the later
stage. I am very grateful to @Fabian and  @Xiaowei for sharing a lot of
good ideas.
About the definition of method signatures I want to share my points here
which I am discussing with fabian in google doc (not yet completed), as
follows:

Assume we have a table:
val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string,
'proctime.proctime)

Approach 1:
case1: Map follows Source Table
val result =
  tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied in
the output
  .window(Tumble over 5.millis on 'proctime as 'w)

case2: FatAgg follows Window (Fabian mentioned above)
val result =
    tab.window(Tumble ... as 'w)
       .groupBy('w, 'k1, 'k2) // 'w should be a group key.
       .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
       .select('k1, 'col1, 'w.rowtime as 'rtime)

Approach 2: Similar to Fabian‘s approach, which the result schema would be
clearly defined, but add a built-in append UDF. That make
map/flatmap/agg/flatAgg interface only accept one Expression.
val result =
    tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2,
'long, 'proctime)
     .window(Tumble over 5.millis on 'proctime as 'w)

Note: Append is a special UDF for built-in that can pass through any
column.

So, May be we can defined the as  table.map(Expression)  first, If
necessary, we can extend to table.map(Expression*)  in the future ?  Of
course, I also hope that we can do more perfection in this proposal through
discussion.

Thanks,
Jincheng





Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道:

> Hi Fabian,
>
> I think that the key question you raised is if we allow extra parameters in
> the methods map/flatMap/agg/flatAgg. I can see why allowing that may appear
> more convenient in some cases. However, it might also cause some confusions
> if we do that. For example, do we allow multiple UDFs in these expressions?
> If we do, the semantics may be weird to define, e.g. what does
> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even though
> not allowing it may appear less powerful, but it can make things more
> intuitive too. In the case of agg/flatAgg, we can define the keys to be
> implied in the result table and appears at the beginning. You can use a
> select method if you want to modify this behavior. I think that eventually
> we will have some API which allows other expressions as additional
> parameters, but I think it's better to do that after we introduce the
> concept of nested tables. A lot of things we suggested here can be
> considered as special cases of that. But things are much simpler if we
> leave that to later.
>
> Regards,
> Xiaowei
>
> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi,
> >
> > * Re emit:
> > I think we should start with a well understood semantics of full
> > replacement. This is how the other agg functions work.
> > As was said before, there are open questions regarding an append mode
> > (checkpointing, whether supporting retractions or not and if yes how to
> > declare them, ...).
> > Since this seems to be an optimization, I'd postpone it.
> >
> > * Re grouping keys:
> > I don't think we should automatically add them because the result schema
> > would not be intuitive.
> > Would they be added at the beginning of the tuple or at the end? What
> > metadata fields of windows would be added? In which order would they be
> > added?
> >
> > However, we could support syntax like this:
> > val t: Table = ???
> > t
> >   .window(Tumble ... as 'w)
> >   .groupBy('a, 'b)
> >   .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as 'rtime)
> >
> > The result schema would be clearly defined as [b, a, f1, f2, ..., fn,
> wend,
> > rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
> >
> > * Re Multi-staged evaluation:
> > I think this should be an optimization that can be applied if the UDF
> > implements the merge() method.
> >
> > Best, Fabian
> >
> > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
> > wshaox...@gmail.com
> > >:
> >
> > > Hi xiaowei,
> > >
> > > Yes, I agree with you that the semantics of TableAggregateFunction emit
> > is
> > > much more complex than AggregateFunction. The fundamental difference is
> > > that TableAggregateFunction emits a "table" while AggregateFunction
> > outputs
> > > (a column of) a "row". In the case of AggregateFunction it only has one
> > > mode which is “replacing” (complete update). But for
> > > TableAggregateFunction, it could be incremental (only emit the new
> > updated
> > > results) update or complete update (always emit the entire table when
> > > “emit" is triggered).  From the performance perspective, we might want
> to
> > > use incremental update. But we need review and design this carefully,
> > > especially taking into account the cases of the failover (instead of
> just
> > > back-up the ACC it may also needs to remember the emit offset) and
> > > retractions, as the semantics of TableAggregateFunction emit are
> > different
> > > than other UDFs. TableFunction also emits a table, but it does not need
> > to
> > > worry this due to the nature of stateless.
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com>
> wrote:
> > >
> > > > Hi Jincheng,
> > > >
> > > > Thanks for adding the public interfaces! I think that it's a very
> good
> > > > start. There are a few points that we need to have more discussions.
> > > >
> > > >    - TableAggregateFunction - this is a very complex beast,
> definitely
> > > the
> > > >    most complex user defined objects we introduced so far. I think
> > there
> > > > are
> > > >    quite some interesting questions here. For example, do we allow
> > > >    multi-staged TableAggregate in this case? What is the semantics of
> > > > emit? Is
> > > >    it amendments to the previous output, or replacing it? I think
> that
> > > this
> > > >    subject itself is worth a discussion to make sure we get the
> details
> > > > right.
> > > >    - GroupedTable.agg - does the group keys automatically appear in
> the
> > > >    output? how about the case of windowing aggregation?
> > > >
> > > > Regards,
> > > > Xiaowei
> > > >
> > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
> sunjincheng...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi, Xiaowei,
> > > > >
> > > > > Thanks for bring up the discuss of Table API Enhancement Outline !
> > > > >
> > > > > I quickly looked at the overall content, these are good expressions
> > of
> > > > our
> > > > > offline discussions. But from the points of my view, we should add
> > the
> > > > > usage of public interfaces that we will introduce in this propose.
> > > So, I
> > > > > added the following usage description of  interface and operators
> in
> > > > > google doc:
> > > > >
> > > > > 1. Map Operator
> > > > >     Map operator is a new operator of Table, Map operator can
> apply a
> > > > > scalar function, and can return multi-column. The usage as follows:
> > > > >
> > > > >   val res = tab
> > > > >      .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> > > > >      .select(‘a, ‘c)
> > > > >
> > > > > 2. FlatMap Operator
> > > > >     FaltMap operator is a new operator of Table, FlatMap operator
> can
> > > > apply
> > > > > a table function, and can return multi-row. The usage as follows:
> > > > >
> > > > >   val res = tab
> > > > >       .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> > > > >       .select(‘a, ‘c)
> > > > >
> > > > > 3. Agg Operator
> > > > >     Agg operator is a new operator of Table/GroupedTable, Agg
> > operator
> > > > can
> > > > > apply a aggregate function, and can return multi-column. The usage
> as
> > > > > follows:
> > > > >
> > > > >    val res = tab
> > > > >       .groupBy(‘a) // leave groupBy-Clause out to define global
> > > > aggregates
> > > > >       .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> > > > >       .select(‘a, ‘c)
> > > > >
> > > > > 4.  FlatAgg Operator
> > > > >     FlatAgg operator is a new operator of Table/GroupedTable,
> FaltAgg
> > > > > operator can apply a table aggregate function, and can return
> > > multi-row.
> > > > > The usage as follows:
> > > > >
> > > > >     val res = tab
> > > > >        .groupBy(‘a) // leave groupBy-Clause out to define global
> > table
> > > > > aggregates
> > > > >        .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
> > > > >        .select(‘a, ‘c)
> > > > >
> > > > >   5. TableAggregateFunction
> > > > >      The behavior of table aggregates is most like
> > GroupReduceFunction
> > > > did,
> > > > > which computed for a group of elements, and output  a group of
> > > elements.
> > > > > The TableAggregateFunction can be applied on
> GroupedTable.flatAgg() .
> > > The
> > > > > interface of TableAggregateFunction has a lot of content, so I
> don't
> > > copy
> > > > > it here, Please look at the detail in google doc:
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > > >
> > > > > I will be very appreciate to anyone for reviewing and commenting.
> > > > >
> > > > > Best,
> > > > > Jincheng
> > > > >
> > > >
> > >
> >
>

Reply via email to