Hi Hequn,
        You can't use window or other bounded operators after non-window join. 
The time attribute fields can not be passed through because of semantic 
conflict.
       Why does Flink have this limitation?
       I have a temp view
        
        var finalTable = tableEnv.sqlQuery(s"select * from
                A join B on xxxx
                join C on xxxx  " )
        tableEnv.registerTable("finalTable", finalTable)
     
      And I want to window this table because I want it to output 1 minute per 
second, however obviously I can not do this now, may I ask how can I make a 
“final table” to output 1 minute per second? And if a table is a retract 
stream, will the item added to the window be retracted either?

      Thanks a lot.     
        

Best
Henry   



> 在 2018年8月22日,上午10:30,Hequn Cheng <chenghe...@gmail.com> 写道:
> 
> Hi Hery,
> 
> As for choise1:
> The state size of join depends on it's input table size, not the result 
> table, so the state size of join of choise1 depends on how many article id, 
> praise id and response_id. 
> Also non-window join will merge same rows in it's state, i.e, <Row, RowCnt>, 
> so the state size won't grows if you keep pouring same article id. I think 
> the problem here is you need a distinct before join, so that a praise id 
> won't join multi same article ids, and this will influence the correctness of 
> the result.
> I think you need do aggregate before join to make sure the correctness of the 
> result. Because there are duplicated article id after article join praise and 
> this will influence the value of count(r.response_id).
> You can't use window or other bounded operators after non-window join. The 
> time attribute fields can not be passed through because of semantic conflict.
> Hop window with large fixed duration and small hop interval should be 
> avoided. Data will be redundant in various windows. For example, a hopping 
> window of 15 minutes size and 5 minute hop interval assigns each row to 3 
> different windows of 15 minute size.
> As for choice2:
> I think you need another filed(for example, HOP_START) when join the three 
> tables. Only join records in same window.
> To solve your problem, I think we can do non-window group by first and then 
> join three result tables. Furthermore, state retention time can be set to 
> keep state from growing larger.
> 
> Best, Hequn
> 
> On Tue, Aug 21, 2018 at 10:07 PM 徐涛 <happydexu...@gmail.com 
> <mailto:happydexu...@gmail.com>> wrote:
> Hi Fabian,
>       So maybe I can not join a table that generate from a window, because 
> the table is getting larger and larger as the time goes, maybe the system 
> will crash one day. 
> 
>       I am working on a system that calculate the “score" of article, which 
> is consist of the count of article praise, the count of article response, etc
>       Because I can not use flink to save all the article, I decide to update 
> the score of the article that created in 3 days.
> 
>       I have two choises,
>       1. join the article table and praise table, response table then window
>               select a.article_id, count(p.praise_id) as pCount, 
> count(r.response_id) as rCount
>               from
>                       article a
>               left join
>                       praise p on a.article_id = p.article_id
>               left join
>                       response r on a.article_id = r.article_id
>               group by hop(updated_time, interval '1' minute,interval '3' 
> day) , article_id
>       2. window the article table, window the priase table, window the 
> response table ,then join them together
>               select aAggr.article_id, pAggr.pCount, rAggr.rCount
>               (select article_id from article group by hop(updated_time, 
> interval '1' minute,interval '3' day) , article_id) aAggr
>               left join
>               (select article_id,count(praise_id) as pCount from praise group 
> by hop(updated_time, interval '1' minute,interval '3' day) , article_id) 
> pAggr on aAggr.article_id=pAggr.article_id
>               left join
>               (select article_id,count(response_id) as rCount from response 
> group by hop(updated_time, interval '1' minute,interval '3' day) , 
> article_id) rAggr on aAggr.article_id=rAggr.article_id
> 
>       Maybe I should choose 1,   join then window, but not window then join.  
>  
>       Please correct me if I am wrong.
> 
>       I have some worries when choose 1,
>       I do not know how Flink works internally, it seems that in the sql , 
> table article ,table praise, table response is growing as the time goes by, 
> will it introduce performance issue? 
> 
> Best,
> Henry
> 
>> 在 2018年8月21日,下午9:29,Hequn Cheng <chenghe...@gmail.com 
>> <mailto:chenghe...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> praiseAggr is an append table, so it contains "100,101,102,100,101,103,100".
>> 1. if you change your sql to s"SELECT article_id FROM praise GROUP BY 
>> article_id", the answer is "101,102,103"
>> 2. if you change your sql to s"SELECT last_value(article_id) FROM praise", 
>> the answer is "100" 
>> 
>> Best, Hequn
>> 
>> On Tue, Aug 21, 2018 at 8:52 PM, 徐涛 <happydexu...@gmail.com 
>> <mailto:happydexu...@gmail.com>> wrote:
>> Hi Fabian,
>>      Thanks for your response. This question puzzles me for quite a long 
>> time.
>>      If the praiseAggr has the following value:
>>      window-1     100,101,102
>>      window-2            100,101,103
>>      window-3            100
>> 
>>      the last time the article table joins praiseAggr, which of the 
>> following value does praiseAggr table has?
>>      1—      100,101,102,100,101,103,100           collect all the element 
>> of all the window
>>      2—  100                                                    the element 
>> of the latest window
>>      3—  101,102,103                                    the distinct value 
>> of all the window
>> 
>> 
>> Best,
>> Henry
>> 
>> 
>>> 在 2018年8月21日,下午8:02,Fabian Hueske <fhue...@gmail.com 
>>> <mailto:fhue...@gmail.com>> 写道:
>>> 
>>> Hi,
>>> 
>>> The semantics of a query do not depend on the way that it is used.
>>> praiseAggr is a table that grows by one row per second and article_id. If 
>>> you use that table in a join, the join will fully materialize the table.
>>> This is a special case because the same row is added multiple times, so the 
>>> state won't grow that quickly, but the performance will decrease because 
>>> for each row from article will join with multiple (a growing number) of 
>>> rows from praiseAggr.
>>> 
>>> Best, Fabian
>>> 
>>> 2018-08-21 12:19 GMT+02:00 徐涛 <happydexu...@gmail.com 
>>> <mailto:happydexu...@gmail.com>>:
>>> Hi All,
>>>     var praiseAggr = tableEnv.sqlQuery(s"SELECT article_id FROM praise 
>>> GROUP BY HOP(updated_time, INTERVAL '1' SECOND,INTERVAL '3' MINUTE) , 
>>> article_id" )
>>>     tableEnv.registerTable("praiseAggr", praiseAggr)
>>>     var finalTable = tableEnv.sqlQuery(s”SELECT 1 FROM article a join 
>>> praiseAggr p on a.article_id=p.article_id" )
>>>     tableEnv.registerTable("finalTable", finalTable)
>>>      I know that praiseAggr, if written to sink, is append mode , so if a 
>>> table joins praiseAggr, what the table “see”, is a table contains the 
>>> latest value, or a table that grows larger and larger? If it is the later, 
>>> will it introduce performance problem?
>>>      Thanks a lot.
>>> 
>>> 
>>> Best, 
>>> Henry
>>> 
>> 
>> 
> 

Reply via email to