Hi Henry,

You can increase the retention time to make sure all data you want won't be
expired.
As for incremental, I think we can sink results into a kv storage, say
hbase. The hbase table contains a total and latest data set you want so
that you don't need to flush again. Would it be satisfy your scenario?

Best, Hequn


On Wed, Aug 22, 2018 at 2:51 PM 徐涛 <happydexu...@gmail.com> wrote:

> Hi Hequn,
> Thanks a lot four your response! This helps me understand the mechanism
> more clearly.
>
> I have another question:
> *How do I use flink to accoplish time attenuation?*
> If a use join plus retention time solution, I can only get the increment
> data. But some other data may need to be recomputed because the time
> attenuation. Then how do I flush them?
>
> Best,
> Henry,
>
> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com> 写道:
>
> Hi Hery,
>
> As for choise1:
>
>    - The state size of join depends on it's input table size, not the
>    result table, so the state size of join of choise1 depends on how many
>    article id, praise id and response_id.
>    - Also non-window join will merge same rows in it's state, i.e, <Row,
>    RowCnt>, so the state size won't grows if you keep pouring same article id.
>    I think the problem here is you need a distinct before join, so that a
>    praise id won't join multi same article ids, and this will influence the
>    correctness of the result.
>    - I think you need do aggregate before join to make sure the
>    correctness of the result. Because there are duplicated article id after
>    article join praise and this will influence the value
>    of count(r.response_id).
>    - You can't use window or other bounded operators after non-window
>    join. The time attribute fields can not be passed through because of
>    semantic conflict.
>    - Hop window with large fixed duration and small hop interval should
>    be avoided. Data will be redundant in various windows. For example, a
>    hopping window of 15 minutes size and 5 minute hop interval assigns each
>    row to 3 different windows of 15 minute size.
>
> As for choice2:
>
>    - I think you need another filed(for example, HOP_START) when join the
>    three tables. Only join records in same window.
>
> To solve your problem, I think we can do non-window group by first and
> then join three result tables. Furthermore, state retention time can be set
> to keep state from growing larger.
>
> Best, Hequn
>
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com> wrote:
>
>> Hi Fabian,
>> So maybe I can not join a table that generate from a window, because the
>> table is getting larger and larger as the time goes, maybe the system will
>> crash one day.
>>
>> I am working on a system that calculate the “score" of article, which is
>> consist of the count of article praise, the count of article response, etc
>> Because I can not use flink to save all the article, I decide to update
>> the score of the article that created in 3 days.
>>
>> I have two choises,
>> 1. join the article table and praise table, response table then window
>> select a.article_id, count(p.praise_id) as pCount, count(r.response_id)
>> as rCount
>> from
>> article a
>> left join
>> praise p on a.article_id = p.article_id
>> left join
>> response r on a.article_id = r.article_id
>> group by hop(updated_time, interval '1' minute,interval '3' day) ,
>> article_id
>> 2. window the article table, window the priase table, window the response
>> table ,then join them together
>> select aAggr.article_id, pAggr.pCount, rAggr.rCount
>> (select article_id from article group by hop(updated_time, interval '1'
>> minute,interval '3' day) , article_id) aAggr
>> left join
>> (select article_id,count(praise_id) as pCount from praise group by hop(
>> updated_time, interval '1' minute,interval '3' day) , article_id) pAggr
>> on aAggr.article_id=pAggr.article_id
>> left join
>> (select article_id,count(response_id) as rCount from response group by
>> hop(updated_time, interval '1' minute,interval '3' day) , article_id)
>> rAggr on aAggr.article_id=rAggr.article_id
>>
>> Maybe I should choose 1,   join then window, but not window then join.
>> Please correct me if I am wrong.
>>
>> I have some worries when choose 1,
>> I do not know how Flink works internally, it seems that in the sql ,
>> table article ,table praise, table response is growing as the time goes by,
>> will it introduce performance issue?
>>
>> Best,
>> Henry
>>
>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com> 写道:
>>
>> Hi Henry,
>>
>> praiseAggr is an append table, so it contains
>> "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY
>> article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM
>> praise", the answer is "100"
>>
>> Best, Hequn
>>
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>> Thanks for your response. This question puzzles me for quite a long time.
>>> If the praiseAggr has the following value:
>>> window-1     100,101,102
>>> window-2     100,101,103
>>> window-3     100
>>>
>>> the last time the article table joins praiseAggr, which of the following
>>> value does praiseAggr table has?
>>> 1— 100,101,102,100,101,103,100           collect all the element of all
>>> the window
>>> 2—  100    the element of the latest window
>>> 3—  101,102,103    the distinct value of all the window
>>>
>>>
>>> Best,
>>> Henry
>>>
>>>
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com> 写道:
>>>
>>> Hi,
>>>
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id.
>>> If you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so
>>> the state won't grow that quickly, but the performance will decrease
>>> because for each row from article will join with multiple (a growing
>>> number) of rows from praiseAggr.
>>>
>>> Best, Fabian
>>>
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com>:
>>>
>>>> Hi All,
>>>> var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise
>>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) ,
>>>> article_id" )
>>>> tableEnv.registerTable("praiseAggr", praiseAggr)
>>>>
>>>>     var finalTable = tableEnv.sqlQuery(*s**”**SELECT 1 FROM article a join 
>>>> praiseAggr p on a.article_id=p.article_id" *)
>>>>     tableEnv.registerTable("finalTable", finalTable)
>>>>
>>>>  I know that praiseAggr, if written to sink, is append mode , so if a
>>>> table joins praiseAggr, what the table “see”, is a table contains the
>>>> latest value, or a table that grows larger and larger? If it is the later,
>>>> will it introduce performance problem?
>>>>  Thanks a lot.
>>>>
>>>>
>>>> Best,
>>>> Henry
>>>>
>>>
>>>
>>>
>>
>>
>

Reply via email to