Hi,

There is an ongoing work [1] to support natively the streams like you described 
(we call them upsert streams/changelogs). But it boils down to the exactly the 
same thing you have done - aggregating the records per key and adding `latest` 
aggregation function. Until we support this natively, you can use the query 
that you have written.

Regarding the state size. In most cases there is no workaround this issue. 
Records overwriting previous value could arrive at arbitrary point of time and 
for most of the operations (like SUM aggregation in your case, filtering) we 
need to keep the previous value for the key on the state. Sometimes it might be 
possible to optimise the query and skip the “latest value aggregation”, if the 
following SQL operator either do not need to know the previous value (like sink 
or projection) or if the following SQL operator knows the previous value anyway 
(like join).

Piotr Nowojski

[1] https://issues.apache.org/jira/browse/FLINK-8545 
<https://issues.apache.org/jira/browse/FLINK-8545>

> On 21 Mar 2019, at 09:39, 徐涛 <happydexu...@gmail.com> wrote:
> 
> Hi Experts,
>       Assuming there is a stream which content is like this:
>        Seq     ID             MONEY
>       1.        100           100
>        2.        100           200
>        3.        101           300
> 
>       The record of Seq#2 is updating record of Seq#1, changing the money 
> from 100 to 200.
>       If I register the stream as table T, and want to sum all the money 
> group by each ID, if I write  "select sum(MONEY) from T”, will get 600 as the 
> result, which is incorrect.
> 
>       I can write a UDAF, for example latest, to compute the latest value of 
> all the ID, then the SQL is like this:
>       select sum(MONEY) from
>       (
>               select ID, latest(MONEY) from T group by ID
>       )
>       But I have to save each ID and its latest value in state, I am worried 
> that the state goes too large. Now I use this method and set the state 
> retention to several days before the state goes too large. I wonder if there 
> are better ways to do this.
> 
>       So what is the best practice in this scenario? Anyone have a 
> suggestion? Thanks a lot.
> 
> 
> Best
> Henry
>       

Reply via email to