Hi Hequn, However is it semantically correct? because the sql result is not equal to the bounded table.
> 在 2018年8月20日,下午8:34,Hequn Cheng <chenghe...@gmail.com> 写道: > > Hi Henry, > > Both sql output incrementally. > > However there are some problems if you use retract sink. You have to pay > attention to the timestamp field since each time the value is different. > For example, if the value is updated from 1 to 2, > previous row: add (a, 1, 2018-08-20 20:18:10.286) > retract row: delete (a, 1, 2018-08-20 20:18:10.386) > new row: add (a, 2, 2018-08-20 20:18:10.486) > The retract row is different from the previous row because of the time field. > > Of course, this problem should be fixed later. > > Best, Hequn > > On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <happydexu...@gmail.com > <mailto:happydexu...@gmail.com>> wrote: > Hi All, > Like the following code,If I use retract stream, I think Flink is able > to know which item is modified( if praise has 10000 items now, when one item > comes to the stream, only very small amount of data is write to sink) > var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU > FROM praise group by article_id” ) > tableEnv.registerTable("finalTable", praiseAggr) > tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from > finalTable") > > But if I use the following sql, by adding a dynamic timestamp field: > var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) > as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” ) > Is the whole table flush to the sink? Or only the incremental value > will flush to the sink? Why? > > Thanks, > Henry > >