[ https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young closed FLINK-8918. ----------------------------- Resolution: Unresolved Fix Version/s: (was: 1.8.0) > Introduce Runtime Filter Join > ----------------------------- > > Key: FLINK-8918 > URL: https://issues.apache.org/jira/browse/FLINK-8918 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Major > > In general, stream join is one of the most performance cost task. For every > record from both side, we need to query the state from the other side, this > will lead to poor performance when the state size if huge. So, in production, > we always need to spend a lot slots to handle stream join. But, indeed, we > can improve this in somehow, there a phenomenon of stream join can be found > in production. That's the `joined ratio` of the stream join is often very > low, for example. > - stream join in promotion analysis: Job need to join the promotion log with > the action(click, view, payment, collection, retweet) log with the > `promotion_id` to analysis the effect of the promotion. > - stream join in AD(advertising) attribution: Job need to join the AD click > log with the item payment log on the `click_id` to find which click of which > AD that brings the payment to do attribution. > - stream join in click log analysis of doc: Job need to join viewed log(doc > viewed by users) with the click log (doc clicked by users) to analysis the > reason of the click and the property of the users. > - ….so on > All these cases have one common property, that is the _joined ratio_ is very > low. Here is a example to describe it, imagine that, we have 10000 records > from the left stream, and 10000 records from the right stream, and we execute > _select * from leftStream l join rightStream r on l.id = r.id_ , we only got > 100 record from the result, that is the case for low _joined ratio_, this is > an example for inner join, but it can also apply to left & right join. > there are more example I can come up with low _joined ratio_ , but the most > important point I want to expressed is that, the low _joined ratio_ of stream > join in production is a very common phenomenon(maybe the almost common > phenomenon in some companies, at least in our company that is the case). > *Then how to improve it?* > We can see from the above case, 10000 record join 10000 record we only got > 100 result, that means, we query the state 20000 times (10000 for the left > stream and 10000 for the right stream) but only 100 of them are meaningful!!! > If we could reduce the useless query times, then we can definitely improve > the performance of stream join. > the way we used to improve this is to introduce the _Runtime Filter Join_, > the mainly ideal is that, we build a _filter_ for the state on each side > (left stream & right stream). When we need to query the state on that side we > first check the corresponding _filter_ whether the _key_ is possible in the > state, if the _filter_ say "not, it impossible in the state", then we stop > querying the state, if it say "hmm, it maybe in state", then we need to query > the state. As you can see, the best choose of the _filter_ is _Bloom Filter_, > it has all the feature that we expected: _extremely good performance_, > _non-existence of false negative_. > > *the simplest pseudo code for _Runtime Filter Join_(the comments inline are > based on RocksDBBackend)* > {code:java} > void performJoinNormally(Record recordFromLeftStream) { > Iterator<Record> rightIterator = rigthStreamState.iterator(); > // perform the `seek()` on the RocksDB, and iterator one by one, > // this is an expensive operation especially when the key can't be found > in RocksDB. > for (Record recordFromRightState : rightIterator) { > ....... > } > } > void performRuntimeFilterJoin(Record recordFromLeftStream) { > Iterator<Record> rightIterator = EMPTY_ITERATOR; > if (rigthStreamfilter.containsCurrentKey()) { > rightIterator = rigthStreamState.iterator(); > } > // perform the `seek()` only when filter.containsCurrentKey() return true > for (Record recordFromRightState : rightIterator) { > ....... > } > > // add the current key into the filter of left stream. > leftStreamFilter.addCurrentKey(); > } > {code} > A description of Runtime Filter Join for batch join can be found > [here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html] > (even though it not for stream join original, but we can easily refer it to > `stream join`) -- This message was sent by Atlassian JIRA (v7.6.3#76005)