Hi Hequn,
        Maybe I do not express clearly. I mean if only the update_timestamp of 
the increment data is updated, it is not enough. Because from the sql, it 
express the idea “all the time in the table is the same”, but actually each 
item in the table may be different. It is a bit weird.

Best, Henry



> 在 2018年8月21日,上午10:09,Hequn Cheng <chenghe...@gmail.com> 写道:
> 
> Hi Henry,
> 
> If you upsert by key 'article_id', the result is correct, i.e, the result is 
> (a, 2, 2018-08-20 20:18:10.486). What do you think?
> 
> Best, Hequn
> 
> 
> 
> On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <happydexu...@gmail.com 
> <mailto:happydexu...@gmail.com>> wrote:
> Hi Hequn,
>       However is it semantically correct? because the sql result is not equal 
> to the bounded table.
>       
> 
>> 在 2018年8月20日,下午8:34,Hequn Cheng <chenghe...@gmail.com 
>> <mailto:chenghe...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> Both sql output incrementally. 
>> 
>> However there are some problems if you use retract sink. You have to pay 
>> attention to the timestamp field since each time the value is different.  
>> For example, if the value is updated from 1 to 2, 
>> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
>> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
>> new row: add (a, 2, 2018-08-20 20:18:10.486)
>> The retract row is different from the previous row because of the time field.
>> 
>> Of course, this problem should be fixed later.
>> 
>> Best, Hequn
>> 
>> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <happydexu...@gmail.com 
>> <mailto:happydexu...@gmail.com>> wrote:
>> Hi All,
>>      Like the following code,If I use retract stream, I think Flink is able 
>> to know which item is modified( if praise has 10000 items now, when one item 
>> comes to the stream, only very small amount of data is write to sink) 
>>      var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU 
>> FROM praise group by article_id” )
>>         tableEnv.registerTable("finalTable", praiseAggr)
>>      tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
>> finalTable")
>> 
>>         But if I use the following sql, by adding a dynamic timestamp field:
>>              var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) 
>> as PU,LOCALTIMESTAMP as update_timestamp FROM praise group by article_id” )
>>       Is the whole table flush to the sink? Or only the incremental value 
>> will flush to the sink? Why?
>> 
>> Thanks,
>> Henry
>> 
>> 
> 
> 

Reply via email to