Hi Henry, If you upsert by key 'article_id', the result is correct, i.e, the result is (a, 2, 2018-08-20 20:18:10.486). What do you think?
Best, Hequn On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <happydexu...@gmail.com> wrote: > Hi Hequn, > However is it semantically correct? because the sql result is not equal to > the bounded table. > > > 在 2018年8月20日,下午8:34,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi Henry, > > Both sql output incrementally. > > However there are some problems if you use retract sink. You have to pay > attention to the timestamp field since each time the value is different. > For example, if the value is updated from 1 to 2, > > previous row: add (a, 1, 2018-08-20 20:18:10.286) > retract row: delete (a, 1, 2018-08-20 20:18:10.386) > new row: add (a, 2, 2018-08-20 20:18:10.486) > > The retract row is different from the previous row because of the time > field. > > Of course, this problem should be fixed later. > > Best, Hequn > > On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <happydexu...@gmail.com> wrote: > >> Hi All, >> Like the following code,If I use retract stream, I think Flink is able to >> know which item is modified( if praise has 10000 items now, when one item >> comes to the stream, only very small amount of data is write to sink) >> >> var praiseAggr = tableEnv.sqlQuery(*s"SELECT article_id,hll(uid) as PU >> FROM praise group by article_id**”* ) >> >> tableEnv.registerTable("finalTable", praiseAggr) >> >> tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from >> finalTable") >> >> >> But if I use the following sql, by adding a dynamic timestamp >> field: >> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as >> PU,LOCALTIMESTAMP >> as update_timestamp* FROM praise group by article_id**”* ) >> Is the whole table flush to the sink? Or only the incremental >> value will flush to the sink? Why? >> >> Thanks, >> Henry >> >> > >