Hi Ron,

Thanks for sharing your thoughts! It makes sense. It would be helpful if
these references of Hive, Polardb, etc. could be added into the FLIP.

Best regards,
Jing

On Tue, Jun 20, 2023 at 5:41 PM liu ron <ron9....@gmail.com> wrote:

> Hi, Jing
>
> The default value for this ratio is a reference to other systems, such as
> Hive. As long as Runtime Filter can filter out more than half of the data,
> we can benefit from it. Of course, normally, as long as we can get the
> statistics, ndv are present, the use of rowCount should be less, so I think
> the formula is valid in most cases. This formula we are also borrowed from
> some systems, such as the polardb of AliCloud. your concern is valuable for
> this FLIP, but currently, we do not know how to adjust is reasonably, too
> complex may lead to the user also can not understand, so I think we should
> first follow the simple way, the subsequent gradual optimization. The first
> step may be that we can verify the reasonableness of current formula by
> TPC-DS case.
>
> Best,
> Ron
>
> Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 19:46写道:
>
> > Hi Ron,
> >
> > Thanks for the clarification. That answered my questions.
> >
> > Regarding the ratio, since my gut feeling is that any value less than 0.8
> > or 0.9 won't help too much(I might be wrong). I was thinking of adapting
> > the formula to somehow map the current 0.9-1 to 0-1, i.e. if user config
> > 0.5, it will be mapped to e.g. 0.95 (or e.g. 0.85, the real number
> > needs more calculation) for the current formula described in the FLIP.
> But
> > I am not sure it is a feasible solution. It deserves more discussion.
> Maybe
> > some real performance tests could give us some hints.
> >
> > Best regards,
> > Jing
> >
> > On Tue, Jun 20, 2023 at 5:19 AM liu ron <ron9....@gmail.com> wrote:
> >
> > > Hi, Jing
> > >
> > > Thanks for your feedback.
> > >
> > > > Afaiu, the runtime Filter will only be Injected when the gap between
> > the
> > > build data size and prob data size is big enough. Let's make an extreme
> > > example. If the small table(build side) has one row and the large
> > > table(probe side) contains tens of billions of rows. This will be the
> > ideal
> > > use case for the runtime filter and the improvement will be
> significant.
> > Is
> > > this correct?
> > >
> > > Yes, you are right.
> > >
> > > > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> > > will
> > > the value of max-build-data-size and min-prob-data-size depend on the
> > > parallelism config? I.e. with the same data-size setting, is it
> possible
> > to
> > > inject or don't inject runtime filters by adjusting the parallelism?
> > >
> > > First, let me clarify two points. The first is that RuntimeFilter
> decides
> > > whether to inject or not in the optimization phase, but we do not
> > consider
> > > operator parallelism in the SQL optimization phase currently, which is
> > set
> > > at the ExecNode level. The second is that in batch mode, the default
> > > AdaptiveBatchScheduler[1] is now used, which will derive the
> parallelism
> > of
> > > the downstream operator based on the amount of data produced by the
> > > upstream operator, that is, the parallelism is determined by runtime
> > > adaptation. In the above case, we cannot decide whether to inject
> > > BloomFilter in the optimization stage based on parallelism.
> > > A more important point is that the purpose of Runtime Filter is to
> reduce
> > > the amount of data for shuffle, and thus the amount of data processed
> by
> > > the downstream join operator. Therefore, I understand that regardless
> of
> > > the parallelism of the probe, the amount of data in the shuffle must be
> > > reduced after inserting the Runtime Filter, which is beneficial to the
> > join
> > > operator, so whether to insert the RuntimeFilter or not is not
> dependent
> > on
> > > the parallelism.
> > >
> > > > Does it make sense to reconsider the formula of ratio
> > > calculation to help users easily control the filter injection?
> > >
> > > Only when ndv does not exist will row count be considered. when size
> uses
> > > the default value and ndv cannot be taken, it is true that this
> condition
> > > may always hold, but this does not seem to affect anything, and the
> user
> > is
> > > also likely to change the value of the size. One question, how do you
> > think
> > > we should make it easier for users to control the  filter injection?
> > >
> > >
> > > [1]:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> > >
> > > Best,
> > > Ron
> > >
> > > Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 07:11写道:
> > >
> > > > Hi Lijie,
> > > >
> > > > Thanks for your proposal. It is a really nice feature. I'd like to
> ask
> > a
> > > > few questions to understand your thoughts.
> > > >
> > > > Afaiu, the runtime Filter will only be Injected when the gap between
> > the
> > > > build data size and prob data size is big enough. Let's make an
> extreme
> > > > example. If the small table(build side) has one row and the large
> > > > table(probe side) contains tens of billions of rows. This will be the
> > > ideal
> > > > use case for the runtime filter and the improvement will be
> > significant.
> > > Is
> > > > this correct?
> > > >
> > > > Speaking of the "Conditions of injecting Runtime Filter" in the FLIP,
> > > will
> > > > the value of max-build-data-size and min-prob-data-size depend on the
> > > > parallelism config? I.e. with the same data-size setting, is it
> > possible
> > > to
> > > > inject or don't inject runtime filters by adjusting the parallelism?
> > > >
> > > > In the FLIP, there are default values for the new configuration
> > > parameters
> > > > that will be used to check the injection condition. If ndv cannot be
> > > > estimated, row count will be used. Given the max-build-data-size is
> > 10MB
> > > > and the min-prob-data-size is 10GB, in the worst case, the
> > > min-filter-ratio
> > > > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we
> > consider
> > > > the duplication and the fact that the large table might have more
> > columns
> > > > than the small table, the probeNdv should still be 100 or 10 times
> > > > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or
> 0.9.
> > > Both
> > > > are bigger than the default value 0.5 in the FLIP. If I am not
> > mistaken,
> > > > commonly, a min-filter-ratio less than 0.99 will always allow
> injecting
> > > the
> > > > runtime filter. Does it make sense to reconsider the formula of ratio
> > > > calculation to help users easily control the filter injection?
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang <wangdachui9...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Stefan,
> > > > >
> > > > > >> bypassing the dataflow
> > > > > I believe it's a possible solution, but it may require more
> > > coordination
> > > > > and extra conditions (such as DFS), I do think it should be
> excluded
> > > from
> > > > > the first version. I'll put it in Future+Improvements as a
> potential
> > > > > improvement.
> > > > >
> > > > > Thanks again for your quick reply :)
> > > > >
> > > > > Best,
> > > > > Lijie
> > > > >
> > > > > Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一
> > 20:51写道:
> > > > >
> > > > > >
> > > > > > Hi Lijie,
> > > > > >
> > > > > > I think you understood me correctly. But I would not consider
> this
> > a
> > > > true
> > > > > > cyclic dependency in the dataflow because I would not suggest to
> > send
> > > > the
> > > > > > filter through an edge in the job graph from join to scan. I’d
> > rather
> > > > > > bypass the stream graph for exchanging bringing the filter to the
> > > scan.
> > > > > For
> > > > > > example, the join could report the filter after the build phase,
> > e.g.
> > > > to
> > > > > > the JM or a predefined DFS folder. And when the probe scan is
> > > > scheduled,
> > > > > > the JM provides the filter information to the scan when it gets
> > > > scheduled
> > > > > > for execution or the scan looks in DFS if it can find any filter
> > that
> > > > it
> > > > > > can use as part of initialization. I’m not suggesting to do it
> > > exactly
> > > > in
> > > > > > those ways, but just to show what I mean by "bypassing the
> > dataflow".
> > > > > >
> > > > > > Anyways, I’m fine with excluding this optimization from the
> current
> > > > FLIP
> > > > > > if you believe it would be hard to implement in Flink.
> > > > > >
> > > > > > Best,
> > > > > > Stefan
> > > > > >
> > > > > >
> > > > > > > On 19. Jun 2023, at 14:07, Lijie Wang <
> wangdachui9...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Stefan,
> > > > > > >
> > > > > > > If I understand correctly(I hope so), the hash join operator
> > needs
> > > to
> > > > > > send
> > > > > > > the bloom filter to probe scan, and probe scan also needs to
> send
> > > the
> > > > > > > filtered data to the hash join operator. This means there will
> > be a
> > > > > cycle
> > > > > > > in the data flow, it will be hard for current Flink to schedule
> > > this
> > > > > kind
> > > > > > > of graph. I admit we can find a way to do this, but that's
> > > probably a
> > > > > > > bit outside the scope of this FLIP.  So let's do these complex
> > > > > > > optimizations later, WDYT?
> > > > > > >
> > > > > > > Best,
> > > > > > > Lijie
> > > > > > >
> > > > > > > Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > > > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道:
> > > > > > >
> > > > > > >> Hi Lijie,
> > > > > > >>
> > > > > > >> Exactly, my proposal was to build the bloom filter in the hash
> > > > > > operator. I
> > > > > > >> don’t know about all the details about the implementation of
> > > Flink’s
> > > > > > join
> > > > > > >> operator, but I’d assume that even if the join is a two input
> > > > operator
> > > > > > it
> > > > > > >> gets scheduled for 2 different pipelines. First the build
> phase
> > > with
> > > > > the
> > > > > > >> scan from the dimension table and after that’s completed the
> > probe
> > > > > phase
> > > > > > >> with the scan of the fact table. I’m not proposing the use the
> > > bloom
> > > > > > filter
> > > > > > >> only in the join operator, but rather send the bloom filter to
> > the
> > > > > probe
> > > > > > >> scan before starting the probe. I assume this would require
> some
> > > > form
> > > > > of
> > > > > > >> side channel to transport the filter and coordination to tell
> > the
> > > > > > sources
> > > > > > >> that such a filter is available. I cannot answer how hard
> those
> > > > would
> > > > > > be to
> > > > > > >> implement, but the idea doesn’t seem impossible to me.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Stefan
> > > > > > >>
> > > > > > >>
> > > > > > >>> On 19. Jun 2023, at 11:56, Lijie Wang <
> > wangdachui9...@gmail.com>
> > > > > > wrote:
> > > > > > >>>
> > > > > > >>> Hi Stefan,
> > > > > > >>>
> > > > > > >>> Now I know what you mean about point 1. But currently it is
> > > > > unfeasible
> > > > > > >> for
> > > > > > >>> Flink, because the building of the hash table is inside the
> > hash
> > > > join
> > > > > > >>> operator. The hash join operator has two inputs, it will
> first
> > > > > process
> > > > > > >> the
> > > > > > >>> data of the build-input to build a hash table, and then use
> the
> > > > hash
> > > > > > >> table
> > > > > > >>> to process the data of the probe-input. If we want to use the
> > > built
> > > > > > hash
> > > > > > >>> table to deduplicate data for bloom filter, we must put the
> > bloom
> > > > > > filter
> > > > > > >>> inside the hash join operator.  However, in this way, the
> data
> > > > > reaching
> > > > > > >> the
> > > > > > >>> join operator cannot be reduced (the shuffle/network overhead
> > > > cannot
> > > > > be
> > > > > > >>> reduced), which is not what we expected.
> > > > > > >>>
> > > > > > >>> Regarding the filter type, I agree with you, more types of
> > > filters
> > > > > can
> > > > > > >>> get further
> > > > > > >>> optimization,  and it is in our future plan (We described it
> in
> > > the
> > > > > > >> section
> > > > > > >>> Future+Improvements#More+underlying+implementations).
> > > > > > >>>
> > > > > > >>> Best,
> > > > > > >>> Lijie
> > > > > > >>>
> > > > > > >>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > > > srich...@confluent.io.invalid> <mailto:
> > > > > > >> srich...@confluent.io.invalid <mailto:
> > > srich...@confluent.io.invalid
> > > > > >>>
> > > > > > 于2023年6月19日周一 15:58写道:
> > > > > > >>>
> > > > > > >>>>
> > > > > > >>>> Hi Lijie,
> > > > > > >>>>
> > > > > > >>>> thanks for your response, I agree with what you said about
> > > points
> > > > 2
> > > > > > and
> > > > > > >> 3.
> > > > > > >>>> Let me explain a bit more about point 1. This would not
> apply
> > to
> > > > all
> > > > > > >> types
> > > > > > >>>> of joins and my suggestion is also *not* to build a hash
> table
> > > > only
> > > > > > for
> > > > > > >> the
> > > > > > >>>> purpose to build the bloom filter.
> > > > > > >>>> I was thinking about the scenario of a hash join, where you
> > > would
> > > > > > build
> > > > > > >>>> the hash table as part of the join algorithm anyways and
> then
> > > use
> > > > > the
> > > > > > >>>> keyset of that hash table to 1) have better insights on
> about
> > > NDV
> > > > > and
> > > > > > >> 2) be
> > > > > > >>>> able to construct the bloom filter without duplicates and
> > > > therefore
> > > > > > >> faster.
> > > > > > >>>> So the preconditions where I would use this is if you are
> > > > building a
> > > > > > >> hash
> > > > > > >>>> table as part of the join and you know you are not building
> > for
> > > a
> > > > > key
> > > > > > >>>> column (because there would be no duplicates to eliminate).
> > Then
> > > > > your
> > > > > > >> bloom
> > > > > > >>>> filter construction could benefit already from the
> > deduplication
> > > > > work
> > > > > > >> that
> > > > > > >>>> was done for building the hash table.
> > > > > > >>>>
> > > > > > >>>> I also wanted to point out that besides bloom filter and IN
> > > filter
> > > > > you
> > > > > > >>>> could also think of other types of filter that can become
> > > > > interesting
> > > > > > >> for
> > > > > > >>>> certain distributions and meta data. For example, if you
> have
> > > > > min/max
> > > > > > >>>> information about columns and partitions you could have a
> bit
> > > > vector
> > > > > > >>>> represent equilibrium-sized ranges of the key space between
> > min
> > > > and
> > > > > > max
> > > > > > >> and
> > > > > > >>>> have the bits represent what part of the range is present
> and
> > > push
> > > > > > that
> > > > > > >>>> information down to the scan.
> > > > > > >>>>
> > > > > > >>>> Best,
> > > > > > >>>> Stefan
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang <
> > > wangdachui9...@gmail.com
> > > > > > <mailto:wangdachui9...@gmail.com>>
> > > > > > >> wrote:
> > > > > > >>>>>
> > > > > > >>>>> Hi Stefan,
> > > > > > >>>>>
> > > > > > >>>>> Thanks for your feedback. Let me briefly summarize the
> > > > optimization
> > > > > > >>>> points
> > > > > > >>>>> you mentioned above (Please correct me if I'm wrong):
> > > > > > >>>>>
> > > > > > >>>>> 1. Build an extra hash table for deduplication before
> > building
> > > > the
> > > > > > >> bloom
> > > > > > >>>>> filter.
> > > > > > >>>>> 2. Use the two-phase approach to build the bloom
> filter(first
> > > > > local,
> > > > > > >> then
> > > > > > >>>>> OR-combine).
> > > > > > >>>>> 3. Use blocked bloom filters to improve the cache
> efficiency.
> > > > > > >>>>>
> > > > > > >>>>> For the above 3 points, I have the following questions or
> > > > opinions:
> > > > > > >>>>>
> > > > > > >>>>> For point 1, it seems that building a hash table also
> > requires
> > > > > > >> traversing
> > > > > > >>>>> all build side data, and the overhead seems to be the same
> as
> > > > > > building
> > > > > > >> a
> > > > > > >>>>> bloom filter directly? In addition, the hash table will
> take
> > up
> > > > > more
> > > > > > >>>> space
> > > > > > >>>>> when the amount of data is large, which is why we choose to
> > use
> > > > > bloom
> > > > > > >>>>> filter instead of hash table.
> > > > > > >>>>>
> > > > > > >>>>> For point 2, I think it's a good idea to use the two-phase
> > > > approach
> > > > > > to
> > > > > > >>>>> build the bloom filter. But rather than directly
> broadcasting
> > > the
> > > > > > local
> > > > > > >>>>> bloom filter to the probe side, I prefer to introduce a
> > global
> > > > node
> > > > > > for
> > > > > > >>>> the
> > > > > > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the
> > combined
> > > > > bloom
> > > > > > >>>> filter
> > > > > > >>>>> to the probe side. The latter can reduce the amount of data
> > > > > > transferred
> > > > > > >>>> by
> > > > > > >>>>> the network. I will change the FLIP like this.
> > > > > > >>>>>
> > > > > > >>>>> For point 3, I think it's a nice optimization, but I prefer
> > to
> > > > put
> > > > > it
> > > > > > >> to
> > > > > > >>>>> the future improvements. There is already an implementation
> > of
> > > > > bloom
> > > > > > >>>> filter
> > > > > > >>>>> in flink, we can simply reuse it. Introducing a new bloom
> > > filter
> > > > > > >>>>> implementation introduces some complexity  (we need to
> > > implement
> > > > > it,
> > > > > > >> test
> > > > > > >>>>> it, etc), and is not the focus of this FLIP.
> > > > > > >>>>>
> > > > > > >>>>> [1]
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda
> > > > > > >>>>>
> > > > > > >>>>> Best,
> > > > > > >>>>> Lijie
> > > > > > >>>>>
> > > > > > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > > > srich...@confluent.io.invalid> <mailto:
> > > > > > >> srich...@confluent.io.invalid <mailto:
> > > srich...@confluent.io.invalid
> > > > >>
> > > > > > <mailto:
> > > > > > >>>> srich...@confluent.io.invalid <mailto:
> > > > srich...@confluent.io.invalid
> > > > > >
> > > > > > <mailto:srich...@confluent.io.invalid>>>
> > > > > > >> 于2023年6月16日周五 16:45写道:
> > > > > > >>>>>
> > > > > > >>>>>> Hi,
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks for the proposal of this feature! I have a question
> > > about
> > > > > the
> > > > > > >>>>>> filter build and a some suggestions for potential
> > > improvements.
> > > > > > >> First, I
> > > > > > >>>>>> wonder why you suggest to run the filter builder as
> separate
> > > > > > operator
> > > > > > >>>> with
> > > > > > >>>>>> parallelism 1. I’d suggest to integrate the filter
> > distributed
> > > > > build
> > > > > > >>>> with
> > > > > > >>>>>> the hash table build phase as follows:
> > > > > > >>>>>>
> > > > > > >>>>>> 1. Build the hash table completely in each subtask.
> > > > > > >>>>>> 2. The keyset of the hash table is giving us a precise NDV
> > > count
> > > > > for
> > > > > > >>>> every
> > > > > > >>>>>> subtask.
> > > > > > >>>>>> 3. Build a filter from the subtask hash table. For low
> > > > cardinality
> > > > > > >>>> tables,
> > > > > > >>>>>> I’d go with the suggested optimization of IN-filter.
> > > > > > >>>>>> 4. Each build subtask transfers the local bloom filter to
> > all
> > > > > probe
> > > > > > >>>>>> operators.
> > > > > > >>>>>> 5. On the probe operator we can either probe against the
> > > > > individual
> > > > > > >>>>>> filters, or we OR-combine all subtask filters into
> > aggregated
> > > > > bloom
> > > > > > >>>> filter.
> > > > > > >>>>>>
> > > > > > >>>>>> I’m suggesting this because building inserting into a
> > (larger)
> > > > > bloom
> > > > > > >>>>>> filter can be costly, especially once the filter exceeds
> > cache
> > > > > sizes
> > > > > > >>>> and is
> > > > > > >>>>>> therefor better parallelized. First inserting into the
> hash
> > > > table
> > > > > > also
> > > > > > >>>>>> deduplicates the keys and we avoid inserting records twice
> > > into
> > > > > the
> > > > > > >>>> bloom
> > > > > > >>>>>> filter. If we want to improve cache efficiency for the
> build
> > > of
> > > > > > larger
> > > > > > >>>>>> filters, we could structure them as blocked bloom filters,
> > > where
> > > > > the
> > > > > > >>>> filter
> > > > > > >>>>>> is separated into blocks and all bits of one key go only
> > into
> > > > one
> > > > > > >> block.
> > > > > > >>>>>> That allows us to apply software managed buffering to
> first
> > > > group
> > > > > > keys
> > > > > > >>>> that
> > > > > > >>>>>> go into the same partition (ideally fitting into cache)
> and
> > > then
> > > > > > bulk
> > > > > > >>>> load
> > > > > > >>>>>> partitions once we collected enough keys for one round of
> > > > loading.
> > > > > > >>>>>>
> > > > > > >>>>>> Best,
> > > > > > >>>>>> Stefan
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> <
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn
> > > > > > >>>>>
> > > > > > >>>>>> Stefan Richter
> > > > > > >>>>>> Principal Engineer II
> > > > > > >>>>>>
> > > > > > >>>>>> Follow us:  <
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J
> > > > > > >>>>>
> > > > > > >>>>>> <
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK
> > > > > > >>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang <
> > > > wangdachui9...@gmail.com
> > > > > > <mailto:wangdachui9...@gmail.com>
> > > > > > >> <mailto:wangdachui9...@gmail.com>
> > > > > > >>>> <mailto:wangdachui9...@gmail.com>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>> Hi,  Benchao and Aitozi,
> > > > > > >>>>>>>
> > > > > > >>>>>>> Thanks for your feedback about this FLIP.
> > > > > > >>>>>>>
> > > > > > >>>>>>> @Benchao
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> I think it would be reasonable to also support
> "pipeline
> > > > > shuffle"
> > > > > > >> if
> > > > > > >>>>>>> possible.
> > > > > > >>>>>>> As I said above, runtime filter can work well with all
> > > shuffle
> > > > > > mode,
> > > > > > >>>>>>> including pipeline shuffle.
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than
> > > > > > >> RuntimeFilter
> > > > > > >>>>>>> operator, it can still filter out additional data
> > afterwards.
> > > > > > >>>>>>> I think the main purpose of runtime filter is to reduce
> the
> > > > > shuffle
> > > > > > >>>> data
> > > > > > >>>>>>> and the data arriving at join. Although eagerly running
> the
> > > > large
> > > > > > >>>>>>> table side can process datas in advance, most of the data
> > may
> > > > be
> > > > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing the
> > > join.
> > > > > In
> > > > > > >>>>>>> addition, if the join is a hash-join, the probe side of
> the
> > > > > > hash-join
> > > > > > >>>>>> also
> > > > > > >>>>>>> needs to wait for its build side to complete, so the
> large
> > > > table
> > > > > > side
> > > > > > >>>> is
> > > > > > >>>>>>> likely to be back-pressed.
> > > > > > >>>>>>> In addition, I don't tend to add too many configuration
> > > options
> > > > > in
> > > > > > >> the
> > > > > > >>>>>>> first version, which may make it more difficult to use
> > (users
> > > > > need
> > > > > > to
> > > > > > >>>>>>> understand a lot of internal implementation details).
> Maybe
> > > it
> > > > > > could
> > > > > > >>>> be a
> > > > > > >>>>>>> future improvement (if it's worthwhile)?
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> @Aitozi
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> IMO, In the current implementation two source table
> > > operators
> > > > > > will
> > > > > > >> be
> > > > > > >>>>>>> executed simultaneously.
> > > > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add this
> > > point
> > > > > to
> > > > > > >>>> FLIP).
> > > > > > >>>>>>> The runtime filter is generally chained with the large
> > table
> > > > side
> > > > > > to
> > > > > > >>>>>> reduce
> > > > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job
> > > > vertices
> > > > > > >>>> should
> > > > > > >>>>>> be
> > > > > > >>>>>>> scheduled in topological order, so the large table side
> can
> > > > only
> > > > > be
> > > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> Are there some tests to show the default value of
> > > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G
> is a
> > > > good
> > > > > > >>>> default
> > > > > > >>>>>>> value.
> > > > > > >>>>>>> It's not tested yet, but it will be done before merge the
> > > code.
> > > > > The
> > > > > > >>>>>> current
> > > > > > >>>>>>> value refers to systems such as spark and hive. Before
> code
> > > > > > merging,
> > > > > > >> we
> > > > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of
> values.
> > If
> > > > you
> > > > > > >> have
> > > > > > >>>>>>> relevant experience on it, welcome to give some
> > suggestions.
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> What's the representation of the runtime filter node in
> > > > > planner ?
> > > > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new
> > > physical
> > > > > > >> nodes,
> > > > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Best,
> > > > > > >>>>>>> Lijie
> > > > > > >>>>>>>
> > > > > > >>>>>>> Aitozi <gjying1...@gmail.com <mailto:
> gjying1...@gmail.com>
> > > > > > <mailto:gjying1...@gmail.com> <mailto:
> > > > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>> <mailto:
> > > > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com> <mailto:
> > > > > > gjying1...@gmail.com>>>
> > > > > > >>>>>> 于2023年6月15日周四 15:52写道:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Hi Lijie,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Nice to see this valuable feature. After reading the
> FLIP
> > I
> > > > have
> > > > > > >>>> some
> > > > > > >>>>>>>> questions below:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Schedule the TableSource(dim) first.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> How does it know to schedule the TableSource(dim) first
> ?
> > > IMO,
> > > > > In
> > > > > > >> the
> > > > > > >>>>>>>> current implementation two source table operators will
> be
> > > > > executed
> > > > > > >>>>>>>> simultaneously.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> If the data volume on the probe side is too small, the
> > > > overhead
> > > > > > of
> > > > > > >>>>>>>> building runtime filter is not worth it.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Are there some tests to show the default value of
> > > > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G
> is
> > a
> > > > good
> > > > > > >>>> default
> > > > > > >>>>>>>> value. The same to
> > > > > > >> table.optimizer.runtime-filter.max-build-data-size
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> the runtime filter can be pushed down along the probe
> > side,
> > > > as
> > > > > > >> close
> > > > > > >>>> to
> > > > > > >>>>>>>> data sources as possible
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> What's the representation of the runtime filter node in
> > > > planner
> > > > > ?
> > > > > > Is
> > > > > > >>>> it
> > > > > > >>>>>> a
> > > > > > >>>>>>>> Filternode
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Best,
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Aitozi.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Benchao Li <libenc...@apache.org <mailto:
> > > libenc...@apache.org
> > > > >
> > > > > > <mailto:libenc...@apache.org>
> > > > > > >> <mailto:libenc...@apache.org>>
> > > > > > >>>> 于2023年6月15日周四 14:30写道:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>>> Hi Lijie,
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Regarding the shuffle mode, I think it would be
> > reasonable
> > > to
> > > > > > also
> > > > > > >>>>>>>> support
> > > > > > >>>>>>>>> "pipeline shuffle" if possible.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP
> computing,
> > > > > > although
> > > > > > >>>> this
> > > > > > >>>>>>>> has
> > > > > > >>>>>>>>> not been much exposed to users for now, I know a few
> > > > companies
> > > > > > that
> > > > > > >>>>>> uses
> > > > > > >>>>>>>>> Flink as a MPP computing engine, and there is an
> ongoing
> > > > > > effort[1]
> > > > > > >> to
> > > > > > >>>>>>>> make
> > > > > > >>>>>>>>> this usage more powerful.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter
> > > becomes
> > > > > > >> running
> > > > > > >>>>>>>> before
> > > > > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process
> > any
> > > > data
> > > > > > and
> > > > > > >>>>>> will
> > > > > > >>>>>>>>> occupy resources", whether it benefits us depends on
> the
> > > > scale
> > > > > of
> > > > > > >>>> data,
> > > > > > >>>>>>>> if
> > > > > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than
> > > > > RuntimeFilter
> > > > > > >>>>>>>> operator,
> > > > > > >>>>>>>>> it can still filter out additional data afterwards.
> Hence
> > > in
> > > > my
> > > > > > >>>>>> opinion,
> > > > > > >>>>>>>> we
> > > > > > >>>>>>>>> do not need to make the edge between
> RuntimeFilterBuilder
> > > and
> > > > > > >>>>>>>> RuntimeFilter
> > > > > > >>>>>>>>> BLOCKING only, at least it can be configured.
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> [1]
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > > > <mailto:
> > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > > > <mailto:
> > > > > > wangdachui9...@gmail.com>> <mailto:
> > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> > > > > > >>>>>> 于2023年6月15日周四 14:18写道:
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>> Hi Yuxia,
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> I made a mistake in the above response.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> The runtime filter can work well with all shuffle
> mode.
> > > > > However,
> > > > > > >>>>>> hybrid
> > > > > > >>>>>>>>>> shuffle and blocking shuffle are currently recommended
> > for
> > > > > batch
> > > > > > >>>> jobs
> > > > > > >>>>>>>>>> (piepline shuffle is not recommended).
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> One more thing to mention here is that we will force
> the
> > > > edge
> > > > > > >>>> between
> > > > > > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be
> > > > > BLOCKING(regardless
> > > > > > >> of
> > > > > > >>>>>>>> which
> > > > > > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter
> > really
> > > > > > doesn’t
> > > > > > >>>>>> need
> > > > > > >>>>>>>>> to
> > > > > > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if
> > the
> > > > > > >>>>>> RuntimeFilter
> > > > > > >>>>>>>>>> becomes running before the RuntimeFilterBuilder
> > finished,
> > > it
> > > > > > will
> > > > > > >>>> not
> > > > > > >>>>>>>>>> process any data and will occupy resources.
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Best,
> > > > > > >>>>>>>>>> Lijie
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > > > <mailto:
> > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > > > <mailto:
> > > > > > wangdachui9...@gmail.com>> <mailto:
> > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> > > > > > >>>>>> 于2023年6月15日周四 09:48写道:
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>>> Hi Yuxia,
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Thanks for your feedback. The answers of your
> questions
> > > are
> > > > > as
> > > > > > >>>>>>>> follows:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> 1. Yes, the row count comes from statistic of
> > underlying
> > > > > > table(Or
> > > > > > >>>>>>>>>>> estimated based on the statistic of underlying table,
> > if
> > > > the
> > > > > > >> build
> > > > > > >>>>>>>> side
> > > > > > >>>>>>>>>> or
> > > > > > >>>>>>>>>>> probe side is not TableScan).  If the statistic
> > > > unavailable,
> > > > > we
> > > > > > >>>> will
> > > > > > >>>>>>>>> not
> > > > > > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly
> > > evaluate
> > > > > the
> > > > > > >>>>>>>>>> benefits).
> > > > > > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build side
> > is
> > > > also
> > > > > > >> based
> > > > > > >>>>>>>> on
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>> row count statistics, that is, if the statistics is
> > > > > > unavailable,
> > > > > > >>>> the
> > > > > > >>>>>>>>>>> requirement
> > > > > > "table.optimizer.runtime-filter.max-build-data-size"
> > > > > > >>>>>>>> cannot
> > > > > > >>>>>>>>>> be
> > > > > > >>>>>>>>>>> evaluated either. I'll add this point into FLIP.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> 2.
> > > > > > >>>>>>>>>>> Estimated data size does not meet requirement (in
> > planner
> > > > > > >>>>>>>> optimization
> > > > > > >>>>>>>>>>> phase) -> No filter
> > > > > > >>>>>>>>>>> Estimated data size meets the requirement (in planner
> > > > > > >> optimization
> > > > > > >>>>>>>>>> phase),
> > > > > > >>>>>>>>>>> but the real data size does not meet the
> requirement(in
> > > > > > execution
> > > > > > >>>>>>>>> phase)
> > > > > > >>>>>>>>>> ->
> > > > > > >>>>>>>>>>> Fake filter
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch
> > > jobs/blocking
> > > > > > >> shuffle.
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>> Lijie
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto:
> > > > > > luoyu...@alumni.sjtu.edu.cn> <mailto:
> > > > > > >> luoyu...@alumni.sjtu.edu.cn <mailto:
> luoyu...@alumni.sjtu.edu.cn
> > >>
> > > > > > <mailto:
> > > > > > >>>> luoyu...@alumni.sjtu.edu.cn <mailto:
> > luoyu...@alumni.sjtu.edu.cn
> > > >
> > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>>
> > > > > > >> <mailto:
> > > > > > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto:
> > > luoyu...@alumni.sjtu.edu.cn
> > > > >
> > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>
> > > > > > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>>
> > > > > > >>>> 于2023年6月14日周三 20:37写道:
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited
> to
> > > see
> > > > > > >> runtime
> > > > > > >>>>>>>>> filter
> > > > > > >>>>>>>>>>>> is to be implemented in Flink.
> > > > > > >>>>>>>>>>>> I have few questions about it:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be
> estimated,
> > > use
> > > > > row
> > > > > > >>>> count
> > > > > > >>>>>>>>>>>> instead`. So, does row count comes from the
> statistic
> > > from
> > > > > > >>>>>>>> underlying
> > > > > > >>>>>>>>>>>> table? What if the the statistic is also unavailable
> > > > > > considering
> > > > > > >>>>>>>> users
> > > > > > >>>>>>>>>>>> maynot always remember to generate statistic in
> > > > production.
> > > > > > >>>>>>>>>>>> I'm wondering whether it make senese that just
> disable
> > > > > runtime
> > > > > > >>>>>>>> filter
> > > > > > >>>>>>>>> if
> > > > > > >>>>>>>>>>>> statistic is unavailable since in that case, we can
> > > hardly
> > > > > > >>>> evaluate
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>>> benefits of runtime-filter.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime
> filters
> > > only
> > > > > if
> > > > > > >> the
> > > > > > >>>>>>>>>>>> following requirements are met:xxx", but it also
> said,
> > > > "Once
> > > > > > >> this
> > > > > > >>>>>>>>> limit
> > > > > > >>>>>>>>>> is
> > > > > > >>>>>>>>>>>> exceeded, it will output a fake filter(which always
> > > > returns
> > > > > > >> true)"
> > > > > > >>>>>>>> in
> > > > > > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are
> > > > > > >> contradictory,
> > > > > > >>>>>>>> so
> > > > > > >>>>>>>>>> i'm
> > > > > > >>>>>>>>>>>> wondering what's the real behavior, no filter will
> be
> > > > > injected
> > > > > > >> or
> > > > > > >>>>>>>> fake
> > > > > > >>>>>>>>>>>> filter?
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take
> > effect
> > > > in
> > > > > > >>>> blocking
> > > > > > >>>>>>>>>>>> shuffle?
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best regards,
> > > > > > >>>>>>>>>>>> Yuxia
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> ----- 原始邮件 -----
> > > > > > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto:
> > > > > > ron9....@gmail.com> <mailto:ron9....@gmail.com>
> > > > > > >> <mailto:ron9....@gmail.com>
> > > > > > >>>> <mailto:ron9....@gmail.com>>
> > > > > > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto:
> > > > > dev@flink.apache.org>
> > > > > > <mailto:dev@flink.apache.org>
> > > > > > >> <mailto:dev@flink.apache.org>
> > > > > > >>>> <mailto:dev@flink.apache.org>>
> > > > > > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> > > > > > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter
> > for
> > > > > Flink
> > > > > > >>>> Batch
> > > > > > >>>>>>>>>> Jobs
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter
> is
> > a
> > > > > common
> > > > > > >>>>>>>>>>>> optimization
> > > > > > >>>>>>>>>>>> to improve the join performance that has been
> adopted
> > by
> > > > > many
> > > > > > >>>>>>>>> computing
> > > > > > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a
> > > streaming
> > > > > > batch
> > > > > > >>>>>>>>>> computing
> > > > > > >>>>>>>>>>>> engine, and we are continuously optimizing the
> > > performance
> > > > > of
> > > > > > >>>>>>>> batches.
> > > > > > >>>>>>>>>>>> Runtime filter is a general performance optimization
> > > > > technique
> > > > > > >>>> that
> > > > > > >>>>>>>>> can
> > > > > > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we
> are
> > > > > > >> introducing
> > > > > > >>>>>>>> it
> > > > > > >>>>>>>>> on
> > > > > > >>>>>>>>>>>> batch as well.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Looking forward to all feedback.
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>> Ron
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > > > <mailto:
> > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>
> > > > <mailto:
> > > > > > wangdachui9...@gmail.com>> <mailto:
> > > > > > >>>>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >
> > > > > <mailto:
> > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>>
> > > > > > >>>> 于2023年6月14日周三 17:17写道:
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Hi devs
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a
> > discussion
> > > > > about
> > > > > > >>>>>>>>>> FLIP-324:
> > > > > > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Runtime Filter is a common optimization to improve
> > join
> > > > > > >>>>>>>> performance.
> > > > > > >>>>>>>>>> It
> > > > > > >>>>>>>>>>>> is
> > > > > > >>>>>>>>>>>>> designed to dynamically generate filter conditions
> > for
> > > > > > certain
> > > > > > >>>>>>>> Join
> > > > > > >>>>>>>>>>>> queries
> > > > > > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or
> > shuffled
> > > > > data,
> > > > > > >>>> avoid
> > > > > > >>>>>>>>>>>>> unnecessary I/O and network transmission, and speed
> > up
> > > > the
> > > > > > >> query.
> > > > > > >>>>>>>>> Its
> > > > > > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom
> > > filter)
> > > > > > based
> > > > > > >>>> on
> > > > > > >>>>>>>>> the
> > > > > > >>>>>>>>>>>> data
> > > > > > >>>>>>>>>>>>> on the small table side(build side) first, then
> pass
> > > this
> > > > > > >> filter
> > > > > > >>>>>>>> to
> > > > > > >>>>>>>>>> the
> > > > > > >>>>>>>>>>>>> large table side(probe side) to filter the
> irrelevant
> > > > data
> > > > > on
> > > > > > >> it,
> > > > > > >>>>>>>>> this
> > > > > > >>>>>>>>>>>> can
> > > > > > >>>>>>>>>>>>> reduce the data reaching the join and improve
> > > > performance.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> You can find more details in the FLIP-324[1].
> Looking
> > > > > forward
> > > > > > >> to
> > > > > > >>>>>>>>> your
> > > > > > >>>>>>>>>>>>> feedback.
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> [1]
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>> Best,
> > > > > > >>>>>>>>>>>>> Ron & Gen & Lijie
> > > > > > >>>>>>>>>>>>>
> > > > > > >>>>>>>>>>>>
> > > > > > >>>>>>>>>>>
> > > > > > >>>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> --
> > > > > > >>>>>>>>>
> > > > > > >>>>>>>>> Best,
> > > > > > >>>>>>>>> Benchao Li
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to