Hi all,

Thanks for all the feedback and suggestions so far. If there are no
further comments, we will open the voting thread tomorrow, Aug 6, 2024.

Best,
Xia

Lincoln Lee <lincoln.8...@gmail.com> 于2024年8月1日周四 22:18写道:

> Hi Xia,
>
> Thanks for your updates! Looks good to me.
>
> Best,
> Lincoln Lee
>
>
> Xia Sun <xingbe...@gmail.com> 于2024年8月1日周四 11:15写道:
>
> > Hi Lincoln,
> >
> > Thanks for your detailed explanation. I understand your concern.
> > Introducing configuration with redundant semantics can indeed confuse
> > users, and the engine should minimize user exposure to these details.
> Based
> > on this premise, while also ensuring that users can choose to enable the
> > broadcast hash join optimization during either the compile-time or
> runtime,
> > I think we can introduce a new configuration
> > `table.optimizer.adaptive-broadcast-join.strategy`, and reuse the
> existing
> > configuration `table.optimizer.join.broadcast-threshold` as a unified
> > threshold for determining broadcast hash join optimization. The
> > `table.optimizer.adaptive-broadcast-join.strategy` configuration would be
> > of an enumeration type with three options:
> >
> > AUTO: Flink will autonomously select the optimal timing for the
> > optimization.
> > RUNTIME_ONLY: The broadcast hash join optimization will only be performed
> > at runtime.
> > NONE: The broadcast hash join optimization will only be performed at
> > compile phase.
> > And AUTO will be the default option.
> >
> > I have also updated this information in FLIP, PTAL.
> >
> > Best,
> > Xia
> >
> > Lincoln Lee <lincoln.8...@gmail.com> 于2024年7月30日周二 23:39写道:
> >
> > > Thanks Xia for your explanation!
> > >
> > > I can understand your concern, but considering the design of this FLIP,
> > > which already covers compile-time inaccurate optimization for runtime
> > > de-optimization, is it necessary to make the user manually turn off
> > > 'table.optimizer.join.broadcast-threshold' and set the new
> > > 'table.optimizer.adaptive.join.broadcast-threshold' again? Another
> option
> > > is that users only need to focus on the existing broadcast size
> > threshold,
> > > and accept the reality that 100% accurate optimization cannot be done
> > > at compile time, and adopt the new capability of dynamic optimization
> at
> > > runtime, and ultimately, users will trust that flink will always
> optimize
> > > accurately, and from this point of view, I would prefer a generic
> > parameter
> > > 'table.optimizer. adaptive-optimization.enabled', which would allow for
> > > more dynamic optimization in the future, not limited to broadcast join
> > > scenarios and will not continuously bring more new options, WDYT?
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Xia Sun <xingbe...@gmail.com> 于2024年7月30日周二 11:27写道:
> > >
> > > > Hi Lincoln,
> > > >
> > > > Thank you for your input and participation in the discussion!
> > > >
> > > > Compared to introducing the 'table.optimizer.adaptive-join.enabled'
> > > option,
> > > > introducing the "table.optimizer.adaptive.join.broadcast-threshold"
> can
> > > > also cover the need to disable static broadcast optimization while
> only
> > > > enabling dynamic broadcast optimization. From this perspective,
> > > introducing
> > > > a new threshold configuration might be more appropriate. What do you
> > > think?
> > > >
> > > > Best,
> > > > Xia
> > > >
> > > > Lincoln Lee <lincoln.8...@gmail.com> 于2024年7月29日周一 23:12写道:
> > > >
> > > > > +1 for this useful optimization!
> > > > >
> > > > > I have a question about the new optoin, do we really need two
> > broadcast
> > > > > join thresholds? IIUC, this adaptive broadcast join is a complement
> > to
> > > > > compile-time optimization, there is no need for the user to
> configure
> > > two
> > > > > different thresholds (not the off represented by -1), so we just
> want
> > > to
> > > > > control the adaptive optimization itself, should we provide a
> > > > configuration
> > > > > option like 'table.optimizer.adaptive-join.enabled' or a more
> general
> > > one
> > > > > 'table.optimizer.adaptive-optimization.enabled' for such related
> > > > > optimizations?
> > > > >
> > > > >
> > > > > Best,
> > > > > Lincoln Lee
> > > > >
> > > > >
> > > > > Ron Liu <ron9....@gmail.com> 于2024年7月26日周五 11:59写道:
> > > > >
> > > > > > Hi, Xia
> > > > > >
> > > > > > Thanks for your reply. It looks good to me.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > > Ron
> > > > > >
> > > > > > Xia Sun <xingbe...@gmail.com> 于2024年7月26日周五 10:49写道:
> > > > > >
> > > > > > > Hi Ron,
> > > > > > >
> > > > > > > Thanks for your feedback!
> > > > > > >
> > > > > > > -> creation of the join operators until runtime
> > > > > > >
> > > > > > >
> > > > > > > That means when creating the AdaptiveJoinOperatorFactory, we
> will
> > > not
> > > > > > > immediately create the JoinOperator. Instead, we only pass in
> the
> > > > > > necessary
> > > > > > > parameters for creating the JoinOperator. The appropriate
> > > > JoinOperator
> > > > > > will
> > > > > > > be created during the StreamGraphOptimizationStrategy
> > optimization
> > > > > phase.
> > > > > > >
> > > > > > > You mentioned that the runtime's visibility into the table
> > planner
> > > is
> > > > > > > indeed an issue. It includes two aspects,
> > > > > > > (1) we plan to place both implementations of the
> > > > > > > AdaptiveBroadcastJoinOptimizationStrategy and
> > > > > AdaptiveJoinOperatorFactory
> > > > > > > in the table layer. During the runtime phase, we will obtain
> the
> > > > > > > AdaptiveBroadcastJoinOptimizationStrategy through class
> loading.
> > > > > > Therefore,
> > > > > > > the flink-runtime does not need to be aware of the table
> layer's
> > > > > > > implementation.
> > > > > > > (2) Since the dynamic codegen in the
> AdaptiveJoinOperatorFactory
> > > > needs
> > > > > to
> > > > > > > be aware of the table planner, we will consider placing the
> > > > > > > AdaptiveJoinOperatorFactory in the table planner module as
> well.
> > > > > > >
> > > > > > >
> > > > > > >  -> When did you configure these optimization strategies
> > uniformly
> > > > into
> > > > > > > >
> `execution.batch.adaptive.stream-graph-optimization.strategies`
> > > > > > >
> > > > > > >
> > > > > > > Thank you for pointing out this issue. When there are multiple
> > > > > > > StreamGraphOptimizationStrategies, the optimization order at
> the
> > > > > runtime
> > > > > > > phase will strictly follow the order specified in the
> > configuration
> > > > > > option
> > > > > > >
> `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > > > > > Therefore,
> > > > > > > it is necessary to have a unified configuration during the sql
> > > > planner
> > > > > > > phase to ensure the correct optimization order. Currently, we
> are
> > > > > > > considering performing this unified configuration in
> > > > > > > BatchPlanner#afterTranslation().
> > > > > > >
> > > > > > > For simplicity, as long as the adaptive broadcast join/skewed
> > join
> > > > > > > optimization features are enabled (e.g.,
> > > > > > > `table.optimizer.adaptive.join.broadcast-threshold` is not -1),
> > the
> > > > > > > corresponding strategy will be configured. This optimization is
> > > > > > independent
> > > > > > > of the specific SQL query, although it might not produce any
> > actual
> > > > > > effect.
> > > > > > >
> > > > > > > Best,
> > > > > > > Xia
> > > > > > >
> > > > > > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道:
> > > > > > >
> > > > > > > > Hi, Xia
> > > > > > > >
> > > > > > > > This FLIP looks good to me, +1.
> > > > > > > >
> > > > > > > > I've two questions:
> > > > > > > >
> > > > > > > > 1.
> > > > > > > > >> Accordingly, in terms of implementation, we will delay the
> > > > codegen
> > > > > > and
> > > > > > > > creation of the join operators until runtime.
> > > > > > > >
> > > > > > > > How are you delaying codegen to runtime, the current runtime
> is
> > > not
> > > > > SQL
> > > > > > > > planner aware. in other words, how do I understand this
> > sentence?
> > > > > > > >
> > > > > > > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy
> to
> > > > > runtime
> > > > > > > via
> > > > > > > > option
> > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > > > > > > In
> > > > > > > > SQL planner if you have multiple different optimization
> > > strategies
> > > > > like
> > > > > > > > broadcast join, skew join, etc...  When did you configure
> these
> > > > > > > > optimization strategies uniformly into
> > > > > > > >
> > `execution.batch.adaptive.stream-graph-optimization.strategies`?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道:
> > > > > > > >
> > > > > > > > > +1 for the FLIP
> > > > > > > > >
> > > > > > > > > It's a good start to adaptively optimize the logical
> > execution
> > > > plan
> > > > > > > with
> > > > > > > > > runtime information.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Zhu
> > > > > > > > >
> > > > > > > > > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道:
> > > > > > > > >
> > > > > > > > > > Hi devs,
> > > > > > > > > >
> > > > > > > > > > Junrui Lee, Lei Yang, and I would like to initiate a
> > > discussion
> > > > > > about
> > > > > > > > > > FLIP-470: Support Adaptive Broadcast Join[1].
> > > > > > > > > >
> > > > > > > > > > In general, Broadcast Hash Join is currently the most
> > > efficient
> > > > > > join
> > > > > > > > > > strategy available in Flink. However, its prerequisite is
> > > that
> > > > > the
> > > > > > > > input
> > > > > > > > > > data on one side must be sufficiently small; otherwise,
> it
> > > may
> > > > > lead
> > > > > > > to
> > > > > > > > > > memory overuse or other issues. Currently, due to the
> lack
> > of
> > > > > > precise
> > > > > > > > > > statistics, it is difficult to make accurate estimations
> > > during
> > > > > the
> > > > > > > > Flink
> > > > > > > > > > SQL Planning phase. For example, when an upstream Filter
> > > > operator
> > > > > > is
> > > > > > > > > > present, it is easy to overestimate the size of the
> table,
> > > > > whereas
> > > > > > > with
> > > > > > > > > > an expansion operator, the table size tends to be
> > > > underestimated.
> > > > > > > > > Moreover,
> > > > > > > > > > once the join operator is determined, it cannot be
> modified
> > > at
> > > > > > > runtime.
> > > > > > > > > >
> > > > > > > > > > To address this issue, we plan to introduce Adaptive
> > > Broadcast
> > > > > Join
> > > > > > > > > > capability based on FLIP-468: Introducing
> StreamGraph-Based
> > > Job
> > > > > > > > > > Submission[2]
> > > > > > > > > > and FLIP-469: Supports Adaptive Optimization of
> > > StreamGraph[3].
> > > > > > This
> > > > > > > > will
> > > > > > > > > > allow the join operator to be dynamically optimized to
> > > > Broadcast
> > > > > > Join
> > > > > > > > > based
> > > > > > > > > > on the actual input data volume at runtime and fallback
> > when
> > > > the
> > > > > > > > > > optimization
> > > > > > > > > > conditions are not met.
> > > > > > > > > >
> > > > > > > > > > For more details, please refer to FLIP-470[1]. We look
> > > forward
> > > > to
> > > > > > > your
> > > > > > > > > > feedback.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Junrui Lee, Lei Yang and Xia Sun
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> > > > > > > > > > [3]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to