Hi Henry,

If you upsert by key 'article_id', the result is correct, i.e, the result
is (a, 2, 2018-08-20 20:18:10.486). What do you think?

Best, Hequn



On Tue, Aug 21, 2018 at 9:44 AM, 徐涛 <happydexu...@gmail.com> wrote:

> Hi Hequn,
> However is it semantically correct? because the sql result is not equal to
> the bounded table.
>
>
> 在 2018年8月20日,下午8:34,Hequn Cheng <chenghe...@gmail.com> 写道:
>
> Hi Henry,
>
> Both sql output incrementally.
>
> However there are some problems if you use retract sink. You have to pay
> attention to the timestamp field since each time the value is different.
> For example, if the value is updated from 1 to 2,
>
> previous row:  add (a, 1, 2018-08-20 20:18:10.286)
> retract row: delete (a, 1, 2018-08-20 20:18:10.386)
> new row: add (a, 2, 2018-08-20 20:18:10.486)
>
> The retract row is different from the previous row because of the time
> field.
>
> Of course, this problem should be fixed later.
>
> Best, Hequn
>
> On Mon, Aug 20, 2018 at 6:43 PM, 徐涛 <happydexu...@gmail.com> wrote:
>
>> Hi All,
>> Like the following code,If I use retract stream, I think Flink is able to
>> know which item is modified( if praise has 10000 items now, when one item
>> comes to the stream, only very small amount of data is write to sink)
>>
>>      var praiseAggr = tableEnv.sqlQuery(*s"SELECT article_id,hll(uid) as PU 
>> FROM praise group by article_id**”* )
>>
>>         tableEnv.registerTable("finalTable", praiseAggr)
>>
>>      tableEnv.sqlUpdate(s"insert into sinkTableName SELECT * from 
>> finalTable")
>>
>>
>>         But if I use the following sql, by adding a dynamic timestamp
>> field:
>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as 
>> PU,LOCALTIMESTAMP
>> as update_timestamp* FROM praise group by article_id**”* )
>>       Is the whole table flush to the sink? Or only the incremental
>> value will flush to the sink? Why?
>>
>> Thanks,
>> Henry
>>
>>
>
>

Reply via email to