Hi all,

Thanks for all the feedback about this FLIP.  If there are no other
questions or concerns, I will start voting tomorrow(Friday, June 23rd).

Best,
Lijie

Jing Ge <j...@ververica.com.invalid> 于2023年6月21日周三 17:14写道:

> Hi Ron,
>
> Thanks for sharing your thoughts! It makes sense. It would be helpful if
> these references of Hive, Polardb, etc. could be added into the FLIP.
>
> Best regards,
> Jing
>
> On Tue, Jun 20, 2023 at 5:41 PM liu ron <ron9....@gmail.com> wrote:
>
> > Hi, Jing
> >
> > The default value for this ratio is a reference to other systems, such as
> > Hive. As long as Runtime Filter can filter out more than half of the
> data,
> > we can benefit from it. Of course, normally, as long as we can get the
> > statistics, ndv are present, the use of rowCount should be less, so I
> think
> > the formula is valid in most cases. This formula we are also borrowed
> from
> > some systems, such as the polardb of AliCloud. your concern is valuable
> for
> > this FLIP, but currently, we do not know how to adjust is reasonably, too
> > complex may lead to the user also can not understand, so I think we
> should
> > first follow the simple way, the subsequent gradual optimization. The
> first
> > step may be that we can verify the reasonableness of current formula by
> > TPC-DS case.
> >
> > Best,
> > Ron
> >
> > Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 19:46写道:
> >
> > > Hi Ron,
> > >
> > > Thanks for the clarification. That answered my questions.
> > >
> > > Regarding the ratio, since my gut feeling is that any value less than
> 0.8
> > > or 0.9 won't help too much(I might be wrong). I was thinking of
> adapting
> > > the formula to somehow map the current 0.9-1 to 0-1, i.e. if user
> config
> > > 0.5, it will be mapped to e.g. 0.95 (or e.g. 0.85, the real number
> > > needs more calculation) for the current formula described in the FLIP.
> > But
> > > I am not sure it is a feasible solution. It deserves more discussion.
> > Maybe
> > > some real performance tests could give us some hints.
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Tue, Jun 20, 2023 at 5:19 AM liu ron <ron9....@gmail.com> wrote:
> > >
> > > > Hi, Jing
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > > Afaiu, the runtime Filter will only be Injected when the gap
> between
> > > the
> > > > build data size and prob data size is big enough. Let's make an
> extreme
> > > > example. If the small table(build side) has one row and the large
> > > > table(probe side) contains tens of billions of rows. This will be the
> > > ideal
> > > > use case for the runtime filter and the improvement will be
> > significant.
> > > Is
> > > > this correct?
> > > >
> > > > Yes, you are right.
> > > >
> > > > > Speaking of the "Conditions of injecting Runtime Filter" in the
> FLIP,
> > > > will
> > > > the value of max-build-data-size and min-prob-data-size depend on the
> > > > parallelism config? I.e. with the same data-size setting, is it
> > possible
> > > to
> > > > inject or don't inject runtime filters by adjusting the parallelism?
> > > >
> > > > First, let me clarify two points. The first is that RuntimeFilter
> > decides
> > > > whether to inject or not in the optimization phase, but we do not
> > > consider
> > > > operator parallelism in the SQL optimization phase currently, which
> is
> > > set
> > > > at the ExecNode level. The second is that in batch mode, the default
> > > > AdaptiveBatchScheduler[1] is now used, which will derive the
> > parallelism
> > > of
> > > > the downstream operator based on the amount of data produced by the
> > > > upstream operator, that is, the parallelism is determined by runtime
> > > > adaptation. In the above case, we cannot decide whether to inject
> > > > BloomFilter in the optimization stage based on parallelism.
> > > > A more important point is that the purpose of Runtime Filter is to
> > reduce
> > > > the amount of data for shuffle, and thus the amount of data processed
> > by
> > > > the downstream join operator. Therefore, I understand that regardless
> > of
> > > > the parallelism of the probe, the amount of data in the shuffle must
> be
> > > > reduced after inserting the Runtime Filter, which is beneficial to
> the
> > > join
> > > > operator, so whether to insert the RuntimeFilter or not is not
> > dependent
> > > on
> > > > the parallelism.
> > > >
> > > > > Does it make sense to reconsider the formula of ratio
> > > > calculation to help users easily control the filter injection?
> > > >
> > > > Only when ndv does not exist will row count be considered. when size
> > uses
> > > > the default value and ndv cannot be taken, it is true that this
> > condition
> > > > may always hold, but this does not seem to affect anything, and the
> > user
> > > is
> > > > also likely to change the value of the size. One question, how do you
> > > think
> > > > we should make it easier for users to control the  filter injection?
> > > >
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月20日周二 07:11写道:
> > > >
> > > > > Hi Lijie,
> > > > >
> > > > > Thanks for your proposal. It is a really nice feature. I'd like to
> > ask
> > > a
> > > > > few questions to understand your thoughts.
> > > > >
> > > > > Afaiu, the runtime Filter will only be Injected when the gap
> between
> > > the
> > > > > build data size and prob data size is big enough. Let's make an
> > extreme
> > > > > example. If the small table(build side) has one row and the large
> > > > > table(probe side) contains tens of billions of rows. This will be
> the
> > > > ideal
> > > > > use case for the runtime filter and the improvement will be
> > > significant.
> > > > Is
> > > > > this correct?
> > > > >
> > > > > Speaking of the "Conditions of injecting Runtime Filter" in the
> FLIP,
> > > > will
> > > > > the value of max-build-data-size and min-prob-data-size depend on
> the
> > > > > parallelism config? I.e. with the same data-size setting, is it
> > > possible
> > > > to
> > > > > inject or don't inject runtime filters by adjusting the
> parallelism?
> > > > >
> > > > > In the FLIP, there are default values for the new configuration
> > > > parameters
> > > > > that will be used to check the injection condition. If ndv cannot
> be
> > > > > estimated, row count will be used. Given the max-build-data-size is
> > > 10MB
> > > > > and the min-prob-data-size is 10GB, in the worst case, the
> > > > min-filter-ratio
> > > > > will be 0.999, i.e. the probeNdv is 1000 times buildNdv . If we
> > > consider
> > > > > the duplication and the fact that the large table might have more
> > > columns
> > > > > than the small table, the probeNdv should still be 100 or 10 times
> > > > > buildNdv, which ends up with a min-filter-ratio equals to 0.99 or
> > 0.9.
> > > > Both
> > > > > are bigger than the default value 0.5 in the FLIP. If I am not
> > > mistaken,
> > > > > commonly, a min-filter-ratio less than 0.99 will always allow
> > injecting
> > > > the
> > > > > runtime filter. Does it make sense to reconsider the formula of
> ratio
> > > > > calculation to help users easily control the filter injection?
> > > > >
> > > > > Best regards,
> > > > > Jing
> > > > >
> > > > > On Mon, Jun 19, 2023 at 4:42 PM Lijie Wang <
> wangdachui9...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi Stefan,
> > > > > >
> > > > > > >> bypassing the dataflow
> > > > > > I believe it's a possible solution, but it may require more
> > > > coordination
> > > > > > and extra conditions (such as DFS), I do think it should be
> > excluded
> > > > from
> > > > > > the first version. I'll put it in Future+Improvements as a
> > potential
> > > > > > improvement.
> > > > > >
> > > > > > Thanks again for your quick reply :)
> > > > > >
> > > > > > Best,
> > > > > > Lijie
> > > > > >
> > > > > > Stefan Richter <srich...@confluent.io.invalid> 于2023年6月19日周一
> > > 20:51写道:
> > > > > >
> > > > > > >
> > > > > > > Hi Lijie,
> > > > > > >
> > > > > > > I think you understood me correctly. But I would not consider
> > this
> > > a
> > > > > true
> > > > > > > cyclic dependency in the dataflow because I would not suggest
> to
> > > send
> > > > > the
> > > > > > > filter through an edge in the job graph from join to scan. I’d
> > > rather
> > > > > > > bypass the stream graph for exchanging bringing the filter to
> the
> > > > scan.
> > > > > > For
> > > > > > > example, the join could report the filter after the build
> phase,
> > > e.g.
> > > > > to
> > > > > > > the JM or a predefined DFS folder. And when the probe scan is
> > > > > scheduled,
> > > > > > > the JM provides the filter information to the scan when it gets
> > > > > scheduled
> > > > > > > for execution or the scan looks in DFS if it can find any
> filter
> > > that
> > > > > it
> > > > > > > can use as part of initialization. I’m not suggesting to do it
> > > > exactly
> > > > > in
> > > > > > > those ways, but just to show what I mean by "bypassing the
> > > dataflow".
> > > > > > >
> > > > > > > Anyways, I’m fine with excluding this optimization from the
> > current
> > > > > FLIP
> > > > > > > if you believe it would be hard to implement in Flink.
> > > > > > >
> > > > > > > Best,
> > > > > > > Stefan
> > > > > > >
> > > > > > >
> > > > > > > > On 19. Jun 2023, at 14:07, Lijie Wang <
> > wangdachui9...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Stefan,
> > > > > > > >
> > > > > > > > If I understand correctly(I hope so), the hash join operator
> > > needs
> > > > to
> > > > > > > send
> > > > > > > > the bloom filter to probe scan, and probe scan also needs to
> > send
> > > > the
> > > > > > > > filtered data to the hash join operator. This means there
> will
> > > be a
> > > > > > cycle
> > > > > > > > in the data flow, it will be hard for current Flink to
> schedule
> > > > this
> > > > > > kind
> > > > > > > > of graph. I admit we can find a way to do this, but that's
> > > > probably a
> > > > > > > > bit outside the scope of this FLIP.  So let's do these
> complex
> > > > > > > > optimizations later, WDYT?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Lijie
> > > > > > > >
> > > > > > > > Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > > > > srich...@confluent.io.invalid>> 于2023年6月19日周一 18:15写道:
> > > > > > > >
> > > > > > > >> Hi Lijie,
> > > > > > > >>
> > > > > > > >> Exactly, my proposal was to build the bloom filter in the
> hash
> > > > > > > operator. I
> > > > > > > >> don’t know about all the details about the implementation of
> > > > Flink’s
> > > > > > > join
> > > > > > > >> operator, but I’d assume that even if the join is a two
> input
> > > > > operator
> > > > > > > it
> > > > > > > >> gets scheduled for 2 different pipelines. First the build
> > phase
> > > > with
> > > > > > the
> > > > > > > >> scan from the dimension table and after that’s completed the
> > > probe
> > > > > > phase
> > > > > > > >> with the scan of the fact table. I’m not proposing the use
> the
> > > > bloom
> > > > > > > filter
> > > > > > > >> only in the join operator, but rather send the bloom filter
> to
> > > the
> > > > > > probe
> > > > > > > >> scan before starting the probe. I assume this would require
> > some
> > > > > form
> > > > > > of
> > > > > > > >> side channel to transport the filter and coordination to
> tell
> > > the
> > > > > > > sources
> > > > > > > >> that such a filter is available. I cannot answer how hard
> > those
> > > > > would
> > > > > > > be to
> > > > > > > >> implement, but the idea doesn’t seem impossible to me.
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Stefan
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>> On 19. Jun 2023, at 11:56, Lijie Wang <
> > > wangdachui9...@gmail.com>
> > > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>> Hi Stefan,
> > > > > > > >>>
> > > > > > > >>> Now I know what you mean about point 1. But currently it is
> > > > > > unfeasible
> > > > > > > >> for
> > > > > > > >>> Flink, because the building of the hash table is inside the
> > > hash
> > > > > join
> > > > > > > >>> operator. The hash join operator has two inputs, it will
> > first
> > > > > > process
> > > > > > > >> the
> > > > > > > >>> data of the build-input to build a hash table, and then use
> > the
> > > > > hash
> > > > > > > >> table
> > > > > > > >>> to process the data of the probe-input. If we want to use
> the
> > > > built
> > > > > > > hash
> > > > > > > >>> table to deduplicate data for bloom filter, we must put the
> > > bloom
> > > > > > > filter
> > > > > > > >>> inside the hash join operator.  However, in this way, the
> > data
> > > > > > reaching
> > > > > > > >> the
> > > > > > > >>> join operator cannot be reduced (the shuffle/network
> overhead
> > > > > cannot
> > > > > > be
> > > > > > > >>> reduced), which is not what we expected.
> > > > > > > >>>
> > > > > > > >>> Regarding the filter type, I agree with you, more types of
> > > > filters
> > > > > > can
> > > > > > > >>> get further
> > > > > > > >>> optimization,  and it is in our future plan (We described
> it
> > in
> > > > the
> > > > > > > >> section
> > > > > > > >>> Future+Improvements#More+underlying+implementations).
> > > > > > > >>>
> > > > > > > >>> Best,
> > > > > > > >>> Lijie
> > > > > > > >>>
> > > > > > > >>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > > > > srich...@confluent.io.invalid> <mailto:
> > > > > > > >> srich...@confluent.io.invalid <mailto:
> > > > srich...@confluent.io.invalid
> > > > > > >>>
> > > > > > > 于2023年6月19日周一 15:58写道:
> > > > > > > >>>
> > > > > > > >>>>
> > > > > > > >>>> Hi Lijie,
> > > > > > > >>>>
> > > > > > > >>>> thanks for your response, I agree with what you said about
> > > > points
> > > > > 2
> > > > > > > and
> > > > > > > >> 3.
> > > > > > > >>>> Let me explain a bit more about point 1. This would not
> > apply
> > > to
> > > > > all
> > > > > > > >> types
> > > > > > > >>>> of joins and my suggestion is also *not* to build a hash
> > table
> > > > > only
> > > > > > > for
> > > > > > > >> the
> > > > > > > >>>> purpose to build the bloom filter.
> > > > > > > >>>> I was thinking about the scenario of a hash join, where
> you
> > > > would
> > > > > > > build
> > > > > > > >>>> the hash table as part of the join algorithm anyways and
> > then
> > > > use
> > > > > > the
> > > > > > > >>>> keyset of that hash table to 1) have better insights on
> > about
> > > > NDV
> > > > > > and
> > > > > > > >> 2) be
> > > > > > > >>>> able to construct the bloom filter without duplicates and
> > > > > therefore
> > > > > > > >> faster.
> > > > > > > >>>> So the preconditions where I would use this is if you are
> > > > > building a
> > > > > > > >> hash
> > > > > > > >>>> table as part of the join and you know you are not
> building
> > > for
> > > > a
> > > > > > key
> > > > > > > >>>> column (because there would be no duplicates to
> eliminate).
> > > Then
> > > > > > your
> > > > > > > >> bloom
> > > > > > > >>>> filter construction could benefit already from the
> > > deduplication
> > > > > > work
> > > > > > > >> that
> > > > > > > >>>> was done for building the hash table.
> > > > > > > >>>>
> > > > > > > >>>> I also wanted to point out that besides bloom filter and
> IN
> > > > filter
> > > > > > you
> > > > > > > >>>> could also think of other types of filter that can become
> > > > > > interesting
> > > > > > > >> for
> > > > > > > >>>> certain distributions and meta data. For example, if you
> > have
> > > > > > min/max
> > > > > > > >>>> information about columns and partitions you could have a
> > bit
> > > > > vector
> > > > > > > >>>> represent equilibrium-sized ranges of the key space
> between
> > > min
> > > > > and
> > > > > > > max
> > > > > > > >> and
> > > > > > > >>>> have the bits represent what part of the range is present
> > and
> > > > push
> > > > > > > that
> > > > > > > >>>> information down to the scan.
> > > > > > > >>>>
> > > > > > > >>>> Best,
> > > > > > > >>>> Stefan
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>> On 19. Jun 2023, at 08:26, Lijie Wang <
> > > > wangdachui9...@gmail.com
> > > > > > > <mailto:wangdachui9...@gmail.com>>
> > > > > > > >> wrote:
> > > > > > > >>>>>
> > > > > > > >>>>> Hi Stefan,
> > > > > > > >>>>>
> > > > > > > >>>>> Thanks for your feedback. Let me briefly summarize the
> > > > > optimization
> > > > > > > >>>> points
> > > > > > > >>>>> you mentioned above (Please correct me if I'm wrong):
> > > > > > > >>>>>
> > > > > > > >>>>> 1. Build an extra hash table for deduplication before
> > > building
> > > > > the
> > > > > > > >> bloom
> > > > > > > >>>>> filter.
> > > > > > > >>>>> 2. Use the two-phase approach to build the bloom
> > filter(first
> > > > > > local,
> > > > > > > >> then
> > > > > > > >>>>> OR-combine).
> > > > > > > >>>>> 3. Use blocked bloom filters to improve the cache
> > efficiency.
> > > > > > > >>>>>
> > > > > > > >>>>> For the above 3 points, I have the following questions or
> > > > > opinions:
> > > > > > > >>>>>
> > > > > > > >>>>> For point 1, it seems that building a hash table also
> > > requires
> > > > > > > >> traversing
> > > > > > > >>>>> all build side data, and the overhead seems to be the
> same
> > as
> > > > > > > building
> > > > > > > >> a
> > > > > > > >>>>> bloom filter directly? In addition, the hash table will
> > take
> > > up
> > > > > > more
> > > > > > > >>>> space
> > > > > > > >>>>> when the amount of data is large, which is why we choose
> to
> > > use
> > > > > > bloom
> > > > > > > >>>>> filter instead of hash table.
> > > > > > > >>>>>
> > > > > > > >>>>> For point 2, I think it's a good idea to use the
> two-phase
> > > > > approach
> > > > > > > to
> > > > > > > >>>>> build the bloom filter. But rather than directly
> > broadcasting
> > > > the
> > > > > > > local
> > > > > > > >>>>> bloom filter to the probe side, I prefer to introduce a
> > > global
> > > > > node
> > > > > > > for
> > > > > > > >>>> the
> > > > > > > >>>>> OR-combine(like two-phase-agg[1]), then broadcast the
> > > combined
> > > > > > bloom
> > > > > > > >>>> filter
> > > > > > > >>>>> to the probe side. The latter can reduce the amount of
> data
> > > > > > > transferred
> > > > > > > >>>> by
> > > > > > > >>>>> the network. I will change the FLIP like this.
> > > > > > > >>>>>
> > > > > > > >>>>> For point 3, I think it's a nice optimization, but I
> prefer
> > > to
> > > > > put
> > > > > > it
> > > > > > > >> to
> > > > > > > >>>>> the future improvements. There is already an
> implementation
> > > of
> > > > > > bloom
> > > > > > > >>>> filter
> > > > > > > >>>>> in flink, we can simply reuse it. Introducing a new bloom
> > > > filter
> > > > > > > >>>>> implementation introduces some complexity  (we need to
> > > > implement
> > > > > > it,
> > > > > > > >> test
> > > > > > > >>>>> it, etc), and is not the focus of this FLIP.
> > > > > > > >>>>>
> > > > > > > >>>>> [1]
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/%252523local-global-aggregation%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2eoXknGWmG4TSiznxtHFWG%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3V4Sv1o119cpU4xfP0ifkj&source=gmail-imap&ust=1687781326000000&usg=AOvVaw033xxrkenJpx27XzCVKsda
> > > > > > > >>>>>
> > > > > > > >>>>> Best,
> > > > > > > >>>>> Lijie
> > > > > > > >>>>>
> > > > > > > >>>>> Stefan Richter <srich...@confluent.io.invalid <mailto:
> > > > > > > srich...@confluent.io.invalid> <mailto:
> > > > > > > >> srich...@confluent.io.invalid <mailto:
> > > > srich...@confluent.io.invalid
> > > > > >>
> > > > > > > <mailto:
> > > > > > > >>>> srich...@confluent.io.invalid <mailto:
> > > > > srich...@confluent.io.invalid
> > > > > > >
> > > > > > > <mailto:srich...@confluent.io.invalid>>>
> > > > > > > >> 于2023年6月16日周五 16:45写道:
> > > > > > > >>>>>
> > > > > > > >>>>>> Hi,
> > > > > > > >>>>>>
> > > > > > > >>>>>> Thanks for the proposal of this feature! I have a
> question
> > > > about
> > > > > > the
> > > > > > > >>>>>> filter build and a some suggestions for potential
> > > > improvements.
> > > > > > > >> First, I
> > > > > > > >>>>>> wonder why you suggest to run the filter builder as
> > separate
> > > > > > > operator
> > > > > > > >>>> with
> > > > > > > >>>>>> parallelism 1. I’d suggest to integrate the filter
> > > distributed
> > > > > > build
> > > > > > > >>>> with
> > > > > > > >>>>>> the hash table build phase as follows:
> > > > > > > >>>>>>
> > > > > > > >>>>>> 1. Build the hash table completely in each subtask.
> > > > > > > >>>>>> 2. The keyset of the hash table is giving us a precise
> NDV
> > > > count
> > > > > > for
> > > > > > > >>>> every
> > > > > > > >>>>>> subtask.
> > > > > > > >>>>>> 3. Build a filter from the subtask hash table. For low
> > > > > cardinality
> > > > > > > >>>> tables,
> > > > > > > >>>>>> I’d go with the suggested optimization of IN-filter.
> > > > > > > >>>>>> 4. Each build subtask transfers the local bloom filter
> to
> > > all
> > > > > > probe
> > > > > > > >>>>>> operators.
> > > > > > > >>>>>> 5. On the probe operator we can either probe against the
> > > > > > individual
> > > > > > > >>>>>> filters, or we OR-combine all subtask filters into
> > > aggregated
> > > > > > bloom
> > > > > > > >>>> filter.
> > > > > > > >>>>>>
> > > > > > > >>>>>> I’m suggesting this because building inserting into a
> > > (larger)
> > > > > > bloom
> > > > > > > >>>>>> filter can be costly, especially once the filter exceeds
> > > cache
> > > > > > sizes
> > > > > > > >>>> and is
> > > > > > > >>>>>> therefor better parallelized. First inserting into the
> > hash
> > > > > table
> > > > > > > also
> > > > > > > >>>>>> deduplicates the keys and we avoid inserting records
> twice
> > > > into
> > > > > > the
> > > > > > > >>>> bloom
> > > > > > > >>>>>> filter. If we want to improve cache efficiency for the
> > build
> > > > of
> > > > > > > larger
> > > > > > > >>>>>> filters, we could structure them as blocked bloom
> filters,
> > > > where
> > > > > > the
> > > > > > > >>>> filter
> > > > > > > >>>>>> is separated into blocks and all bits of one key go only
> > > into
> > > > > one
> > > > > > > >> block.
> > > > > > > >>>>>> That allows us to apply software managed buffering to
> > first
> > > > > group
> > > > > > > keys
> > > > > > > >>>> that
> > > > > > > >>>>>> go into the same partition (ideally fitting into cache)
> > and
> > > > then
> > > > > > > bulk
> > > > > > > >>>> load
> > > > > > > >>>>>> partitions once we collected enough keys for one round
> of
> > > > > loading.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Best,
> > > > > > > >>>>>> Stefan
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> <
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw3p0tBjuVsWz3SLYyPQukfL%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw1THgA9fFMrOd7QpGpwiRx6&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1f-3D9-2lZDGsvFBjeFlvn
> > > > > > > >>>>>
> > > > > > > >>>>>> Stefan Richter
> > > > > > > >>>>>> Principal Engineer II
> > > > > > > >>>>>>
> > > > > > > >>>>>> Follow us:  <
> > > > > > > >>>>>>
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.confluent.io/blog?utm_source%25253Dfooter%252526utm_medium%25253Demail%252526utm_campaign%25253Dch.email-signature_type.community_content.blog%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2VU_JTYB24Wp4bF2JshdU7%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw37ghBlQPqP0tTXCfNJCqKv&source=gmail-imap&ust=1687781326000000&usg=AOvVaw20v4QTnSyAz_HAHbMyVY7J
> > > > > > > >>>>>
> > > > > > > >>>>>> <
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://twitter.com/ConfluentInc%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw2irnDxUAhXR0N8FUk2orze%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0ItT553mEuA5KaeJWSH36D&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1mNvHaIwjIKU_gqOuDYLDK
> > > > > > > >>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>> On 15. Jun 2023, at 13:35, Lijie Wang <
> > > > > wangdachui9...@gmail.com
> > > > > > > <mailto:wangdachui9...@gmail.com>
> > > > > > > >> <mailto:wangdachui9...@gmail.com>
> > > > > > > >>>> <mailto:wangdachui9...@gmail.com>> wrote:
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Hi,  Benchao and Aitozi,
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Thanks for your feedback about this FLIP.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> @Benchao
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> I think it would be reasonable to also support
> > "pipeline
> > > > > > shuffle"
> > > > > > > >> if
> > > > > > > >>>>>>> possible.
> > > > > > > >>>>>>> As I said above, runtime filter can work well with all
> > > > shuffle
> > > > > > > mode,
> > > > > > > >>>>>>> including pipeline shuffle.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly
> than
> > > > > > > >> RuntimeFilter
> > > > > > > >>>>>>> operator, it can still filter out additional data
> > > afterwards.
> > > > > > > >>>>>>> I think the main purpose of runtime filter is to reduce
> > the
> > > > > > shuffle
> > > > > > > >>>> data
> > > > > > > >>>>>>> and the data arriving at join. Although eagerly running
> > the
> > > > > large
> > > > > > > >>>>>>> table side can process datas in advance, most of the
> data
> > > may
> > > > > be
> > > > > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing
> the
> > > > join.
> > > > > > In
> > > > > > > >>>>>>> addition, if the join is a hash-join, the probe side of
> > the
> > > > > > > hash-join
> > > > > > > >>>>>> also
> > > > > > > >>>>>>> needs to wait for its build side to complete, so the
> > large
> > > > > table
> > > > > > > side
> > > > > > > >>>> is
> > > > > > > >>>>>>> likely to be back-pressed.
> > > > > > > >>>>>>> In addition, I don't tend to add too many configuration
> > > > options
> > > > > > in
> > > > > > > >> the
> > > > > > > >>>>>>> first version, which may make it more difficult to use
> > > (users
> > > > > > need
> > > > > > > to
> > > > > > > >>>>>>> understand a lot of internal implementation details).
> > Maybe
> > > > it
> > > > > > > could
> > > > > > > >>>> be a
> > > > > > > >>>>>>> future improvement (if it's worthwhile)?
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> @Aitozi
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> IMO, In the current implementation two source table
> > > > operators
> > > > > > > will
> > > > > > > >> be
> > > > > > > >>>>>>> executed simultaneously.
> > > > > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add
> this
> > > > point
> > > > > > to
> > > > > > > >>>> FLIP).
> > > > > > > >>>>>>> The runtime filter is generally chained with the large
> > > table
> > > > > side
> > > > > > > to
> > > > > > > >>>>>> reduce
> > > > > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The
> job
> > > > > vertices
> > > > > > > >>>> should
> > > > > > > >>>>>> be
> > > > > > > >>>>>>> scheduled in topological order, so the large table side
> > can
> > > > > only
> > > > > > be
> > > > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> Are there some tests to show the default value of
> > > > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G
> > is a
> > > > > good
> > > > > > > >>>> default
> > > > > > > >>>>>>> value.
> > > > > > > >>>>>>> It's not tested yet, but it will be done before merge
> the
> > > > code.
> > > > > > The
> > > > > > > >>>>>> current
> > > > > > > >>>>>>> value refers to systems such as spark and hive. Before
> > code
> > > > > > > merging,
> > > > > > > >> we
> > > > > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of
> > values.
> > > If
> > > > > you
> > > > > > > >> have
> > > > > > > >>>>>>> relevant experience on it, welcome to give some
> > > suggestions.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> What's the representation of the runtime filter node
> in
> > > > > > planner ?
> > > > > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new
> > > > physical
> > > > > > > >> nodes,
> > > > > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Best,
> > > > > > > >>>>>>> Lijie
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Aitozi <gjying1...@gmail.com <mailto:
> > gjying1...@gmail.com>
> > > > > > > <mailto:gjying1...@gmail.com> <mailto:
> > > > > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>
> <mailto:
> > > > > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com>
> <mailto:
> > > > > > > gjying1...@gmail.com>>>
> > > > > > > >>>>>> 于2023年6月15日周四 15:52写道:
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>> Hi Lijie,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Nice to see this valuable feature. After reading the
> > FLIP
> > > I
> > > > > have
> > > > > > > >>>> some
> > > > > > > >>>>>>>> questions below:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>> Schedule the TableSource(dim) first.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> How does it know to schedule the TableSource(dim)
> first
> > ?
> > > > IMO,
> > > > > > In
> > > > > > > >> the
> > > > > > > >>>>>>>> current implementation two source table operators will
> > be
> > > > > > executed
> > > > > > > >>>>>>>> simultaneously.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>> If the data volume on the probe side is too small,
> the
> > > > > overhead
> > > > > > > of
> > > > > > > >>>>>>>> building runtime filter is not worth it.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Are there some tests to show the default value of
> > > > > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G
> > is
> > > a
> > > > > good
> > > > > > > >>>> default
> > > > > > > >>>>>>>> value. The same to
> > > > > > > >> table.optimizer.runtime-filter.max-build-data-size
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>> the runtime filter can be pushed down along the probe
> > > side,
> > > > > as
> > > > > > > >> close
> > > > > > > >>>> to
> > > > > > > >>>>>>>> data sources as possible
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> What's the representation of the runtime filter node
> in
> > > > > planner
> > > > > > ?
> > > > > > > Is
> > > > > > > >>>> it
> > > > > > > >>>>>> a
> > > > > > > >>>>>>>> Filternode
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Best,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Aitozi.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Benchao Li <libenc...@apache.org <mailto:
> > > > libenc...@apache.org
> > > > > >
> > > > > > > <mailto:libenc...@apache.org>
> > > > > > > >> <mailto:libenc...@apache.org>>
> > > > > > > >>>> 于2023年6月15日周四 14:30写道:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>> Hi Lijie,
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Regarding the shuffle mode, I think it would be
> > > reasonable
> > > > to
> > > > > > > also
> > > > > > > >>>>>>>> support
> > > > > > > >>>>>>>>> "pipeline shuffle" if possible.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP
> > computing,
> > > > > > > although
> > > > > > > >>>> this
> > > > > > > >>>>>>>> has
> > > > > > > >>>>>>>>> not been much exposed to users for now, I know a few
> > > > > companies
> > > > > > > that
> > > > > > > >>>>>> uses
> > > > > > > >>>>>>>>> Flink as a MPP computing engine, and there is an
> > ongoing
> > > > > > > effort[1]
> > > > > > > >> to
> > > > > > > >>>>>>>> make
> > > > > > > >>>>>>>>> this usage more powerful.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter
> > > > becomes
> > > > > > > >> running
> > > > > > > >>>>>>>> before
> > > > > > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not
> process
> > > any
> > > > > data
> > > > > > > and
> > > > > > > >>>>>> will
> > > > > > > >>>>>>>>> occupy resources", whether it benefits us depends on
> > the
> > > > > scale
> > > > > > of
> > > > > > > >>>> data,
> > > > > > > >>>>>>>> if
> > > > > > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than
> > > > > > RuntimeFilter
> > > > > > > >>>>>>>> operator,
> > > > > > > >>>>>>>>> it can still filter out additional data afterwards.
> > Hence
> > > > in
> > > > > my
> > > > > > > >>>>>> opinion,
> > > > > > > >>>>>>>> we
> > > > > > > >>>>>>>>> do not need to make the edge between
> > RuntimeFilterBuilder
> > > > and
> > > > > > > >>>>>>>> RuntimeFilter
> > > > > > > >>>>>>>>> BLOCKING only, at least it can be configured.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> [1]
> > > > > > > >>>>>>
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH&source=gmail-imap&ust=1687781326000000&usg=AOvVaw0HsmkkqPeZGZOBvFiA8NOA
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > > > > <mailto:
> > > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >
> > > > > <mailto:
> > > > > > > wangdachui9...@gmail.com>> <mailto:
> > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >>>
> > > > > > > >>>>>> 于2023年6月15日周四 14:18写道:
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>>> Hi Yuxia,
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> I made a mistake in the above response.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> The runtime filter can work well with all shuffle
> > mode.
> > > > > > However,
> > > > > > > >>>>>> hybrid
> > > > > > > >>>>>>>>>> shuffle and blocking shuffle are currently
> recommended
> > > for
> > > > > > batch
> > > > > > > >>>> jobs
> > > > > > > >>>>>>>>>> (piepline shuffle is not recommended).
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> One more thing to mention here is that we will force
> > the
> > > > > edge
> > > > > > > >>>> between
> > > > > > > >>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be
> > > > > > BLOCKING(regardless
> > > > > > > >> of
> > > > > > > >>>>>>>> which
> > > > > > > >>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter
> > > really
> > > > > > > doesn’t
> > > > > > > >>>>>> need
> > > > > > > >>>>>>>>> to
> > > > > > > >>>>>>>>>> run before the RuntimeFilterBuilder finished. Even
> if
> > > the
> > > > > > > >>>>>> RuntimeFilter
> > > > > > > >>>>>>>>>> becomes running before the RuntimeFilterBuilder
> > > finished,
> > > > it
> > > > > > > will
> > > > > > > >>>> not
> > > > > > > >>>>>>>>>> process any data and will occupy resources.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Best,
> > > > > > > >>>>>>>>>> Lijie
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > > > > <mailto:
> > > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >
> > > > > <mailto:
> > > > > > > wangdachui9...@gmail.com>> <mailto:
> > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >>>
> > > > > > > >>>>>> 于2023年6月15日周四 09:48写道:
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>> Hi Yuxia,
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Thanks for your feedback. The answers of your
> > questions
> > > > are
> > > > > > as
> > > > > > > >>>>>>>> follows:
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> 1. Yes, the row count comes from statistic of
> > > underlying
> > > > > > > table(Or
> > > > > > > >>>>>>>>>>> estimated based on the statistic of underlying
> table,
> > > if
> > > > > the
> > > > > > > >> build
> > > > > > > >>>>>>>> side
> > > > > > > >>>>>>>>>> or
> > > > > > > >>>>>>>>>>> probe side is not TableScan).  If the statistic
> > > > > unavailable,
> > > > > > we
> > > > > > > >>>> will
> > > > > > > >>>>>>>>> not
> > > > > > > >>>>>>>>>>> inject a runtime filter(As you said, we can hardly
> > > > evaluate
> > > > > > the
> > > > > > > >>>>>>>>>> benefits).
> > > > > > > >>>>>>>>>>> Besides, AFAIK, the estimated data size of build
> side
> > > is
> > > > > also
> > > > > > > >> based
> > > > > > > >>>>>>>> on
> > > > > > > >>>>>>>>>> the
> > > > > > > >>>>>>>>>>> row count statistics, that is, if the statistics is
> > > > > > > unavailable,
> > > > > > > >>>> the
> > > > > > > >>>>>>>>>>> requirement
> > > > > > > "table.optimizer.runtime-filter.max-build-data-size"
> > > > > > > >>>>>>>> cannot
> > > > > > > >>>>>>>>>> be
> > > > > > > >>>>>>>>>>> evaluated either. I'll add this point into FLIP.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> 2.
> > > > > > > >>>>>>>>>>> Estimated data size does not meet requirement (in
> > > planner
> > > > > > > >>>>>>>> optimization
> > > > > > > >>>>>>>>>>> phase) -> No filter
> > > > > > > >>>>>>>>>>> Estimated data size meets the requirement (in
> planner
> > > > > > > >> optimization
> > > > > > > >>>>>>>>>> phase),
> > > > > > > >>>>>>>>>>> but the real data size does not meet the
> > requirement(in
> > > > > > > execution
> > > > > > > >>>>>>>>> phase)
> > > > > > > >>>>>>>>>> ->
> > > > > > > >>>>>>>>>>> Fake filter
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> 3. Yes, the runtime filter is only for batch
> > > > jobs/blocking
> > > > > > > >> shuffle.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Best,
> > > > > > > >>>>>>>>>>> Lijie
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> yuxia <luoyu...@alumni.sjtu.edu.cn <mailto:
> > > > > > > luoyu...@alumni.sjtu.edu.cn> <mailto:
> > > > > > > >> luoyu...@alumni.sjtu.edu.cn <mailto:
> > luoyu...@alumni.sjtu.edu.cn
> > > >>
> > > > > > > <mailto:
> > > > > > > >>>> luoyu...@alumni.sjtu.edu.cn <mailto:
> > > luoyu...@alumni.sjtu.edu.cn
> > > > >
> > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>>
> > > > > > > >> <mailto:
> > > > > > > >>>>>> luoyu...@alumni.sjtu.edu.cn <mailto:
> > > > luoyu...@alumni.sjtu.edu.cn
> > > > > >
> > > > > > > <mailto:luoyu...@alumni.sjtu.edu.cn>
> > > > > > > >> <mailto:luoyu...@alumni.sjtu.edu.cn>>>
> > > > > > > >>>> 于2023年6月14日周三 20:37写道:
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Thanks Lijie for starting this discussion. Excited
> > to
> > > > see
> > > > > > > >> runtime
> > > > > > > >>>>>>>>> filter
> > > > > > > >>>>>>>>>>>> is to be implemented in Flink.
> > > > > > > >>>>>>>>>>>> I have few questions about it:
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> 1: As the FLIP said, `if the ndv cannot be
> > estimated,
> > > > use
> > > > > > row
> > > > > > > >>>> count
> > > > > > > >>>>>>>>>>>> instead`. So, does row count comes from the
> > statistic
> > > > from
> > > > > > > >>>>>>>> underlying
> > > > > > > >>>>>>>>>>>> table? What if the the statistic is also
> unavailable
> > > > > > > considering
> > > > > > > >>>>>>>> users
> > > > > > > >>>>>>>>>>>> maynot always remember to generate statistic in
> > > > > production.
> > > > > > > >>>>>>>>>>>> I'm wondering whether it make senese that just
> > disable
> > > > > > runtime
> > > > > > > >>>>>>>> filter
> > > > > > > >>>>>>>>> if
> > > > > > > >>>>>>>>>>>> statistic is unavailable since in that case, we
> can
> > > > hardly
> > > > > > > >>>> evaluate
> > > > > > > >>>>>>>>> the
> > > > > > > >>>>>>>>>>>> benefits of runtime-filter.
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> 2: The FLIP said: "We will inject the runtime
> > filters
> > > > only
> > > > > > if
> > > > > > > >> the
> > > > > > > >>>>>>>>>>>> following requirements are met:xxx", but it also
> > said,
> > > > > "Once
> > > > > > > >> this
> > > > > > > >>>>>>>>> limit
> > > > > > > >>>>>>>>>> is
> > > > > > > >>>>>>>>>>>> exceeded, it will output a fake filter(which
> always
> > > > > returns
> > > > > > > >> true)"
> > > > > > > >>>>>>>> in
> > > > > > > >>>>>>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they
> are
> > > > > > > >> contradictory,
> > > > > > > >>>>>>>> so
> > > > > > > >>>>>>>>>> i'm
> > > > > > > >>>>>>>>>>>> wondering what's the real behavior, no filter will
> > be
> > > > > > injected
> > > > > > > >> or
> > > > > > > >>>>>>>> fake
> > > > > > > >>>>>>>>>>>> filter?
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> 3: Does it also mean runtime-filter can only take
> > > effect
> > > > > in
> > > > > > > >>>> blocking
> > > > > > > >>>>>>>>>>>> shuffle?
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Best regards,
> > > > > > > >>>>>>>>>>>> Yuxia
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> ----- 原始邮件 -----
> > > > > > > >>>>>>>>>>>> 发件人: "ron9 liu" <ron9....@gmail.com <mailto:
> > > > > > > ron9....@gmail.com> <mailto:ron9....@gmail.com>
> > > > > > > >> <mailto:ron9....@gmail.com>
> > > > > > > >>>> <mailto:ron9....@gmail.com>>
> > > > > > > >>>>>>>>>>>> 收件人: "dev" <dev@flink.apache.org <mailto:
> > > > > > dev@flink.apache.org>
> > > > > > > <mailto:dev@flink.apache.org>
> > > > > > > >> <mailto:dev@flink.apache.org>
> > > > > > > >>>> <mailto:dev@flink.apache.org>>
> > > > > > > >>>>>>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> > > > > > > >>>>>>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime
> Filter
> > > for
> > > > > > Flink
> > > > > > > >>>> Batch
> > > > > > > >>>>>>>>>> Jobs
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Thanks Lijie start this discussion. Runtime Filter
> > is
> > > a
> > > > > > common
> > > > > > > >>>>>>>>>>>> optimization
> > > > > > > >>>>>>>>>>>> to improve the join performance that has been
> > adopted
> > > by
> > > > > > many
> > > > > > > >>>>>>>>> computing
> > > > > > > >>>>>>>>>>>> engines such as Spark, Doris, etc... Flink is a
> > > > streaming
> > > > > > > batch
> > > > > > > >>>>>>>>>> computing
> > > > > > > >>>>>>>>>>>> engine, and we are continuously optimizing the
> > > > performance
> > > > > > of
> > > > > > > >>>>>>>> batches.
> > > > > > > >>>>>>>>>>>> Runtime filter is a general performance
> optimization
> > > > > > technique
> > > > > > > >>>> that
> > > > > > > >>>>>>>>> can
> > > > > > > >>>>>>>>>>>> improve the performance of Flink batch jobs, so we
> > are
> > > > > > > >> introducing
> > > > > > > >>>>>>>> it
> > > > > > > >>>>>>>>> on
> > > > > > > >>>>>>>>>>>> batch as well.
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Looking forward to all feedback.
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Best,
> > > > > > > >>>>>>>>>>>> Ron
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Lijie Wang <wangdachui9...@gmail.com <mailto:
> > > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>
> > > > > <mailto:
> > > > > > > >>>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >
> > > > > <mailto:
> > > > > > > wangdachui9...@gmail.com>> <mailto:
> > > > > > > >>>>>> wangdachui9...@gmail.com <mailto:
> wangdachui9...@gmail.com
> > >
> > > > > > <mailto:
> > > > > > > wangdachui9...@gmail.com> <mailto:
> > > > > > > >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com
> >>>>
> > > > > > > >>>> 于2023年6月14日周三 17:17写道:
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Hi devs
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Ron Liu, Gen Luo and I would like to start a
> > > discussion
> > > > > > about
> > > > > > > >>>>>>>>>> FLIP-324:
> > > > > > > >>>>>>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Runtime Filter is a common optimization to
> improve
> > > join
> > > > > > > >>>>>>>> performance.
> > > > > > > >>>>>>>>>> It
> > > > > > > >>>>>>>>>>>> is
> > > > > > > >>>>>>>>>>>>> designed to dynamically generate filter
> conditions
> > > for
> > > > > > > certain
> > > > > > > >>>>>>>> Join
> > > > > > > >>>>>>>>>>>> queries
> > > > > > > >>>>>>>>>>>>> at runtime to reduce the amount of scanned or
> > > shuffled
> > > > > > data,
> > > > > > > >>>> avoid
> > > > > > > >>>>>>>>>>>>> unnecessary I/O and network transmission, and
> speed
> > > up
> > > > > the
> > > > > > > >> query.
> > > > > > > >>>>>>>>> Its
> > > > > > > >>>>>>>>>>>>> working principle is building a filter(e.g. bloom
> > > > filter)
> > > > > > > based
> > > > > > > >>>> on
> > > > > > > >>>>>>>>> the
> > > > > > > >>>>>>>>>>>> data
> > > > > > > >>>>>>>>>>>>> on the small table side(build side) first, then
> > pass
> > > > this
> > > > > > > >> filter
> > > > > > > >>>>>>>> to
> > > > > > > >>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>> large table side(probe side) to filter the
> > irrelevant
> > > > > data
> > > > > > on
> > > > > > > >> it,
> > > > > > > >>>>>>>>> this
> > > > > > > >>>>>>>>>>>> can
> > > > > > > >>>>>>>>>>>>> reduce the data reaching the join and improve
> > > > > performance.
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> You can find more details in the FLIP-324[1].
> > Looking
> > > > > > forward
> > > > > > > >> to
> > > > > > > >>>>>>>>> your
> > > > > > > >>>>>>>>>>>>> feedback.
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> [1]
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%252525253A%2525252BIntroduce%2525252BRuntime%2525252BFilter%2525252Bfor%2525252BFlink%2525252BBatch%2525252BJobs%252526source%25253Dgmail-imap%252526ust%25253D1687433776000000%252526usg%25253DAOvVaw0ke1ZHcJ--A1QgsbB84MHA%2526source%253Dgmail-imap%2526ust%253D1687760804000000%2526usg%253DAOvVaw21E3CQyayeBTYztmOnwMcz%26source%3Dgmail-imap%26ust%3D1687773407000000%26usg%3DAOvVaw0xVu0zYYNRmh8u8aq7uSi3&source=gmail-imap&ust=1687781326000000&usg=AOvVaw1LXwtWT177350iKD3sKCEt
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Best,
> > > > > > > >>>>>>>>>>>>> Ron & Gen & Lijie
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> --
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Best,
> > > > > > > >>>>>>>>> Benchao Li
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to