Hi All,

After an offline discussion with Ron, I have made the following updates to
FLIP-469:

Introduced the OperatorsFinished class to represent the state of finished
operators, as well as the size and distribution of the data they produced.
The StreamGraphOptimizer strategy will now depend on OperatorsFinished
objects instead of JobEvent. JobEvent will be used only at runtime. When
the AdaptiveExecutionHandler receives a JobVertexFinishedEvent, it will
convert it to an OperatorsFinished object and send it to the
StreamGraphOptimizer to attempt to trigger an optimization of the
StreamGraph.

Best,
Junrui

Junrui Lee <jrlee....@gmail.com> 于2024年7月24日周三 15:46写道:

> Hi Ron,
>
> Thank you for your questions regarding the
> StreamGraphOptimizationStrategy. Here are my responses:
>
> 1.is there a better way to pass it to the
> runtime?
>
> At the moment, we have not thought of a better way.
>
> Considering that this is a class from the table layer, it can only be seen
> at the runtime layer through class loading. Additionally, this is also a
> configuration item that needs to be converged into the configuration
> (aligned with the goal of Flink 2.0 configuration refactor work).
>
> 2.can the list type always be
> order-preserving
>
> Yes, we will ensure that the order in which this config parameter is
> loaded is consistent with the order set by the user. The execution order
> will also follow this sequence.
>
> 3.are all
> StreamGraphOptimizationStrategy traversed and executed once?
>
> Yes, they are traversed and executed only once.
>
> Best regards,
> Junrui
>
> Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 14:12写道:
>
>> One additional question: for a JobEvent, are all
>> StreamGraphOptimizationStrategy traversed and executed once?
>>
>> Best,
>> Ron
>>
>> Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 13:59写道:
>>
>> > Hi, Junrui
>> >
>> > Thanks for the proposal, this design allows the Flink engine to become
>> > smarter by doing more dynamic optimizations at runtime. so +1 from my
>> side.
>> >
>> > For your FLIP, I've one minor question.
>> >
>> > Regarding the StreamGraphOptimizationStrategy, you mentioned introducing
>> > the option
>> > `execution.batch.adaptive.stream-graph-optimization.strategies`(List
>> Type)
>> > and passing it to the runtime, is there a better way to pass it to the
>> > runtime?
>> >  Is there a better way to pass it to the runtime than using the
>> > configuration parameter? Another thing is that suppose two optimization
>> > strategies a, b are executed in order, a must be executed first, then b.
>> > How do you let the user perceive that a must be in front of a when
>> setting
>> > parameters, and a must be behind, and can the list type always be
>> > order-preserving?
>> >
>> >
>> > Best,
>> > Ron
>> >
>> > Junrui Lee <jrlee....@gmail.com> 于2024年7月18日周四 12:12写道:
>> >
>> >> Hi, Weijie
>> >>
>> >> Thanks for your feedback!
>> >>
>> >> `StreamGraphOptimizationStrategy` is a reasonable abstract, I'd like to
>> >> know what built-in strategy implementations you have in mind so far?
>> >>
>> >> We will introduce two optimization strategies:
>> >> AdaptiveBroadcastJoinOptimizeStrategy, which dynamically determines and
>> >> switches to BroadcastJoin, and SkewedJoinOptimizeStrategy, which
>> addresses
>> >> data skew issues.
>> >>
>> >> For the so-called pending operators, can we show it in different colors
>> >> on the UI.
>> >>
>> >> Yes, we will use different colors (such as green) to display the
>> pending
>> >> operators.
>> >>
>> >> Best regards,
>> >> Junrui
>> >>
>> >> weijie guo <guoweijieres...@gmail.com> 于2024年7月17日周三 20:15写道:
>> >>
>> >> > Thanks for the proposal!
>> >> >
>> >> > I like this idea as it gives Flink's adaptive batching processing
>> more
>> >> room
>> >> > to imagine and optimize.
>> >> >
>> >> > So, +1 from my side.
>> >> >
>> >> > I just have two questions:
>> >> >
>> >> > 1. `StreamGraphOptimizationStrategy` is a reasonable abstract, I'd
>> like
>> >> to
>> >> > know what built-in strategy implementations you have in mind so far?
>> >> >
>> >> > 2. For the so-called pending operators, can we show it in different
>> >> colors
>> >> > on the UI.
>> >> >
>> >> >
>> >> > Best regards,
>> >> >
>> >> > Weijie
>> >> >
>> >> >
>> >> > Zhu Zhu <reed...@gmail.com> 于2024年7月17日周三 16:49写道:
>> >> >
>> >> > > Thanks Junrui for the updates. The proposal looks good to me.
>> >> > > With the stream graph added to the REST API result, I think we are
>> >> > > also quite close to enable Flink to expand a job vertex to show its
>> >> > > operator-chain topology.
>> >> > >
>> >> > > Thanks,
>> >> > > Zhu
>> >> > >
>> >> > > Junrui Lee <jrlee....@gmail.com> 于2024年7月15日周一 14:58写道:
>> >> > >
>> >> > > > Hi Zhu,
>> >> > > >
>> >> > > > Thanks for your feedback.
>> >> > > >
>> >> > > > Following your suggestion, I have updated the public interface
>> >> section
>> >> > of
>> >> > > > the FLIP with the following additions:
>> >> > > >
>> >> > > > 1. UI:
>> >> > > > The job topology will display a hybrid of the current JobGraph
>> along
>> >> > with
>> >> > > > downstream components yet to be converted to a StreamGraph. On
>> the
>> >> > > topology
>> >> > > > graph display page, there will be a "Show Pending Operators"
>> button
>> >> in
>> >> > > the
>> >> > > > upper right corner for users to switch back to a job topology
>> that
>> >> only
>> >> > > > includes JobVertices.
>> >> > > >
>> >> > > > 2. Rest API:
>> >> > > > Add a new field "stream-graph-plan" will be added to the job
>> details
>> >> > REST
>> >> > > > API, which represents the runtime Stream graph. The field
>> >> > "job-vertex-id"
>> >> > > > is valid only when the StreamNode has been converted to a
>> JobVertex,
>> >> > and
>> >> > > it
>> >> > > > will hold the ID of the corresponding JobVertex for that
>> StreamNode.
>> >> > > >
>> >> > > > For further information, please feel free to review the public
>> >> > interface
>> >> > > > section of FLIP-469
>> >> > > >
>> >> > > > Best,
>> >> > > > Junrui
>> >> > > >
>> >> > > > Zhu Zhu <reed...@gmail.com> 于2024年7月15日周一 10:29写道:
>> >> > > >
>> >> > > > > +1 for the FLIP
>> >> > > > >
>> >> > > > > It is useful to adaptively optimize logical execution
>> plans(stream
>> >> > > > > operators and
>> >> > > > > stream edges) for batch jobs.
>> >> > > > >
>> >> > > > > One question:
>> >> > > > > The FLIP already proposed to update the REST API & Web UI to
>> show
>> >> > > > operators
>> >> > > > > that are not yet converted to job vertices. However, I think it
>> >> would
>> >> > > be
>> >> > > > > better if Flink can display these operators as part of the
>> graph,
>> >> > > > allowing
>> >> > > > > users to have an overview of the processing logic graph at
>> early
>> >> > stages
>> >> > > > of
>> >> > > > > the job execution.
>> >> > > > > This change would also involve the public interface, so
>> instead of
>> >> > > > > postponing
>> >> > > > > it to a later FLIP, I prefer to have a design for it in this
>> FLIP.
>> >> > > WDYT?
>> >> > > > >
>> >> > > > > Thanks,
>> >> > > > > Zhu
>> >> > > > >
>> >> > > > > Junrui Lee <jrlee....@gmail.com> 于2024年7月11日周四 11:27写道:
>> >> > > > >
>> >> > > > > > Hi devs,
>> >> > > > > >
>> >> > > > > > Xia Sun, Lei Yang, and I would like to initiate a discussion
>> >> about
>> >> > > > > > FLIP-469: Supports Adaptive Optimization of StreamGraph.
>> >> > > > > >
>> >> > > > > > This FLIP is the second in the series on adaptive
>> optimization
>> >> of
>> >> > > > > > StreamGraph and follows up on FLIP-468 [1]. As we proposed in
>> >> > > FLIP-468
>> >> > > > to
>> >> > > > > > enable the scheduler to recognize and access the
>> StreamGraph, in
>> >> > this
>> >> > > > > FLIP,
>> >> > > > > > we will propose a mechanism for adaptive optimization of
>> >> > StreamGraph.
>> >> > > > It
>> >> > > > > > allows the scheduler to dynamically adjust the logical
>> execution
>> >> > plan
>> >> > > > at
>> >> > > > > > runtime. This mechanism is the base of adaptive optimization
>> >> > > > strategies,
>> >> > > > > > such as adaptive broadcast join and skewed join optimization.
>> >> > > > > >
>> >> > > > > > Unlike the traditional execution of jobs based on a static
>> >> > > StreamGraph,
>> >> > > > > > this mechanism will progressively determine StreamGraph
>> during
>> >> > > runtime.
>> >> > > > > The
>> >> > > > > > determined StreamGraph will be transformed into a specific
>> >> > JobGraph,
>> >> > > > > while
>> >> > > > > > the indeterminate part will allow Flink to flexibly adjust
>> >> > according
>> >> > > to
>> >> > > > > > real-time job status and actual input conditions.
>> >> > > > > >
>> >> > > > > > Note that this FLIP focuses on the introduction of the
>> mechanism
>> >> > and
>> >> > > > does
>> >> > > > > > not introduce any actual optimization strategies; these will
>> be
>> >> > > > discussed
>> >> > > > > > in subsequent FLIPs.
>> >> > > > > >
>> >> > > > > > For more details, please refer to FLIP-469 [2]. We look
>> forward
>> >> to
>> >> > > your
>> >> > > > > > feedback.
>> >> > > > > >
>> >> > > > > > Best,
>> >> > > > > >
>> >> > > > > > Xia Sun, Lei Yang and Junrui Lee
>> >> > > > > >
>> >> > > > > > [1]
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-468%3A+Introducing+StreamGraph-Based+Job+Submission
>> >> > > > > > [2]
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >
>>
>

Reply via email to