Hi Hequn,
        However is it semantically correct? because the sql result is not equal 
to the bounded table.
        

> 在 2018年8月20日,下午8:34,Hequn Cheng <chenghe...@gmail.com> 写道:
> 
> Hi Henry,
> 
> Both sql output incrementally. 
> 
> However there are some problems if you use retract sink. You have to pay 
> attention to the timestamp field since each time the value is different.  
> For example, if the value is updated from 1 to 2, 
> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
> new row: add (a, 2, 2018-08-20 20:18:10.486)
> The retract row is different from the previous row because of the time field.
> 
> Of course, this problem should be fixed later.
> 
> Best, Hequn
> 
> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <happydexu...@gmail.com 
> <mailto:happydexu...@gmail.com>> wrote:
> Hi All,
>       Like the following code,If I use retract stream, I think Flink is able 
> to know which item is modified( if praise has 10000 items now, when one item 
> comes to the stream, only very small amount of data is write to sink) 
>       var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
> FROM praise group by article_id” )
>         tableEnv.registerTable("finalTable", praiseAggr)
>       tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
> finalTable")
> 
>         But if I use the following sql, by adding a dynamic timestamp field:
>               var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
> as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
>       Is the whole table flush to the sink? Or only the incremental value 
> will flush to the sink? Why?
> 
> Thanks,
> Henry
> 
> 

Reply via email to