Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-22 Thread Lijie Wang
ose of runtime filter is to reduce
> > the
> > > > > > shuffle
> > > > > > > >>>> data
> > > > > > > >>>>>>> and the data arriving at join. Although eagerly running
> > the
> > > > > large
> > > > > > > >>>>>>> table side can process datas in advance, most of the
> data
> > > may
> > > > > be
> > > > > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing
> the
> > > > join.
> > > > > > In
> > > > > > > >>>>>>> addition, if the join is a hash-join, the probe side of
> > the
> > > > > > > hash-join
> > > > > > > >>>>>> also
> > > > > > > >>>>>>> needs to wait for its build side to complete, so the
> > large
> > > > > table
> > > > > > > side
> > > > > > > >>>> is
> > > > > > > >>>>>>> likely to be back-pressed.
> > > > > > > >>>>>>> In addition, I don't tend to add too many configuration
> > > > options
> > > > > > in
> > > > > > > >> the
> > > > > > > >>>>>>> first version, which may make it more difficult to use
> > > (users
> > > > > > need
> > > > > > > to
> > > > > > > >>>>>>> understand a lot of internal implementation details).
> > Maybe
> > > > it
> > > > > > > could
> > > > > > > >>>> be a
> > > > > > > >>>>>>> future improvement (if it's worthwhile)?
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> @Aitozi
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> IMO, In the current implementation two source table
> > > > operators
> > > > > > > will
> > > > > > > >> be
> > > > > > > >>>>>>> executed simultaneously.
> > > > > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add
> this
> > > > point
> > > > > > to
> > > > > > > >>>> FLIP).
> > > > > > > >>>>>>> The runtime filter is generally chained with the large
> > > table
> > > > > side
> > > > > > > to
> > > > > > > >>>>>> reduce
> > > > > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The
> job
> > > > > vertices
> > > > > > > >>>> should
> > > > > > > >>>>>> be
> > > > > > > >>>>>>> scheduled in topological order, so the large table side
> > can
> > > > > only
> > > > > > be
> > > > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>>> Are there some tests to show the default value of
> > > > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G
> > is a
> > > > > good
> > > > > > > >>>> default
> > > > > > > >>>>>>> value.
> > > > > > > >>>>>>> It's not tested yet, but it will be done before merge
> the
> > > > code.
> > > > > > The
> > > > > > > >>>>>> current
> > > > > > > >>>>>>> value refers to systems such as spark and hive. Before
> > code
> > > > > > > merging,
> > > > > > > >> we
> > > > > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of
> > values.
> > > If
> > > > > you
> > > > > > > >> have
> > > > > > >

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-21 Thread Jing Ge
onable to also support
> "pipeline
> > > > > shuffle"
> > > > > > >> if
> > > > > > >>>>>>> possible.
> > > > > > >>>>>>> As I said above, runtime filter can work well with all
> > > shuffle
> > > > > > mode,
> > > > > > >>>>>>> including pipeline shuffle.
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> if the RuntimeFIlterBuilder could be done quickly than
> > > > > > >> RuntimeFilter
> > > > > > >>>>>>> operator, it can still filter out additional data
> > afterwards.
> > > > > > >>>>>>> I think the main purpose of runtime filter is to reduce
> the
> > > > > shuffle
> > > > > > >>>> data
> > > > > > >>>>>>> and the data arriving at join. Although eagerly running
> the
> > > > large
> > > > > > >>>>>>> table side can process datas in advance, most of the data
> > may
> > > > be
> > > > > > >>>>>>> irrelevant, causing huge shuffle overhead and slowing the
> > > join.
> > > > > In
> > > > > > >>>>>>> addition, if the join is a hash-join, the probe side of
> the
> > > > > > hash-join
> > > > > > >>>>>> also
> > > > > > >>>>>>> needs to wait for its build side to complete, so the
> large
> > > > table
> > > > > > side
> > > > > > >>>> is
> > > > > > >>>>>>> likely to be back-pressed.
> > > > > > >>>>>>> In addition, I don't tend to add too many configuration
> > > options
> > > > > in
> > > > > > >> the
> > > > > > >>>>>>> first version, which may make it more difficult to use
> > (users
> > > > > need
> > > > > > to
> > > > > > >>>>>>> understand a lot of internal implementation details).
> Maybe
> > > it
> > > > > > could
> > > > > > >>>> be a
> > > > > > >>>>>>> future improvement (if it's worthwhile)?
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> @Aitozi
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> IMO, In the current implementation two source table
> > > operators
> > > > > > will
> > > > > > >> be
> > > > > > >>>>>>> executed simultaneously.
> > > > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add this
> > > point
> > > > > to
> > > > > > >>>> FLIP).
> > > > > > >>>>>>> The runtime filter is generally chained with the large
> > table
> > > > side
> > > > > > to
> > > > > > >>>>>> reduce
> > > > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job
> > > > vertices
> > > > > > >>>> should
> > > > > > >>>>>> be
> > > > > > >>>>>>> scheduled in topological order, so the large table side
> can
> > > > only
> > > > > be
> > > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > > > >>>>>>>
> > > > > > >>>>>>>>> Are there some tests to show the default value of
> > > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G
> is a
> > > > good
> > > > > > >>>> default
> > > > > > >>>>>>> value.
> > > > > > >>>>>>> It's not tested yet, but it will be done before merge the
> > > code.
> > > > > The
> > > > > > >>>>>> curr

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-20 Thread liu ron
> > >>>>>> be
> > > > > >>>>>>> scheduled in topological order, so the large table side can
> > > only
> > > > be
> > > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > > >>>>>>>
> > > > > >>>>>>>>> Are there some tests to show the default value of
> > > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > > good
> > > > > >>>> default
> > > > > >>>>>>> value.
> > > > > >>>>>>> It's not tested yet, but it will be done before merge the
> > code.
> > > > The
> > > > > >>>>>> current
> > > > > >>>>>>> value refers to systems such as spark and hive. Before code
> > > > > merging,
> > > > > >> we
> > > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values.
> If
> > > you
> > > > > >> have
> > > > > >>>>>>> relevant experience on it, welcome to give some
> suggestions.
> > > > > >>>>>>>
> > > > > >>>>>>>>> What's the representation of the runtime filter node in
> > > > planner ?
> > > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new
> > physical
> > > > > >> nodes,
> > > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> > > > > >>>>>>>
> > > > > >>>>>>> Best,
> > > > > >>>>>>> Lijie
> > > > > >>>>>>>
> > > > > >>>>>>> Aitozi mailto:gjying1...@gmail.com>
> > > > > <mailto:gjying1...@gmail.com>  > > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>  > > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com>  > > > > gjying1...@gmail.com>>>
> > > > > >>>>>> 于2023年6月15日周四 15:52写道:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hi Lijie,
> > > > > >>>>>>>>
> > > > > >>>>>>>> Nice to see this valuable feature. After reading the FLIP
> I
> > > have
> > > > > >>>> some
> > > > > >>>>>>>> questions below:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Schedule the TableSource(dim) first.
> > > > > >>>>>>>>
> > > > > >>>>>>>> How does it know to schedule the TableSource(dim) first ?
> > IMO,
> > > > In
> > > > > >> the
> > > > > >>>>>>>> current implementation two source table operators will be
> > > > executed
> > > > > >>>>>>>> simultaneously.
> > > > > >>>>>>>>
> > > > > >>>>>>>>> If the data volume on the probe side is too small, the
> > > overhead
> > > > > of
> > > > > >>>>>>>> building runtime filter is not worth it.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Are there some tests to show the default value of
> > > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is
> a
> > > good
> > > > > >>>> default
> > > > > >>>>>>>> value. The same to
> > > > > >> table.optimizer.runtime-filter.max-build-data-size
> > > > > >>>>>>>>
> > > > > >>>>>>>>> the runtime filter can be pushed down along the probe
> side,
> > > as
> > > > > >> close
> > > > > >>>> to
> > > > > >>>>>>>> data sources as possible
> > > >

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-20 Thread Jing Ge
 > >>>>>>>>> IMO, In the current implementation two source table
> operators
> > > > will
> > > > >> be
> > > > >>>>>>> executed simultaneously.
> > > > >>>>>>> The example in FLIP uses blocking shuffle(I will add this
> point
> > > to
> > > > >>>> FLIP).
> > > > >>>>>>> The runtime filter is generally chained with the large table
> > side
> > > > to
> > > > >>>>>> reduce
> > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job
> > vertices
> > > > >>>> should
> > > > >>>>>> be
> > > > >>>>>>> scheduled in topological order, so the large table side can
> > only
> > > be
> > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > >>>>>>>
> > > > >>>>>>>>> Are there some tests to show the default value of
> > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > good
> > > > >>>> default
> > > > >>>>>>> value.
> > > > >>>>>>> It's not tested yet, but it will be done before merge the
> code.
> > > The
> > > > >>>>>> current
> > > > >>>>>>> value refers to systems such as spark and hive. Before code
> > > > merging,
> > > > >> we
> > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If
> > you
> > > > >> have
> > > > >>>>>>> relevant experience on it, welcome to give some suggestions.
> > > > >>>>>>>
> > > > >>>>>>>>> What's the representation of the runtime filter node in
> > > planner ?
> > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new
> physical
> > > > >> nodes,
> > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Lijie
> > > > >>>>>>>
> > > > >>>>>>> Aitozi mailto:gjying1...@gmail.com>
> > > > <mailto:gjying1...@gmail.com>  > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>  > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com>  > > > gjying1...@gmail.com>>>
> > > > >>>>>> 于2023年6月15日周四 15:52写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Lijie,
> > > > >>>>>>>>
> > > > >>>>>>>> Nice to see this valuable feature. After reading the FLIP I
> > have
> > > > >>>> some
> > > > >>>>>>>> questions below:
> > > > >>>>>>>>
> > > > >>>>>>>>> Schedule the TableSource(dim) first.
> > > > >>>>>>>>
> > > > >>>>>>>> How does it know to schedule the TableSource(dim) first ?
> IMO,
> > > In
> > > > >> the
> > > > >>>>>>>> current implementation two source table operators will be
> > > executed
> > > > >>>>>>>> simultaneously.
> > > > >>>>>>>>
> > > > >>>>>>>>> If the data volume on the probe side is too small, the
> > overhead
> > > > of
> > > > >>>>>>>> building runtime filter is not worth it.
> > > > >>>>>>>>
> > > > >>>>>>>> Are there some tests to show the default value of
> > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > good
> > > > >>>> default
> > > > >>>>>>>> value. The same to
> > > > >> table.optimizer.runtime-filter.max-build-data-size
> > > > >>>>>>>>
> > > > >&g

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-20 Thread Lijie Wang
l add this
> point
> > > to
> > > > >>>> FLIP).
> > > > >>>>>>> The runtime filter is generally chained with the large table
> > side
> > > > to
> > > > >>>>>> reduce
> > > > >>>>>>> the shuffle data (as shown in Figure 2 of FLIP). The job
> > vertices
> > > > >>>> should
> > > > >>>>>> be
> > > > >>>>>>> scheduled in topological order, so the large table side can
> > only
> > > be
> > > > >>>>>>> scheduled after the RuntimeFilterBuilder finishes.
> > > > >>>>>>>
> > > > >>>>>>>>> Are there some tests to show the default value of
> > > > >>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > good
> > > > >>>> default
> > > > >>>>>>> value.
> > > > >>>>>>> It's not tested yet, but it will be done before merge the
> code.
> > > The
> > > > >>>>>> current
> > > > >>>>>>> value refers to systems such as spark and hive. Before code
> > > > merging,
> > > > >> we
> > > > >>>>>>> will test on TPC-DS 10 T to find an optimal set of values. If
> > you
> > > > >> have
> > > > >>>>>>> relevant experience on it, welcome to give some suggestions.
> > > > >>>>>>>
> > > > >>>>>>>>> What's the representation of the runtime filter node in
> > > planner ?
> > > > >>>>>>> As shown in Figure 1 of FLIP, we intend to add two new
> physical
> > > > >> nodes,
> > > > >>>>>>> RuntimeFilterBuilder and RuntimeFilter.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> Lijie
> > > > >>>>>>>
> > > > >>>>>>> Aitozi mailto:gjying1...@gmail.com>
> > > > <mailto:gjying1...@gmail.com>  > > > >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>  > > > >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com>  > > > gjying1...@gmail.com>>>
> > > > >>>>>> 于2023年6月15日周四 15:52写道:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Lijie,
> > > > >>>>>>>>
> > > > >>>>>>>> Nice to see this valuable feature. After reading the FLIP I
> > have
> > > > >>>> some
> > > > >>>>>>>> questions below:
> > > > >>>>>>>>
> > > > >>>>>>>>> Schedule the TableSource(dim) first.
> > > > >>>>>>>>
> > > > >>>>>>>> How does it know to schedule the TableSource(dim) first ?
> IMO,
> > > In
> > > > >> the
> > > > >>>>>>>> current implementation two source table operators will be
> > > executed
> > > > >>>>>>>> simultaneously.
> > > > >>>>>>>>
> > > > >>>>>>>>> If the data volume on the probe side is too small, the
> > overhead
> > > > of
> > > > >>>>>>>> building runtime filter is not worth it.
> > > > >>>>>>>>
> > > > >>>>>>>> Are there some tests to show the default value of
> > > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> > good
> > > > >>>> default
> > > > >>>>>>>> value. The same to
> > > > >> table.optimizer.runtime-filter.max-build-data-size
> > > > >>>>>>>>
> > > > >>>>>>>>> the runtime filter can be pushed down along the probe side,
> > as
> > > > >> close
> > > > >>>> to
> > > > >>>>>>>> data sources as possible
> > > > >>>>>>>>
> > > > >>>>>&g

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread liu ron
hedule the TableSource(dim) first ? IMO,
> > In
> > > >> the
> > > >>>>>>>> current implementation two source table operators will be
> > executed
> > > >>>>>>>> simultaneously.
> > > >>>>>>>>
> > > >>>>>>>>> If the data volume on the probe side is too small, the
> overhead
> > > of
> > > >>>>>>>> building runtime filter is not worth it.
> > > >>>>>>>>
> > > >>>>>>>> Are there some tests to show the default value of
> > > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a
> good
> > > >>>> default
> > > >>>>>>>> value. The same to
> > > >> table.optimizer.runtime-filter.max-build-data-size
> > > >>>>>>>>
> > > >>>>>>>>> the runtime filter can be pushed down along the probe side,
> as
> > > >> close
> > > >>>> to
> > > >>>>>>>> data sources as possible
> > > >>>>>>>>
> > > >>>>>>>> What's the representation of the runtime filter node in
> planner
> > ?
> > > Is
> > > >>>> it
> > > >>>>>> a
> > > >>>>>>>> Filternode
> > > >>>>>>>>
> > > >>>>>>>> Best,
> > > >>>>>>>>
> > > >>>>>>>> Aitozi.
> > > >>>>>>>>
> > > >>>>>>>> Benchao Li mailto:libenc...@apache.org
> >
> > > <mailto:libenc...@apache.org>
> > > >> <mailto:libenc...@apache.org>>
> > > >>>> 于2023年6月15日周四 14:30写道:
> > > >>>>>>>>
> > > >>>>>>>>> Hi Lijie,
> > > >>>>>>>>>
> > > >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to
> > > also
> > > >>>>>>>> support
> > > >>>>>>>>> "pipeline shuffle" if possible.
> > > >>>>>>>>>
> > > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing,
> > > although
> > > >>>> this
> > > >>>>>>>> has
> > > >>>>>>>>> not been much exposed to users for now, I know a few
> companies
> > > that
> > > >>>>>> uses
> > > >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing
> > > effort[1]
> > > >> to
> > > >>>>>>>> make
> > > >>>>>>>>> this usage more powerful.
> > > >>>>>>>>>
> > > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes
> > > >> running
> > > >>>>>>>> before
> > > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any
> data
> > > and
> > > >>>>>> will
> > > >>>>>>>>> occupy resources", whether it benefits us depends on the
> scale
> > of
> > > >>>> data,
> > > >>>>>>>> if
> > > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than
> > RuntimeFilter
> > > >>>>>>>> operator,
> > > >>>>>>>>> it can still filter out additional data afterwards. Hence in
> my
> > > >>>>>> opinion,
> > > >>>>>>>> we
> > > >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
> > > >>>>>>>> RuntimeFilter
> > > >>>>>>>>> BLOCKING only, at least it can be configured.
> > > >>>>>>>>>
> > > >>>>>>>>> [1]
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> >

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Jing Ge
able operators will be
> executed
> > >>>>>>>> simultaneously.
> > >>>>>>>>
> > >>>>>>>>> If the data volume on the probe side is too small, the overhead
> > of
> > >>>>>>>> building runtime filter is not worth it.
> > >>>>>>>>
> > >>>>>>>> Are there some tests to show the default value of
> > >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
> > >>>> default
> > >>>>>>>> value. The same to
> > >> table.optimizer.runtime-filter.max-build-data-size
> > >>>>>>>>
> > >>>>>>>>> the runtime filter can be pushed down along the probe side, as
> > >> close
> > >>>> to
> > >>>>>>>> data sources as possible
> > >>>>>>>>
> > >>>>>>>> What's the representation of the runtime filter node in planner
> ?
> > Is
> > >>>> it
> > >>>>>> a
> > >>>>>>>> Filternode
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>>
> > >>>>>>>> Aitozi.
> > >>>>>>>>
> > >>>>>>>> Benchao Li mailto:libenc...@apache.org>
> > <mailto:libenc...@apache.org>
> > >> <mailto:libenc...@apache.org>>
> > >>>> 于2023年6月15日周四 14:30写道:
> > >>>>>>>>
> > >>>>>>>>> Hi Lijie,
> > >>>>>>>>>
> > >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to
> > also
> > >>>>>>>> support
> > >>>>>>>>> "pipeline shuffle" if possible.
> > >>>>>>>>>
> > >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing,
> > although
> > >>>> this
> > >>>>>>>> has
> > >>>>>>>>> not been much exposed to users for now, I know a few companies
> > that
> > >>>>>> uses
> > >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing
> > effort[1]
> > >> to
> > >>>>>>>> make
> > >>>>>>>>> this usage more powerful.
> > >>>>>>>>>
> > >>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes
> > >> running
> > >>>>>>>> before
> > >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any data
> > and
> > >>>>>> will
> > >>>>>>>>> occupy resources", whether it benefits us depends on the scale
> of
> > >>>> data,
> > >>>>>>>> if
> > >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than
> RuntimeFilter
> > >>>>>>>> operator,
> > >>>>>>>>> it can still filter out additional data afterwards. Hence in my
> > >>>>>> opinion,
> > >>>>>>>> we
> > >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
> > >>>>>>>> RuntimeFilter
> > >>>>>>>>> BLOCKING only, at least it can be configured.
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>>>>>
> > >>>>
> > >>
> >
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D168743377600%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D168776080400%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D168777340700%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH=gmail-imap=168778132600=AOvVaw0HsmkkqPeZGZOBvFiA8NOA
> > >>>>>>>>>
> > >>>>>>>>> Lijie Wang  > wangdachui9...@gmail.com>  > >> wangdachui9...@gma

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
t;> Aitozi mailto:gjying1...@gmail.com>
> <mailto:gjying1...@gmail.com>  >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>  >>>> gjying1...@gmail.com <mailto:gjying1...@gmail.com>  gjying1...@gmail.com>>>
> >>>>>> 于2023年6月15日周四 15:52写道:
> >>>>>>>
> >>>>>>>> Hi Lijie,
> >>>>>>>>
> >>>>>>>> Nice to see this valuable feature. After reading the FLIP I have
> >>>> some
> >>>>>>>> questions below:
> >>>>>>>>
> >>>>>>>>> Schedule the TableSource(dim) first.
> >>>>>>>>
> >>>>>>>> How does it know to schedule the TableSource(dim) first ? IMO, In
> >> the
> >>>>>>>> current implementation two source table operators will be executed
> >>>>>>>> simultaneously.
> >>>>>>>>
> >>>>>>>>> If the data volume on the probe side is too small, the overhead
> of
> >>>>>>>> building runtime filter is not worth it.
> >>>>>>>>
> >>>>>>>> Are there some tests to show the default value of
> >>>>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
> >>>> default
> >>>>>>>> value. The same to
> >> table.optimizer.runtime-filter.max-build-data-size
> >>>>>>>>
> >>>>>>>>> the runtime filter can be pushed down along the probe side, as
> >> close
> >>>> to
> >>>>>>>> data sources as possible
> >>>>>>>>
> >>>>>>>> What's the representation of the runtime filter node in planner ?
> Is
> >>>> it
> >>>>>> a
> >>>>>>>> Filternode
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>>
> >>>>>>>> Aitozi.
> >>>>>>>>
> >>>>>>>> Benchao Li mailto:libenc...@apache.org>
> <mailto:libenc...@apache.org>
> >> <mailto:libenc...@apache.org>>
> >>>> 于2023年6月15日周四 14:30写道:
> >>>>>>>>
> >>>>>>>>> Hi Lijie,
> >>>>>>>>>
> >>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to
> also
> >>>>>>>> support
> >>>>>>>>> "pipeline shuffle" if possible.
> >>>>>>>>>
> >>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing,
> although
> >>>> this
> >>>>>>>> has
> >>>>>>>>> not been much exposed to users for now, I know a few companies
> that
> >>>>>> uses
> >>>>>>>>> Flink as a MPP computing engine, and there is an ongoing
> effort[1]
> >> to
> >>>>>>>> make
> >>>>>>>>> this usage more powerful.
> >>>>>>>>>
> >>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes
> >> running
> >>>>>>>> before
> >>>>>>>>> the RuntimeFilterBuilder finished, it will not process any data
> and
> >>>>>> will
> >>>>>>>>> occupy resources", whether it benefits us depends on the scale of
> >>>> data,
> >>>>>>>> if
> >>>>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> >>>>>>>> operator,
> >>>>>>>>> it can still filter out additional data afterwards. Hence in my
> >>>>>> opinion,
> >>>>>>>> we
> >>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
> >>>>>>>> RuntimeFilter
> >>>>>>>>> BLOCKING only, at least it can be configured.
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Stefan Richter
>>>> it
>>>>>> a
>>>>>>>> Filternode
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> 
>>>>>>>> Aitozi.
>>>>>>>> 
>>>>>>>> Benchao Li mailto:libenc...@apache.org> 
>>>>>>>> <mailto:libenc...@apache.org>
>> <mailto:libenc...@apache.org>>
>>>> 于2023年6月15日周四 14:30写道:
>>>>>>>> 
>>>>>>>>> Hi Lijie,
>>>>>>>>> 
>>>>>>>>> Regarding the shuffle mode, I think it would be reasonable to also
>>>>>>>> support
>>>>>>>>> "pipeline shuffle" if possible.
>>>>>>>>> 
>>>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, although
>>>> this
>>>>>>>> has
>>>>>>>>> not been much exposed to users for now, I know a few companies that
>>>>>> uses
>>>>>>>>> Flink as a MPP computing engine, and there is an ongoing effort[1]
>> to
>>>>>>>> make
>>>>>>>>> this usage more powerful.
>>>>>>>>> 
>>>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes
>> running
>>>>>>>> before
>>>>>>>>> the RuntimeFilterBuilder finished, it will not process any data and
>>>>>> will
>>>>>>>>> occupy resources", whether it benefits us depends on the scale of
>>>> data,
>>>>>>>> if
>>>>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
>>>>>>>> operator,
>>>>>>>>> it can still filter out additional data afterwards. Hence in my
>>>>>> opinion,
>>>>>>>> we
>>>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
>>>>>>>> RuntimeFilter
>>>>>>>>> BLOCKING only, at least it can be configured.
>>>>>>>>> 
>>>>>>>>> [1]
>>>>>> 
>>>> 
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://www.google.com/url?q%25253Dhttps://issues.apache.org/jira/browse/FLINK-25318%252526source%25253Dgmail-imap%252526ust%25253D168743377600%252526usg%25253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%2526source%253Dgmail-imap%2526ust%253D168776080400%2526usg%253DAOvVaw1oNzOlNn0UCDtz1M9jAw1x%26source%3Dgmail-imap%26ust%3D168777340700%26usg%3DAOvVaw3Zt14Wvxs_b8ghD0dIgPfH=gmail-imap=168778132600=AOvVaw0HsmkkqPeZGZOBvFiA8NOA
>>>>>>>>> 
>>>>>>>>> Lijie Wang >>>>>>>> <mailto:wangdachui9...@gmail.com> > wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> >>> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com> 
>>>> <mailto:wangdachui9...@gmail.com>> > wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
>>>>>> 于2023年6月15日周四 14:18写道:
>>>>>>>>> 
>>>>>>>>>> Hi Yuxia,
>>>>>>>>>> 
>>>>>>>>>> I made a mistake in the above response.
>>>>>>>>>> 
>>>>>>>>>> The runtime filter can work well with all shuffle mode. However,
>>>>>> hybrid
>>>>>>>>>> shuffle and blocking shuffle are currently recommended for batch
>>>> jobs
>>>>>>>>>> (piepline shuffle is not recommended).
>>>>>>>>>> 
>>>>>>>>>> One more thing to mention here is that we will force the edge
>>>> between
>>>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless
>> of
>>>>>>>> which
>>>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t
>>>>>> need
>>>>>>>>> to
>>>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the
>>>>>> RuntimeFilter
>>>>>>>>>> becomes running before the RuntimeFilterBuilder finished, it will
>>>> not
>>>>>&

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
gt;>
> >>>>> Aitozi mailto:gjying1...@gmail.com>  gjying1...@gmail.com>  >> gjying1...@gmail.com <mailto:gjying1...@gmail.com>>>
> >>>> 于2023年6月15日周四 15:52写道:
> >>>>>
> >>>>>> Hi Lijie,
> >>>>>>
> >>>>>>  Nice to see this valuable feature. After reading the FLIP I have
> >> some
> >>>>>> questions below:
> >>>>>>
> >>>>>>> Schedule the TableSource(dim) first.
> >>>>>>
> >>>>>> How does it know to schedule the TableSource(dim) first ? IMO, In
> the
> >>>>>> current implementation two source table operators will be executed
> >>>>>> simultaneously.
> >>>>>>
> >>>>>>> If the data volume on the probe side is too small, the overhead of
> >>>>>> building runtime filter is not worth it.
> >>>>>>
> >>>>>> Are there some tests to show the default value of
> >>>>>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good
> >> default
> >>>>>> value. The same to
> table.optimizer.runtime-filter.max-build-data-size
> >>>>>>
> >>>>>>> the runtime filter can be pushed down along the probe side, as
> close
> >> to
> >>>>>> data sources as possible
> >>>>>>
> >>>>>> What's the representation of the runtime filter node in planner ? Is
> >> it
> >>>> a
> >>>>>> Filternode
> >>>>>>
> >>>>>> Best,
> >>>>>>
> >>>>>> Aitozi.
> >>>>>>
> >>>>>> Benchao Li mailto:libenc...@apache.org>
> <mailto:libenc...@apache.org>>
> >> 于2023年6月15日周四 14:30写道:
> >>>>>>
> >>>>>>> Hi Lijie,
> >>>>>>>
> >>>>>>> Regarding the shuffle mode, I think it would be reasonable to also
> >>>>>> support
> >>>>>>> "pipeline shuffle" if possible.
> >>>>>>>
> >>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, although
> >> this
> >>>>>> has
> >>>>>>> not been much exposed to users for now, I know a few companies that
> >>>> uses
> >>>>>>> Flink as a MPP computing engine, and there is an ongoing effort[1]
> to
> >>>>>> make
> >>>>>>> this usage more powerful.
> >>>>>>>
> >>>>>>> Back to your concern that "Even if the RuntimeFilter becomes
> running
> >>>>>> before
> >>>>>>> the RuntimeFilterBuilder finished, it will not process any data and
> >>>> will
> >>>>>>> occupy resources", whether it benefits us depends on the scale of
> >> data,
> >>>>>> if
> >>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> >>>>>> operator,
> >>>>>>> it can still filter out additional data afterwards. Hence in my
> >>>> opinion,
> >>>>>> we
> >>>>>>> do not need to make the edge between RuntimeFilterBuilder and
> >>>>>> RuntimeFilter
> >>>>>>> BLOCKING only, at least it can be configured.
> >>>>>>>
> >>>>>>> [1]
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://issues.apache.org/jira/browse/FLINK-25318%2526source%253Dgmail-imap%2526ust%253D168743377600%2526usg%253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%26source%3Dgmail-imap%26ust%3D168776080400%26usg%3DAOvVaw1oNzOlNn0UCDtz1M9jAw1x=gmail-imap=168777340700=AOvVaw3Zt14Wvxs_b8ghD0dIgPfH
> >>>>>>>
> >>>>>>> Lijie Wang  wangdachui9...@gmail.com>  >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>  wangdachui9...@gmail.com>>
> >>>> 于2023年6月15日周四 14:18写道:
> >>>>>>>
> >>>>>>>> Hi Yuxia,
> >>>>>>>>
> >>>>>>>> I made a mistake in the above response.
> >>>>>>>>
> >>

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Stefan Richter
>>>>>> 
>>>>>> Benchao Li mailto:libenc...@apache.org> 
>>>>>> <mailto:libenc...@apache.org>>
>> 于2023年6月15日周四 14:30写道:
>>>>>> 
>>>>>>> Hi Lijie,
>>>>>>> 
>>>>>>> Regarding the shuffle mode, I think it would be reasonable to also
>>>>>> support
>>>>>>> "pipeline shuffle" if possible.
>>>>>>> 
>>>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, although
>> this
>>>>>> has
>>>>>>> not been much exposed to users for now, I know a few companies that
>>>> uses
>>>>>>> Flink as a MPP computing engine, and there is an ongoing effort[1] to
>>>>>> make
>>>>>>> this usage more powerful.
>>>>>>> 
>>>>>>> Back to your concern that "Even if the RuntimeFilter becomes running
>>>>>> before
>>>>>>> the RuntimeFilterBuilder finished, it will not process any data and
>>>> will
>>>>>>> occupy resources", whether it benefits us depends on the scale of
>> data,
>>>>>> if
>>>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
>>>>>> operator,
>>>>>>> it can still filter out additional data afterwards. Hence in my
>>>> opinion,
>>>>>> we
>>>>>>> do not need to make the edge between RuntimeFilterBuilder and
>>>>>> RuntimeFilter
>>>>>>> BLOCKING only, at least it can be configured.
>>>>>>> 
>>>>>>> [1]
>>>> 
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://www.google.com/url?q%253Dhttps://issues.apache.org/jira/browse/FLINK-25318%2526source%253Dgmail-imap%2526ust%253D168743377600%2526usg%253DAOvVaw3GqdpuiCqegqRLDv1PjMiL%26source%3Dgmail-imap%26ust%3D168776080400%26usg%3DAOvVaw1oNzOlNn0UCDtz1M9jAw1x=gmail-imap=168777340700=AOvVaw3Zt14Wvxs_b8ghD0dIgPfH
>>>>>>> 
>>>>>>> Lijie Wang mailto:wangdachui9...@gmail.com> 
>>>>>>> > wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> 
>> <mailto:wangdachui9...@gmail.com>>
>>>> 于2023年6月15日周四 14:18写道:
>>>>>>> 
>>>>>>>> Hi Yuxia,
>>>>>>>> 
>>>>>>>> I made a mistake in the above response.
>>>>>>>> 
>>>>>>>> The runtime filter can work well with all shuffle mode. However,
>>>> hybrid
>>>>>>>> shuffle and blocking shuffle are currently recommended for batch
>> jobs
>>>>>>>> (piepline shuffle is not recommended).
>>>>>>>> 
>>>>>>>> One more thing to mention here is that we will force the edge
>> between
>>>>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
>>>>>> which
>>>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t
>>>> need
>>>>>>> to
>>>>>>>> run before the RuntimeFilterBuilder finished. Even if the
>>>> RuntimeFilter
>>>>>>>> becomes running before the RuntimeFilterBuilder finished, it will
>> not
>>>>>>>> process any data and will occupy resources.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Lijie
>>>>>>>> 
>>>>>>>> Lijie Wang mailto:wangdachui9...@gmail.com> 
>>>>>>>> > wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>> 
>> <mailto:wangdachui9...@gmail.com>>
>>>> 于2023年6月15日周四 09:48写道:
>>>>>>>> 
>>>>>>>>> Hi Yuxia,
>>>>>>>>> 
>>>>>>>>> Thanks for your feedback. The answers of your questions are as
>>>>>> follows:
>>>>>>>>> 
>>>>>>>>> 1. Yes, the row count comes from statistic of underlying table(Or
>>>>>>>>> estimated based on the statistic of underlying table, if the build
>>>>>> side
>>>>>>>> or
>>>>>>>>> probe side is not TableScan).  If the sta

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
jobs
> >>>>>> (piepline shuffle is not recommended).
> >>>>>>
> >>>>>> One more thing to mention here is that we will force the edge
> between
> >>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
> >>>> which
> >>>>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t
> >> need
> >>>>> to
> >>>>>> run before the RuntimeFilterBuilder finished. Even if the
> >> RuntimeFilter
> >>>>>> becomes running before the RuntimeFilterBuilder finished, it will
> not
> >>>>>> process any data and will occupy resources.
> >>>>>>
> >>>>>> Best,
> >>>>>> Lijie
> >>>>>>
> >>>>>> Lijie Wang  wangdachui9...@gmail.com> <mailto:wangdachui9...@gmail.com>>
> >> 于2023年6月15日周四 09:48写道:
> >>>>>>
> >>>>>>> Hi Yuxia,
> >>>>>>>
> >>>>>>> Thanks for your feedback. The answers of your questions are as
> >>>> follows:
> >>>>>>>
> >>>>>>> 1. Yes, the row count comes from statistic of underlying table(Or
> >>>>>>> estimated based on the statistic of underlying table, if the build
> >>>> side
> >>>>>> or
> >>>>>>> probe side is not TableScan).  If the statistic unavailable, we
> will
> >>>>> not
> >>>>>>> inject a runtime filter(As you said, we can hardly evaluate the
> >>>>>> benefits).
> >>>>>>> Besides, AFAIK, the estimated data size of build side is also based
> >>>> on
> >>>>>> the
> >>>>>>> row count statistics, that is, if the statistics is unavailable,
> the
> >>>>>>> requirement "table.optimizer.runtime-filter.max-build-data-size"
> >>>> cannot
> >>>>>> be
> >>>>>>> evaluated either. I'll add this point into FLIP.
> >>>>>>>
> >>>>>>> 2.
> >>>>>>> Estimated data size does not meet requirement (in planner
> >>>> optimization
> >>>>>>> phase) -> No filter
> >>>>>>> Estimated data size meets the requirement (in planner optimization
> >>>>>> phase),
> >>>>>>> but the real data size does not meet the requirement(in execution
> >>>>> phase)
> >>>>>> ->
> >>>>>>> Fake filter
> >>>>>>>
> >>>>>>> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Lijie
> >>>>>>>
> >>>>>>> yuxia  luoyu...@alumni.sjtu.edu.cn>  >> luoyu...@alumni.sjtu.edu.cn <mailto:luoyu...@alumni.sjtu.edu.cn>>>
> 于2023年6月14日周三 20:37写道:
> >>>>>>>
> >>>>>>>> Thanks Lijie for starting this discussion. Excited to see runtime
> >>>>> filter
> >>>>>>>> is to be implemented in Flink.
> >>>>>>>> I have few questions about it:
> >>>>>>>>
> >>>>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, use row
> count
> >>>>>>>> instead`. So, does row count comes from the statistic from
> >>>> underlying
> >>>>>>>> table? What if the the statistic is also unavailable considering
> >>>> users
> >>>>>>>> maynot always remember to generate statistic in production.
> >>>>>>>> I'm wondering whether it make senese that just disable runtime
> >>>> filter
> >>>>> if
> >>>>>>>> statistic is unavailable since in that case, we can hardly
> evaluate
> >>>>> the
> >>>>>>>> benefits of runtime-filter.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 2: The FLIP said: "We will inject the runtime filters only if the
> >>>>>>>> following requirements are met:xxx", but it also said, "Once this
> >>>>> limit
> >>>>>> is
> >>>>>>>> exceeded, it will output a fake filter(which always returns true)"
> >>>> in
> >>>>>>>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory,
> >>>> so
> >>>>>> i'm
> >>>>>>>> wondering what's the real behavior, no filter will be injected or
> >>>> fake
> >>>>>>>> filter?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 3: Does it also mean runtime-filter can only take effect in
> blocking
> >>>>>>>> shuffle?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Best regards,
> >>>>>>>> Yuxia
> >>>>>>>>
> >>>>>>>> - 原始邮件 -
> >>>>>>>> 发件人: "ron9 liu" mailto:ron9@gmail.com>
> <mailto:ron9@gmail.com>>
> >>>>>>>> 收件人: "dev" mailto:dev@flink.apache.org>
> <mailto:dev@flink.apache.org>>
> >>>>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> >>>>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink
> Batch
> >>>>>> Jobs
> >>>>>>>>
> >>>>>>>> Thanks Lijie start this discussion. Runtime Filter is a common
> >>>>>>>> optimization
> >>>>>>>> to improve the join performance that has been adopted by many
> >>>>> computing
> >>>>>>>> engines such as Spark, Doris, etc... Flink is a streaming batch
> >>>>>> computing
> >>>>>>>> engine, and we are continuously optimizing the performance of
> >>>> batches.
> >>>>>>>> Runtime filter is a general performance optimization technique
> that
> >>>>> can
> >>>>>>>> improve the performance of Flink batch jobs, so we are introducing
> >>>> it
> >>>>> on
> >>>>>>>> batch as well.
> >>>>>>>>
> >>>>>>>> Looking forward to all feedback.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Ron
> >>>>>>>>
> >>>>>>>> Lijie Wang  wangdachui9...@gmail.com>  >> wangdachui9...@gmail.com <mailto:wangdachui9...@gmail.com>>>
> 于2023年6月14日周三 17:17写道:
> >>>>>>>>
> >>>>>>>>> Hi devs
> >>>>>>>>>
> >>>>>>>>> Ron Liu, Gen Luo and I would like to start a discussion about
> >>>>>> FLIP-324:
> >>>>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
> >>>>>>>>>
> >>>>>>>>> Runtime Filter is a common optimization to improve join
> >>>> performance.
> >>>>>> It
> >>>>>>>> is
> >>>>>>>>> designed to dynamically generate filter conditions for certain
> >>>> Join
> >>>>>>>> queries
> >>>>>>>>> at runtime to reduce the amount of scanned or shuffled data,
> avoid
> >>>>>>>>> unnecessary I/O and network transmission, and speed up the query.
> >>>>> Its
> >>>>>>>>> working principle is building a filter(e.g. bloom filter) based
> on
> >>>>> the
> >>>>>>>> data
> >>>>>>>>> on the small table side(build side) first, then pass this filter
> >>>> to
> >>>>>> the
> >>>>>>>>> large table side(probe side) to filter the irrelevant data on it,
> >>>>> this
> >>>>>>>> can
> >>>>>>>>> reduce the data reaching the join and improve performance.
> >>>>>>>>>
> >>>>>>>>> You can find more details in the FLIP-324[1]. Looking forward to
> >>>>> your
> >>>>>>>>> feedback.
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://cwiki.apache.org/confluence/display/FLINK/FLIP-324%25253A%252BIntroduce%252BRuntime%252BFilter%252Bfor%252BFlink%252BBatch%252BJobs%26source%3Dgmail-imap%26ust%3D168743377600%26usg%3DAOvVaw0ke1ZHcJ--A1QgsbB84MHA=gmail-imap=168776080400=AOvVaw21E3CQyayeBTYztmOnwMcz
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Ron & Gen & Lijie
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> Best,
> >>>>> Benchao Li
>
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Stefan Richter
-filter.max-build-data-size
>>>> 
>>>>> the runtime filter can be pushed down along the probe side, as close to
>>>> data sources as possible
>>>> 
>>>> What's the representation of the runtime filter node in planner ? Is it
>> a
>>>> Filternode
>>>> 
>>>> Best,
>>>> 
>>>> Aitozi.
>>>> 
>>>> Benchao Li mailto:libenc...@apache.org>> 
>>>> 于2023年6月15日周四 14:30写道:
>>>> 
>>>>> Hi Lijie,
>>>>> 
>>>>> Regarding the shuffle mode, I think it would be reasonable to also
>>>> support
>>>>> "pipeline shuffle" if possible.
>>>>> 
>>>>> "pipeline shuffle" is a essential for OLAP/MPP computing, although this
>>>> has
>>>>> not been much exposed to users for now, I know a few companies that
>> uses
>>>>> Flink as a MPP computing engine, and there is an ongoing effort[1] to
>>>> make
>>>>> this usage more powerful.
>>>>> 
>>>>> Back to your concern that "Even if the RuntimeFilter becomes running
>>>> before
>>>>> the RuntimeFilterBuilder finished, it will not process any data and
>> will
>>>>> occupy resources", whether it benefits us depends on the scale of data,
>>>> if
>>>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
>>>> operator,
>>>>> it can still filter out additional data afterwards. Hence in my
>> opinion,
>>>> we
>>>>> do not need to make the edge between RuntimeFilterBuilder and
>>>> RuntimeFilter
>>>>> BLOCKING only, at least it can be configured.
>>>>> 
>>>>> [1]
>> https://www.google.com/url?q=https://www.google.com/url?q%3Dhttps://issues.apache.org/jira/browse/FLINK-25318%26source%3Dgmail-imap%26ust%3D168743377600%26usg%3DAOvVaw3GqdpuiCqegqRLDv1PjMiL=gmail-imap=168776080400=AOvVaw1oNzOlNn0UCDtz1M9jAw1x
>>>>> 
>>>>> Lijie Wang mailto:wangdachui9...@gmail.com> 
>>>>> <mailto:wangdachui9...@gmail.com>>
>> 于2023年6月15日周四 14:18写道:
>>>>> 
>>>>>> Hi Yuxia,
>>>>>> 
>>>>>> I made a mistake in the above response.
>>>>>> 
>>>>>> The runtime filter can work well with all shuffle mode. However,
>> hybrid
>>>>>> shuffle and blocking shuffle are currently recommended for batch jobs
>>>>>> (piepline shuffle is not recommended).
>>>>>> 
>>>>>> One more thing to mention here is that we will force the edge between
>>>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
>>>> which
>>>>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t
>> need
>>>>> to
>>>>>> run before the RuntimeFilterBuilder finished. Even if the
>> RuntimeFilter
>>>>>> becomes running before the RuntimeFilterBuilder finished, it will not
>>>>>> process any data and will occupy resources.
>>>>>> 
>>>>>> Best,
>>>>>> Lijie
>>>>>> 
>>>>>> Lijie Wang mailto:wangdachui9...@gmail.com> 
>>>>>> <mailto:wangdachui9...@gmail.com>>
>> 于2023年6月15日周四 09:48写道:
>>>>>> 
>>>>>>> Hi Yuxia,
>>>>>>> 
>>>>>>> Thanks for your feedback. The answers of your questions are as
>>>> follows:
>>>>>>> 
>>>>>>> 1. Yes, the row count comes from statistic of underlying table(Or
>>>>>>> estimated based on the statistic of underlying table, if the build
>>>> side
>>>>>> or
>>>>>>> probe side is not TableScan).  If the statistic unavailable, we will
>>>>> not
>>>>>>> inject a runtime filter(As you said, we can hardly evaluate the
>>>>>> benefits).
>>>>>>> Besides, AFAIK, the estimated data size of build side is also based
>>>> on
>>>>>> the
>>>>>>> row count statistics, that is, if the statistics is unavailable, the
>>>>>>> requirement "table.optimizer.runtime-filter.max-build-data-size"
>>>> cannot
>>>>>> be
>>>>>>> eval

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-19 Thread Lijie Wang
>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
> >> which
> >>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t
> need
> >>> to
> >>>> run before the RuntimeFilterBuilder finished. Even if the
> RuntimeFilter
> >>>> becomes running before the RuntimeFilterBuilder finished, it will not
> >>>> process any data and will occupy resources.
> >>>>
> >>>> Best,
> >>>> Lijie
> >>>>
> >>>> Lijie Wang mailto:wangdachui9...@gmail.com>>
> 于2023年6月15日周四 09:48写道:
> >>>>
> >>>>> Hi Yuxia,
> >>>>>
> >>>>> Thanks for your feedback. The answers of your questions are as
> >> follows:
> >>>>>
> >>>>> 1. Yes, the row count comes from statistic of underlying table(Or
> >>>>> estimated based on the statistic of underlying table, if the build
> >> side
> >>>> or
> >>>>> probe side is not TableScan).  If the statistic unavailable, we will
> >>> not
> >>>>> inject a runtime filter(As you said, we can hardly evaluate the
> >>>> benefits).
> >>>>> Besides, AFAIK, the estimated data size of build side is also based
> >> on
> >>>> the
> >>>>> row count statistics, that is, if the statistics is unavailable, the
> >>>>> requirement "table.optimizer.runtime-filter.max-build-data-size"
> >> cannot
> >>>> be
> >>>>> evaluated either. I'll add this point into FLIP.
> >>>>>
> >>>>> 2.
> >>>>> Estimated data size does not meet requirement (in planner
> >> optimization
> >>>>> phase) -> No filter
> >>>>> Estimated data size meets the requirement (in planner optimization
> >>>> phase),
> >>>>> but the real data size does not meet the requirement(in execution
> >>> phase)
> >>>> ->
> >>>>> Fake filter
> >>>>>
> >>>>> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> >>>>>
> >>>>> Best,
> >>>>> Lijie
> >>>>>
> >>>>> yuxia  luoyu...@alumni.sjtu.edu.cn>> 于2023年6月14日周三 20:37写道:
> >>>>>
> >>>>>> Thanks Lijie for starting this discussion. Excited to see runtime
> >>> filter
> >>>>>> is to be implemented in Flink.
> >>>>>> I have few questions about it:
> >>>>>>
> >>>>>> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> >>>>>> instead`. So, does row count comes from the statistic from
> >> underlying
> >>>>>> table? What if the the statistic is also unavailable considering
> >> users
> >>>>>> maynot always remember to generate statistic in production.
> >>>>>> I'm wondering whether it make senese that just disable runtime
> >> filter
> >>> if
> >>>>>> statistic is unavailable since in that case, we can hardly evaluate
> >>> the
> >>>>>> benefits of runtime-filter.
> >>>>>>
> >>>>>>
> >>>>>> 2: The FLIP said: "We will inject the runtime filters only if the
> >>>>>> following requirements are met:xxx", but it also said, "Once this
> >>> limit
> >>>> is
> >>>>>> exceeded, it will output a fake filter(which always returns true)"
> >> in
> >>>>>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory,
> >> so
> >>>> i'm
> >>>>>> wondering what's the real behavior, no filter will be injected or
> >> fake
> >>>>>> filter?
> >>>>>>
> >>>>>>
> >>>>>> 3: Does it also mean runtime-filter can only take effect in blocking
> >>>>>> shuffle?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Best regards,
> >>>>>> Yuxia
> >>>>>>
> >>>>>> - 原始邮件 -
> >>>>>> 发件人: "ron9 liu" mailto:ron9@gmail.com>>
> >>>>>> 收件人: "dev" mailto:dev@flink.apache.org>>
> >>>>>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> >>>>>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch
> >>>> Jobs
> >>>>>>
> >>>>>> Thanks Lijie start this discussion. Runtime Filter is a common
> >>>>>> optimization
> >>>>>> to improve the join performance that has been adopted by many
> >>> computing
> >>>>>> engines such as Spark, Doris, etc... Flink is a streaming batch
> >>>> computing
> >>>>>> engine, and we are continuously optimizing the performance of
> >> batches.
> >>>>>> Runtime filter is a general performance optimization technique that
> >>> can
> >>>>>> improve the performance of Flink batch jobs, so we are introducing
> >> it
> >>> on
> >>>>>> batch as well.
> >>>>>>
> >>>>>> Looking forward to all feedback.
> >>>>>>
> >>>>>> Best,
> >>>>>> Ron
> >>>>>>
> >>>>>> Lijie Wang  wangdachui9...@gmail.com>> 于2023年6月14日周三 17:17写道:
> >>>>>>
> >>>>>>> Hi devs
> >>>>>>>
> >>>>>>> Ron Liu, Gen Luo and I would like to start a discussion about
> >>>> FLIP-324:
> >>>>>>> Introduce Runtime Filter for Flink Batch Jobs[1]
> >>>>>>>
> >>>>>>> Runtime Filter is a common optimization to improve join
> >> performance.
> >>>> It
> >>>>>> is
> >>>>>>> designed to dynamically generate filter conditions for certain
> >> Join
> >>>>>> queries
> >>>>>>> at runtime to reduce the amount of scanned or shuffled data, avoid
> >>>>>>> unnecessary I/O and network transmission, and speed up the query.
> >>> Its
> >>>>>>> working principle is building a filter(e.g. bloom filter) based on
> >>> the
> >>>>>> data
> >>>>>>> on the small table side(build side) first, then pass this filter
> >> to
> >>>> the
> >>>>>>> large table side(probe side) to filter the irrelevant data on it,
> >>> this
> >>>>>> can
> >>>>>>> reduce the data reaching the join and improve performance.
> >>>>>>>
> >>>>>>> You can find more details in the FLIP-324[1]. Looking forward to
> >>> your
> >>>>>>> feedback.
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%253A%2BIntroduce%2BRuntime%2BFilter%2Bfor%2BFlink%2BBatch%2BJobs=gmail-imap=168743377600=AOvVaw0ke1ZHcJ--A1QgsbB84MHA
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Ron & Gen & Lijie
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Best,
> >>> Benchao Li
>
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-16 Thread Stefan Richter
ome tests to show the default value of
>> table.optimizer.runtime-filter.min-probe-data-size 10G is a good default
>> value. The same to table.optimizer.runtime-filter.max-build-data-size
>> 
>>> the runtime filter can be pushed down along the probe side, as close to
>> data sources as possible
>> 
>> What's the representation of the runtime filter node in planner ? Is it a
>> Filternode
>> 
>> Best,
>> 
>> Aitozi.
>> 
>> Benchao Li  于2023年6月15日周四 14:30写道:
>> 
>>> Hi Lijie,
>>> 
>>> Regarding the shuffle mode, I think it would be reasonable to also
>> support
>>> "pipeline shuffle" if possible.
>>> 
>>> "pipeline shuffle" is a essential for OLAP/MPP computing, although this
>> has
>>> not been much exposed to users for now, I know a few companies that uses
>>> Flink as a MPP computing engine, and there is an ongoing effort[1] to
>> make
>>> this usage more powerful.
>>> 
>>> Back to your concern that "Even if the RuntimeFilter becomes running
>> before
>>> the RuntimeFilterBuilder finished, it will not process any data and will
>>> occupy resources", whether it benefits us depends on the scale of data,
>> if
>>> the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
>> operator,
>>> it can still filter out additional data afterwards. Hence in my opinion,
>> we
>>> do not need to make the edge between RuntimeFilterBuilder and
>> RuntimeFilter
>>> BLOCKING only, at least it can be configured.
>>> 
>>> [1] 
>>> https://www.google.com/url?q=https://issues.apache.org/jira/browse/FLINK-25318=gmail-imap=168743377600=AOvVaw3GqdpuiCqegqRLDv1PjMiL
>>> 
>>> Lijie Wang mailto:wangdachui9...@gmail.com>> 
>>> 于2023年6月15日周四 14:18写道:
>>> 
>>>> Hi Yuxia,
>>>> 
>>>> I made a mistake in the above response.
>>>> 
>>>> The runtime filter can work well with all shuffle mode. However, hybrid
>>>> shuffle and blocking shuffle are currently recommended for batch jobs
>>>> (piepline shuffle is not recommended).
>>>> 
>>>> One more thing to mention here is that we will force the edge between
>>>> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of
>> which
>>>> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need
>>> to
>>>> run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
>>>> becomes running before the RuntimeFilterBuilder finished, it will not
>>>> process any data and will occupy resources.
>>>> 
>>>> Best,
>>>> Lijie
>>>> 
>>>> Lijie Wang mailto:wangdachui9...@gmail.com>> 
>>>> 于2023年6月15日周四 09:48写道:
>>>> 
>>>>> Hi Yuxia,
>>>>> 
>>>>> Thanks for your feedback. The answers of your questions are as
>> follows:
>>>>> 
>>>>> 1. Yes, the row count comes from statistic of underlying table(Or
>>>>> estimated based on the statistic of underlying table, if the build
>> side
>>>> or
>>>>> probe side is not TableScan).  If the statistic unavailable, we will
>>> not
>>>>> inject a runtime filter(As you said, we can hardly evaluate the
>>>> benefits).
>>>>> Besides, AFAIK, the estimated data size of build side is also based
>> on
>>>> the
>>>>> row count statistics, that is, if the statistics is unavailable, the
>>>>> requirement "table.optimizer.runtime-filter.max-build-data-size"
>> cannot
>>>> be
>>>>> evaluated either. I'll add this point into FLIP.
>>>>> 
>>>>> 2.
>>>>> Estimated data size does not meet requirement (in planner
>> optimization
>>>>> phase) -> No filter
>>>>> Estimated data size meets the requirement (in planner optimization
>>>> phase),
>>>>> but the real data size does not meet the requirement(in execution
>>> phase)
>>>> ->
>>>>> Fake filter
>>>>> 
>>>>> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
>>>>> 
>>>>> Best,
>>>>> Lijie
>>>>> 
>>>>> yuxia mailto:luoyu...@alumni.sjtu.edu.cn>> 
>>>>> 于2023年6月14日周三 20:37写道:
>>>

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Lijie Wang
; > > BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need
> > to
> > > run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
> > > becomes running before the RuntimeFilterBuilder finished, it will not
> > > process any data and will occupy resources.
> > >
> > > Best,
> > > Lijie
> > >
> > > Lijie Wang  于2023年6月15日周四 09:48写道:
> > >
> > > > Hi Yuxia,
> > > >
> > > > Thanks for your feedback. The answers of your questions are as
> follows:
> > > >
> > > > 1. Yes, the row count comes from statistic of underlying table(Or
> > > > estimated based on the statistic of underlying table, if the build
> side
> > > or
> > > > probe side is not TableScan).  If the statistic unavailable, we will
> > not
> > > > inject a runtime filter(As you said, we can hardly evaluate the
> > > benefits).
> > > > Besides, AFAIK, the estimated data size of build side is also based
> on
> > > the
> > > > row count statistics, that is, if the statistics is unavailable, the
> > > > requirement "table.optimizer.runtime-filter.max-build-data-size"
> cannot
> > > be
> > > > evaluated either. I'll add this point into FLIP.
> > > >
> > > > 2.
> > > > Estimated data size does not meet requirement (in planner
> optimization
> > > > phase) -> No filter
> > > > Estimated data size meets the requirement (in planner optimization
> > > phase),
> > > > but the real data size does not meet the requirement(in execution
> > phase)
> > > ->
> > > > Fake filter
> > > >
> > > > 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> > > >
> > > > Best,
> > > > Lijie
> > > >
> > > > yuxia  于2023年6月14日周三 20:37写道:
> > > >
> > > >> Thanks Lijie for starting this discussion. Excited to see runtime
> > filter
> > > >> is to be implemented in Flink.
> > > >> I have few questions about it:
> > > >>
> > > >> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> > > >> instead`. So, does row count comes from the statistic from
> underlying
> > > >> table? What if the the statistic is also unavailable considering
> users
> > > >> maynot always remember to generate statistic in production.
> > > >> I'm wondering whether it make senese that just disable runtime
> filter
> > if
> > > >> statistic is unavailable since in that case, we can hardly evaluate
> > the
> > > >> benefits of runtime-filter.
> > > >>
> > > >>
> > > >> 2: The FLIP said: "We will inject the runtime filters only if the
> > > >> following requirements are met:xxx", but it also said, "Once this
> > limit
> > > is
> > > >> exceeded, it will output a fake filter(which always returns true)"
> in
> > > >> `RuntimeFilterBuilderOperator` part; Seems they are contradictory,
> so
> > > i'm
> > > >> wondering what's the real behavior, no filter will be injected or
> fake
> > > >> filter?
> > > >>
> > > >>
> > > >> 3: Does it also mean runtime-filter can only take effect in blocking
> > > >> shuffle?
> > > >>
> > > >>
> > > >>
> > > >> Best regards,
> > > >> Yuxia
> > > >>
> > > >> - 原始邮件 -
> > > >> 发件人: "ron9 liu" 
> > > >> 收件人: "dev" 
> > > >> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> > > >> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch
> > > Jobs
> > > >>
> > > >> Thanks Lijie start this discussion. Runtime Filter is a common
> > > >> optimization
> > > >> to improve the join performance that has been adopted by many
> > computing
> > > >> engines such as Spark, Doris, etc... Flink is a streaming batch
> > > computing
> > > >> engine, and we are continuously optimizing the performance of
> batches.
> > > >> Runtime filter is a general performance optimization technique that
> > can
> > > >> improve the performance of Flink batch jobs, so we are introducing
> it
> > on
> > > >> batch as well.
> > > >>
> > > >> Looking forward to all feedback.
> > > >>
> > > >> Best,
> > > >> Ron
> > > >>
> > > >> Lijie Wang  于2023年6月14日周三 17:17写道:
> > > >>
> > > >> > Hi devs
> > > >> >
> > > >> > Ron Liu, Gen Luo and I would like to start a discussion about
> > > FLIP-324:
> > > >> > Introduce Runtime Filter for Flink Batch Jobs[1]
> > > >> >
> > > >> > Runtime Filter is a common optimization to improve join
> performance.
> > > It
> > > >> is
> > > >> > designed to dynamically generate filter conditions for certain
> Join
> > > >> queries
> > > >> > at runtime to reduce the amount of scanned or shuffled data, avoid
> > > >> > unnecessary I/O and network transmission, and speed up the query.
> > Its
> > > >> > working principle is building a filter(e.g. bloom filter) based on
> > the
> > > >> data
> > > >> > on the small table side(build side) first, then pass this filter
> to
> > > the
> > > >> > large table side(probe side) to filter the irrelevant data on it,
> > this
> > > >> can
> > > >> > reduce the data reaching the join and improve performance.
> > > >> >
> > > >> > You can find more details in the FLIP-324[1]. Looking forward to
> > your
> > > >> > feedback.
> > > >> >
> > > >> > [1]
> > > >> >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> > > >> >
> > > >> > Best,
> > > >> > Ron & Gen & Lijie
> > > >> >
> > > >>
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Aitozi
t; >> statistic is unavailable since in that case, we can hardly evaluate
> the
> > >> benefits of runtime-filter.
> > >>
> > >>
> > >> 2: The FLIP said: "We will inject the runtime filters only if the
> > >> following requirements are met:xxx", but it also said, "Once this
> limit
> > is
> > >> exceeded, it will output a fake filter(which always returns true)" in
> > >> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so
> > i'm
> > >> wondering what's the real behavior, no filter will be injected or fake
> > >> filter?
> > >>
> > >>
> > >> 3: Does it also mean runtime-filter can only take effect in blocking
> > >> shuffle?
> > >>
> > >>
> > >>
> > >> Best regards,
> > >> Yuxia
> > >>
> > >> - 原始邮件 -
> > >> 发件人: "ron9 liu" 
> > >> 收件人: "dev" 
> > >> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> > >> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch
> > Jobs
> > >>
> > >> Thanks Lijie start this discussion. Runtime Filter is a common
> > >> optimization
> > >> to improve the join performance that has been adopted by many
> computing
> > >> engines such as Spark, Doris, etc... Flink is a streaming batch
> > computing
> > >> engine, and we are continuously optimizing the performance of batches.
> > >> Runtime filter is a general performance optimization technique that
> can
> > >> improve the performance of Flink batch jobs, so we are introducing it
> on
> > >> batch as well.
> > >>
> > >> Looking forward to all feedback.
> > >>
> > >> Best,
> > >> Ron
> > >>
> > >> Lijie Wang  于2023年6月14日周三 17:17写道:
> > >>
> > >> > Hi devs
> > >> >
> > >> > Ron Liu, Gen Luo and I would like to start a discussion about
> > FLIP-324:
> > >> > Introduce Runtime Filter for Flink Batch Jobs[1]
> > >> >
> > >> > Runtime Filter is a common optimization to improve join performance.
> > It
> > >> is
> > >> > designed to dynamically generate filter conditions for certain Join
> > >> queries
> > >> > at runtime to reduce the amount of scanned or shuffled data, avoid
> > >> > unnecessary I/O and network transmission, and speed up the query.
> Its
> > >> > working principle is building a filter(e.g. bloom filter) based on
> the
> > >> data
> > >> > on the small table side(build side) first, then pass this filter to
> > the
> > >> > large table side(probe side) to filter the irrelevant data on it,
> this
> > >> can
> > >> > reduce the data reaching the join and improve performance.
> > >> >
> > >> > You can find more details in the FLIP-324[1]. Looking forward to
> your
> > >> > feedback.
> > >> >
> > >> > [1]
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> > >> >
> > >> > Best,
> > >> > Ron & Gen & Lijie
> > >> >
> > >>
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Benchao Li
Hi Lijie,

Regarding the shuffle mode, I think it would be reasonable to also support
"pipeline shuffle" if possible.

"pipeline shuffle" is a essential for OLAP/MPP computing, although this has
not been much exposed to users for now, I know a few companies that uses
Flink as a MPP computing engine, and there is an ongoing effort[1] to make
this usage more powerful.

Back to your concern that "Even if the RuntimeFilter becomes running before
the RuntimeFilterBuilder finished, it will not process any data and will
occupy resources", whether it benefits us depends on the scale of data, if
the RuntimeFIlterBuilder could be done quickly than RuntimeFilter operator,
it can still filter out additional data afterwards. Hence in my opinion, we
do not need to make the edge between RuntimeFilterBuilder and RuntimeFilter
BLOCKING only, at least it can be configured.

[1] https://issues.apache.org/jira/browse/FLINK-25318

Lijie Wang  于2023年6月15日周四 14:18写道:

> Hi Yuxia,
>
> I made a mistake in the above response.
>
> The runtime filter can work well with all shuffle mode. However, hybrid
> shuffle and blocking shuffle are currently recommended for batch jobs
> (piepline shuffle is not recommended).
>
> One more thing to mention here is that we will force the edge between
> RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
> BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need to
> run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
> becomes running before the RuntimeFilterBuilder finished, it will not
> process any data and will occupy resources.
>
> Best,
> Lijie
>
> Lijie Wang  于2023年6月15日周四 09:48写道:
>
> > Hi Yuxia,
> >
> > Thanks for your feedback. The answers of your questions are as follows:
> >
> > 1. Yes, the row count comes from statistic of underlying table(Or
> > estimated based on the statistic of underlying table, if the build side
> or
> > probe side is not TableScan).  If the statistic unavailable, we will not
> > inject a runtime filter(As you said, we can hardly evaluate the
> benefits).
> > Besides, AFAIK, the estimated data size of build side is also based on
> the
> > row count statistics, that is, if the statistics is unavailable, the
> > requirement "table.optimizer.runtime-filter.max-build-data-size" cannot
> be
> > evaluated either. I'll add this point into FLIP.
> >
> > 2.
> > Estimated data size does not meet requirement (in planner optimization
> > phase) -> No filter
> > Estimated data size meets the requirement (in planner optimization
> phase),
> > but the real data size does not meet the requirement(in execution phase)
> ->
> > Fake filter
> >
> > 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
> >
> > Best,
> > Lijie
> >
> > yuxia  于2023年6月14日周三 20:37写道:
> >
> >> Thanks Lijie for starting this discussion. Excited to see runtime filter
> >> is to be implemented in Flink.
> >> I have few questions about it:
> >>
> >> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> >> instead`. So, does row count comes from the statistic from underlying
> >> table? What if the the statistic is also unavailable considering users
> >> maynot always remember to generate statistic in production.
> >> I'm wondering whether it make senese that just disable runtime filter if
> >> statistic is unavailable since in that case, we can hardly evaluate the
> >> benefits of runtime-filter.
> >>
> >>
> >> 2: The FLIP said: "We will inject the runtime filters only if the
> >> following requirements are met:xxx", but it also said, "Once this limit
> is
> >> exceeded, it will output a fake filter(which always returns true)" in
> >> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so
> i'm
> >> wondering what's the real behavior, no filter will be injected or fake
> >> filter?
> >>
> >>
> >> 3: Does it also mean runtime-filter can only take effect in blocking
> >> shuffle?
> >>
> >>
> >>
> >> Best regards,
> >> Yuxia
> >>
> >> - 原始邮件 -
> >> 发件人: "ron9 liu" 
> >> 收件人: "dev" 
> >> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> >> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch
> Jobs
> >>
> >> Thanks Lijie start this discussion. Runtime Filter is a common
> >> optimization
> >> to improve the join performance that has bee

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-15 Thread Lijie Wang
Hi Yuxia,

I made a mistake in the above response.

The runtime filter can work well with all shuffle mode. However, hybrid
shuffle and blocking shuffle are currently recommended for batch jobs
(piepline shuffle is not recommended).

One more thing to mention here is that we will force the edge between
RuntimeFilterBuilder and RuntimeFilter to be BLOCKING(regardless of which
BatchShuffleMode is set). Because the RuntimeFilter really doesn’t need to
run before the RuntimeFilterBuilder finished. Even if the RuntimeFilter
becomes running before the RuntimeFilterBuilder finished, it will not
process any data and will occupy resources.

Best,
Lijie

Lijie Wang  于2023年6月15日周四 09:48写道:

> Hi Yuxia,
>
> Thanks for your feedback. The answers of your questions are as follows:
>
> 1. Yes, the row count comes from statistic of underlying table(Or
> estimated based on the statistic of underlying table, if the build side or
> probe side is not TableScan).  If the statistic unavailable, we will not
> inject a runtime filter(As you said, we can hardly evaluate the benefits).
> Besides, AFAIK, the estimated data size of build side is also based on the
> row count statistics, that is, if the statistics is unavailable, the
> requirement "table.optimizer.runtime-filter.max-build-data-size" cannot be
> evaluated either. I'll add this point into FLIP.
>
> 2.
> Estimated data size does not meet requirement (in planner optimization
> phase) -> No filter
> Estimated data size meets the requirement (in planner optimization phase),
> but the real data size does not meet the requirement(in execution phase) ->
> Fake filter
>
> 3. Yes, the runtime filter is only for batch jobs/blocking shuffle.
>
> Best,
> Lijie
>
> yuxia  于2023年6月14日周三 20:37写道:
>
>> Thanks Lijie for starting this discussion. Excited to see runtime filter
>> is to be implemented in Flink.
>> I have few questions about it:
>>
>> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
>> instead`. So, does row count comes from the statistic from underlying
>> table? What if the the statistic is also unavailable considering users
>> maynot always remember to generate statistic in production.
>> I'm wondering whether it make senese that just disable runtime filter if
>> statistic is unavailable since in that case, we can hardly evaluate the
>> benefits of runtime-filter.
>>
>>
>> 2: The FLIP said: "We will inject the runtime filters only if the
>> following requirements are met:xxx", but it also said, "Once this limit is
>> exceeded, it will output a fake filter(which always returns true)" in
>> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
>> wondering what's the real behavior, no filter will be injected or fake
>> filter?
>>
>>
>> 3: Does it also mean runtime-filter can only take effect in blocking
>> shuffle?
>>
>>
>>
>> Best regards,
>> Yuxia
>>
>> - 原始邮件 -
>> 发件人: "ron9 liu" 
>> 收件人: "dev" 
>> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
>> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>>
>> Thanks Lijie start this discussion. Runtime Filter is a common
>> optimization
>> to improve the join performance that has been adopted by many computing
>> engines such as Spark, Doris, etc... Flink is a streaming batch computing
>> engine, and we are continuously optimizing the performance of batches.
>> Runtime filter is a general performance optimization technique that can
>> improve the performance of Flink batch jobs, so we are introducing it on
>> batch as well.
>>
>> Looking forward to all feedback.
>>
>> Best,
>> Ron
>>
>> Lijie Wang  于2023年6月14日周三 17:17写道:
>>
>> > Hi devs
>> >
>> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
>> > Introduce Runtime Filter for Flink Batch Jobs[1]
>> >
>> > Runtime Filter is a common optimization to improve join performance. It
>> is
>> > designed to dynamically generate filter conditions for certain Join
>> queries
>> > at runtime to reduce the amount of scanned or shuffled data, avoid
>> > unnecessary I/O and network transmission, and speed up the query. Its
>> > working principle is building a filter(e.g. bloom filter) based on the
>> data
>> > on the small table side(build side) first, then pass this filter to the
>> > large table side(probe side) to filter the irrelevant data on it, this
>> can
>> > reduce the data reaching the join and improve performance.
>> >
>> > You can find more details in the FLIP-324[1]. Looking forward to your
>> > feedback.
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>> >
>> > Best,
>> > Ron & Gen & Lijie
>> >
>>
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread Lijie Wang
Hi Yuxia,

Thanks for your feedback. The answers of your questions are as follows:

1. Yes, the row count comes from statistic of underlying table(Or estimated
based on the statistic of underlying table, if the build side or probe side
is not TableScan).  If the statistic unavailable, we will not inject a
runtime filter(As you said, we can hardly evaluate the benefits). Besides,
AFAIK, the estimated data size of build side is also based on the row count
statistics, that is, if the statistics is unavailable, the requirement
"table.optimizer.runtime-filter.max-build-data-size" cannot be evaluated
either. I'll add this point into FLIP.

2.
Estimated data size does not meet requirement (in planner optimization
phase) -> No filter
Estimated data size meets the requirement (in planner optimization phase),
but the real data size does not meet the requirement(in execution phase) ->
Fake filter

3. Yes, the runtime filter is only for batch jobs/blocking shuffle.

Best,
Lijie

yuxia  于2023年6月14日周三 20:37写道:

> Thanks Lijie for starting this discussion. Excited to see runtime filter
> is to be implemented in Flink.
> I have few questions about it:
>
> 1: As the FLIP said, `if the ndv cannot be estimated, use row count
> instead`. So, does row count comes from the statistic from underlying
> table? What if the the statistic is also unavailable considering users
> maynot always remember to generate statistic in production.
> I'm wondering whether it make senese that just disable runtime filter if
> statistic is unavailable since in that case, we can hardly evaluate the
> benefits of runtime-filter.
>
>
> 2: The FLIP said: "We will inject the runtime filters only if the
> following requirements are met:xxx", but it also said, "Once this limit is
> exceeded, it will output a fake filter(which always returns true)" in
> `RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm
> wondering what's the real behavior, no filter will be injected or fake
> filter?
>
>
> 3: Does it also mean runtime-filter can only take effect in blocking
> shuffle?
>
>
>
> Best regards,
> Yuxia
>
> ----- 原始邮件 -
> 发件人: "ron9 liu" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
> 主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs
>
> Thanks Lijie start this discussion. Runtime Filter is a common optimization
> to improve the join performance that has been adopted by many computing
> engines such as Spark, Doris, etc... Flink is a streaming batch computing
> engine, and we are continuously optimizing the performance of batches.
> Runtime filter is a general performance optimization technique that can
> improve the performance of Flink batch jobs, so we are introducing it on
> batch as well.
>
> Looking forward to all feedback.
>
> Best,
> Ron
>
> Lijie Wang  于2023年6月14日周三 17:17写道:
>
> > Hi devs
> >
> > Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> > Introduce Runtime Filter for Flink Batch Jobs[1]
> >
> > Runtime Filter is a common optimization to improve join performance. It
> is
> > designed to dynamically generate filter conditions for certain Join
> queries
> > at runtime to reduce the amount of scanned or shuffled data, avoid
> > unnecessary I/O and network transmission, and speed up the query. Its
> > working principle is building a filter(e.g. bloom filter) based on the
> data
> > on the small table side(build side) first, then pass this filter to the
> > large table side(probe side) to filter the irrelevant data on it, this
> can
> > reduce the data reaching the join and improve performance.
> >
> > You can find more details in the FLIP-324[1]. Looking forward to your
> > feedback.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
> >
> > Best,
> > Ron & Gen & Lijie
> >
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread yuxia
Thanks Lijie for starting this discussion. Excited to see runtime filter is to 
be implemented in Flink.
I have few questions about it:

1: As the FLIP said, `if the ndv cannot be estimated, use row count instead`. 
So, does row count comes from the statistic from underlying table? What if the 
the statistic is also unavailable considering users maynot always remember to 
generate statistic in production.
I'm wondering whether it make senese that just disable runtime filter if 
statistic is unavailable since in that case, we can hardly evaluate the 
benefits of runtime-filter.
 

2: The FLIP said: "We will inject the runtime filters only if the following 
requirements are met:xxx", but it also said, "Once this limit is exceeded, it 
will output a fake filter(which always returns true)" in 
`RuntimeFilterBuilderOperator` part; Seems they are contradictory, so i'm 
wondering what's the real behavior, no filter will be injected or fake filter?


3: Does it also mean runtime-filter can only take effect in blocking shuffle?



Best regards,
Yuxia

- 原始邮件 -
发件人: "ron9 liu" 
收件人: "dev" 
发送时间: 星期三, 2023年 6 月 14日 下午 5:29:28
主题: Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

Thanks Lijie start this discussion. Runtime Filter is a common optimization
to improve the join performance that has been adopted by many computing
engines such as Spark, Doris, etc... Flink is a streaming batch computing
engine, and we are continuously optimizing the performance of batches.
Runtime filter is a general performance optimization technique that can
improve the performance of Flink batch jobs, so we are introducing it on
batch as well.

Looking forward to all feedback.

Best,
Ron

Lijie Wang  于2023年6月14日周三 17:17写道:

> Hi devs
>
> Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> Introduce Runtime Filter for Flink Batch Jobs[1]
>
> Runtime Filter is a common optimization to improve join performance. It is
> designed to dynamically generate filter conditions for certain Join queries
> at runtime to reduce the amount of scanned or shuffled data, avoid
> unnecessary I/O and network transmission, and speed up the query. Its
> working principle is building a filter(e.g. bloom filter) based on the data
> on the small table side(build side) first, then pass this filter to the
> large table side(probe side) to filter the irrelevant data on it, this can
> reduce the data reaching the join and improve performance.
>
> You can find more details in the FLIP-324[1]. Looking forward to your
> feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>
> Best,
> Ron & Gen & Lijie
>


Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-14 Thread liu ron
Thanks Lijie start this discussion. Runtime Filter is a common optimization
to improve the join performance that has been adopted by many computing
engines such as Spark, Doris, etc... Flink is a streaming batch computing
engine, and we are continuously optimizing the performance of batches.
Runtime filter is a general performance optimization technique that can
improve the performance of Flink batch jobs, so we are introducing it on
batch as well.

Looking forward to all feedback.

Best,
Ron

Lijie Wang  于2023年6月14日周三 17:17写道:

> Hi devs
>
> Ron Liu, Gen Luo and I would like to start a discussion about FLIP-324:
> Introduce Runtime Filter for Flink Batch Jobs[1]
>
> Runtime Filter is a common optimization to improve join performance. It is
> designed to dynamically generate filter conditions for certain Join queries
> at runtime to reduce the amount of scanned or shuffled data, avoid
> unnecessary I/O and network transmission, and speed up the query. Its
> working principle is building a filter(e.g. bloom filter) based on the data
> on the small table side(build side) first, then pass this filter to the
> large table side(probe side) to filter the irrelevant data on it, this can
> reduce the data reaching the join and improve performance.
>
> You can find more details in the FLIP-324[1]. Looking forward to your
> feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-324%3A+Introduce+Runtime+Filter+for+Flink+Batch+Jobs
>
> Best,
> Ron & Gen & Lijie
>