Thanks for the proposal Sihua!

Let me try to summarize the motivation / scope of this proposal.

You are proposing to add support for a special Bloom Filter state per
KeyGroup and reduce the number of key accesses by checking the Bloom Filter
first.

This is would be a rather generic feature that could be interesting for
various applications, including joins and deduplication as you described.

IMO, such a feature would be very interesting. However, my concerns with
Bloom Filter is that they are insert-only data structures, i.e., it is not
possible to remove keys once they were added. This might render the filter
useless over time.
In a different thread (see discussion in FLINK-8918 [1]), you mentioned
that the Bloom Filters would be growing.
If we keep them in memory, how can we prevent them from exceeding memory
boundaries over time?

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8918

2018-05-23 9:56 GMT+02:00 sihua zhou <summerle...@163.com>:

> Hi Devs!
> I proposal to introduce "Elastic Bloom Filter" for Flink, the reason I
> make up this proposal is that, it helped us a lot on production, it let's
> improve the performance with reducing consumption of resources. Here is a
> brief description fo the motivation of why it's so powful, more detail
> information can be found https://issues.apache.org/jira/browse/FLINK-8601 ,
> and the design doc can be found https://docs.google.com/
> document/d/17UY5RZ1mq--hPzFx-LfBjCAw_kkoIrI9KHovXWkxNYY/edit?usp=sharing
>
> ------------------------------------
> *Motivation*
>
> There are some scenarios drive us to introduce this ElasticBloomFilter,
> one is Stream Join, another is Data Deduplication, and some special user
> cases...This has given us a great experience, for example,  we implemented
> the Runtime Filter Join base on it, and it gives us a great performance
> improvement. With this feature, It diffs us from the "normal stream join",
> allows us to improve performance while reducing resource consumption by
> about half!!!
> I will list the two most typical user cases that optimized by the
> ElasticBloomFilter: one is "*Runtime Filter Join*" in detail, another is 
> "*Data
> Dedeplication*" in brief.
>
> *Scenario 1: Runtime Filter Join*
>
> In general, stream join is one of the most performance cost task. For
> every record from both side, we need to query the state from the other
> side, this will lead to poor performance when the state size if huge. So,
> in production, we always need to spend a lot slots to handle stream join.
> But, indeed, we can improve this in somehow, there a phenomenon of stream
> join can be found in production. That's the “joined ratio” of the stream
> join is often very low, for example.
>
>    - stream join in promotion analysis: Job need to join the promotion
>    log with the action(click, view, buy) log with the promotion_id to analysis
>    the effect of the promotion.
>    - stream join in AD(advertising) attribution: Job need to join the AD
>    click log with the item payment log on the click_id to find which click of
>    which AD that brings the payment to do attribution.
>    - stream join in click log analysis of doc: Job need to join viewed
>    log(doc viewed by users) with the click log (doc clicked by users) to
>    analysis the reason of the click and the property of the users.
>    - ….so on
>
> All these cases have one common property, that is the joined ratio is very
> low. Here is a example to describe it, we have 10000 records from the left
> stream, and 10000 records from the right stream, and we execute  *select
> * from leftStream l join rightStream r on l.id <http://l.id> = r.id
> <http://r.id>* , we only got 100 record from the result, that is the case
> for low joined ratio, this is an example for inner join, but it can also
> applied to left & right join.
>
> there are more example I can come up with low joined ratio…but the point I
> want to raise up is that the low joined ratio of stream join in production
> is a very common phenomenon(maybe even the almost common phenomenon in some
> companies, at least in our company that is the case).
>
> *How to improve this?*
>
> We can see from the above case, 10000 record join 10000 record and we only
> got 100 result, that means, we query the state 20000 times (10000 for the
> left stream and 10000 for the right stream) but only 100 of them are
> meaningful!!! If we could reduce the useless query times, then we can
> definitely improve the performance of stream join.
> the way we used to improve this is to introduce the Runtime Filter Join,
> the mainly ideal is that, we build a filter for the state on each side
> (left stream & right stream). When we need to query the state on that side
> we first check the corresponding filter whether the key is possible in the
> state, if the filter say "not, it impossible in the State", then we stop
> querying the state, if it say "hmm, it maybe in state", then we need to
> query the state. As you can see, the best choose of the filter is Bloom
> Filter, it has all the feature that we want: extremely good performance,
> non-existence of false negative.
>
> *Scenario 2:  Data Deduplication*
>
> We have implemented two general functions based on the ElasticBloomFilter.
> They are count(distinct x) and select distinct x, y, z from table. Unlike
> the Runtime Filter Join the result of this two functions is approximate,
> not exactly. There are used in the scenario where we don't need a 100%
> accurate result, for example, to count the number of visiting users in each
> online store. In general, we don't need a 100% accurate result in this
> case(indeed we can't give a 100% accurate result, because there could be
> error when collecting user_id from different devices), if we could get a
> 98% accurate result with only 1/2 resource, that could be very nice.
>
> I believe there would be more user cases in stream world that could be
> optimized by the Bloom Filter(as what it had done in the big data world)...
>
> I will appreciate it very much, if someone could have a look of the JIRA
> or the google doc and give some comments!
>
> Thanks, Sihua
>
>

Reply via email to