[ 
https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492600#comment-16492600
 ] 

Sihua Zhou commented on FLINK-8601:
-----------------------------------

Hi [~aljoscha] could you please have a look at this? I updated the doc to the 
latest version.

> Introduce ElasticBloomFilter for Approximate calculation and other situations 
> of performance optimization
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8601
>                 URL: https://issues.apache.org/jira/browse/FLINK-8601
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API, State Backends, Checkpointing
>    Affects Versions: 1.5.0
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>            Priority: Major
>
> h2. *Motivation*
> There are some scenarios drive us to introduce this ElasticBloomFilter, one 
> is Stream Join, another is Data Deduplication, and some special user 
> cases...This has given us a great experience, for example,  we implemented 
> the Runtime Filter Join base on it, and it gives us a great performance 
> improvement. With this feature, It diff us from the "normal stream join", 
> allows us to improve performance while reducing resource consumption by about 
> half!!!
> I will list the two most typical user cases that optimized by the 
> ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data 
> Dedeplication" in brief.
> *Scenario 1: Runtime Filter Join*
> In general, stream join is one of the most performance cost task. For every 
> record from both side, we need to query the state from the other side, this 
> will lead to poor performance when the state size if huge. So, in production, 
> we always need to spend a lot slots to handle stream join. But, indeed, we 
> can improve this in somehow, there a phenomenon of stream join can be found 
> in production. That's the “joined ratio” of the stream join is often very 
> low, for example.
> - stream join in promotion analysis: Job need to join the promotion log with 
> the action(click, view, buy) log with the promotion_id to analysis the effect 
> of the promotion.
> - stream join in AD(advertising) attribution: Job need to join the AD click 
> log with the item payment log on the click_id to find which click of which AD 
> that brings the payment to do attribution.
> - stream join in click log analysis of doc: Job need to join viewed log(doc 
> viewed by users) with the click log (doc clicked by users) to analysis the 
> reason of the click and the property of the users.
> - ….so on
> All these cases have one common property, that is the joined ratio is very 
> low. Here is a example to describe it, we have 10000 records from the left 
> stream, and 10000 records from the right stream, and we execute  select * 
> from leftStream l join rightStream r on l.id = r.id , we only got 100 record 
> from the result, that is the case for low joined ratio, this is an example 
> for inner join, but it can also apply to left & right join.
> there are more example I can come up with low joined ratio…but the point I 
> want to raise up is that the low joined ratio of stream join in production is 
> a very common phenomenon(maybe even the almost common phenomenon in some 
> companies, at least in our company that is the case).
> *How to improve this?*
> We can see from the above case, 10000 record join 10000 record and we only 
> got 100 result, that means, we query the state 20000 times (10000 for the 
> left stream and 10000 for the right stream) but only 100 of them are 
> meaningful!!! If we could reduce the useless query times, then we can 
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join, the 
> mainly ideal is that, we build a filter for the state on each side (left 
> stream & right stream). When we need to query the state on that side we first 
> check the corresponding filter whether the key is possible in the state, if 
> the filter say "not, it impossible in the State", then we stop querying the 
> state, if it say "hmm, it maybe in state", then we need to query the state. 
> As you can see, the best choose of the filter is Bloom Filter, it has all the 
> feature that we want: extremely good performance, non-existence of false 
> negative.
> The simplest pseudo code for Runtime Filter Join(the comments is based on 
> RocksDBBackend)
> {code:java}
> void performJoinNormally(Record recordFromLeftStream) {
>       Iterator<Record> rightIterator = rigthStreamState.iterator();
>       // perform the `seek()` on the RocksDB, and iterator one by one,
>       // this is an expensive operation especially when the key can't be 
> found in RocksDB.
> for (Record recordFromRightState : rightIterator) {
>       ……...
> }
> }
>  
> void performRuntimeFilterJoin(Record recordFromLeftStream) {
>       Iterator<Record> rightIterator = EMPTY_ITERATOR;
>       if (rigthStreamfilter.containsCurrentKey()) {
>                       rightIterator = rigthStreamState.iterator();
>       }
>  // perform the `seek()` only when filter.containsCurrentKey() return true
>       for (Record recordFromRightState : rightIterator) {
>               .......
>       }
>        // add the current key into the filter of left stream.
>       leftStreamFilter.addCurrentKey();
> }
> {code}
> *Scenario 2:  Data Deduplication*
> We have implemented two general functions based on the ElasticBloomFilter. 
> They are count(distinct x) and select distinct x, y, z from table. Unlike the 
> Runtime Filter Join the result of this two functions is approximate, not 
> exactly. There are used in the scenario where we don't need a 100% accurate 
> result, for example, to count the number of visiting users in each online 
> store. In general, we don't need a 100% accurate result in this case(indeed 
> we can't give a 100% accurate result, because there could be error when 
> collecting user_id from different devices), if we could get a 98% accurate 
> result with only 1/2 resource, that could be very nice.
> {code:java}
> void countDistinctNormally(Key key, Iterator<Record> records) {
>       // query 1 times
>       final long oldVal = valState.get();
>       long val = oldVal;
>               // query records.size() times
>       for (Record record : records) {
>                       if (mapState.get(record) == null) {
>                               ++val;
>                               mapState.put(record);
>                       }
>       }
>       if (val != oldVal) {
>                       valState.update(val);
>       }
> }
>  
> void countDistinctBF(Key key, Iterator<Record> records) {
>       // query 1 times
>       final long oldVal = valState.get();
>       long val = oldVal;
>       for (Record record : records) {
>                       if (!bfState.contains(record)) {
>                               ++val;
>                               bfState.add(record);
>                       }
>       }
>       if (val != oldVal) {
>                       valState.update(val);
>       }
> }
> {code}
> I believe there would be more user cases in stream world that could be 
> optimized by the Bloom Filter(as what it had done in the big data world)...
> *Required features and challenges*
> There are a few challenges with using bloom filter in flink. Firstly, it need 
> to be held as operator state because it need to support 1) fault-tolerant, 
> and as well as 2) rescaling. Beside, because we need to support rescaling, so 
> we need to create bloom filter for each key group to store data fails into 
> it, so another challenge is how to 3) handle data skewed(The amount of data 
> that falls into different groups could be very different )? Imagine that we 
> create a BF on each key group for the incoming data, and we are able to 
> estimate the total amount of data, then the question is what the size should 
> we create for the BF that on each key group? It is so tricky and even 
> impossible to estimate the amount of data on each key group. After that, 
> because that Bloom Filter need to live in the memory to get the extremely 
> performance, so we need a 4) TTL policy to recycle memory, otherwise we will 
> get OOM finally. So, as a brief summarize we need to at lest fullfill the 
> follow features:
> - Fault tolerant(checkpoint & restoring)
> - Rescaling
> - Handle data skewed
> - TTL policy
> Design doc:  [design 
> doc|https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to