[ https://issues.apache.org/jira/browse/FLINK-8601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492600#comment-16492600 ]
Sihua Zhou commented on FLINK-8601: ----------------------------------- Hi [~aljoscha] could you please have a look at this? I updated the doc to the latest version. > Introduce ElasticBloomFilter for Approximate calculation and other situations > of performance optimization > --------------------------------------------------------------------------------------------------------- > > Key: FLINK-8601 > URL: https://issues.apache.org/jira/browse/FLINK-8601 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Major > > h2. *Motivation* > There are some scenarios drive us to introduce this ElasticBloomFilter, one > is Stream Join, another is Data Deduplication, and some special user > cases...This has given us a great experience, for example, we implemented > the Runtime Filter Join base on it, and it gives us a great performance > improvement. With this feature, It diff us from the "normal stream join", > allows us to improve performance while reducing resource consumption by about > half!!! > I will list the two most typical user cases that optimized by the > ElasticBloomFilter: one is "Runtime Filter Join" in detail, another is "Data > Dedeplication" in brief. > *Scenario 1: Runtime Filter Join* > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the “joined ratio” of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, buy) log with the promotion_id to analysis the effect > of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the click_id to find which click of which AD > that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the joined ratio is very > low. Here is a example to describe it, we have 10000 records from the left > stream, and 10000 records from the right stream, and we execute select * > from leftStream l join rightStream r on l.id = r.id , we only got 100 record > from the result, that is the case for low joined ratio, this is an example > for inner join, but it can also apply to left & right join. > there are more example I can come up with low joined ratio…but the point I > want to raise up is that the low joined ratio of stream join in production is > a very common phenomenon(maybe even the almost common phenomenon in some > companies, at least in our company that is the case). > *How to improve this?* > We can see from the above case, 10000 record join 10000 record and we only > got 100 result, that means, we query the state 20000 times (10000 for the > left stream and 10000 for the right stream) but only 100 of them are > meaningful!!! If we could reduce the useless query times, then we can > definitely improve the performance of stream join. > the way we used to improve this is to introduce the Runtime Filter Join, the > mainly ideal is that, we build a filter for the state on each side (left > stream & right stream). When we need to query the state on that side we first > check the corresponding filter whether the key is possible in the state, if > the filter say "not, it impossible in the State", then we stop querying the > state, if it say "hmm, it maybe in state", then we need to query the state. > As you can see, the best choose of the filter is Bloom Filter, it has all the > feature that we want: extremely good performance, non-existence of false > negative. > The simplest pseudo code for Runtime Filter Join(the comments is based on > RocksDBBackend) > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator<Record> rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key can't be > found in RocksDB. > for (Record recordFromRightState : rightIterator) { > ……... > } > } > > void performRuntimeFilterJoin(Record recordFromLeftStream) { > Iterator<Record> rightIterator = EMPTY_ITERATOR; > if (rigthStreamfilter.containsCurrentKey()) { > rightIterator = rigthStreamState.iterator(); > } > // perform the `seek()` only when filter.containsCurrentKey() return true > for (Record recordFromRightState : rightIterator) { > ....... > } > // add the current key into the filter of left stream. > leftStreamFilter.addCurrentKey(); > } > {code} > *Scenario 2: Data Deduplication* > We have implemented two general functions based on the ElasticBloomFilter. > They are count(distinct x) and select distinct x, y, z from table. Unlike the > Runtime Filter Join the result of this two functions is approximate, not > exactly. There are used in the scenario where we don't need a 100% accurate > result, for example, to count the number of visiting users in each online > store. In general, we don't need a 100% accurate result in this case(indeed > we can't give a 100% accurate result, because there could be error when > collecting user_id from different devices), if we could get a 98% accurate > result with only 1/2 resource, that could be very nice. > {code:java} > void countDistinctNormally(Key key, Iterator<Record> records) { > // query 1 times > final long oldVal = valState.get(); > long val = oldVal; > // query records.size() times > for (Record record : records) { > if (mapState.get(record) == null) { > ++val; > mapState.put(record); > } > } > if (val != oldVal) { > valState.update(val); > } > } > > void countDistinctBF(Key key, Iterator<Record> records) { > // query 1 times > final long oldVal = valState.get(); > long val = oldVal; > for (Record record : records) { > if (!bfState.contains(record)) { > ++val; > bfState.add(record); > } > } > if (val != oldVal) { > valState.update(val); > } > } > {code} > I believe there would be more user cases in stream world that could be > optimized by the Bloom Filter(as what it had done in the big data world)... > *Required features and challenges* > There are a few challenges with using bloom filter in flink. Firstly, it need > to be held as operator state because it need to support 1) fault-tolerant, > and as well as 2) rescaling. Beside, because we need to support rescaling, so > we need to create bloom filter for each key group to store data fails into > it, so another challenge is how to 3) handle data skewed(The amount of data > that falls into different groups could be very different )? Imagine that we > create a BF on each key group for the incoming data, and we are able to > estimate the total amount of data, then the question is what the size should > we create for the BF that on each key group? It is so tricky and even > impossible to estimate the amount of data on each key group. After that, > because that Bloom Filter need to live in the memory to get the extremely > performance, so we need a 4) TTL policy to recycle memory, otherwise we will > get OOM finally. So, as a brief summarize we need to at lest fullfill the > follow features: > - Fault tolerant(checkpoint & restoring) > - Rescaling > - Handle data skewed > - TTL policy > Design doc: [design > doc|https://docs.google.com/document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)