[ 
https://issues.apache.org/jira/browse/FLINK-7799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16204554#comment-16204554
 ] 

Xingcan Cui edited comment on FLINK-7799 at 10/15/17 8:11 AM:
--------------------------------------------------------------

Hi [~fhueske], I've reconsidered this problem and found a drawback of the time 
block solution. 

Specifically, if we merge all the rows belonging to the same time block in an 
entry (the value of which is either a {{Map}} or a {{List}}) of the 
{{MapState}}, the minimum operating unit of the state becomes a collection. 
That means everytime we store/remove a single row, all the data in the same 
block must also be rewritten, which will definitely bring a lot of extra cost. 
Maybe we can set two states (one for real data and the other one for the block 
index) for that?

Anyway, since the rocksdb backend should be widely used in real applications 
and the {{MapState}} entries are ordered in it, I wonder if something like a  
hint mechanism could be provided in the state API, so that the join function 
can be aware of the ordering.


was (Author: xccui):
Hi [~fhueske], I've reconsidered this problem and found a drawback of the time 
block solution. 

Specifically, if we merge all the rows belonging to the same time block in an 
entry (the value of which is either a {{Map}} or a {{List}}) of the 
{{MapState}}, the minimum operating unit of the state becomes a collection. 
That means everytime we store/remove a single row, all the data in the same 
block must also be rewritten, which will definitely bring a lot of extra cost.

If that drawback cannot be eliminated, I wonder if we could improve the join 
performance from another point of view. Since the rocksdb backend should be 
widely used in real applications and the {{MapState}} entries are ordered in 
it, can we provide something like a  hint mechanism in the state API, so that 
the join function can be aware of the ordering?

> Improve performance of windowed joins
> -------------------------------------
>
>                 Key: FLINK-7799
>                 URL: https://issues.apache.org/jira/browse/FLINK-7799
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: Fabian Hueske
>            Assignee: Xingcan Cui
>            Priority: Critical
>
> The performance of windowed joins can be improved by changing the state 
> access patterns.
> Right now, rows are inserted into a MapState with their timestamp as key. 
> Since we use a time resolution of 1ms, this means that the full key space of 
> the state must be iterated and many map entries must be accessed when joining 
> or evicting rows. 
> A better strategy would be to block the time into larger intervals and 
> register the rows in their respective interval. Another benefit would be that 
> we can directly access the state entries because we know exactly which 
> timestamps to look up. Hence, we can limit the state access to the relevant 
> section during joining and state eviction. 
> The good size for intervals needs to be identified and might depend on the 
> size of the window.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to