Hi Ron,

Thanks for your feedback!

-> creation of the join operators until runtime


That means when creating the AdaptiveJoinOperatorFactory, we will not
immediately create the JoinOperator. Instead, we only pass in the necessary
parameters for creating the JoinOperator. The appropriate JoinOperator will
be created during the StreamGraphOptimizationStrategy optimization phase.

You mentioned that the runtime's visibility into the table planner is
indeed an issue. It includes two aspects,
(1) we plan to place both implementations of the
AdaptiveBroadcastJoinOptimizationStrategy and AdaptiveJoinOperatorFactory
in the table layer. During the runtime phase, we will obtain the
AdaptiveBroadcastJoinOptimizationStrategy through class loading. Therefore,
the flink-runtime does not need to be aware of the table layer's
implementation.
(2) Since the dynamic codegen in the AdaptiveJoinOperatorFactory needs to
be aware of the table planner, we will consider placing the
AdaptiveJoinOperatorFactory in the table planner module as well.


 -> When did you configure these optimization strategies uniformly into
> `execution.batch.adaptive.stream-graph-optimization.strategies`


Thank you for pointing out this issue. When there are multiple
StreamGraphOptimizationStrategies, the optimization order at the runtime
phase will strictly follow the order specified in the configuration option
`execution.batch.adaptive.stream-graph-optimization.strategies`. Therefore,
it is necessary to have a unified configuration during the sql planner
phase to ensure the correct optimization order. Currently, we are
considering performing this unified configuration in
BatchPlanner#afterTranslation().

For simplicity, as long as the adaptive broadcast join/skewed join
optimization features are enabled (e.g.,
`table.optimizer.adaptive.join.broadcast-threshold` is not -1), the
corresponding strategy will be configured. This optimization is independent
of the specific SQL query, although it might not produce any actual effect.

Best,
Xia

Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:10写道:

> Hi, Xia
>
> This FLIP looks good to me, +1.
>
> I've two questions:
>
> 1.
> >> Accordingly, in terms of implementation, we will delay the codegen and
> creation of the join operators until runtime.
>
> How are you delaying codegen to runtime, the current runtime is not SQL
> planner aware. in other words, how do I understand this sentence?
>
> 2. FLIP-469 mentions passing StreamGraphOptimizationStrategy to runtime via
> option `execution.batch.adaptive.stream-graph-optimization.strategies`. In
> SQL planner if you have multiple different optimization strategies like
> broadcast join, skew join, etc...  When did you configure these
> optimization strategies uniformly into
> `execution.batch.adaptive.stream-graph-optimization.strategies`?
>
>
>
> Zhu Zhu <reed...@gmail.com> 于2024年7月19日周五 17:41写道:
>
> > +1 for the FLIP
> >
> > It's a good start to adaptively optimize the logical execution plan with
> > runtime information.
> >
> > Thanks,
> > Zhu
> >
> > Xia Sun <xingbe...@gmail.com> 于2024年7月18日周四 18:23写道:
> >
> > > Hi devs,
> > >
> > > Junrui Lee, Lei Yang, and I would like to initiate a discussion about
> > > FLIP-470: Support Adaptive Broadcast Join[1].
> > >
> > > In general, Broadcast Hash Join is currently the most efficient join
> > > strategy available in Flink. However, its prerequisite is that the
> input
> > > data on one side must be sufficiently small; otherwise, it may lead to
> > > memory overuse or other issues. Currently, due to the lack of precise
> > > statistics, it is difficult to make accurate estimations during the
> Flink
> > > SQL Planning phase. For example, when an upstream Filter operator is
> > > present, it is easy to overestimate the size of the table, whereas with
> > > an expansion operator, the table size tends to be underestimated.
> > Moreover,
> > > once the join operator is determined, it cannot be modified at runtime.
> > >
> > > To address this issue, we plan to introduce Adaptive Broadcast Join
> > > capability based on FLIP-468: Introducing StreamGraph-Based Job
> > > Submission[2]
> > > and FLIP-469: Supports Adaptive Optimization of StreamGraph[3]. This
> will
> > > allow the join operator to be dynamically optimized to Broadcast Join
> > based
> > > on the actual input data volume at runtime and fallback when the
> > > optimization
> > > conditions are not met.
> > >
> > > For more details, please refer to FLIP-470[1]. We look forward to your
> > > feedback.
> > >
> > > Best,
> > > Junrui Lee, Lei Yang and Xia Sun
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-470%3A+Support+Adaptive+Broadcast+Join
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
> > > [3]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > >
> >
>

Reply via email to