Hi all, Thanks for all the feedback about this FLIP. If there are no other questions or concerns, I will start voting tomorrow(Friday, June 23rd).
Best, Lijie Jing Ge <j...@ververica.com.invalid> 于2023年6月21日周三 17:14写道: > Hi Ron, > > Thanks for sharing your thoughts! It makes sense. It would be helpful if > these references of Hive, Polardb, etc. could be added into the FLIP. > > Best regards, > Jing > > On Tue, Jun 20, 2023 at 5:41 PM liu ron <ron9....@gmail.com> wrote: > > > Hi, Jing > > > > The default value for this ratio is a reference to other systems, such as > > Hive. As long as Runtime Filter can filter out more than half of the > data, > > we can benefit from it. Of course, normally, as long as we can get the > > statistics, ndv are present, the use of rowCount should be less, so I > think > > the formula is valid in most cases. This formula we are also borrowed > from > > some systems, such as the polardb of AliCloud. your concern is valuable > for > > this FLIP, but currently, we do not know how to adjust is reasonably, too > > complex may lead to the user also can not understand, so I think we > should > > first follow the simple way, the subsequent gradual optimization. The > first > > step may be that we can verify the reasonableness of current formula by > > TPC-DS case. > > > > Best, > > Ron > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 19:46写道: > > > > > Hi Ron, > > > > > > Thanks for the clarification. That answered my questions. > > > > > > Regarding the ratio, since my gut feeling is that any value less than > 0.8 > > > or 0.9 won't help too much(I might be wrong). I was thinking of > adapting > > > the formula to somehow map the current 0.9-1 to 0-1, i.e. if user > config > > > 0.5, it will be mapped to e.g. 0.95 (or e.g. 0.85, the real number > > > needs more calculation) for the current formula described in the FLIP. > > But > > > I am not sure it is a feasible solution. It deserves more discussion. > > Maybe > > > some real performance tests could give us some hints. > > > > > > Best regards, > > > Jing > > > > > > On Tue, Jun 20, 2023 at 5:19 AM liu ron <ron9....@gmail.com> wrote: > > > > > > > Hi, Jing > > > > > > > > Thanks for your feedback. > > > > > > > > > Afaiu, the runtime Filter will only be Injected when the gap > between > > > the > > > > build data size and prob data size is big enough. Let's make an > extreme > > > > example. If the small table(build side) has one row and the large > > > > table(probe side) contains tens of billions of rows. This will be the > > > ideal > > > > use case for the runtime filter and the improvement will be > > significant. > > > Is > > > > this correct? > > > > > > > > Yes, you are right. > > > > > > > > > Speaking of the "Conditions of injecting Runtime Filter" in the > FLIP, > > > > will > > > > the value of max-build-data-size and min-prob-data-size depend on the > > > > parallelism config? I.e. with the same data-size setting, is it > > possible > > > to > > > > inject or don't inject runtime filters by adjusting the parallelism? > > > > > > > > First, let me clarify two points. The first is that RuntimeFilter > > decides > > > > whether to inject or not in the optimization phase, but we do not > > > consider > > > > operator parallelism in the SQL optimization phase currently, which > is > > > set > > > > at the ExecNode level. The second is that in batch mode, the default > > > > AdaptiveBatchScheduler[1] is now used, which will derive the > > parallelism > > > of > > > > the downstream operator based on the amount of data produced by the > > > > upstream operator, that is, the parallelism is determined by runtime > > > > adaptation. In the above case, we cannot decide whether to inject > > > > BloomFilter in the optimization stage based on parallelism. > > > > A more important point is that the purpose of Runtime Filter is to > > reduce > > > > the amount of data for shuffle, and thus the amount of data processed > > by > > > > the downstream join operator. Therefore, I understand that regardless > > of > > > > the parallelism of the probe, the amount of data in the shuffle must > be > > > > reduced after inserting the Runtime Filter, which is beneficial to > the > > > join > > > > operator, so whether to insert the RuntimeFilter or not is not > > dependent > > > on > > > > the parallelism. > > > > > > > > > Does it make sense to reconsider the formula of ratio > > > > calculation to help users easily control the filter injection? > > > > > > > > Only when ndv does not exist will row count be considered. when size > > uses > > > > the default value and ndv cannot be taken, it is true that this > > condition > > > > may always hold, but this does not seem to affect anything, and the > > user > > > is > > > > also likely to change the value of the size. One question, how do you > > > think > > > > we should make it easier for users to control the filter injection? > > > > > > > > > > > > [1]: > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler > > > > > > > > Best, > > > > Ron > > > > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 07:11写道: > > > > > > > > > Hi Lijie, > > > > > > > > > > Thanks for your proposal. It is a really nice feature. I'd like to > > ask > > > a > > > > > few questions to understand your thoughts. > > > > > > > > > > Afaiu, the runtime Filter will only be Injected when the gap > between > > > the > > > > > build data size and prob data size is big enough. Let's make an > > extreme > > > > > example. If the small table(build side) has one row and the large > > > > > table(probe side) contains tens of billions of rows. This will be > the > > > > ideal > > > > > use case for the runtime filter and the improvement will be > > > significant. > > > > Is > > > > > this correct? > > > > > > > > > > Speaking of the "Conditions of injecting Runtime Filter" in the > FLIP, > > > > will > > > > > the value of max-build-data-size and min-prob-data-size depend on > the > > > > > parallelism config? I.e. with the same data-size setting, is it > > > possible > > > > to > > > > > inject or don't inject runtime filters by adjusting the > parallelism? > > > > > > > > > > In the FLIP, there are default values for the new configuration > > > > parameters > > > > > that will be used to check the injection condition. If ndv cannot > be > > > > > estimated, row count will be used. Given the max-build-data-size is > > > 10MB > > > > > and the min-prob-data-size is 10GB, in the worst case, the > > > > min-filter-ratio > > > > > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we > > > consider > > > > > the duplication and the fact that the large table might have more > > > columns > > > > > than the small table, the probeNdv should still be 100 or 10 times > > > > > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or > > 0.9. > > > > Both > > > > > are bigger than the default value 0.5 in the FLIP. If I am not > > > mistaken, > > > > > commonly, a min-filter-ratio less than 0.99 will always allow > > injecting > > > > the > > > > > runtime filter. Does it make sense to reconsider the formula of > ratio > > > > > calculation to help users easily control the filter injection? > > > > > > > > > > Best regards, > > > > > Jing > > > > > > > > > > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang < > wangdachui9...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Hi Stefan, > > > > > > > > > > > > >> bypassing the dataflow > > > > > > I believe it's a possible solution, but it may require more > > > > coordination > > > > > > and extra conditions (such as DFS), I do think it should be > > excluded > > > > from > > > > > > the first version. I'll put it in Future+Improvements as a > > potential > > > > > > improvement. > > > > > > > > > > > > Thanks again for your quick reply :) > > > > > > > > > > > > Best, > > > > > > Lijie > > > > > > > > > > > > Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一 > > > 20:51写道: > > > > > > > > > > > > > > > > > > > > Hi Lijie, > > > > > > > > > > > > > > I think you understood me correctly. But I would not consider > > this > > > a > > > > > true > > > > > > > cyclic dependency in the dataflow because I would not suggest > to > > > send > > > > > the > > > > > > > filter through an edge in the job graph from join to scan. I’d > > > rather > > > > > > > bypass the stream graph for exchanging bringing the filter to > the > > > > scan. > > > > > > For > > > > > > > example, the join could report the filter after the build > phase, > > > e.g. > > > > > to > > > > > > > the JM or a predefined DFS folder. And when the probe scan is > > > > > scheduled, > > > > > > > the JM provides the filter information to the scan when it gets > > > > > scheduled > > > > > > > for execution or the scan looks in DFS if it can find any > filter > > > that > > > > > it > > > > > > > can use as part of initialization. I’m not suggesting to do it > > > > exactly > > > > > in > > > > > > > those ways, but just to show what I mean by "bypassing the > > > dataflow". > > > > > > > > > > > > > > Anyways, I’m fine with excluding this optimization from the > > current > > > > > FLIP > > > > > > > if you believe it would be hard to implement in Flink. > > > > > > > > > > > > > > Best, > > > > > > > Stefan > > > > > > > > > > > > > > > > > > > > > > On 19. Jun 2023, at 14:07, Lijie Wang < > > wangdachui9...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > Hi Stefan, > > > > > > > > > > > > > > > > If I understand correctly(I hope so), the hash join operator > > > needs > > > > to > > > > > > > send > > > > > > > > the bloom filter to probe scan, and probe scan also needs to > > send > > > > the > > > > > > > > filtered data to the hash join operator. This means there > will > > > be a > > > > > > cycle > > > > > > > > in the data flow, it will be hard for current Flink to > schedule > > > > this > > > > > > kind > > > > > > > > of graph. I admit we can find a way to do this, but that's > > > > probably a > > > > > > > > bit outside the scope of this FLIP. So let's do these > complex > > > > > > > > optimizations later, WDYT? > > > > > > > > > > > > > > > > Best, > > > > > > > > Lijie > > > > > > > > > > > > > > > > Stefan Richter <srich...@confluent.io.invalid <mailto: > > > > > > > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道: > > > > > > > > > > > > > > > >> Hi Lijie, > > > > > > > >> > > > > > > > >> Exactly, my proposal was to build the bloom filter in the > hash > > > > > > > operator. I > > > > > > > >> don’t know about all the details about the implementation of > > > > Flink’s > > > > > > > join > > > > > > > >> operator, but I’d assume that even if the join is a two > input > > > > > operator > > > > > > > it > > > > > > > >> gets scheduled for 2 different pipelines. First the build > > phase > > > > with > > > > > > the > > > > > > > >> scan from the dimension table and after that’s completed the > > > probe > > > > > > phase > > > > > > > >> with the scan of the fact table. I’m not proposing the use > the > > > > bloom > > > > > > > filter > > > > > > > >> only in the join operator, but rather send the bloom filter > to > > > the > > > > > > probe > > > > > > > >> scan before starting the probe. I assume this would require > > some > > > > > form > > > > > > of > > > > > > > >> side channel to transport the filter and coordination to > tell > > > the > > > > > > > sources > > > > > > > >> that such a filter is available. I cannot answer how hard > > those > > > > > would > > > > > > > be to > > > > > > > >> implement, but the idea doesn’t seem impossible to me. > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> Stefan > > > > > > > >> > > > > > > > >> > > > > > > > >>> On 19. Jun 2023, at 11:56, Lijie Wang < > > > wangdachui9...@gmail.com> > > > > > > > wrote: > > > > > > > >>> > > > > > > > >>> Hi Stefan, > > > > > > > >>> > > > > > > > >>> Now I know what you mean about point 1. But currently it is > > > > > > unfeasible > > > > > > > >> for > > > > > > > >>> Flink, because the building of the hash table is inside the > > > hash > > > > > join > > > > > > > >>> operator. The hash join operator has two inputs, it will > > first > > > > > > process > > > > > > > >> the > > > > > > > >>> data of the build-input to build a hash table, and then use > > the > > > > > hash > > > > > > > >> table > > > > > > > >>> to process the data of the probe-input. If we want to use > the > > > > built > > > > > > > hash > > > > > > > >>> table to deduplicate data for bloom filter, we must put the > > > bloom > > > > > > > filter > > > > > > > >>> inside the hash join operator. However, in this way, the > > data > > > > > > reaching > > > > > > > >> the > > > > > > > >>> join operator cannot be reduced (the shuffle/network > overhead > > > > > cannot > > > > > > be > > > > > > > >>> reduced), which is not what we expected. > > > > > > > >>> > > > > > > > >>> Regarding the filter type, I agree with you, more types of > > > > filters > > > > > > can > > > > > > > >>> get further > > > > > > > >>> optimization, and it is in our future plan (We described > it > > in > > > > the > > > > > > > >> section > > > > > > > >>> Future+Improvements#More+underlying+implementations). > > > > > > > >>> > > > > > > > >>> Best, > > > > > > > >>> Lijie > > > > > > > >>> > > > > > > > >>> Stefan Richter <srich...@confluent.io.invalid <mailto: > > > > > > > srich...@confluent.io.invalid> <mailto: > > > > > > > >> srich...@confluent.io.invalid <mailto: > > > > srich...@confluent.io.invalid > > > > > > >>> > > > > > > > 于2023年6月19日周一 15:58写道: > > > > > > > >>> > > > > > > > >>>> > > > > > > > >>>> Hi Lijie, > > > > > > > >>>> > > > > > > > >>>> thanks for your response, I agree with what you said about > > > > points > > > > > 2 > > > > > > > and > > > > > > > >> 3. > > > > > > > >>>> Let me explain a bit more about point 1. This would not > > apply > > > to > > > > > all > > > > > > > >> types > > > > > > > >>>> of joins and my suggestion is also *not* to build a hash > > table > > > > > only > > > > > > > for > > > > > > > >> the > > > > > > > >>>> purpose to build the bloom filter. > > > > > > > >>>> I was thinking about the scenario of a hash join, where > you > > > > would > > > > > > > build > > > > > > > >>>> the hash table as part of the join algorithm anyways and > > then > > > > use > > > > > > the > > > > > > > >>>> keyset of that hash table to 1) have better insights on > > about > > > > NDV > > > > > > and > > > > > > > >> 2) be > > > > > > > >>>> able to construct the bloom filter without duplicates and > > > > > therefore > > > > > > > >> faster. > > > > > > > >>>> So the preconditions where I would use this is if you are > > > > > building a > > > > > > > >> hash > > > > > > > >>>> table as part of the join and you know you are not > building > > > for > > > > a > > > > > > key > > > > > > > >>>> column (because there would be no duplicates to > eliminate). > > > Then > > > > > > your > > > > > > > >> bloom > > > > > > > >>>> filter construction could benefit already from the > > > deduplication > > > > > > work > > > > > > > >> that > > > > > > > >>>> was done for building the hash table. > > > > > > > >>>> > > > > > > > >>>> I also wanted to point out that besides bloom filter and > IN > > > > filter > > > > > > you > > > > > > > >>>> could also think of other types of filter that can become > > > > > > interesting > > > > > > > >> for > > > > > > > >>>> certain distributions and meta data. For example, if you > > have > > > > > > min/max > > > > > > > >>>> information about columns and partitions you could have a > > bit > > > > > vector > > > > > > > >>>> represent equilibrium-sized ranges of the key space > between > > > min > > > > > and > > > > > > > max > > > > > > > >> and > > > > > > > >>>> have the bits represent what part of the range is present > > and > > > > push > > > > > > > that > > > > > > > >>>> information down to the scan. > > > > > > > >>>> > > > > > > > >>>> Best, > > > > > > > >>>> Stefan > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang < > > > > wangdachui9...@gmail.com > > > > > > > <mailto:wangdachui9...@gmail.com>> > > > > > > > >> wrote: > > > > > > > >>>>> > > > > > > > >>>>> Hi Stefan, > > > > > > > >>>>> > > > > > > > >>>>> Thanks for your feedback. Let me briefly summarize the > > > > > optimization > > > > > > > >>>> points > > > > > > > >>>>> you mentioned above (Please correct me if I'm wrong): > > > > > > > >>>>> > > > > > > > >>>>> 1. Build an extra hash table for deduplication before > > > building > > > > > the > > > > > > > >> bloom > > > > > > > >>>>> filter. > > > > > > > >>>>> 2. Use the two-phase approach to build the bloom > > filter(first > > > > > > local, > > > > > > > >> then > > > > > > > >>>>> OR-combine). > > > > > > > >>>>> 3. Use blocked bloom filters to improve the cache > > efficiency. > > > > > > > >>>>> > > > > > > > >>>>> For the above 3 points, I have the following questions or > > > > > opinions: > > > > > > > >>>>> > > > > > > > >>>>> For point 1, it seems that building a hash table also > > > requires > > > > > > > >> traversing > > > > > > > >>>>> all build side data, and the overhead seems to be the > same > > as > > > > > > > building > > > > > > > >> a > > > > > > > >>>>> bloom filter directly? In addition, the hash table will > > take > > > up > > > > > > more > > > > > > > >>>> space > > > > > > > >>>>> when the amount of data is large, which is why we choose > to > > > use > > > > > > bloom > > > > > > > >>>>> filter instead of hash table. > > > > > > > >>>>> > > > > > > > >>>>> For point 2, I think it's a good idea to use the > two-phase > > > > > approach > > > > > > > to > > > > > > > >>>>> build the bloom filter. But rather than directly > > broadcasting > > > > the > > > > > > > local > > > > > > > >>>>> bloom filter to the probe side, I prefer to introduce a > > > global > > > > > node > > > > > > > for > > > > > > > >>>> the > > > > > > > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the > > > combined > > > > > > bloom > > > > > > > >>>> filter > > > > > > > >>>>> to the probe side. The latter can reduce the amount of > data > > > > > > > transferred > > > > > > > >>>> by > > > > > > > >>>>> the network. I will change the FLIP like this. > > > > > > > >>>>> > > > > > > > >>>>> For point 3, I think it's a nice optimization, but I > prefer > > > to > > > > > put > > > > > > it > > > > > > > >> to > > > > > > > >>>>> the future improvements. There is already an > implementation > > > of > > > > > > bloom > > > > > > > >>>> filter > > > > > > > >>>>> in flink, we can simply reuse it. Introducing a new bloom > > > > filter > > > > > > > >>>>> implementation introduces some complexity (we need to > > > > implement > > > > > > it, > > > > > > > >> test > > > > > > > >>>>> it, etc), and is not the focus of this FLIP. > > > > > > > >>>>> > > > > > > > >>>>> [1] > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda > > > > > > > >>>>> > > > > > > > >>>>> Best, > > > > > > > >>>>> Lijie > > > > > > > >>>>> > > > > > > > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto: > > > > > > > srich...@confluent.io.invalid> <mailto: > > > > > > > >> srich...@confluent.io.invalid <mailto: > > > > srich...@confluent.io.invalid > > > > > >> > > > > > > > <mailto: > > > > > > > >>>> srich...@confluent.io.invalid <mailto: > > > > > srich...@confluent.io.invalid > > > > > > > > > > > > > > <mailto:srich...@confluent.io.invalid>>> > > > > > > > >> 于2023年6月16日周五 16:45写道: > > > > > > > >>>>> > > > > > > > >>>>>> Hi, > > > > > > > >>>>>> > > > > > > > >>>>>> Thanks for the proposal of this feature! I have a > question > > > > about > > > > > > the > > > > > > > >>>>>> filter build and a some suggestions for potential > > > > improvements. > > > > > > > >> First, I > > > > > > > >>>>>> wonder why you suggest to run the filter builder as > > separate > > > > > > > operator > > > > > > > >>>> with > > > > > > > >>>>>> parallelism 1. I’d suggest to integrate the filter > > > distributed > > > > > > build > > > > > > > >>>> with > > > > > > > >>>>>> the hash table build phase as follows: > > > > > > > >>>>>> > > > > > > > >>>>>> 1. Build the hash table completely in each subtask. > > > > > > > >>>>>> 2. The keyset of the hash table is giving us a precise > NDV > > > > count > > > > > > for > > > > > > > >>>> every > > > > > > > >>>>>> subtask. > > > > > > > >>>>>> 3. Build a filter from the subtask hash table. For low > > > > > cardinality > > > > > > > >>>> tables, > > > > > > > >>>>>> I’d go with the suggested optimization of IN-filter. > > > > > > > >>>>>> 4. Each build subtask transfers the local bloom filter > to > > > all > > > > > > probe > > > > > > > >>>>>> operators. > > > > > > > >>>>>> 5. On the probe operator we can either probe against the > > > > > > individual > > > > > > > >>>>>> filters, or we OR-combine all subtask filters into > > > aggregated > > > > > > bloom > > > > > > > >>>> filter. > > > > > > > >>>>>> > > > > > > > >>>>>> I’m suggesting this because building inserting into a > > > (larger) > > > > > > bloom > > > > > > > >>>>>> filter can be costly, especially once the filter exceeds > > > cache > > > > > > sizes > > > > > > > >>>> and is > > > > > > > >>>>>> therefor better parallelized. First inserting into the > > hash > > > > > table > > > > > > > also > > > > > > > >>>>>> deduplicates the keys and we avoid inserting records > twice > > > > into > > > > > > the > > > > > > > >>>> bloom > > > > > > > >>>>>> filter. If we want to improve cache efficiency for the > > build > > > > of > > > > > > > larger > > > > > > > >>>>>> filters, we could structure them as blocked bloom > filters, > > > > where > > > > > > the > > > > > > > >>>> filter > > > > > > > >>>>>> is separated into blocks and all bits of one key go only > > > into > > > > > one > > > > > > > >> block. > > > > > > > >>>>>> That allows us to apply software managed buffering to > > first > > > > > group > > > > > > > keys > > > > > > > >>>> that > > > > > > > >>>>>> go into the same partition (ideally fitting into cache) > > and > > > > then > > > > > > > bulk > > > > > > > >>>> load > > > > > > > >>>>>> partitions once we collected enough keys for one round > of > > > > > loading. > > > > > > > >>>>>> > > > > > > > >>>>>> Best, > > > > > > > >>>>>> Stefan > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> < > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn > > > > > > > >>>>> > > > > > > > >>>>>> Stefan Richter > > > > > > > >>>>>> Principal Engineer II > > > > > > > >>>>>> > > > > > > > >>>>>> Follow us: < > > > > > > > >>>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J > > > > > > > >>>>> > > > > > > > >>>>>> < > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK > > > > > > > >>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>> > > > > > > > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang < > > > > > wangdachui9...@gmail.com > > > > > > > <mailto:wangdachui9...@gmail.com> > > > > > > > >> <mailto:wangdachui9...@gmail.com> > > > > > > > >>>> <mailto:wangdachui9...@gmail.com>> wrote: > > > > > > > >>>>>>> > > > > > > > >>>>>>> Hi, Benchao and Aitozi, > > > > > > > >>>>>>> > > > > > > > >>>>>>> Thanks for your feedback about this FLIP. > > > > > > > >>>>>>> > > > > > > > >>>>>>> @Benchao > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> I think it would be reasonable to also support > > "pipeline > > > > > > shuffle" > > > > > > > >> if > > > > > > > >>>>>>> possible. > > > > > > > >>>>>>> As I said above, runtime filter can work well with all > > > > shuffle > > > > > > > mode, > > > > > > > >>>>>>> including pipeline shuffle. > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly > than > > > > > > > >> RuntimeFilter > > > > > > > >>>>>>> operator, it can still filter out additional data > > > afterwards. > > > > > > > >>>>>>> I think the main purpose of runtime filter is to reduce > > the > > > > > > shuffle > > > > > > > >>>> data > > > > > > > >>>>>>> and the data arriving at join. Although eagerly running > > the > > > > > large > > > > > > > >>>>>>> table side can process datas in advance, most of the > data > > > may > > > > > be > > > > > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing > the > > > > join. > > > > > > In > > > > > > > >>>>>>> addition, if the join is a hash-join, the probe side of > > the > > > > > > > hash-join > > > > > > > >>>>>> also > > > > > > > >>>>>>> needs to wait for its build side to complete, so the > > large > > > > > table > > > > > > > side > > > > > > > >>>> is > > > > > > > >>>>>>> likely to be back-pressed. > > > > > > > >>>>>>> In addition, I don't tend to add too many configuration > > > > options > > > > > > in > > > > > > > >> the > > > > > > > >>>>>>> first version, which may make it more difficult to use > > > (users > > > > > > need > > > > > > > to > > > > > > > >>>>>>> understand a lot of internal implementation details). > > Maybe > > > > it > > > > > > > could > > > > > > > >>>> be a > > > > > > > >>>>>>> future improvement (if it's worthwhile)? > > > > > > > >>>>>>> > > > > > > > >>>>>>> > > > > > > > >>>>>>> @Aitozi > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> IMO, In the current implementation two source table > > > > operators > > > > > > > will > > > > > > > >> be > > > > > > > >>>>>>> executed simultaneously. > > > > > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add > this > > > > point > > > > > > to > > > > > > > >>>> FLIP). > > > > > > > >>>>>>> The runtime filter is generally chained with the large > > > table > > > > > side > > > > > > > to > > > > > > > >>>>>> reduce > > > > > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The > job > > > > > vertices > > > > > > > >>>> should > > > > > > > >>>>>> be > > > > > > > >>>>>>> scheduled in topological order, so the large table side > > can > > > > > only > > > > > > be > > > > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes. > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> Are there some tests to show the default value of > > > > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G > > is a > > > > > good > > > > > > > >>>> default > > > > > > > >>>>>>> value. > > > > > > > >>>>>>> It's not tested yet, but it will be done before merge > the > > > > code. > > > > > > The > > > > > > > >>>>>> current > > > > > > > >>>>>>> value refers to systems such as spark and hive. Before > > code > > > > > > > merging, > > > > > > > >> we > > > > > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of > > values. > > > If > > > > > you > > > > > > > >> have > > > > > > > >>>>>>> relevant experience on it, welcome to give some > > > suggestions. > > > > > > > >>>>>>> > > > > > > > >>>>>>>>> What's the representation of the runtime filter node > in > > > > > > planner ? > > > > > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new > > > > physical > > > > > > > >> nodes, > > > > > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter. > > > > > > > >>>>>>> > > > > > > > >>>>>>> Best, > > > > > > > >>>>>>> Lijie > > > > > > > >>>>>>> > > > > > > > >>>>>>> Aitozi <gjying1...@gmail.com <mailto: > > gjying1...@gmail.com> > > > > > > > <mailto:gjying1...@gmail.com> <mailto: > > > > > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> > <mailto: > > > > > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> > <mailto: > > > > > > > gjying1...@gmail.com>>> > > > > > > > >>>>>> 于2023年6月15日周四 15:52写道: > > > > > > > >>>>>>> > > > > > > > >>>>>>>> Hi Lijie, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Nice to see this valuable feature. After reading the > > FLIP > > > I > > > > > have > > > > > > > >>>> some > > > > > > > >>>>>>>> questions below: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> Schedule the TableSource(dim) first. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> How does it know to schedule the TableSource(dim) > first > > ? > > > > IMO, > > > > > > In > > > > > > > >> the > > > > > > > >>>>>>>> current implementation two source table operators will > > be > > > > > > executed > > > > > > > >>>>>>>> simultaneously. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> If the data volume on the probe side is too small, > the > > > > > overhead > > > > > > > of > > > > > > > >>>>>>>> building runtime filter is not worth it. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Are there some tests to show the default value of > > > > > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G > > is > > > a > > > > > good > > > > > > > >>>> default > > > > > > > >>>>>>>> value. The same to > > > > > > > >> table.optimizer.runtime-filter.max-build-data-size > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> the runtime filter can be pushed down along the probe > > > side, > > > > > as > > > > > > > >> close > > > > > > > >>>> to > > > > > > > >>>>>>>> data sources as possible > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> What's the representation of the runtime filter node > in > > > > > planner > > > > > > ? > > > > > > > Is > > > > > > > >>>> it > > > > > > > >>>>>> a > > > > > > > >>>>>>>> Filternode > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Best, > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Aitozi. > > > > > > > >>>>>>>> > > > > > > > >>>>>>>> Benchao Li <libenc...@apache.org <mailto: > > > > libenc...@apache.org > > > > > > > > > > > > > <mailto:libenc...@apache.org> > > > > > > > >> <mailto:libenc...@apache.org>> > > > > > > > >>>> 于2023年6月15日周四 14:30写道: > > > > > > > >>>>>>>> > > > > > > > >>>>>>>>> Hi Lijie, > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Regarding the shuffle mode, I think it would be > > > reasonable > > > > to > > > > > > > also > > > > > > > >>>>>>>> support > > > > > > > >>>>>>>>> "pipeline shuffle" if possible. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP > > computing, > > > > > > > although > > > > > > > >>>> this > > > > > > > >>>>>>>> has > > > > > > > >>>>>>>>> not been much exposed to users for now, I know a few > > > > > companies > > > > > > > that > > > > > > > >>>>>> uses > > > > > > > >>>>>>>>> Flink as a MPP computing engine, and there is an > > ongoing > > > > > > > effort[1] > > > > > > > >> to > > > > > > > >>>>>>>> make > > > > > > > >>>>>>>>> this usage more powerful. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter > > > > becomes > > > > > > > >> running > > > > > > > >>>>>>>> before > > > > > > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not > process > > > any > > > > > data > > > > > > > and > > > > > > > >>>>>> will > > > > > > > >>>>>>>>> occupy resources", whether it benefits us depends on > > the > > > > > scale > > > > > > of > > > > > > > >>>> data, > > > > > > > >>>>>>>> if > > > > > > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than > > > > > > RuntimeFilter > > > > > > > >>>>>>>> operator, > > > > > > > >>>>>>>>> it can still filter out additional data afterwards. > > Hence > > > > in > > > > > my > > > > > > > >>>>>> opinion, > > > > > > > >>>>>>>> we > > > > > > > >>>>>>>>> do not need to make the edge between > > RuntimeFilterBuilder > > > > and > > > > > > > >>>>>>>> RuntimeFilter > > > > > > > >>>>>>>>> BLOCKING only, at least it can be configured. > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> [1] > > > > > > > >>>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > > > > > wangdachui9...@gmail.com> <mailto: > > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > > > > > <mailto: > > > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com > > > > > > > <mailto: > > > > > > > wangdachui9...@gmail.com>> <mailto: > > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com > >>> > > > > > > > >>>>>> 于2023年6月15日周四 14:18写道: > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>>> Hi Yuxia, > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> I made a mistake in the above response. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> The runtime filter can work well with all shuffle > > mode. > > > > > > However, > > > > > > > >>>>>> hybrid > > > > > > > >>>>>>>>>> shuffle and blocking shuffle are currently > recommended > > > for > > > > > > batch > > > > > > > >>>> jobs > > > > > > > >>>>>>>>>> (piepline shuffle is not recommended). > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> One more thing to mention here is that we will force > > the > > > > > edge > > > > > > > >>>> between > > > > > > > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be > > > > > > BLOCKING(regardless > > > > > > > >> of > > > > > > > >>>>>>>> which > > > > > > > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter > > > really > > > > > > > doesn’t > > > > > > > >>>>>> need > > > > > > > >>>>>>>>> to > > > > > > > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even > if > > > the > > > > > > > >>>>>> RuntimeFilter > > > > > > > >>>>>>>>>> becomes running before the RuntimeFilterBuilder > > > finished, > > > > it > > > > > > > will > > > > > > > >>>> not > > > > > > > >>>>>>>>>> process any data and will occupy resources. > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Best, > > > > > > > >>>>>>>>>> Lijie > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > > > > > wangdachui9...@gmail.com> <mailto: > > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > > > > > <mailto: > > > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com > > > > > > > <mailto: > > > > > > > wangdachui9...@gmail.com>> <mailto: > > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com > >>> > > > > > > > >>>>>> 于2023年6月15日周四 09:48写道: > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>>>> Hi Yuxia, > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Thanks for your feedback. The answers of your > > questions > > > > are > > > > > > as > > > > > > > >>>>>>>> follows: > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> 1. Yes, the row count comes from statistic of > > > underlying > > > > > > > table(Or > > > > > > > >>>>>>>>>>> estimated based on the statistic of underlying > table, > > > if > > > > > the > > > > > > > >> build > > > > > > > >>>>>>>> side > > > > > > > >>>>>>>>>> or > > > > > > > >>>>>>>>>>> probe side is not TableScan). If the statistic > > > > > unavailable, > > > > > > we > > > > > > > >>>> will > > > > > > > >>>>>>>>> not > > > > > > > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly > > > > evaluate > > > > > > the > > > > > > > >>>>>>>>>> benefits). > > > > > > > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build > side > > > is > > > > > also > > > > > > > >> based > > > > > > > >>>>>>>> on > > > > > > > >>>>>>>>>> the > > > > > > > >>>>>>>>>>> row count statistics, that is, if the statistics is > > > > > > > unavailable, > > > > > > > >>>> the > > > > > > > >>>>>>>>>>> requirement > > > > > > > "table.optimizer.runtime-filter.max-build-data-size" > > > > > > > >>>>>>>> cannot > > > > > > > >>>>>>>>>> be > > > > > > > >>>>>>>>>>> evaluated either. I'll add this point into FLIP. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> 2. > > > > > > > >>>>>>>>>>> Estimated data size does not meet requirement (in > > > planner > > > > > > > >>>>>>>> optimization > > > > > > > >>>>>>>>>>> phase) -> No filter > > > > > > > >>>>>>>>>>> Estimated data size meets the requirement (in > planner > > > > > > > >> optimization > > > > > > > >>>>>>>>>> phase), > > > > > > > >>>>>>>>>>> but the real data size does not meet the > > requirement(in > > > > > > > execution > > > > > > > >>>>>>>>> phase) > > > > > > > >>>>>>>>>> -> > > > > > > > >>>>>>>>>>> Fake filter > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch > > > > jobs/blocking > > > > > > > >> shuffle. > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>> Lijie > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto: > > > > > > > luoyu...@alumni.sjtu.edu.cn> <mailto: > > > > > > > >> luoyu...@alumni.sjtu.edu.cn <mailto: > > luoyu...@alumni.sjtu.edu.cn > > > >> > > > > > > > <mailto: > > > > > > > >>>> luoyu...@alumni.sjtu.edu.cn <mailto: > > > luoyu...@alumni.sjtu.edu.cn > > > > > > > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>> > > > > > > > >> <mailto: > > > > > > > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto: > > > > luoyu...@alumni.sjtu.edu.cn > > > > > > > > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn> > > > > > > > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>> > > > > > > > >>>> 于2023年6月14日周三 20:37写道: > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited > > to > > > > see > > > > > > > >> runtime > > > > > > > >>>>>>>>> filter > > > > > > > >>>>>>>>>>>> is to be implemented in Flink. > > > > > > > >>>>>>>>>>>> I have few questions about it: > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be > > estimated, > > > > use > > > > > > row > > > > > > > >>>> count > > > > > > > >>>>>>>>>>>> instead`. So, does row count comes from the > > statistic > > > > from > > > > > > > >>>>>>>> underlying > > > > > > > >>>>>>>>>>>> table? What if the the statistic is also > unavailable > > > > > > > considering > > > > > > > >>>>>>>> users > > > > > > > >>>>>>>>>>>> maynot always remember to generate statistic in > > > > > production. > > > > > > > >>>>>>>>>>>> I'm wondering whether it make senese that just > > disable > > > > > > runtime > > > > > > > >>>>>>>> filter > > > > > > > >>>>>>>>> if > > > > > > > >>>>>>>>>>>> statistic is unavailable since in that case, we > can > > > > hardly > > > > > > > >>>> evaluate > > > > > > > >>>>>>>>> the > > > > > > > >>>>>>>>>>>> benefits of runtime-filter. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime > > filters > > > > only > > > > > > if > > > > > > > >> the > > > > > > > >>>>>>>>>>>> following requirements are met:xxx", but it also > > said, > > > > > "Once > > > > > > > >> this > > > > > > > >>>>>>>>> limit > > > > > > > >>>>>>>>>> is > > > > > > > >>>>>>>>>>>> exceeded, it will output a fake filter(which > always > > > > > returns > > > > > > > >> true)" > > > > > > > >>>>>>>> in > > > > > > > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they > are > > > > > > > >> contradictory, > > > > > > > >>>>>>>> so > > > > > > > >>>>>>>>>> i'm > > > > > > > >>>>>>>>>>>> wondering what's the real behavior, no filter will > > be > > > > > > injected > > > > > > > >> or > > > > > > > >>>>>>>> fake > > > > > > > >>>>>>>>>>>> filter? > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take > > > effect > > > > > in > > > > > > > >>>> blocking > > > > > > > >>>>>>>>>>>> shuffle? > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Best regards, > > > > > > > >>>>>>>>>>>> Yuxia > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> ----- 原始邮件 ----- > > > > > > > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto: > > > > > > > ron9....@gmail.com> <mailto:ron9....@gmail.com> > > > > > > > >> <mailto:ron9....@gmail.com> > > > > > > > >>>> <mailto:ron9....@gmail.com>> > > > > > > > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto: > > > > > > dev@flink.apache.org> > > > > > > > <mailto:dev@flink.apache.org> > > > > > > > >> <mailto:dev@flink.apache.org> > > > > > > > >>>> <mailto:dev@flink.apache.org>> > > > > > > > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28 > > > > > > > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime > Filter > > > for > > > > > > Flink > > > > > > > >>>> Batch > > > > > > > >>>>>>>>>> Jobs > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter > > is > > > a > > > > > > common > > > > > > > >>>>>>>>>>>> optimization > > > > > > > >>>>>>>>>>>> to improve the join performance that has been > > adopted > > > by > > > > > > many > > > > > > > >>>>>>>>> computing > > > > > > > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a > > > > streaming > > > > > > > batch > > > > > > > >>>>>>>>>> computing > > > > > > > >>>>>>>>>>>> engine, and we are continuously optimizing the > > > > performance > > > > > > of > > > > > > > >>>>>>>> batches. > > > > > > > >>>>>>>>>>>> Runtime filter is a general performance > optimization > > > > > > technique > > > > > > > >>>> that > > > > > > > >>>>>>>>> can > > > > > > > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we > > are > > > > > > > >> introducing > > > > > > > >>>>>>>> it > > > > > > > >>>>>>>>> on > > > > > > > >>>>>>>>>>>> batch as well. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Looking forward to all feedback. > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>>> Ron > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > > > > > > > wangdachui9...@gmail.com> <mailto: > > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> > > > > > <mailto: > > > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com > > > > > > > <mailto: > > > > > > > wangdachui9...@gmail.com>> <mailto: > > > > > > > >>>>>> wangdachui9...@gmail.com <mailto: > wangdachui9...@gmail.com > > > > > > > > > <mailto: > > > > > > > wangdachui9...@gmail.com> <mailto: > > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com > >>>> > > > > > > > >>>> 于2023年6月14日周三 17:17写道: > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Hi devs > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a > > > discussion > > > > > > about > > > > > > > >>>>>>>>>> FLIP-324: > > > > > > > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1] > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Runtime Filter is a common optimization to > improve > > > join > > > > > > > >>>>>>>> performance. > > > > > > > >>>>>>>>>> It > > > > > > > >>>>>>>>>>>> is > > > > > > > >>>>>>>>>>>>> designed to dynamically generate filter > conditions > > > for > > > > > > > certain > > > > > > > >>>>>>>> Join > > > > > > > >>>>>>>>>>>> queries > > > > > > > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or > > > shuffled > > > > > > data, > > > > > > > >>>> avoid > > > > > > > >>>>>>>>>>>>> unnecessary I/O and network transmission, and > speed > > > up > > > > > the > > > > > > > >> query. > > > > > > > >>>>>>>>> Its > > > > > > > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom > > > > filter) > > > > > > > based > > > > > > > >>>> on > > > > > > > >>>>>>>>> the > > > > > > > >>>>>>>>>>>> data > > > > > > > >>>>>>>>>>>>> on the small table side(build side) first, then > > pass > > > > this > > > > > > > >> filter > > > > > > > >>>>>>>> to > > > > > > > >>>>>>>>>> the > > > > > > > >>>>>>>>>>>>> large table side(probe side) to filter the > > irrelevant > > > > > data > > > > > > on > > > > > > > >> it, > > > > > > > >>>>>>>>> this > > > > > > > >>>>>>>>>>>> can > > > > > > > >>>>>>>>>>>>> reduce the data reaching the join and improve > > > > > performance. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> You can find more details in the FLIP-324[1]. > > Looking > > > > > > forward > > > > > > > >> to > > > > > > > >>>>>>>>> your > > > > > > > >>>>>>>>>>>>> feedback. > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> [1] > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>> > > > > > > > >>>>>> > > > > > > > >>>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>>> Best, > > > > > > > >>>>>>>>>>>>> Ron & Gen & Lijie > > > > > > > >>>>>>>>>>>>> > > > > > > > >>>>>>>>>>>> > > > > > > > >>>>>>>>>>> > > > > > > > >>>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> -- > > > > > > > >>>>>>>>> > > > > > > > >>>>>>>>> Best, > > > > > > > >>>>>>>>> Benchao Li > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >