Hi Jincheng,
> #1) No,watermark solves the issue of the late event. Here, the performance
> problem is caused by the update emit mode. i.e.: When current calculation
> result is output, the previous calculation result needs to be retracted.
Hmm, yes I missed this. For time-windowed cases (some aggregate/flatAggregate
cases) emitting only on watermark should solve the problem. For non time
windowed cases it would reduce the amount of retractions, right? Or am I still
missing something?
> #3)I still hope to keep the simplicity that select only support projected
> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
> flatmap('d)).
table.select(F(‘a).unnest(), ‘b, G(‘c).unnest())
Could be rejected during some validation phase. On the other hand:
table.select(F(‘a).unnest(), ‘b, scalarG(‘c))
or
table.flatMap(F(‘a), ‘b, scalarG(‘c))
Could work and be more or less a syntax sugar for cross apply.
Piotrek
> On 21 Nov 2018, at 12:16, jincheng sun <[email protected]> wrote:
>
> Hi shaoxuan & Hequn,
>
> Thanks for your suggestion,I'll file the JIRAs later.
> We can prepare PRs while continuing to move forward the ongoing discussion.
>
> Regards,
> Jincheng
>
> jincheng sun <[email protected]> 于2018年11月21日周三 下午7:07写道:
>
>> Hi Piotrek,
>> Thanks for your feedback, and thanks for share your thoughts!
>>
>> #1) No,watermark solves the issue of the late event. Here, the performance
>> problem is caused by the update emit mode. i.e.: When current calculation
>> result is output, the previous calculation result needs to be retracted.
>> #2) As I mentioned above we should continue the discussion until we solve
>> the problems raised by Xiaowei and Fabian.
>> #3)I still hope to keep the simplicity that select only support projected
>> scalar, we can hardly tell the semantics of tab.select(flatmap('a), 'b,
>> flatmap('d)).
>>
>> Thanks,
>> Jincheng
>>
>> Piotr Nowojski <[email protected]> 于2018年11月21日周三 下午5:24写道:
>>
>>> Hi,
>>>
>>> 1.
>>>
>>>> In fact, in addition to the design of APIs, there will be various
>>>> performance optimization details, such as: table Aggregate function
>>>> emitValue will generate multiple calculation results, in extreme cases,
>>>> each record will trigger a large number of retract messages, this will
>>> have
>>>> poor performance
>>>
>>> Can this be solved/mitigated by emitting the results only on watermarks?
>>> I think that was the path that we decided to take both for Temporal Joins
>>> and upsert stream conversion. I know that this increases the latency and
>>> there is a place for a future global setting/user preference “emit the data
>>> ASAP mode”, but emitting only on watermarks seems to me as a better/more
>>> sane default.
>>>
>>> 2.
>>>
>>> With respect to the API discussion and implicit columns. The problem for
>>> me so far is I’m not sure if I like the additionally complexity of
>>> `append()` solution, while implicit columns are definitely not in the
>>> spirit of SQL. Neither joins nor aggregations add extra unexpected columns
>>> to the result without asking. This definitely can be confusing for the
>>> users since it brakes the convention. Thus I would lean towards Fabian’s
>>> proposal of multi-argument `map(Expression*)` from those 3 options.
>>>
>>> 3.
>>>
>>> Another topic is that I’m not 100% convinced that we should be adding new
>>> api functions for `map`,`aggregate`,`flatMap` and `flatAggregate`. I think
>>> the same could be achieved by changing
>>>
>>> table.map(F('x))
>>>
>>> into
>>>
>>> table.select(F('x)).unnest()
>>> or
>>> table.select(F('x).unnest())
>>>
>>> Where `unnest()` means unnest row/tuple type into a columnar table.
>>>
>>> table.flatMap(F('x))
>>>
>>> Could be on the other hand also handled by
>>>
>>> table.select(F('x))
>>>
>>> By correctly deducing that F(x) is a multi row output function
>>>
>>> Same might apply to `aggregate(F('x))`, but this maybe could be replaced
>>> by:
>>>
>>> table.groupBy(…).select(F('x).unnest())
>>>
>>> Adding scalar functions should also be possible:
>>>
>>> table.groupBy('k).select(F('x).unnest(), ‘k)
>>>
>>> Maybe such approach would allow us to implement the same features in the
>>> SQL as well?
>>>
>>> Piotrek
>>>
>>>> On 21 Nov 2018, at 09:43, Hequn Cheng <[email protected]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Thank you all for the great proposal and discussion!
>>>> I also prefer to move on to the next step, so +1 for opening the JIRAs
>>> to
>>>> start the work.
>>>> We can have more detailed discussion there. Btw, we can start with JIRAs
>>>> which we have agreed on.
>>>>
>>>> Best,
>>>> Hequn
>>>>
>>>> On Tue, Nov 20, 2018 at 11:38 PM Shaoxuan Wang <[email protected]>
>>> wrote:
>>>>
>>>>> +1. I agree that we should open the JIRAs to start the work. We may
>>>>> have better ideas on the flavor of the interface when implement/review
>>>>> the code.
>>>>>
>>>>> Regards,
>>>>> shaoxuan
>>>>>
>>>>>
>>>>> On 11/20/18, jincheng sun <[email protected]> wrote:
>>>>>> Hi all,
>>>>>>
>>>>>> Thanks all for the feedback.
>>>>>>
>>>>>> @Piotr About not using abbreviations naming, +1,I like
>>>>>> your proposal!Currently both DataSet and DataStream API are using
>>>>>> `aggregate`,
>>>>>> BTW,I find other language also not using abbreviations naming,such as
>>> R.
>>>>>>
>>>>>> Sometimes the interface of the API is really difficult to perfect, we
>>>>> need
>>>>>> to spend a lot of time thinking and feedback from a large number of
>>>>> users,
>>>>>> and constantly improve, but for backward compatibility issues, we
>>> have to
>>>>>> adopt the most conservative approach when designing the API(Of
>>> course, I
>>>>> am
>>>>>> more in favor of developing more rich features, when we discuss
>>> clearly).
>>>>>> Therefore, I propose to divide the function implementation of
>>>>>> map/faltMap/agg/flatAgg into basic functions of JIRAs and JIRAs that
>>>>>> support time attributes and groupKeys. We can develop the features
>>> which
>>>>>> we have already agreed on the design. And we will continue to discuss
>>>>> the
>>>>>> uncertain design.
>>>>>>
>>>>>> In fact, in addition to the design of APIs, there will be various
>>>>>> performance optimization details, such as: table Aggregate function
>>>>>> emitValue will generate multiple calculation results, in extreme
>>> cases,
>>>>>> each record will trigger a large number of retract messages, this will
>>>>> have
>>>>>> poor performance,so we will also optimize the interface design, such
>>> as
>>>>>> adding the emitWithRetractValue interface (I have updated the google
>>> doc)
>>>>>> to allow the user to optionally perform incremental calculations, thus
>>>>>> avoiding a large number of retracts. Details like this are difficult
>>> to
>>>>>> fully discuss in the mail list, so I recommend creating JIRAs/FLIP
>>> first,
>>>>>> we develop designs that have been agreed upon and continue to discuss
>>>>>> non-deterministic designs! What do you think? @Fabian & Piotr &
>>> XiaoWei
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>> Xiaowei Jiang <[email protected]> 于2018年11月19日周一 上午12:07写道:
>>>>>>
>>>>>>> Hi Fabian & Piotr, thanks for the feedback!
>>>>>>>
>>>>>>> I appreciate your concerns, both on timestamp attributes as well as
>>> on
>>>>>>> implicit group keys. At the same time, I'm also concerned with the
>>>>>>> proposed
>>>>>>> approach of allowing Expression* as parameters, especially for
>>>>>>> flatMap/flatAgg. So far, we never allowed a scalar expression to
>>> appear
>>>>>>> together with table expressions. With the Expression* approach, this
>>>>> will
>>>>>>> happen for the parameters to flatMap/flatAgg. I'm a bit concerned on
>>> if
>>>>>>> we
>>>>>>> fully understand the consequences when we try to extend our system in
>>>>> the
>>>>>>> future. I would be extra cautious in doing this. To avoid this, I
>>> think
>>>>>>> an
>>>>>>> implicit group key for flatAgg is safer. For flatMap, if users want
>>> to
>>>>>>> keep
>>>>>>> the rowtime column, he can use crossApply/join instead. So we are not
>>>>>>> losing any real functionality here.
>>>>>>>
>>>>>>> Also a clarification on the following example:
>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>> .flatAgg(tableAgg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>> If we did not have the select clause in this example, we will have
>>> 'w as
>>>>>>> a
>>>>>>> regular column in the output. It should not magically disappear.
>>>>>>>
>>>>>>> The concern is not as strong for Table.map/Table.agg because we are
>>> not
>>>>>>> mixing scalar and table expressions. But we also want to be a bit
>>>>>>> consistent with these methods. If we used implicit group keys for
>>>>>>> Table.flatAgg, we probably should do the same for Table.agg. Now we
>>> only
>>>>>>> have to choose what to do with Table.map. I can see good arguments
>>> from
>>>>>>> both sides. But starting with a single Expression seems safer because
>>>>>>> that
>>>>>>> we can always extend to Expression* in the future.
>>>>>>>
>>>>>>> While thinking about this problem, it appears that we may need more
>>> work
>>>>>>> in
>>>>>>> our handling of watermarks for SQL/Table API. Our current way of
>>>>>>> propagating the watermarks from source all the way to sink might not
>>> be
>>>>>>> optimal. For example, after a tumbling window, the watermark can
>>>>> actually
>>>>>>> be advanced to just before the expiring of next window. I think that
>>> in
>>>>>>> general, each operator may need to generate new watermarks instead of
>>>>>>> simply propagating them. Once we accept that watermarks may change
>>>>> during
>>>>>>> the execution, it appears that the timestamp columns may also
>>> change, as
>>>>>>> long as we have some way to associate watermark with it. My
>>> intuition is
>>>>>>> that once we have a through solution for the watermark issue, we may
>>> be
>>>>>>> able to solve the problem we encountered for Table.map in a cleaner
>>> way.
>>>>>>> But this is a complex issue which deserves a discussion on its own.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Xiaowei
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Nov 16, 2018 at 12:34 AM Piotr Nowojski <
>>>>> [email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Isn’t the problem of multiple expressions limited only to `flat***`
>>>>>>>> functions and to be more specific only to having two (or more)
>>>>>>>> different
>>>>>>>> table functions passed as an expressions? `.flatAgg(TableAggA('a),
>>>>>>>> scalarFunction1(‘b), scalarFunction2(‘c))` seems to be well defined
>>>>>>>> (duplicate result of every scalar function to every record. Or am I
>>>>>>> missing
>>>>>>>> something?
>>>>>>>>
>>>>>>>> Another remark, I would be in favour of not using abbreviations and
>>>>>>> naming
>>>>>>>> `agg` -> `aggregate`, `flatAgg` -> `flatAggregate`.
>>>>>>>>
>>>>>>>> Piotrek
>>>>>>>>
>>>>>>>>> On 15 Nov 2018, at 14:15, Fabian Hueske <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Jincheng,
>>>>>>>>>
>>>>>>>>> I said before, that I think that the append() method is better than
>>>>>>>>> implicitly forwarding keys, but still, I believe it adds
>>> unnecessary
>>>>>>>> boiler
>>>>>>>>> plate code.
>>>>>>>>>
>>>>>>>>> Moreover, I haven't seen a convincing argument why map(Expression*)
>>>>>>>>> is
>>>>>>>>> worse than map(Expression). In either case we need to do all kinds
>>>>> of
>>>>>>>>> checks to prevent invalid use of functions.
>>>>>>>>> If the method is not correctly used, we can emit a good error
>>>>> message
>>>>>>> and
>>>>>>>>> documenting map(Expression*) will be easier than
>>>>>>>> map(append(Expression*)),
>>>>>>>>> in my opinion.
>>>>>>>>> I think we should not add unnessary syntax unless there is a good
>>>>>>> reason
>>>>>>>>> and to be honest, I haven't seen this reason yet.
>>>>>>>>>
>>>>>>>>> Regarding the groupBy.agg() method, I think it should behave just
>>>>>>>>> like
>>>>>>>> any
>>>>>>>>> other method, i.e., not do any implicit forwarding.
>>>>>>>>> Let's take the example of the windowed group by, that you posted
>>>>>>> before.
>>>>>>>>>
>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>
>>>>>>>>> What happens if 'w.rowtime is not selected? What is the data type
>>> of
>>>>>>> the
>>>>>>>>> field 'w in the resulting Table? Is it a regular field at all or
>>>>> just
>>>>>>>>> a
>>>>>>>>> system field that disappears if it is not selected?
>>>>>>>>>
>>>>>>>>> IMO, the following syntax is shorter, more explicit, and better
>>>>>>>>> aligned
>>>>>>>>> with the regular window.groupBy.select aggregations that are
>>>>>>>>> supported
>>>>>>>>> today.
>>>>>>>>>
>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>> .agg('w.rowtime as 'rtime, 'k1, 'k2, agg('a))
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> Am Mi., 14. Nov. 2018 um 08:37 Uhr schrieb jincheng sun <
>>>>>>>>> [email protected]>:
>>>>>>>>>
>>>>>>>>>> Hi Fabian/Xiaowei,
>>>>>>>>>>
>>>>>>>>>> I am very sorry for my late reply! Glad to see your reply, and
>>>>>>>>>> sounds
>>>>>>>>>> pretty good!
>>>>>>>>>> I agree that the approach with append() which can clearly defined
>>>>>>>>>> the
>>>>>>>>>> result schema is better which Fabian mentioned.
>>>>>>>>>> In addition and append() and also contains non-time attributes,
>>>>>>>>>> e.g.:
>>>>>>>>>>
>>>>>>>>>> tab('name, 'age, 'address, 'rowtime)
>>>>>>>>>> tab.map(append(udf('name), 'address, 'rowtime).as('col1, 'col2,
>>>>>>>>>> 'address, 'rowtime)
>>>>>>>>>> .window(Tumble over 5.millis on 'rowtime as 'w)
>>>>>>>>>> .groupBy('w, 'address)
>>>>>>>>>>
>>>>>>>>>> In this way the append() is very useful, and the behavior is very
>>>>>>>> similar
>>>>>>>>>> to withForwardedFields() in DataSet.
>>>>>>>>>> So +1 to using append() approach for the map()&flatmap()!
>>>>>>>>>>
>>>>>>>>>> But how about the agg() and flatAgg()? In agg/flatAgg case I agree
>>>>>>>>>> Xiaowei's approach that define the keys to be implied in the
>>> result
>>>>>>>> table
>>>>>>>>>> and appears at the beginning, for example as follows:
>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>> .agg(agg('a)).as('w, 'k1, 'k2, 'col1, 'col2)
>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>>
>>>>>>>>>> What to you think? @Fabian @Xiaowei
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Jincheng
>>>>>>>>>>
>>>>>>>>>> Fabian Hueske <[email protected]> 于2018年11月9日周五 下午6:35写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the summary!
>>>>>>>>>>> I like the approach with append() better than the implicit
>>>>>>>>>>> forwarding
>>>>>>>> as
>>>>>>>>>> it
>>>>>>>>>>> clearly indicates which fields are forwarded.
>>>>>>>>>>> However, I don't see much benefit over the flatMap(Expression*)
>>>>>>>> variant,
>>>>>>>>>> as
>>>>>>>>>>> we would still need to analyze the full expression tree to ensure
>>>>>>> that
>>>>>>>> at
>>>>>>>>>>> most (or exactly?) one Scalar / TableFunction is used.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Fabian
>>>>>>>>>>>
>>>>>>>>>>> Am Do., 8. Nov. 2018 um 19:25 Uhr schrieb jincheng sun <
>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>
>>>>>>>>>>>> We are discussing very detailed content about this proposal. We
>>>>>>>>>>>> are
>>>>>>>>>>> trying
>>>>>>>>>>>> to design the API in many aspects (functionality, compatibility,
>>>>>>> ease
>>>>>>>>>> of
>>>>>>>>>>>> use, etc.). I think this is a very good process. Only such a
>>>>>>> detailed
>>>>>>>>>>>> discussion, In order to develop PR more clearly and smoothly in
>>>>>>>>>>>> the
>>>>>>>>>> later
>>>>>>>>>>>> stage. I am very grateful to @Fabian and @Xiaowei for sharing a
>>>>>>>>>>>> lot
>>>>>>>> of
>>>>>>>>>>>> good ideas.
>>>>>>>>>>>> About the definition of method signatures I want to share my
>>>>>>>>>>>> points
>>>>>>>>>> here
>>>>>>>>>>>> which I am discussing with fabian in google doc (not yet
>>>>>>>>>>>> completed),
>>>>>>>> as
>>>>>>>>>>>> follows:
>>>>>>>>>>>>
>>>>>>>>>>>> Assume we have a table:
>>>>>>>>>>>> val tab = util.addTable[(Long, String)]("MyTable", 'long,
>>>>> 'string,
>>>>>>>>>>>> 'proctime.proctime)
>>>>>>>>>>>>
>>>>>>>>>>>> Approach 1:
>>>>>>>>>>>> case1: Map follows Source Table
>>>>>>>>>>>> val result =
>>>>>>>>>>>> tab.map(udf('string)).as('proctime, 'col1, 'col2)// proctime
>>>>>>> implied
>>>>>>>>>> in
>>>>>>>>>>>> the output
>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
>>>>>>>>>>>>
>>>>>>>>>>>> case2: FatAgg follows Window (Fabian mentioned above)
>>>>>>>>>>>> val result =
>>>>>>>>>>>> tab.window(Tumble ... as 'w)
>>>>>>>>>>>> .groupBy('w, 'k1, 'k2) // 'w should be a group key.
>>>>>>>>>>>> .flatAgg(tabAgg('a)).as('k1, 'k2, 'w, 'col1, 'col2)
>>>>>>>>>>>> .select('k1, 'col1, 'w.rowtime as 'rtime)
>>>>>>>>>>>>
>>>>>>>>>>>> Approach 2: Similar to Fabian‘s approach, which the result
>>> schema
>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>> clearly defined, but add a built-in append UDF. That make
>>>>>>>>>>>> map/flatmap/agg/flatAgg interface only accept one Expression.
>>>>>>>>>>>> val result =
>>>>>>>>>>>> tab.map(append(udf('string), 'long, 'proctime)) as ('col1,
>>>>>>>>>>>> 'col2,
>>>>>>>>>>>> 'long, 'proctime)
>>>>>>>>>>>> .window(Tumble over 5.millis on 'proctime as 'w)
>>>>>>>>>>>>
>>>>>>>>>>>> Note: Append is a special UDF for built-in that can pass through
>>>>>>>>>>>> any
>>>>>>>>>>>> column.
>>>>>>>>>>>>
>>>>>>>>>>>> So, May be we can defined the as table.map(Expression) first,
>>>>> If
>>>>>>>>>>>> necessary, we can extend to table.map(Expression*) in the
>>> future
>>>>>>>>>>>> ?
>>>>>>>> Of
>>>>>>>>>>>> course, I also hope that we can do more perfection in this
>>>>>>>>>>>> proposal
>>>>>>>>>>> through
>>>>>>>>>>>> discussion.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Xiaowei Jiang <[email protected]> 于2018年11月7日周三 下午11:45写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Fabian,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think that the key question you raised is if we allow extra
>>>>>>>>>>> parameters
>>>>>>>>>>>> in
>>>>>>>>>>>>> the methods map/flatMap/agg/flatAgg. I can see why allowing
>>> that
>>>>>>> may
>>>>>>>>>>>> appear
>>>>>>>>>>>>> more convenient in some cases. However, it might also cause
>>> some
>>>>>>>>>>>> confusions
>>>>>>>>>>>>> if we do that. For example, do we allow multiple UDFs in these
>>>>>>>>>>>> expressions?
>>>>>>>>>>>>> If we do, the semantics may be weird to define, e.g. what does
>>>>>>>>>>>>> table.groupBy('k).flatAgg(TableAggA('a), TableAggB('b)) mean?
>>>>>>>>>>>>> Even
>>>>>>>>>>> though
>>>>>>>>>>>>> not allowing it may appear less powerful, but it can make
>>> things
>>>>>>> more
>>>>>>>>>>>>> intuitive too. In the case of agg/flatAgg, we can define the
>>>>> keys
>>>>>>> to
>>>>>>>>>> be
>>>>>>>>>>>>> implied in the result table and appears at the beginning. You
>>>>> can
>>>>>>>>>> use a
>>>>>>>>>>>>> select method if you want to modify this behavior. I think that
>>>>>>>>>>>> eventually
>>>>>>>>>>>>> we will have some API which allows other expressions as
>>>>>>>>>>>>> additional
>>>>>>>>>>>>> parameters, but I think it's better to do that after we
>>>>> introduce
>>>>>>> the
>>>>>>>>>>>>> concept of nested tables. A lot of things we suggested here can
>>>>>>>>>>>>> be
>>>>>>>>>>>>> considered as special cases of that. But things are much
>>> simpler
>>>>>>>>>>>>> if
>>>>>>>>>> we
>>>>>>>>>>>>> leave that to later.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Nov 7, 2018 at 5:18 PM Fabian Hueske <
>>> [email protected]
>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * Re emit:
>>>>>>>>>>>>>> I think we should start with a well understood semantics of
>>>>> full
>>>>>>>>>>>>>> replacement. This is how the other agg functions work.
>>>>>>>>>>>>>> As was said before, there are open questions regarding an
>>>>> append
>>>>>>>>>> mode
>>>>>>>>>>>>>> (checkpointing, whether supporting retractions or not and if
>>>>> yes
>>>>>>>>>> how
>>>>>>>>>>> to
>>>>>>>>>>>>>> declare them, ...).
>>>>>>>>>>>>>> Since this seems to be an optimization, I'd postpone it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * Re grouping keys:
>>>>>>>>>>>>>> I don't think we should automatically add them because the
>>>>>>>>>>>>>> result
>>>>>>>>>>>> schema
>>>>>>>>>>>>>> would not be intuitive.
>>>>>>>>>>>>>> Would they be added at the beginning of the tuple or at the
>>>>> end?
>>>>>>>>>> What
>>>>>>>>>>>>>> metadata fields of windows would be added? In which order
>>> would
>>>>>>>>>> they
>>>>>>>>>>> be
>>>>>>>>>>>>>> added?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However, we could support syntax like this:
>>>>>>>>>>>>>> val t: Table = ???
>>>>>>>>>>>>>> t
>>>>>>>>>>>>>> .window(Tumble ... as 'w)
>>>>>>>>>>>>>> .groupBy('a, 'b)
>>>>>>>>>>>>>> .flatAgg('b, 'a, myAgg(row('*)), 'w.end as 'wend, 'w.rowtime
>>>>> as
>>>>>>>>>>>> 'rtime)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The result schema would be clearly defined as [b, a, f1, f2,
>>>>>>>>>>>>>> ...,
>>>>>>>>>> fn,
>>>>>>>>>>>>> wend,
>>>>>>>>>>>>>> rtime]. (f1, f2, ...fn) are the result attributes of the UDF.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * Re Multi-staged evaluation:
>>>>>>>>>>>>>> I think this should be an optimization that can be applied if
>>>>>>>>>>>>>> the
>>>>>>>>>> UDF
>>>>>>>>>>>>>> implements the merge() method.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am Mi., 7. Nov. 2018 um 08:01 Uhr schrieb Shaoxuan Wang <
>>>>>>>>>>>>>> [email protected]
>>>>>>>>>>>>>>> :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi xiaowei,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, I agree with you that the semantics of
>>>>>>>>>> TableAggregateFunction
>>>>>>>>>>>> emit
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> much more complex than AggregateFunction. The fundamental
>>>>>>>>>>> difference
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> that TableAggregateFunction emits a "table" while
>>>>>>>>>> AggregateFunction
>>>>>>>>>>>>>> outputs
>>>>>>>>>>>>>>> (a column of) a "row". In the case of AggregateFunction it
>>>>> only
>>>>>>>>>> has
>>>>>>>>>>>> one
>>>>>>>>>>>>>>> mode which is “replacing” (complete update). But for
>>>>>>>>>>>>>>> TableAggregateFunction, it could be incremental (only emit
>>> the
>>>>>>>>>> new
>>>>>>>>>>>>>> updated
>>>>>>>>>>>>>>> results) update or complete update (always emit the entire
>>>>>>>>>>>>>>> table
>>>>>>>>>>> when
>>>>>>>>>>>>>>> “emit" is triggered). From the performance perspective, we
>>>>>>>>>>>>>>> might
>>>>>>>>>>>> want
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> use incremental update. But we need review and design this
>>>>>>>>>>> carefully,
>>>>>>>>>>>>>>> especially taking into account the cases of the failover
>>>>>>>>>>>>>>> (instead
>>>>>>>>>>> of
>>>>>>>>>>>>> just
>>>>>>>>>>>>>>> back-up the ACC it may also needs to remember the emit
>>> offset)
>>>>>>>>>> and
>>>>>>>>>>>>>>> retractions, as the semantics of TableAggregateFunction emit
>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>> than other UDFs. TableFunction also emits a table, but it
>>> does
>>>>>>>>>> not
>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> worry this due to the nature of stateless.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Shaoxuan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 7:16 PM Xiaowei Jiang
>>>>>>>>>>>>>>> <[email protected]
>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Jincheng,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for adding the public interfaces! I think that it's a
>>>>>>>>>> very
>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>> start. There are a few points that we need to have more
>>>>>>>>>>>> discussions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> - TableAggregateFunction - this is a very complex beast,
>>>>>>>>>>>>> definitely
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> most complex user defined objects we introduced so far. I
>>>>>>>>>>> think
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> quite some interesting questions here. For example, do we
>>>>>>>>>>> allow
>>>>>>>>>>>>>>>> multi-staged TableAggregate in this case? What is the
>>>>>>>>>>> semantics
>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> emit? Is
>>>>>>>>>>>>>>>> it amendments to the previous output, or replacing it? I
>>>>>>>>>> think
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> subject itself is worth a discussion to make sure we get
>>>>> the
>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>> right.
>>>>>>>>>>>>>>>> - GroupedTable.agg - does the group keys automatically
>>>>>>>>>> appear
>>>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> output? how about the case of windowing aggregation?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>> Xiaowei
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Nov 6, 2018 at 6:25 PM jincheng sun <
>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi, Xiaowei,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for bring up the discuss of Table API Enhancement
>>>>>>>>>>> Outline
>>>>>>>>>>>> !
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I quickly looked at the overall content, these are good
>>>>>>>>>>>> expressions
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> our
>>>>>>>>>>>>>>>>> offline discussions. But from the points of my view, we
>>>>>>>>>> should
>>>>>>>>>>>> add
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> usage of public interfaces that we will introduce in this
>>>>>>>>>>>> propose.
>>>>>>>>>>>>>>> So, I
>>>>>>>>>>>>>>>>> added the following usage description of interface and
>>>>>>>>>>> operators
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> google doc:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Map Operator
>>>>>>>>>>>>>>>>> Map operator is a new operator of Table, Map operator can
>>>>>>>>>>>>> apply a
>>>>>>>>>>>>>>>>> scalar function, and can return multi-column. The usage as
>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>> .map(fun: ScalarFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>> .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. FlatMap Operator
>>>>>>>>>>>>>>>>> FaltMap operator is a new operator of Table, FlatMap
>>>>>>>>>>> operator
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> apply
>>>>>>>>>>>>>>>>> a table function, and can return multi-row. The usage as
>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>> .flatMap(fun: TableFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>> .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. Agg Operator
>>>>>>>>>>>>>>>>> Agg operator is a new operator of Table/GroupedTable, Agg
>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> apply a aggregate function, and can return multi-column.
>>> The
>>>>>>>>>>>> usage
>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define
>>>>>>>>>> global
>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>> .agg(fun: AggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>> .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 4. FlatAgg Operator
>>>>>>>>>>>>>>>>> FlatAgg operator is a new operator of Table/GroupedTable,
>>>>>>>>>>>>> FaltAgg
>>>>>>>>>>>>>>>>> operator can apply a table aggregate function, and can
>>>>> return
>>>>>>>>>>>>>>> multi-row.
>>>>>>>>>>>>>>>>> The usage as follows:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> val res = tab
>>>>>>>>>>>>>>>>> .groupBy(‘a) // leave groupBy-Clause out to define
>>>>>>>>>>> global
>>>>>>>>>>>>>> table
>>>>>>>>>>>>>>>>> aggregates
>>>>>>>>>>>>>>>>> .flatAgg(fun: TableAggregateFunction).as(‘a, ‘b, ‘c)
>>>>>>>>>>>>>>>>> .select(‘a, ‘c)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 5. TableAggregateFunction
>>>>>>>>>>>>>>>>> The behavior of table aggregates is most like
>>>>>>>>>>>>>> GroupReduceFunction
>>>>>>>>>>>>>>>> did,
>>>>>>>>>>>>>>>>> which computed for a group of elements, and output a group
>>>>>>>>>> of
>>>>>>>>>>>>>>> elements.
>>>>>>>>>>>>>>>>> The TableAggregateFunction can be applied on
>>>>>>>>>>>>> GroupedTable.flatAgg() .
>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>> interface of TableAggregateFunction has a lot of content,
>>> so
>>>>>>>>>> I
>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>> copy
>>>>>>>>>>>>>>>>> it here, Please look at the detail in google doc:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>> https://docs.google.com/document/d/19rVeyqveGtV33UZt72GV-DP2rLyNlfs0QNGG0xWjayY/edit
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I will be very appreciate to anyone for reviewing and
>>>>>>>>>>> commenting.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Jincheng
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>> -----------------------------------------------------------------------------------
>>>>>
>>>>> *Rome was not built in one day*
>>>>>
>>>>>
>>>>>
>>> -----------------------------------------------------------------------------------
>>>>>
>>>
>>>