Hi Stefan, >> bypassing the dataflow I believe it's a possible solution, but it may require more coordination and extra conditions (such as DFS), I do think it should be excluded from the first version. I'll put it in Future+Improvements as a potential improvement.
Thanks again for your quick reply :) Best, Lijie Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一 20:51写道: > > Hi Lijie, > > I think you understood me correctly. But I would not consider this a true > cyclic dependency in the dataflow because I would not suggest to send the > filter through an edge in the job graph from join to scan. I’d rather > bypass the stream graph for exchanging bringing the filter to the scan. For > example, the join could report the filter after the build phase, e.g. to > the JM or a predefined DFS folder. And when the probe scan is scheduled, > the JM provides the filter information to the scan when it gets scheduled > for execution or the scan looks in DFS if it can find any filter that it > can use as part of initialization. I’m not suggesting to do it exactly in > those ways, but just to show what I mean by "bypassing the dataflow". > > Anyways, I’m fine with excluding this optimization from the current FLIP > if you believe it would be hard to implement in Flink. > > Best, > Stefan > > > > On 19. Jun 2023, at 14:07, Lijie Wang <wangdachui9...@gmail.com> wrote: > > > > Hi Stefan, > > > > If I understand correctly(I hope so), the hash join operator needs to > send > > the bloom filter to probe scan, and probe scan also needs to send the > > filtered data to the hash join operator. This means there will be a cycle > > in the data flow, it will be hard for current Flink to schedule this kind > > of graph. I admit we can find a way to do this, but that's probably a > > bit outside the scope of this FLIP. So let's do these complex > > optimizations later, WDYT? > > > > Best, > > Lijie > > > > Stefan Richter <srich...@confluent.io.invalid <mailto: > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道: > > > >> Hi Lijie, > >> > >> Exactly, my proposal was to build the bloom filter in the hash > operator. I > >> don’t know about all the details about the implementation of Flink’s > join > >> operator, but I’d assume that even if the join is a two input operator > it > >> gets scheduled for 2 different pipelines. First the build phase with the > >> scan from the dimension table and after that’s completed the probe phase > >> with the scan of the fact table. I’m not proposing the use the bloom > filter > >> only in the join operator, but rather send the bloom filter to the probe > >> scan before starting the probe. I assume this would require some form of > >> side channel to transport the filter and coordination to tell the > sources > >> that such a filter is available. I cannot answer how hard those would > be to > >> implement, but the idea doesn’t seem impossible to me. > >> > >> Best, > >> Stefan > >> > >> > >>> On 19. Jun 2023, at 11:56, Lijie Wang <wangdachui9...@gmail.com> > wrote: > >>> > >>> Hi Stefan, > >>> > >>> Now I know what you mean about point 1. But currently it is unfeasible > >> for > >>> Flink, because the building of the hash table is inside the hash join > >>> operator. The hash join operator has two inputs, it will first process > >> the > >>> data of the build-input to build a hash table, and then use the hash > >> table > >>> to process the data of the probe-input. If we want to use the built > hash > >>> table to deduplicate data for bloom filter, we must put the bloom > filter > >>> inside the hash join operator. However, in this way, the data reaching > >> the > >>> join operator cannot be reduced (the shuffle/network overhead cannot be > >>> reduced), which is not what we expected. > >>> > >>> Regarding the filter type, I agree with you, more types of filters can > >>> get further > >>> optimization, and it is in our future plan (We described it in the > >> section > >>> Future+Improvements#More+underlying+implementations). > >>> > >>> Best, > >>> Lijie > >>> > >>> Stefan Richter <srich...@confluent.io.invalid <mailto: > srich...@confluent.io.invalid> <mailto: > >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>>> > 于2023年6月19日周一 15:58写道: > >>> > >>>> > >>>> Hi Lijie, > >>>> > >>>> thanks for your response, I agree with what you said about points 2 > and > >> 3. > >>>> Let me explain a bit more about point 1. This would not apply to all > >> types > >>>> of joins and my suggestion is also *not* to build a hash table only > for > >> the > >>>> purpose to build the bloom filter. > >>>> I was thinking about the scenario of a hash join, where you would > build > >>>> the hash table as part of the join algorithm anyways and then use the > >>>> keyset of that hash table to 1) have better insights on about NDV and > >> 2) be > >>>> able to construct the bloom filter without duplicates and therefore > >> faster. > >>>> So the preconditions where I would use this is if you are building a > >> hash > >>>> table as part of the join and you know you are not building for a key > >>>> column (because there would be no duplicates to eliminate). Then your > >> bloom > >>>> filter construction could benefit already from the deduplication work > >> that > >>>> was done for building the hash table. > >>>> > >>>> I also wanted to point out that besides bloom filter and IN filter you > >>>> could also think of other types of filter that can become interesting > >> for > >>>> certain distributions and meta data. For example, if you have min/max > >>>> information about columns and partitions you could have a bit vector > >>>> represent equilibrium-sized ranges of the key space between min and > max > >> and > >>>> have the bits represent what part of the range is present and push > that > >>>> information down to the scan. > >>>> > >>>> Best, > >>>> Stefan > >>>> > >>>> > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang <wangdachui9...@gmail.com > <mailto:wangdachui9...@gmail.com>> > >> wrote: > >>>>> > >>>>> Hi Stefan, > >>>>> > >>>>> Thanks for your feedback. Let me briefly summarize the optimization > >>>> points > >>>>> you mentioned above (Please correct me if I'm wrong): > >>>>> > >>>>> 1. Build an extra hash table for deduplication before building the > >> bloom > >>>>> filter. > >>>>> 2. Use the two-phase approach to build the bloom filter(first local, > >> then > >>>>> OR-combine). > >>>>> 3. Use blocked bloom filters to improve the cache efficiency. > >>>>> > >>>>> For the above 3 points, I have the following questions or opinions: > >>>>> > >>>>> For point 1, it seems that building a hash table also requires > >> traversing > >>>>> all build side data, and the overhead seems to be the same as > building > >> a > >>>>> bloom filter directly? In addition, the hash table will take up more > >>>> space > >>>>> when the amount of data is large, which is why we choose to use bloom > >>>>> filter instead of hash table. > >>>>> > >>>>> For point 2, I think it's a good idea to use the two-phase approach > to > >>>>> build the bloom filter. But rather than directly broadcasting the > local > >>>>> bloom filter to the probe side, I prefer to introduce a global node > for > >>>> the > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the combined bloom > >>>> filter > >>>>> to the probe side. The latter can reduce the amount of data > transferred > >>>> by > >>>>> the network. I will change the FLIP like this. > >>>>> > >>>>> For point 3, I think it's a nice optimization, but I prefer to put it > >> to > >>>>> the future improvements. There is already an implementation of bloom > >>>> filter > >>>>> in flink, we can simply reuse it. Introducing a new bloom filter > >>>>> implementation introduces some complexity (we need to implement it, > >> test > >>>>> it, etc), and is not the focus of this FLIP. > >>>>> > >>>>> [1] > >>>>> > >>>> > >> > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda > >>>>> > >>>>> Best, > >>>>> Lijie > >>>>> > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto: > srich...@confluent.io.invalid> <mailto: > >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>> > <mailto: > >>>> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid> > <mailto:srich...@confluent.io.invalid>>> > >> 于2023年6月16日周五 16:45写道: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> Thanks for the proposal of this feature! I have a question about the > >>>>>> filter build and a some suggestions for potential improvements. > >> First, I > >>>>>> wonder why you suggest to run the filter builder as separate > operator > >>>> with > >>>>>> parallelism 1. I’d suggest to integrate the filter distributed build > >>>> with > >>>>>> the hash table build phase as follows: > >>>>>> > >>>>>> 1. Build the hash table completely in each subtask. > >>>>>> 2. The keyset of the hash table is giving us a precise NDV count for > >>>> every > >>>>>> subtask. > >>>>>> 3. Build a filter from the subtask hash table. For low cardinality > >>>> tables, > >>>>>> I’d go with the suggested optimization of IN-filter. > >>>>>> 4. Each build subtask transfers the local bloom filter to all probe > >>>>>> operators. > >>>>>> 5. On the probe operator we can either probe against the individual > >>>>>> filters, or we OR-combine all subtask filters into aggregated bloom > >>>> filter. > >>>>>> > >>>>>> I’m suggesting this because building inserting into a (larger) bloom > >>>>>> filter can be costly, especially once the filter exceeds cache sizes > >>>> and is > >>>>>> therefor better parallelized. First inserting into the hash table > also > >>>>>> deduplicates the keys and we avoid inserting records twice into the > >>>> bloom > >>>>>> filter. If we want to improve cache efficiency for the build of > larger > >>>>>> filters, we could structure them as blocked bloom filters, where the > >>>> filter > >>>>>> is separated into blocks and all bits of one key go only into one > >> block. > >>>>>> That allows us to apply software managed buffering to first group > keys > >>>> that > >>>>>> go into the same partition (ideally fitting into cache) and then > bulk > >>>> load > >>>>>> partitions once we collected enough keys for one round of loading. > >>>>>> > >>>>>> Best, > >>>>>> Stefan > >>>>>> > >>>>>> > >>>>>> < > >>>> > >> > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn > >>>>> > >>>>>> Stefan Richter > >>>>>> Principal Engineer II > >>>>>> > >>>>>> Follow us: < > >>>>>> > >>>> > >> > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J > >>>>> > >>>>>> < > >>>> > >> > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK > >>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang <wangdachui9...@gmail.com > <mailto:wangdachui9...@gmail.com> > >> <mailto:wangdachui9...@gmail.com> > >>>> <mailto:wangdachui9...@gmail.com>> wrote: > >>>>>>> > >>>>>>> Hi, Benchao and Aitozi, > >>>>>>> > >>>>>>> Thanks for your feedback about this FLIP. > >>>>>>> > >>>>>>> @Benchao > >>>>>>> > >>>>>>>>> I think it would be reasonable to also support "pipeline shuffle" > >> if > >>>>>>> possible. > >>>>>>> As I said above, runtime filter can work well with all shuffle > mode, > >>>>>>> including pipeline shuffle. > >>>>>>> > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than > >> RuntimeFilter > >>>>>>> operator, it can still filter out additional data afterwards. > >>>>>>> I think the main purpose of runtime filter is to reduce the shuffle > >>>> data > >>>>>>> and the data arriving at join. Although eagerly running the large > >>>>>>> table side can process datas in advance, most of the data may be > >>>>>>> irrelevant, causing huge shuffle overhead and slowing the join. In > >>>>>>> addition, if the join is a hash-join, the probe side of the > hash-join > >>>>>> also > >>>>>>> needs to wait for its build side to complete, so the large table > side > >>>> is > >>>>>>> likely to be back-pressed. > >>>>>>> In addition, I don't tend to add too many configuration options in > >> the > >>>>>>> first version, which may make it more difficult to use (users need > to > >>>>>>> understand a lot of internal implementation details). Maybe it > could > >>>> be a > >>>>>>> future improvement (if it's worthwhile)? > >>>>>>> > >>>>>>> > >>>>>>> @Aitozi > >>>>>>> > >>>>>>>>> IMO, In the current implementation two source table operators > will > >> be > >>>>>>> executed simultaneously. > >>>>>>> The example in FLIP uses blocking shuffle(I will add this point to > >>>> FLIP). > >>>>>>> The runtime filter is generally chained with the large table side > to > >>>>>> reduce > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job vertices > >>>> should > >>>>>> be > >>>>>>> scheduled in topological order, so the large table side can only be > >>>>>>> scheduled after the RuntimeFilterBuilder finishes. > >>>>>>> > >>>>>>>>> Are there some tests to show the default value of > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good > >>>> default > >>>>>>> value. > >>>>>>> It's not tested yet, but it will be done before merge the code. The > >>>>>> current > >>>>>>> value refers to systems such as spark and hive. Before code > merging, > >> we > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If you > >> have > >>>>>>> relevant experience on it, welcome to give some suggestions. > >>>>>>> > >>>>>>>>> What's the representation of the runtime filter node in planner ? > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new physical > >> nodes, > >>>>>>> RuntimeFilterBuilder and RuntimeFilter. > >>>>>>> > >>>>>>> Best, > >>>>>>> Lijie > >>>>>>> > >>>>>>> Aitozi <gjying1...@gmail.com <mailto:gjying1...@gmail.com> > <mailto:gjying1...@gmail.com> <mailto: > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> <mailto: > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> <mailto: > gjying1...@gmail.com>>> > >>>>>> 于2023年6月15日周四 15:52写道: > >>>>>>> > >>>>>>>> Hi Lijie, > >>>>>>>> > >>>>>>>> Nice to see this valuable feature. After reading the FLIP I have > >>>> some > >>>>>>>> questions below: > >>>>>>>> > >>>>>>>>> Schedule the TableSource(dim) first. > >>>>>>>> > >>>>>>>> How does it know to schedule the TableSource(dim) first ? IMO, In > >> the > >>>>>>>> current implementation two source table operators will be executed > >>>>>>>> simultaneously. > >>>>>>>> > >>>>>>>>> If the data volume on the probe side is too small, the overhead > of > >>>>>>>> building runtime filter is not worth it. > >>>>>>>> > >>>>>>>> Are there some tests to show the default value of > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good > >>>> default > >>>>>>>> value. The same to > >> table.optimizer.runtime-filter.max-build-data-size > >>>>>>>> > >>>>>>>>> the runtime filter can be pushed down along the probe side, as > >> close > >>>> to > >>>>>>>> data sources as possible > >>>>>>>> > >>>>>>>> What's the representation of the runtime filter node in planner ? > Is > >>>> it > >>>>>> a > >>>>>>>> Filternode > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> > >>>>>>>> Aitozi. > >>>>>>>> > >>>>>>>> Benchao Li <libenc...@apache.org <mailto:libenc...@apache.org> > <mailto:libenc...@apache.org> > >> <mailto:libenc...@apache.org>> > >>>> 于2023年6月15日周四 14:30写道: > >>>>>>>> > >>>>>>>>> Hi Lijie, > >>>>>>>>> > >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to > also > >>>>>>>> support > >>>>>>>>> "pipeline shuffle" if possible. > >>>>>>>>> > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, > although > >>>> this > >>>>>>>> has > >>>>>>>>> not been much exposed to users for now, I know a few companies > that > >>>>>> uses > >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing > effort[1] > >> to > >>>>>>>> make > >>>>>>>>> this usage more powerful. > >>>>>>>>> > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes > >> running > >>>>>>>> before > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any data > and > >>>>>> will > >>>>>>>>> occupy resources", whether it benefits us depends on the scale of > >>>> data, > >>>>>>>> if > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter > >>>>>>>> operator, > >>>>>>>>> it can still filter out additional data afterwards. Hence in my > >>>>>> opinion, > >>>>>>>> we > >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and > >>>>>>>> RuntimeFilter > >>>>>>>>> BLOCKING only, at least it can be configured. > >>>>>>>>> > >>>>>>>>> [1] > >>>>>> > >>>> > >> > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA > >>>>>>>>> > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > wangdachui9...@gmail.com> <mailto: > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto: > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto: > wangdachui9...@gmail.com>> <mailto: > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>> > >>>>>> 于2023年6月15日周四 14:18写道: > >>>>>>>>> > >>>>>>>>>> Hi Yuxia, > >>>>>>>>>> > >>>>>>>>>> I made a mistake in the above response. > >>>>>>>>>> > >>>>>>>>>> The runtime filter can work well with all shuffle mode. However, > >>>>>> hybrid > >>>>>>>>>> shuffle and blocking shuffle are currently recommended for batch > >>>> jobs > >>>>>>>>>> (piepline shuffle is not recommended). > >>>>>>>>>> > >>>>>>>>>> One more thing to mention here is that we will force the edge > >>>> between > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless > >> of > >>>>>>>> which > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really > doesn’t > >>>>>> need > >>>>>>>>> to > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the > >>>>>> RuntimeFilter > >>>>>>>>>> becomes running before the RuntimeFilterBuilder finished, it > will > >>>> not > >>>>>>>>>> process any data and will occupy resources. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Lijie > >>>>>>>>>> > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > wangdachui9...@gmail.com> <mailto: > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto: > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto: > wangdachui9...@gmail.com>> <mailto: > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>> > >>>>>> 于2023年6月15日周四 09:48写道: > >>>>>>>>>> > >>>>>>>>>>> Hi Yuxia, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for your feedback. The answers of your questions are as > >>>>>>>> follows: > >>>>>>>>>>> > >>>>>>>>>>> 1. Yes, the row count comes from statistic of underlying > table(Or > >>>>>>>>>>> estimated based on the statistic of underlying table, if the > >> build > >>>>>>>> side > >>>>>>>>>> or > >>>>>>>>>>> probe side is not TableScan). If the statistic unavailable, we > >>>> will > >>>>>>>>> not > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly evaluate the > >>>>>>>>>> benefits). > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build side is also > >> based > >>>>>>>> on > >>>>>>>>>> the > >>>>>>>>>>> row count statistics, that is, if the statistics is > unavailable, > >>>> the > >>>>>>>>>>> requirement > "table.optimizer.runtime-filter.max-build-data-size" > >>>>>>>> cannot > >>>>>>>>>> be > >>>>>>>>>>> evaluated either. I'll add this point into FLIP. > >>>>>>>>>>> > >>>>>>>>>>> 2. > >>>>>>>>>>> Estimated data size does not meet requirement (in planner > >>>>>>>> optimization > >>>>>>>>>>> phase) -> No filter > >>>>>>>>>>> Estimated data size meets the requirement (in planner > >> optimization > >>>>>>>>>> phase), > >>>>>>>>>>> but the real data size does not meet the requirement(in > execution > >>>>>>>>> phase) > >>>>>>>>>> -> > >>>>>>>>>>> Fake filter > >>>>>>>>>>> > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch jobs/blocking > >> shuffle. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Lijie > >>>>>>>>>>> > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto: > luoyu...@alumni.sjtu.edu.cn> <mailto: > >> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>> > <mailto: > >>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn> > <mailto:luoyu...@alumni.sjtu.edu.cn>> > >> <mailto: > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn> > <mailto:luoyu...@alumni.sjtu.edu.cn> > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>> > >>>> 于2023年6月14日周三 20:37写道: > >>>>>>>>>>> > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited to see > >> runtime > >>>>>>>>> filter > >>>>>>>>>>>> is to be implemented in Flink. > >>>>>>>>>>>> I have few questions about it: > >>>>>>>>>>>> > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, use row > >>>> count > >>>>>>>>>>>> instead`. So, does row count comes from the statistic from > >>>>>>>> underlying > >>>>>>>>>>>> table? What if the the statistic is also unavailable > considering > >>>>>>>> users > >>>>>>>>>>>> maynot always remember to generate statistic in production. > >>>>>>>>>>>> I'm wondering whether it make senese that just disable runtime > >>>>>>>> filter > >>>>>>>>> if > >>>>>>>>>>>> statistic is unavailable since in that case, we can hardly > >>>> evaluate > >>>>>>>>> the > >>>>>>>>>>>> benefits of runtime-filter. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime filters only if > >> the > >>>>>>>>>>>> following requirements are met:xxx", but it also said, "Once > >> this > >>>>>>>>> limit > >>>>>>>>>> is > >>>>>>>>>>>> exceeded, it will output a fake filter(which always returns > >> true)" > >>>>>>>> in > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are > >> contradictory, > >>>>>>>> so > >>>>>>>>>> i'm > >>>>>>>>>>>> wondering what's the real behavior, no filter will be injected > >> or > >>>>>>>> fake > >>>>>>>>>>>> filter? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take effect in > >>>> blocking > >>>>>>>>>>>> shuffle? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Best regards, > >>>>>>>>>>>> Yuxia > >>>>>>>>>>>> > >>>>>>>>>>>> ----- 原始邮件 ----- > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto: > ron9....@gmail.com> <mailto:ron9....@gmail.com> > >> <mailto:ron9....@gmail.com> > >>>> <mailto:ron9....@gmail.com>> > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto:dev@flink.apache.org> > <mailto:dev@flink.apache.org> > >> <mailto:dev@flink.apache.org> > >>>> <mailto:dev@flink.apache.org>> > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28 > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink > >>>> Batch > >>>>>>>>>> Jobs > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a common > >>>>>>>>>>>> optimization > >>>>>>>>>>>> to improve the join performance that has been adopted by many > >>>>>>>>> computing > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a streaming > batch > >>>>>>>>>> computing > >>>>>>>>>>>> engine, and we are continuously optimizing the performance of > >>>>>>>> batches. > >>>>>>>>>>>> Runtime filter is a general performance optimization technique > >>>> that > >>>>>>>>> can > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we are > >> introducing > >>>>>>>> it > >>>>>>>>> on > >>>>>>>>>>>> batch as well. > >>>>>>>>>>>> > >>>>>>>>>>>> Looking forward to all feedback. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Ron > >>>>>>>>>>>> > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto: > wangdachui9...@gmail.com> <mailto: > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto: > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto: > wangdachui9...@gmail.com>> <mailto: > >>>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto: > wangdachui9...@gmail.com> <mailto: > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>> > >>>> 于2023年6月14日周三 17:17写道: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi devs > >>>>>>>>>>>>> > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion about > >>>>>>>>>> FLIP-324: > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1] > >>>>>>>>>>>>> > >>>>>>>>>>>>> Runtime Filter is a common optimization to improve join > >>>>>>>> performance. > >>>>>>>>>> It > >>>>>>>>>>>> is > >>>>>>>>>>>>> designed to dynamically generate filter conditions for > certain > >>>>>>>> Join > >>>>>>>>>>>> queries > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or shuffled data, > >>>> avoid > >>>>>>>>>>>>> unnecessary I/O and network transmission, and speed up the > >> query. > >>>>>>>>> Its > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom filter) > based > >>>> on > >>>>>>>>> the > >>>>>>>>>>>> data > >>>>>>>>>>>>> on the small table side(build side) first, then pass this > >> filter > >>>>>>>> to > >>>>>>>>>> the > >>>>>>>>>>>>> large table side(probe side) to filter the irrelevant data on > >> it, > >>>>>>>>> this > >>>>>>>>>>>> can > >>>>>>>>>>>>> reduce the data reaching the join and improve performance. > >>>>>>>>>>>>> > >>>>>>>>>>>>> You can find more details in the FLIP-324[1]. Looking forward > >> to > >>>>>>>>> your > >>>>>>>>>>>>> feedback. > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Ron & Gen & Lijie > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Benchao Li > >