Hi Stefan,

>> bypassing the dataflow
I believe it's a possible solution, but it may require more coordination
and extra conditions (such as DFS), I do think it should be excluded from
the first version. I'll put it in Future+Improvements as a potential
improvement.

Thanks again for your quick reply :)

Best,
Lijie

Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一 20:51写道:

>
> Hi Lijie,
>
> I think you understood me correctly. But I would not consider this a true
> cyclic dependency in the dataflow because I would not suggest to send the
> filter through an edge in the job graph from join to scan. I’d rather
> bypass the stream graph for exchanging bringing the filter to the scan. For
> example, the join could report the filter after the build phase, e.g. to
> the JM or a predefined DFS folder. And when the probe scan is scheduled,
> the JM provides the filter information to the scan when it gets scheduled
> for execution or the scan looks in DFS if it can find any filter that it
> can use as part of initialization. I’m not suggesting to do it exactly in
> those ways, but just to show what I mean by "bypassing the dataflow".
>
> Anyways, I’m fine with excluding this optimization from the current FLIP
> if you believe it would be hard to implement in Flink.
>
> Best,
> Stefan
>
>
> > On 19. Jun 2023, at 14:07, Lijie Wang <wangdachui9...@gmail.com> wrote:
> >
> > Hi Stefan,
> >
> > If I understand correctly(I hope so), the hash join operator needs to
> send
> > the bloom filter to probe scan, and probe scan also needs to send the
> > filtered data to the hash join operator. This means there will be a cycle
> > in the data flow, it will be hard for current Flink to schedule this kind
> > of graph. I admit we can find a way to do this, but that's probably a
> > bit outside the scope of this FLIP.  So let's do these complex
> > optimizations later, WDYT?
> >
> > Best,
> > Lijie
> >
> > Stefan Richter <srich...@confluent.io.invalid <mailto:
> srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道:
> >
> >> Hi Lijie,
> >>
> >> Exactly, my proposal was to build the bloom filter in the hash
> operator. I
> >> don’t know about all the details about the implementation of Flink’s
> join
> >> operator, but I’d assume that even if the join is a two input operator
> it
> >> gets scheduled for 2 different pipelines. First the build phase with the
> >> scan from the dimension table and after that’s completed the probe phase
> >> with the scan of the fact table. I’m not proposing the use the bloom
> filter
> >> only in the join operator, but rather send the bloom filter to the probe
> >> scan before starting the probe. I assume this would require some form of
> >> side channel to transport the filter and coordination to tell the
> sources
> >> that such a filter is available. I cannot answer how hard those would
> be to
> >> implement, but the idea doesn’t seem impossible to me.
> >>
> >> Best,
> >> Stefan
> >>
> >>
> >>> On 19. Jun 2023, at 11:56, Lijie Wang <wangdachui9...@gmail.com>
> wrote:
> >>>
> >>> Hi Stefan,
> >>>
> >>> Now I know what you mean about point 1. But currently it is unfeasible
> >> for
> >>> Flink, because the building of the hash table is inside the hash join
> >>> operator. The hash join operator has two inputs, it will first process
> >> the
> >>> data of the build-input to build a hash table, and then use the hash
> >> table
> >>> to process the data of the probe-input. If we want to use the built
> hash
> >>> table to deduplicate data for bloom filter, we must put the bloom
> filter
> >>> inside the hash join operator.  However, in this way, the data reaching
> >> the
> >>> join operator cannot be reduced (the shuffle/network overhead cannot be
> >>> reduced), which is not what we expected.
> >>>
> >>> Regarding the filter type, I agree with you, more types of filters can
> >>> get further
> >>> optimization,  and it is in our future plan (We described it in the
> >> section
> >>> Future+Improvements#More+underlying+implementations).
> >>>
> >>> Best,
> >>> Lijie
> >>>
> >>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> srich...@confluent.io.invalid> <mailto:
> >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>>>
> 于2023年6月19日周一 15:58写道:
> >>>
> >>>>
> >>>> Hi Lijie,
> >>>>
> >>>> thanks for your response, I agree with what you said about points 2
> and
> >> 3.
> >>>> Let me explain a bit more about point 1. This would not apply to all
> >> types
> >>>> of joins and my suggestion is also *not* to build a hash table only
> for
> >> the
> >>>> purpose to build the bloom filter.
> >>>> I was thinking about the scenario of a hash join, where you would
> build
> >>>> the hash table as part of the join algorithm anyways and then use the
> >>>> keyset of that hash table to 1) have better insights on about NDV and
> >> 2) be
> >>>> able to construct the bloom filter without duplicates and therefore
> >> faster.
> >>>> So the preconditions where I would use this is if you are building a
> >> hash
> >>>> table as part of the join and you know you are not building for a key
> >>>> column (because there would be no duplicates to eliminate). Then your
> >> bloom
> >>>> filter construction could benefit already from the deduplication work
> >> that
> >>>> was done for building the hash table.
> >>>>
> >>>> I also wanted to point out that besides bloom filter and IN filter you
> >>>> could also think of other types of filter that can become interesting
> >> for
> >>>> certain distributions and meta data. For example, if you have min/max
> >>>> information about columns and partitions you could have a bit vector
> >>>> represent equilibrium-sized ranges of the key space between min and
> max
> >> and
> >>>> have the bits represent what part of the range is present and push
> that
> >>>> information down to the scan.
> >>>>
> >>>> Best,
> >>>> Stefan
> >>>>
> >>>>
> >>>>> On 19. Jun 2023, at 08:26, Lijie Wang <wangdachui9...@gmail.com
> <mailto:wangdachui9...@gmail.com>>
> >> wrote:
> >>>>>
> >>>>> Hi Stefan,
> >>>>>
> >>>>> Thanks for your feedback. Let me briefly summarize the optimization
> >>>> points
> >>>>> you mentioned above (Please correct me if I'm wrong):
> >>>>>
> >>>>> 1. Build an extra hash table for deduplication before building the
> >> bloom
> >>>>> filter.
> >>>>> 2. Use the two-phase approach to build the bloom filter(first local,
> >> then
> >>>>> OR-combine).
> >>>>> 3. Use blocked bloom filters to improve the cache efficiency.
> >>>>>
> >>>>> For the above 3 points, I have the following questions or opinions:
> >>>>>
> >>>>> For point 1, it seems that building a hash table also requires
> >> traversing
> >>>>> all build side data, and the overhead seems to be the same as
> building
> >> a
> >>>>> bloom filter directly? In addition, the hash table will take up more
> >>>> space
> >>>>> when the amount of data is large, which is why we choose to use bloom
> >>>>> filter instead of hash table.
> >>>>>
> >>>>> For point 2, I think it's a good idea to use the two-phase approach
> to
> >>>>> build the bloom filter. But rather than directly broadcasting the
> local
> >>>>> bloom filter to the probe side, I prefer to introduce a global node
> for
> >>>> the
> >>>>> OR-combine(like two-phase-agg[1]), then broadcast the combined bloom
> >>>> filter
> >>>>> to the probe side. The latter can reduce the amount of data
> transferred
> >>>> by
> >>>>> the network. I will change the FLIP like this.
> >>>>>
> >>>>> For point 3, I think it's a nice optimization, but I prefer to put it
> >> to
> >>>>> the future improvements. There is already an implementation of bloom
> >>>> filter
> >>>>> in flink, we can simply reuse it. Introducing a new bloom filter
> >>>>> implementation introduces some complexity  (we need to implement it,
> >> test
> >>>>> it, etc), and is not the focus of this FLIP.
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda
> >>>>>
> >>>>> Best,
> >>>>> Lijie
> >>>>>
> >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> srich...@confluent.io.invalid> <mailto:
> >> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>>
> <mailto:
> >>>> srich...@confluent.io.invalid <mailto:srich...@confluent.io.invalid>
> <mailto:srich...@confluent.io.invalid>>>
> >> 于2023年6月16日周五 16:45写道:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thanks for the proposal of this feature! I have a question about the
> >>>>>> filter build and a some suggestions for potential improvements.
> >> First, I
> >>>>>> wonder why you suggest to run the filter builder as separate
> operator
> >>>> with
> >>>>>> parallelism 1. I’d suggest to integrate the filter distributed build
> >>>> with
> >>>>>> the hash table build phase as follows:
> >>>>>>
> >>>>>> 1. Build the hash table completely in each subtask.
> >>>>>> 2. The keyset of the hash table is giving us a precise NDV count for
> >>>> every
> >>>>>> subtask.
> >>>>>> 3. Build a filter from the subtask hash table. For low cardinality
> >>>> tables,
> >>>>>> I’d go with the suggested optimization of IN-filter.
> >>>>>> 4. Each build subtask transfers the local bloom filter to all probe
> >>>>>> operators.
> >>>>>> 5. On the probe operator we can either probe against the individual
> >>>>>> filters, or we OR-combine all subtask filters into aggregated bloom
> >>>> filter.
> >>>>>>
> >>>>>> I’m suggesting this because building inserting into a (larger) bloom
> >>>>>> filter can be costly, especially once the filter exceeds cache sizes
> >>>> and is
> >>>>>> therefor better parallelized. First inserting into the hash table
> also
> >>>>>> deduplicates the keys and we avoid inserting records twice into the
> >>>> bloom
> >>>>>> filter. If we want to improve cache efficiency for the build of
> larger
> >>>>>> filters, we could structure them as blocked bloom filters, where the
> >>>> filter
> >>>>>> is separated into blocks and all bits of one key go only into one
> >> block.
> >>>>>> That allows us to apply software managed buffering to first group
> keys
> >>>> that
> >>>>>> go into the same partition (ideally fitting into cache) and then
> bulk
> >>>> load
> >>>>>> partitions once we collected enough keys for one round of loading.
> >>>>>>
> >>>>>> Best,
> >>>>>> Stefan
> >>>>>>
> >>>>>>
> >>>>>> <
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn
> >>>>>
> >>>>>> Stefan Richter
> >>>>>> Principal Engineer II
> >>>>>>
> >>>>>> Follow us:  <
> >>>>>>
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J
> >>>>>
> >>>>>> <
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK
> >>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang <wangdachui9...@gmail.com
> <mailto:wangdachui9...@gmail.com>
> >> <mailto:wangdachui9...@gmail.com>
> >>>> <mailto:wangdachui9...@gmail.com>> wrote:
> >>>>>>>
> >>>>>>> Hi,  Benchao and Aitozi,
> >>>>>>>
> >>>>>>> Thanks for your feedback about this FLIP.
> >>>>>>>
> >>>>>>> @Benchao
> >>>>>>>
> >>>>>>>>> I think it would be reasonable to also support "pipeline shuffle"
> >> if
> >>>>>>> possible.
> >>>>>>> As I said above, runtime filter can work well with all shuffle
> mode,
> >>>>>>> including pipeline shuffle.
> >>>>>>>
> >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than
> >> RuntimeFilter
> >>>>>>> operator, it can still filter out additional data afterwards.
> >>>>>>> I think the main purpose of runtime filter is to reduce the shuffle
> >>>> data
> >>>>>>> and the data arriving at join. Although eagerly running the large
> >>>>>>> table side can process datas in advance, most of the data may be
> >>>>>>> irrelevant, causing huge shuffle overhead and slowing the join. In
> >>>>>>> addition, if the join is a hash-join, the probe side of the
> hash-join
> >>>>>> also
> >>>>>>> needs to wait for its build side to complete, so the large table
> side
> >>>> is
> >>>>>>> likely to be back-pressed.
> >>>>>>> In addition, I don't tend to add too many configuration options in
> >> the
> >>>>>>> first version, which may make it more difficult to use (users need
> to
> >>>>>>> understand a lot of internal implementation details). Maybe it
> could
> >>>> be a
> >>>>>>> future improvement (if it's worthwhile)?
> >>>>>>>
> >>>>>>>
> >>>>>>> @Aitozi
> >>>>>>>
> >>>>>>>>> IMO, In the current implementation two source table operators
> will
> >> be
> >>>>>>> executed simultaneously.
> >>>>>>> The example in FLIP uses blocking shuffle(I will add this point to
> >>>> FLIP).
> >>>>>>> The runtime filter is generally chained with the large table side
> to
> >>>>>> reduce
> >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job vertices
> >>>> should
> >>>>>> be
> >>>>>>> scheduled in topological order, so the large table side can only be
> >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> >>>>>>>
> >>>>>>>>> Are there some tests to show the default value of
> >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
> >>>> default
> >>>>>>> value.
> >>>>>>> It's not tested yet, but it will be done before merge the code. The
> >>>>>> current
> >>>>>>> value refers to systems such as spark and hive. Before code
> merging,
> >> we
> >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If you
> >> have
> >>>>>>> relevant experience on it, welcome to give some suggestions.
> >>>>>>>
> >>>>>>>>> What's the representation of the runtime filter node in planner ?
> >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new physical
> >> nodes,
> >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Lijie
> >>>>>>>
> >>>>>>> Aitozi <gjying1...@gmail.com <mailto:gjying1...@gmail.com>
> <mailto:gjying1...@gmail.com> <mailto:
> >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> <mailto:
> >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> <mailto:
> gjying1...@gmail.com>>>
> >>>>>> 于2023年6月15日周四 15:52写道:
> >>>>>>>
> >>>>>>>> Hi Lijie,
> >>>>>>>>
> >>>>>>>> Nice to see this valuable feature. After reading the FLIP I have
> >>>> some
> >>>>>>>> questions below:
> >>>>>>>>
> >>>>>>>>> Schedule the TableSource(dim) first.
> >>>>>>>>
> >>>>>>>> How does it know to schedule the TableSource(dim) first ? IMO, In
> >> the
> >>>>>>>> current implementation two source table operators will be executed
> >>>>>>>> simultaneously.
> >>>>>>>>
> >>>>>>>>> If the data volume on the probe side is too small, the overhead
> of
> >>>>>>>> building runtime filter is not worth it.
> >>>>>>>>
> >>>>>>>> Are there some tests to show the default value of
> >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
> >>>> default
> >>>>>>>> value. The same to
> >> table.optimizer.runtime-filter.max-build-data-size
> >>>>>>>>
> >>>>>>>>> the runtime filter can be pushed down along the probe side, as
> >> close
> >>>> to
> >>>>>>>> data sources as possible
> >>>>>>>>
> >>>>>>>> What's the representation of the runtime filter node in planner ?
> Is
> >>>> it
> >>>>>> a
> >>>>>>>> Filternode
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Aitozi.
> >>>>>>>>
> >>>>>>>> Benchao Li <libenc...@apache.org <mailto:libenc...@apache.org>
> <mailto:libenc...@apache.org>
> >> <mailto:libenc...@apache.org>>
> >>>> 于2023年6月15日周四 14:30写道:
> >>>>>>>>
> >>>>>>>>> Hi Lijie,
> >>>>>>>>>
> >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to
> also
> >>>>>>>> support
> >>>>>>>>> "pipeline shuffle" if possible.
> >>>>>>>>>
> >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing,
> although
> >>>> this
> >>>>>>>> has
> >>>>>>>>> not been much exposed to users for now, I know a few companies
> that
> >>>>>> uses
> >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing
> effort[1]
> >> to
> >>>>>>>> make
> >>>>>>>>> this usage more powerful.
> >>>>>>>>>
> >>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes
> >> running
> >>>>>>>> before
> >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any data
> and
> >>>>>> will
> >>>>>>>>> occupy resources", whether it benefits us depends on the scale of
> >>>> data,
> >>>>>>>> if
> >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> >>>>>>>> operator,
> >>>>>>>>> it can still filter out additional data afterwards. Hence in my
> >>>>>> opinion,
> >>>>>>>> we
> >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
> >>>>>>>> RuntimeFilter
> >>>>>>>>> BLOCKING only, at least it can be configured.
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA
> >>>>>>>>>
> >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> wangdachui9...@gmail.com> <mailto:
> >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto:
> >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto:
> wangdachui9...@gmail.com>> <mailto:
> >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> >>>>>> 于2023年6月15日周四 14:18写道:
> >>>>>>>>>
> >>>>>>>>>> Hi Yuxia,
> >>>>>>>>>>
> >>>>>>>>>> I made a mistake in the above response.
> >>>>>>>>>>
> >>>>>>>>>> The runtime filter can work well with all shuffle mode. However,
> >>>>>> hybrid
> >>>>>>>>>> shuffle and blocking shuffle are currently recommended for batch
> >>>> jobs
> >>>>>>>>>> (piepline shuffle is not recommended).
> >>>>>>>>>>
> >>>>>>>>>> One more thing to mention here is that we will force the edge
> >>>> between
> >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless
> >> of
> >>>>>>>> which
> >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really
> doesn’t
> >>>>>> need
> >>>>>>>>> to
> >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the
> >>>>>> RuntimeFilter
> >>>>>>>>>> becomes running before the RuntimeFilterBuilder finished, it
> will
> >>>> not
> >>>>>>>>>> process any data and will occupy resources.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Lijie
> >>>>>>>>>>
> >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> wangdachui9...@gmail.com> <mailto:
> >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto:
> >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto:
> wangdachui9...@gmail.com>> <mailto:
> >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> >>>>>> 于2023年6月15日周四 09:48写道:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Yuxia,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your feedback. The answers of your questions are as
> >>>>>>>> follows:
> >>>>>>>>>>>
> >>>>>>>>>>> 1. Yes, the row count comes from statistic of underlying
> table(Or
> >>>>>>>>>>> estimated based on the statistic of underlying table, if the
> >> build
> >>>>>>>> side
> >>>>>>>>>> or
> >>>>>>>>>>> probe side is not TableScan).  If the statistic unavailable, we
> >>>> will
> >>>>>>>>> not
> >>>>>>>>>>> inject a runtime filter(As you said, we can hardly evaluate the
> >>>>>>>>>> benefits).
> >>>>>>>>>>> Besides, AFAIK, the estimated data size of build side is also
> >> based
> >>>>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>> row count statistics, that is, if the statistics is
> unavailable,
> >>>> the
> >>>>>>>>>>> requirement
> "table.optimizer.runtime-filter.max-build-data-size"
> >>>>>>>> cannot
> >>>>>>>>>> be
> >>>>>>>>>>> evaluated either. I'll add this point into FLIP.
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> Estimated data size does not meet requirement (in planner
> >>>>>>>> optimization
> >>>>>>>>>>> phase) -> No filter
> >>>>>>>>>>> Estimated data size meets the requirement (in planner
> >> optimization
> >>>>>>>>>> phase),
> >>>>>>>>>>> but the real data size does not meet the requirement(in
> execution
> >>>>>>>>> phase)
> >>>>>>>>>> ->
> >>>>>>>>>>> Fake filter
> >>>>>>>>>>>
> >>>>>>>>>>> 3. Yes, the runtime filter is only for batch jobs/blocking
> >> shuffle.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Lijie
> >>>>>>>>>>>
> >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto:
> luoyu...@alumni.sjtu.edu.cn> <mailto:
> >> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>>
> <mailto:
> >>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>
> <mailto:luoyu...@alumni.sjtu.edu.cn>>
> >> <mailto:
> >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>
> <mailto:luoyu...@alumni.sjtu.edu.cn>
> >> <mailto:luoyu...@alumni.sjtu.edu.cn>>>
> >>>> 于2023年6月14日周三 20:37写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited to see
> >> runtime
> >>>>>>>>> filter
> >>>>>>>>>>>> is to be implemented in Flink.
> >>>>>>>>>>>> I have few questions about it:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, use row
> >>>> count
> >>>>>>>>>>>> instead`. So, does row count comes from the statistic from
> >>>>>>>> underlying
> >>>>>>>>>>>> table? What if the the statistic is also unavailable
> considering
> >>>>>>>> users
> >>>>>>>>>>>> maynot always remember to generate statistic in production.
> >>>>>>>>>>>> I'm wondering whether it make senese that just disable runtime
> >>>>>>>> filter
> >>>>>>>>> if
> >>>>>>>>>>>> statistic is unavailable since in that case, we can hardly
> >>>> evaluate
> >>>>>>>>> the
> >>>>>>>>>>>> benefits of runtime-filter.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime filters only if
> >> the
> >>>>>>>>>>>> following requirements are met:xxx", but it also said, "Once
> >> this
> >>>>>>>>> limit
> >>>>>>>>>> is
> >>>>>>>>>>>> exceeded, it will output a fake filter(which always returns
> >> true)"
> >>>>>>>> in
> >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are
> >> contradictory,
> >>>>>>>> so
> >>>>>>>>>> i'm
> >>>>>>>>>>>> wondering what's the real behavior, no filter will be injected
> >> or
> >>>>>>>> fake
> >>>>>>>>>>>> filter?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take effect in
> >>>> blocking
> >>>>>>>>>>>> shuffle?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best regards,
> >>>>>>>>>>>> Yuxia
> >>>>>>>>>>>>
> >>>>>>>>>>>> ----- 原始邮件 -----
> >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto:
> ron9....@gmail.com> <mailto:ron9....@gmail.com>
> >> <mailto:ron9....@gmail.com>
> >>>> <mailto:ron9....@gmail.com>>
> >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto:dev@flink.apache.org>
> <mailto:dev@flink.apache.org>
> >> <mailto:dev@flink.apache.org>
> >>>> <mailto:dev@flink.apache.org>>
> >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink
> >>>> Batch
> >>>>>>>>>> Jobs
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a common
> >>>>>>>>>>>> optimization
> >>>>>>>>>>>> to improve the join performance that has been adopted by many
> >>>>>>>>> computing
> >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a streaming
> batch
> >>>>>>>>>> computing
> >>>>>>>>>>>> engine, and we are continuously optimizing the performance of
> >>>>>>>> batches.
> >>>>>>>>>>>> Runtime filter is a general performance optimization technique
> >>>> that
> >>>>>>>>> can
> >>>>>>>>>>>> improve the performance of Flink batch jobs, so we are
> >> introducing
> >>>>>>>> it
> >>>>>>>>> on
> >>>>>>>>>>>> batch as well.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looking forward to all feedback.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Ron
> >>>>>>>>>>>>
> >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> wangdachui9...@gmail.com> <mailto:
> >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> <mailto:
> >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto:
> wangdachui9...@gmail.com>> <mailto:
> >>>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> <mailto:
> wangdachui9...@gmail.com> <mailto:
> >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>>
> >>>> 于2023年6月14日周三 17:17写道:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi devs
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion about
> >>>>>>>>>> FLIP-324:
> >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Runtime Filter is a common optimization to improve join
> >>>>>>>> performance.
> >>>>>>>>>> It
> >>>>>>>>>>>> is
> >>>>>>>>>>>>> designed to dynamically generate filter conditions for
> certain
> >>>>>>>> Join
> >>>>>>>>>>>> queries
> >>>>>>>>>>>>> at runtime to reduce the amount of scanned or shuffled data,
> >>>> avoid
> >>>>>>>>>>>>> unnecessary I/O and network transmission, and speed up the
> >> query.
> >>>>>>>>> Its
> >>>>>>>>>>>>> working principle is building a filter(e.g. bloom filter)
> based
> >>>> on
> >>>>>>>>> the
> >>>>>>>>>>>> data
> >>>>>>>>>>>>> on the small table side(build side) first, then pass this
> >> filter
> >>>>>>>> to
> >>>>>>>>>> the
> >>>>>>>>>>>>> large table side(probe side) to filter the irrelevant data on
> >> it,
> >>>>>>>>> this
> >>>>>>>>>>>> can
> >>>>>>>>>>>>> reduce the data reaching the join and improve performance.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> You can find more details in the FLIP-324[1]. Looking forward
> >> to
> >>>>>>>>> your
> >>>>>>>>>>>>> feedback.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Ron & Gen & Lijie
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Benchao Li
>
>

Reply via email to