Hi Jincheng,

I said before, that I think that the append() method is better than
implicitly forwarding keys, but still, I believe it adds unnecessary boiler
plate code.

Moreover, I haven't seen a convincing argument why map(Expression*) is
worse than map(Expression). In either case we need to do all kinds of
checks to prevent invalid use of functions.
If the method is not correctly used, we can emit a good error message and
documenting map(Expression*) will be easier than map(append(Expression*)),
in my opinion.
I think we should not add unnessary syntax unless there is a good reason
and to be honest, I haven't seen this reason yet.

Regarding the groupBy.agg() method, I think it should behave just like any
other method, i.e., not do any implicit forwarding.
Let's take the example of the windowed group by, that you posted before.

tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
    .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
    .select('k1, 'col1, 'w.rowtime as 'rtime)

What happens if 'w.rowtime is not selected? What is the data type of the
field 'w in the resulting Table? Is it a regular field at all or just a
system field that disappears if it is not selected?

IMO, the following syntax is shorter, more explicit, and better aligned
with the regular window.groupBy.select aggregations that are supported
today.

tab.window(Tumble ... as 'w)
    .groupBy('w, 'k1, 'k2) // 'w should be a group key.
    .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))


Best, Fabian

Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
sunjincheng...@gmail.com>:

> Hi Fabian/Xiaowei,
>
> I am very sorry for my late reply! Glad to see your reply, and sounds
> pretty good!
> I agree that the approach with append() which can clearly defined the
> result schema is better which Fabian mentioned.
> In addition and append() and also contains non-time attributes, e.g.:
>
>     tab('name, 'age, 'address, 'rowtime)
>     tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
> 'address, 'rowtime)
>     .window(Tumble over 5.millis on 'rowtime as 'w)
>     .groupBy('w, 'address)
>
> In this way the append() is very useful, and the behavior is very similar
> to withForwardedFields() in DataSet.
> So +1 to using append() approach for the map()&flatmap()!
>
> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
> Xiaowei's approach that define the keys to be implied in the result table
> and appears at the beginning, for example as follows:
>   tab.window(Tumble ... as 'w)
>     .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>     .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>     .select('k1, 'col1, 'w.rowtime as 'rtime)
>
> What to you think? @Fabian @Xiaowei
>
> Thanks,
> Jincheng
>
> Fabian Hueske <fhue...@gmail.com> 于2018年11月9日周五 下午6:35写道:
>
> > Hi Jincheng,
> >
> > Thanks for the summary!
> > I like the approach with append() better than the implicit forwarding as
> it
> > clearly indicates which fields are forwarded.
> > However, I don't see much benefit over the flatMap(Expression*) variant,
> as
> > we would still need to analyze the full expression tree to ensure that at
> > most (or exactly?) one Scalar / TableFunction is used.
> >
> > Best,
> > Fabian
> >
> > Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
> > sunjincheng...@gmail.com>:
> >
> > > Hi all,
> > >
> > > We are discussing very detailed content about this proposal. We are
> > trying
> > > to design the API in many aspects (functionality, compatibility, ease
> of
> > > use, etc.). I think this is a very good process. Only such a detailed
> > > discussion, In order to develop PR more clearly and smoothly in the
> later
> > > stage. I am very grateful to @Fabian and  @Xiaowei for sharing a lot of
> > > good ideas.
> > > About the definition of method signatures I want to share my points
> here
> > > which I am discussing with fabian in google doc (not yet completed), as
> > > follows:
> > >
> > > Assume we have a table:
> > > val tab = util.addTable[(Long, String)]("MyTable", 'long, 'string,
> > > 'proctime.proctime)
> > >
> > > Approach 1:
> > > case1: Map follows Source Table
> > > val result =
> > >   tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime implied
> in
> > > the output
> > >   .window(Tumble over 5.millis on 'proctime as 'w)
> > >
> > > case2: FatAgg follows Window (Fabian mentioned above)
> > > val result =
> > >     tab.window(Tumble ... as 'w)
> > >        .groupBy('w, 'k1, 'k2) // 'w should be a group key.
> > >        .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
> > >        .select('k1, 'col1, 'w.rowtime as 'rtime)
> > >
> > > Approach 2: Similar to Fabian‘s approach, which the result schema would
> > be
> > > clearly defined, but add a built-in append UDF. That make
> > > map/flatmap/agg/flatAgg interface only accept one Expression.
> > > val result =
> > >     tab.map(append(udf('string), 'long, 'proctime)) as ('col1, 'col2,
> > > 'long, 'proctime)
> > >      .window(Tumble over 5.millis on 'proctime as 'w)
> > >
> > > Note: Append is a special UDF for built-in that can pass through any
> > > column.
> > >
> > > So, May be we can defined the as  table.map(Expression)  first, If
> > > necessary, we can extend to table.map(Expression*)  in the future ?  Of
> > > course, I also hope that we can do more perfection in this proposal
> > through
> > > discussion.
> > >
> > > Thanks,
> > > Jincheng
> > >
> > >
> > >
> > >
> > >
> > > Xiaowei Jiang <xiaow...@gmail.com> 于2018年11月7日周三 下午11:45写道:
> > >
> > > > Hi Fabian,
> > > >
> > > > I think that the key question you raised is if we allow extra
> > parameters
> > > in
> > > > the methods map/flatMap/agg/flatAgg. I can see why allowing that may
> > > appear
> > > > more convenient in some cases. However, it might also cause some
> > > confusions
> > > > if we do that. For example, do we allow multiple UDFs in these
> > > expressions?
> > > > If we do, the semantics may be weird to define, e.g. what does
> > > > table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean? Even
> > though
> > > > not allowing it may appear less powerful, but it can make things more
> > > > intuitive too. In the case of agg/flatAgg, we can define the keys to
> be
> > > > implied in the result table and appears at the beginning. You can
> use a
> > > > select method if you want to modify this behavior. I think that
> > > eventually
> > > > we will have some API which allows other expressions as additional
> > > > parameters, but I think it's better to do that after we introduce the
> > > > concept of nested tables. A lot of things we suggested here can be
> > > > considered as special cases of that. But things are much simpler if
> we
> > > > leave that to later.
> > > >
> > > > Regards,
> > > > Xiaowei
> > > >
> > > > On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > * Re emit:
> > > > > I think we should start with a well understood semantics of full
> > > > > replacement. This is how the other agg functions work.
> > > > > As was said before, there are open questions regarding an append
> mode
> > > > > (checkpointing, whether supporting retractions or not and if yes
> how
> > to
> > > > > declare them, ...).
> > > > > Since this seems to be an optimization, I'd postpone it.
> > > > >
> > > > > * Re grouping keys:
> > > > > I don't think we should automatically add them because the result
> > > schema
> > > > > would not be intuitive.
> > > > > Would they be added at the beginning of the tuple or at the end?
> What
> > > > > metadata fields of windows would be added? In which order would
> they
> > be
> > > > > added?
> > > > >
> > > > > However, we could support syntax like this:
> > > > > val t: Table = ???
> > > > > t
> > > > >   .window(Tumble ... as 'w)
> > > > >   .groupBy('a, 'b)
> > > > >   .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime as
> > > 'rtime)
> > > > >
> > > > > The result schema would be clearly defined as [b, a, f1, f2, ...,
> fn,
> > > > wend,
> > > > > rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
> > > > >
> > > > > * Re Multi-staged evaluation:
> > > > > I think this should be an optimization that can be applied if the
> UDF
> > > > > implements the merge() method.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > > Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
> > > > > wshaox...@gmail.com
> > > > > >:
> > > > >
> > > > > > Hi xiaowei,
> > > > > >
> > > > > > Yes, I agree with you that the semantics of
> TableAggregateFunction
> > > emit
> > > > > is
> > > > > > much more complex than AggregateFunction. The fundamental
> > difference
> > > is
> > > > > > that TableAggregateFunction emits a "table" while
> AggregateFunction
> > > > > outputs
> > > > > > (a column of) a "row". In the case of AggregateFunction it only
> has
> > > one
> > > > > > mode which is “replacing” (complete update). But for
> > > > > > TableAggregateFunction, it could be incremental (only emit the
> new
> > > > > updated
> > > > > > results) update or complete update (always emit the entire table
> > when
> > > > > > “emit" is triggered).  From the performance perspective, we might
> > > want
> > > > to
> > > > > > use incremental update. But we need review and design this
> > carefully,
> > > > > > especially taking into account the cases of the failover (instead
> > of
> > > > just
> > > > > > back-up the ACC it may also needs to remember the emit offset)
> and
> > > > > > retractions, as the semantics of TableAggregateFunction emit are
> > > > > different
> > > > > > than other UDFs. TableFunction also emits a table, but it does
> not
> > > need
> > > > > to
> > > > > > worry this due to the nature of stateless.
> > > > > >
> > > > > > Regards,
> > > > > > Shaoxuan
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang <xiaow...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Jincheng,
> > > > > > >
> > > > > > > Thanks for adding the public interfaces! I think that it's a
> very
> > > > good
> > > > > > > start. There are a few points that we need to have more
> > > discussions.
> > > > > > >
> > > > > > >    - TableAggregateFunction - this is a very complex beast,
> > > > definitely
> > > > > > the
> > > > > > >    most complex user defined objects we introduced so far. I
> > think
> > > > > there
> > > > > > > are
> > > > > > >    quite some interesting questions here. For example, do we
> > allow
> > > > > > >    multi-staged TableAggregate in this case? What is the
> > semantics
> > > of
> > > > > > > emit? Is
> > > > > > >    it amendments to the previous output, or replacing it? I
> think
> > > > that
> > > > > > this
> > > > > > >    subject itself is worth a discussion to make sure we get the
> > > > details
> > > > > > > right.
> > > > > > >    - GroupedTable.agg - does the group keys automatically
> appear
> > in
> > > > the
> > > > > > >    output? how about the case of windowing aggregation?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Xiaowei
> > > > > > >
> > > > > > > On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
> > > > sunjincheng...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi, Xiaowei,
> > > > > > > >
> > > > > > > > Thanks for bring up the discuss of Table API Enhancement
> > Outline
> > > !
> > > > > > > >
> > > > > > > > I quickly looked at the overall content, these are good
> > > expressions
> > > > > of
> > > > > > > our
> > > > > > > > offline discussions. But from the points of my view, we
> should
> > > add
> > > > > the
> > > > > > > > usage of public interfaces that we will introduce in this
> > > propose.
> > > > > > So, I
> > > > > > > > added the following usage description of  interface and
> > operators
> > > > in
> > > > > > > > google doc:
> > > > > > > >
> > > > > > > > 1. Map Operator
> > > > > > > >     Map operator is a new operator of Table, Map operator can
> > > > apply a
> > > > > > > > scalar function, and can return multi-column. The usage as
> > > follows:
> > > > > > > >
> > > > > > > >   val res = tab
> > > > > > > >      .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
> > > > > > > >      .select(‘a, ‘c)
> > > > > > > >
> > > > > > > > 2. FlatMap Operator
> > > > > > > >     FaltMap operator is a new operator of Table, FlatMap
> > operator
> > > > can
> > > > > > > apply
> > > > > > > > a table function, and can return multi-row. The usage as
> > follows:
> > > > > > > >
> > > > > > > >   val res = tab
> > > > > > > >       .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
> > > > > > > >       .select(‘a, ‘c)
> > > > > > > >
> > > > > > > > 3. Agg Operator
> > > > > > > >     Agg operator is a new operator of Table/GroupedTable, Agg
> > > > > operator
> > > > > > > can
> > > > > > > > apply a aggregate function, and can return multi-column. The
> > > usage
> > > > as
> > > > > > > > follows:
> > > > > > > >
> > > > > > > >    val res = tab
> > > > > > > >       .groupBy(‘a) // leave groupBy-Clause out to define
> global
> > > > > > > aggregates
> > > > > > > >       .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
> > > > > > > >       .select(‘a, ‘c)
> > > > > > > >
> > > > > > > > 4.  FlatAgg Operator
> > > > > > > >     FlatAgg operator is a new operator of Table/GroupedTable,
> > > > FaltAgg
> > > > > > > > operator can apply a table aggregate function, and can return
> > > > > > multi-row.
> > > > > > > > The usage as follows:
> > > > > > > >
> > > > > > > >     val res = tab
> > > > > > > >        .groupBy(‘a) // leave groupBy-Clause out to define
> > global
> > > > > table
> > > > > > > > aggregates
> > > > > > > >        .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
> > > > > > > >        .select(‘a, ‘c)
> > > > > > > >
> > > > > > > >   5. TableAggregateFunction
> > > > > > > >      The behavior of table aggregates is most like
> > > > > GroupReduceFunction
> > > > > > > did,
> > > > > > > > which computed for a group of elements, and output  a group
> of
> > > > > > elements.
> > > > > > > > The TableAggregateFunction can be applied on
> > > > GroupedTable.flatAgg() .
> > > > > > The
> > > > > > > > interface of TableAggregateFunction has a lot of content, so
> I
> > > > don't
> > > > > > copy
> > > > > > > > it here, Please look at the detail in google doc:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
> > > > > > > >
> > > > > > > > I will be very appreciate to anyone for reviewing and
> > commenting.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jincheng
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to