Hi Lincoln,

Thanks for your detailed explanation. I understand your concern.
Introducing configuration with redundant semantics can indeed confuse
users, and the engine should minimize user exposure to these details. Based
on this premise, while also ensuring that users can choose to enable the
broadcast hash join optimization during either the compile-time or runtime,
I think we can introduce a new configuration
`table.optimizer.adaptive-broadcast-join.strategy`, and reuse the existing
configuration `table.optimizer.join.broadcast-threshold` as a unified
threshold for determining broadcast hash join optimization. The
`table.optimizer.adaptive-broadcast-join.strategy` configuration would be
of an enumeration type with three options:

AUTO: Flink will autonomously select the optimal timing for the
optimization.
RUNTIME_ONLY: The broadcast hash join optimization will only be performed
at runtime.
NONE: The broadcast hash join optimization will only be performed at
compile phase.
And AUTO will be the default option.

I have also updated this information in FLIP, PTAL.

Best,
Xia

Lincoln Lee <lincoln.8...@gmail.com> 于2024年7月30日周二 23:39写道:

> Thanks Xia for your explanation!
>
> I can understand your concern, but considering the design of this FLIP,
> which already covers compile-time inaccurate optimization for runtime
> de-optimization, is it necessary to make the user manually turn off
> 'table.optimizer.join.broadcast-threshold' and set the new
> 'table.optimizer.adaptive.join.broadcast-threshold' again? Another option
> is that users only need to focus on the existing broadcast size threshold,
> and accept the reality that 100% accurate optimization cannot be done
> at compile time, and adopt the new capability of dynamic optimization at
> runtime, and ultimately, users will trust that flink will always optimize
> accurately, and from this point of view, I would prefer a generic parameter
> 'table.optimizer. adaptive-optimization.enabled', which would allow for
> more dynamic optimization in the future, not limited to broadcast join
> scenarios and will not continuously bring more new options, WDYT?
>
>
> Best,
> Lincoln Lee
>
>
> Xia Sun <xingbe...@gmail.com> 于2024年7月30日周二 11:27写道:
>
> > Hi Lincoln,
> >
> > Thank you for your input and participation in the discussion!
> >
> > Compared to introducing the 'table.optimizer.adaptive-join.enabled'
> option,
> > introducing the "table.optimizer.adaptive.join.broadcast-threshold" can
> > also cover the need to disable static broadcast optimization while only
> > enabling dynamic broadcast optimization. From this perspective,
> introducing
> > a new threshold configuration might be more appropriate. What do you
> think?
> >
> > Best,
> > Xia
> >
> > Lincoln Lee <lincoln.8...@gmail.com> 于2024年7月29日周一 23:12写道:
> >
> > > +1 for this useful optimization!
> > >
> > > I have a question about the new optoin, do we really need two broadcast
> > > join thresholds? IIUC, this adaptive broadcast join is a complement to
> > > compile-time optimization, there is no need for the user to configure
> two
> > > different thresholds (not the off represented by -1), so we just want
> to
> > > control the adaptive optimization itself, should we provide a
> > configuration
> > > option like 'table.optimizer.adaptive-join.enabled' or a more general
> one
> > > 'table.optimizer.adaptive-optimization.enabled' for such related
> > > optimizations?
> > >
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > Ron Liu <ron9....@gmail.com> 于2024年7月26日周五 11:59写道:
> > >
> > > > Hi, Xia
> > > >
> > > > Thanks for your reply. It looks good to me.
> > > >
> > > >
> > > > Best,
> > > > Ron
> > > >
> > > > Xia Sun <xingbe...@gmail.com> 于2024年7月26日周五 10:49写道:
> > > >
> > > > > Hi Ron,
> > > > >
> > > > > Thanks for your feedback!
> > > > >
> > > > > -> creation of the join operators until runtime
> > > > >
> > > > >
> > > > > That means when creating the AdaptiveJoinOperatorFactory, we will
> not
> > > > > immediately create the JoinOperator. Instead, we only pass in the
> > > > necessary
> > > > > parameters for creating the JoinOperator. The appropriate
> > JoinOperator
> > > > will
> > > > > be created during the StreamGraphOptimizationStrategy optimization
> > > phase.
> > > > >
> > > > > You mentioned that the runtime's visibility into the table planner
> is
> > > > > indeed an issue. It includes two aspects,
> > > > > (1) we plan to place both implementations of the
> > > > > AdaptiveBroadcastJoinOptimizationStrategy and
> > > AdaptiveJoinOperatorFactory
> > > > > in the table layer. During the runtime phase, we will obtain the
> > > > > AdaptiveBroadcastJoinOptimizationStrategy through class loading.
> > > > Therefore,
> > > > > the flink-runtime does not need to be aware of the table layer's
> > > > > implementation.
> > > > > (2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory
> > needs
> > > to
> > > > > be aware of the table planner, we will consider placing the
> > > > > AdaptiveJoinOperatorFactory in the table planner module as well.
> > > > >
> > > > >
> > > > >  -> When did you configure these optimization strategies uniformly
> > into
> > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`
> > > > >
> > > > >
> > > > > Thank you for pointing out this issue. When there are multiple
> > > > > StreamGraphOptimizationStrategies, the optimization order at the
> > > runtime
> > > > > phase will strictly follow the order specified in the configuration
> > > > option
> > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > > > Therefore,
> > > > > it is necessary to have a unified configuration during the sql
> > planner
> > > > > phase to ensure the correct optimization order. Currently, we are
> > > > > considering performing this unified configuration in
> > > > > BatchPlanner#afterTranslation().
> > > > >
> > > > > For simplicity, as long as the adaptive broadcast join/skewed join
> > > > > optimization features are enabled (e.g.,
> > > > > `table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
> > > > > corresponding strategy will be configured. This optimization is
> > > > independent
> > > > > of the specific SQL query, although it might not produce any actual
> > > > effect.
> > > > >
> > > > > Best,
> > > > > Xia
> > > > >
> > > > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道:
> > > > >
> > > > > > Hi, Xia
> > > > > >
> > > > > > This FLIP looks good to me, +1.
> > > > > >
> > > > > > I've two questions:
> > > > > >
> > > > > > 1.
> > > > > > >> Accordingly, in terms of implementation, we will delay the
> > codegen
> > > > and
> > > > > > creation of the join operators until runtime.
> > > > > >
> > > > > > How are you delaying codegen to runtime, the current runtime is
> not
> > > SQL
> > > > > > planner aware. in other words, how do I understand this sentence?
> > > > > >
> > > > > > 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to
> > > runtime
> > > > > via
> > > > > > option
> > > `execution.batch.adaptive.stream-graph-optimization.strategies`.
> > > > > In
> > > > > > SQL planner if you have multiple different optimization
> strategies
> > > like
> > > > > > broadcast join, skew join, etc...  When did you configure these
> > > > > > optimization strategies uniformly into
> > > > > > `execution.batch.adaptive.stream-graph-optimization.strategies`?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道:
> > > > > >
> > > > > > > +1 for the FLIP
> > > > > > >
> > > > > > > It's a good start to adaptively optimize the logical execution
> > plan
> > > > > with
> > > > > > > runtime information.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Zhu
> > > > > > >
> > > > > > > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道:
> > > > > > >
> > > > > > > > Hi devs,
> > > > > > > >
> > > > > > > > Junrui Lee, Lei Yang, and I would like to initiate a
> discussion
> > > > about
> > > > > > > > FLIP-470: Support Adaptive Broadcast Join[1].
> > > > > > > >
> > > > > > > > In general, Broadcast Hash Join is currently the most
> efficient
> > > > join
> > > > > > > > strategy available in Flink. However, its prerequisite is
> that
> > > the
> > > > > > input
> > > > > > > > data on one side must be sufficiently small; otherwise, it
> may
> > > lead
> > > > > to
> > > > > > > > memory overuse or other issues. Currently, due to the lack of
> > > > precise
> > > > > > > > statistics, it is difficult to make accurate estimations
> during
> > > the
> > > > > > Flink
> > > > > > > > SQL Planning phase. For example, when an upstream Filter
> > operator
> > > > is
> > > > > > > > present, it is easy to overestimate the size of the table,
> > > whereas
> > > > > with
> > > > > > > > an expansion operator, the table size tends to be
> > underestimated.
> > > > > > > Moreover,
> > > > > > > > once the join operator is determined, it cannot be modified
> at
> > > > > runtime.
> > > > > > > >
> > > > > > > > To address this issue, we plan to introduce Adaptive
> Broadcast
> > > Join
> > > > > > > > capability based on FLIP-468: Introducing StreamGraph-Based
> Job
> > > > > > > > Submission[2]
> > > > > > > > and FLIP-469: Supports Adaptive Optimization of
> StreamGraph[3].
> > > > This
> > > > > > will
> > > > > > > > allow the join operator to be dynamically optimized to
> > Broadcast
> > > > Join
> > > > > > > based
> > > > > > > > on the actual input data volume at runtime and fallback when
> > the
> > > > > > > > optimization
> > > > > > > > conditions are not met.
> > > > > > > >
> > > > > > > > For more details, please refer to FLIP-470[1]. We look
> forward
> > to
> > > > > your
> > > > > > > > feedback.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Junrui Lee, Lei Yang and Xia Sun
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > > > > > > [2]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> > > > > > > > [3]
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to