Thanks for the proposal Sihua! Let me try to summarize the motivation / scope of this proposal.
You are proposing to add support for a special Bloom Filter state per KeyGroup and reduce the number of key accesses by checking the Bloom Filter first. This is would be a rather generic feature that could be interesting for various applications, including joins and deduplication as you described. IMO, such a feature would be very interesting. However, my concerns with Bloom Filter is that they are insert-only data structures, i.e., it is not possible to remove keys once they were added. This might render the filter useless over time. In a different thread (see discussion in FLINK-8918 [1]), you mentioned that the Bloom Filters would be growing. If we keep them in memory, how can we prevent them from exceeding memory boundaries over time? Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-8918 2018-05-23 9:56 GMT+02:00 sihua zhou <summerle...@163.com>: > Hi Devs! > I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I > make up this proposal is that, it helped us a lot on production, it let's > improve the performance with reducing consumption of resources. Here is a > brief description fo the motivation of why it's so powful, more detail > information can be found https://issues.apache.org/jira/browse/FLINK-8601 , > and the design doc can be found https://docs.google.com/ > document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing > > ------------------------------------ > *Motivation* > > There are some scenarios drive us to introduce this ElasticBloomFilter, > one is Stream Join, another is Data Deduplication, and some special user > cases...This has given us a great experience, for example, we implemented > the Runtime Filter Join base on it, and it gives us a great performance > improvement. With this feature, It diffs us from the "normal stream join", > allows us to improve performance while reducing resource consumption by > about half!!! > I will list the two most typical user cases that optimized by the > ElasticBloomFilter: one is "*Runtime Filter Join*" in detail, another is > "*Data > Dedeplication*" in brief. > > *Scenario 1: Runtime Filter Join* > > In general, stream join is one of the most performance cost task. For > every record from both side, we need to query the state from the other > side, this will lead to poor performance when the state size if huge. So, > in production, we always need to spend a lot slots to handle stream join. > But, indeed, we can improve this in somehow, there a phenomenon of stream > join can be found in production. That's the “joined ratio” of the stream > join is often very low, for example. > > - stream join in promotion analysis: Job need to join the promotion > log with the action(click, view, buy) log with the promotion_id to analysis > the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD > click log with the item payment log on the click_id to find which click of > which AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed > log(doc viewed by users) with the click log (doc clicked by users) to > analysis the reason of the click and the property of the users. > - ….so on > > All these cases have one common property, that is the joined ratio is very > low. Here is a example to describe it, we have 10000 records from the left > stream, and 10000 records from the right stream, and we execute *select > * from leftStream l join rightStream r on l.id <http://l.id> = r.id > <http://r.id>* , we only got 100 record from the result, that is the case > for low joined ratio, this is an example for inner join, but it can also > applied to left & right join. > > there are more example I can come up with low joined ratio…but the point I > want to raise up is that the low joined ratio of stream join in production > is a very common phenomenon(maybe even the almost common phenomenon in some > companies, at least in our company that is the case). > > *How to improve this?* > > We can see from the above case, 10000 record join 10000 record and we only > got 100 result, that means, we query the state 20000 times (10000 for the > left stream and 10000 for the right stream) but only 100 of them are > meaningful!!! If we could reduce the useless query times, then we can > definitely improve the performance of stream join. > the way we used to improve this is to introduce the Runtime Filter Join, > the mainly ideal is that, we build a filter for the state on each side > (left stream & right stream). When we need to query the state on that side > we first check the corresponding filter whether the key is possible in the > state, if the filter say "not, it impossible in the State", then we stop > querying the state, if it say "hmm, it maybe in state", then we need to > query the state. As you can see, the best choose of the filter is Bloom > Filter, it has all the feature that we want: extremely good performance, > non-existence of false negative. > > *Scenario 2: Data Deduplication* > > We have implemented two general functions based on the ElasticBloomFilter. > They are count(distinct x) and select distinct x, y, z from table. Unlike > the Runtime Filter Join the result of this two functions is approximate, > not exactly. There are used in the scenario where we don't need a 100% > accurate result, for example, to count the number of visiting users in each > online store. In general, we don't need a 100% accurate result in this > case(indeed we can't give a 100% accurate result, because there could be > error when collecting user_id from different devices), if we could get a > 98% accurate result with only 1/2 resource, that could be very nice. > > I believe there would be more user cases in stream world that could be > optimized by the Bloom Filter(as what it had done in the big data world)... > > I will appreciate it very much, if someone could have a look of the JIRA > or the google doc and give some comments! > > Thanks, Sihua > >