Hi all,

My main concern is the absence of a comprehensive vision for how we could
make this work for both Batch and Streaming. Right now, it feels like the
proposal is solely centered around very specific batch optimizations.

I’m inclined to support submitting StreamGraph because it could already
> provide the actual logical plan of the job at runtime. To be honest, I’m
> not sure what additional benefits submitting Transformations would bring
> compared to StreamGraph. I would appreciate any insights that you might
> offer on this matter.


This FLIP actually proposes a new job representation, so it would be great
to learn from the mistakes of the JobGraph. The main drawback of the
JobGraph is its very tight coupling with the particular Flink version. Even
a small patch version difference between client and server can make the
JobGraph invalid due to the nature of Java Serializability. StreamGraph is
exposed to an even bigger surface area with more internal data structures
that have the potential to make this problem more visible.

In general, it would be highly valuable to make the APIs used for RPCs as
versionless as possible. If you were to write a custom serializer for the
StreamGraph, you’d actually find that you don’t need more than a list of
transformations and a configuration. Passing those to the
StreamGraphGenerator will produce a deterministic result. In this case,
you’ve limited the surface area of what you need to JavaSerialize to
user-provided transformations, which is a pretty nice property.

A list of transformations plus configuration is also the most minimalistic
common denominator across the APIs Flink offers (DataStream, Table, SQL).
Additionally, it would be beneficial to make the submission data structures
immutable. This immutability would ensure the integrity and consistency of
the job definitions throughout their lifecycle, reducing the risk of
inadvertent modifications and making the system more robust.

Best,
D.

On Sat, Jul 27, 2024 at 4:16 AM Yu Chen <yuchen.e...@gmail.com> wrote:

> Hi Junrui,
>
> Thanks for driving this. It’s really helpful for user to explore the job
> they submitted.
>
> It's an important cornerstone of the FLINK-33230 since there are no
> stream-graph informations serialized.
> I'd like to base on this flip to expose operator level metrics and the
> presentation of stream-graphs on the web to enable fine-grained
> observations for users, as mentioned in Future Work.
>
>
> Best,
> Yu Chen
>
>
> > 2024年7月24日 16:03,Junrui Lee <jrlee....@gmail.com> 写道:
> >
> > Hi all,
> >
> > Thank you for all the feedback and suggestions so far. If there are no
> > further comments, we will open the voting thread on Friday, July 26,
> 2024.
> >
> > Best regards,
> > Junrui
> >
> > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 13:43写道:
> >
> >> Hi, Junrui
> >>
> >> Thanks for your detailed reply.
> >>
> >> After reading the updated FLIP-468 & FLIP-470, I see that the design
> looks
> >> good.
> >>
> >>
> >> Best.
> >> Ron
> >>
> >> Junrui Lee <jrlee....@gmail.com> 于2024年7月18日周四 14:26写道:
> >>
> >>> Hi all,
> >>>
> >>> I would like to follow up on my previous email regarding your feedback.
> >>> Below
> >>> is a concise summary of my main points:
> >>>
> >>> 1. Compiled Plan:
> >>> IIUC, the compiled plan is primarily for ensuring execution plan
> >>> compatibility across job versions (e.g., during upgrades). Eventually,
> it
> >>> needs to be converted to StreamGraph for submission. Persisting
> >>> StreamGraph/JobGraph is for supporting high availability in jm failover
> >>> scenarios.
> >>>
> >>> 2. JobGraph vs StreamGraph Submission:
> >>> For users, whether to submit JobGraph or StreamGraph is mostly
> >> transparent.
> >>> In the client mode, this detail is hidden from users. The REST API
> >> (/jobs)
> >>> was originally serving the internal JobGraph class, intended for the
> >> Flink
> >>> client, not as a strict public API.
> >>>
> >>> 3. Runtime SQL Optimization:
> >>> To facilitate runtime SQL-related optimization, we will introduce a
> >>> strategy mechanism called StreamGraphOptimizationStrategy. SQL can
> >>> implement specific strategies, such as
> >>> AdaptiveBroadcastJoinOptimizationStrategy. More details are available
> in
> >>> FLIP-469 [1].
> >>>
> >>> 4. Persistence Strategy:
> >>> Initially, we can retain the JobGraph persistence method to ensure it
> >> does
> >>> not affect stream jobs. After the new method is validated in batch
> >>> processing scenarios over several versions, we can unify the stream
> jobs
> >> to
> >>> also use StreamGraph persistence.
> >>>
> >>> Your thoughts and insights on these points would be highly appreciated.
> >>>
> >>> [1]
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> >>>
> >>> Best regards,
> >>>
> >>> Junrui
> >>>
> >>> Junrui Lee <jrlee....@gmail.com> 于2024年7月12日周五 20:29写道:
> >>>
> >>>> Hi all,
> >>>>
> >>>> Thanks for your feedback. Below are my thoughts on the questions
> you've
> >>>> raised
> >>>>
> >>>> @Fabian
> >>>>
> >>>> - What is the future plan for job submissions in Flink? With the
> >> current
> >>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan
> >>>>> submissions? It might be confusing for users and complicate the
> >> existing
> >>>>> job submission logic significantly.
> >>>>
> >>>>
> >>>> In my view, Flink should support submissions via StreamGraph and,
> >>>> eventually, remove support for JobGraph submissions. As for the
> >> compiled
> >>>> plan, I consider it to be a concept related to table/sql, which also
> >>> needs
> >>>> to be compiled into the corresponding StreamGraph (a more generalized
> >>>> concept, applicable as well to DataStream API jobs) upon submission.
> >>>> In this FLIP, considering to provide a smoother migration for users,
> >> and
> >>>> recognizing that currently, a vast number of Flink UT/IT cases depend
> >> on
> >>>> the submission of a specialized JobGraph to achieve test objectives,
> >>>> transitioning these test cases and replacing with StreamGraph would be
> >> a
> >>>> highly risky and complicated action. Therefore, I am inclined not to
> >>>> eliminate the pathway of submitting JobGraph within this FLIP.
> >>>>
> >>>> How do we plan to connect the optimizer with Flink's runtime?
> >>>>
> >>>> @Ron
> >>>>
> >>>>> For batch scenario, if we want to better support dynamic plan tuning
> >>>>> strategies, the fundamental solution is still to put SQL Optimizer to
> >>>>> flink-runtime.
> >>>>
> >>>>
> >>>> Our current solution is to abstract an interface called
> >>>> StreamGraphOptimizationStrategy. For table-related optimization
> >>> strategies,
> >>>> taking the Broadcast Join concept as an example, we can implement an
> >>>> AdaptiveBroadcastJoinOptimizationStrategy at the table layer. When
> this
> >>>> strategy is needed, during SQL compilation, we will set the
> >> configuration
> >>>> item execution.batch.adaptive.stream-graph-optimization.strategies.
> >> Then,
> >>>> the runtime layer will load the corresponding Strategy class based on
> >>> this
> >>>> configuration item to be used at runtime.
> >>>>
> >>>> We will describe this part in the StreamGraphOptimizer section of
> >>>> FLIP-469, and the specific introduction of
> >>>> AdaptiveBroadcastJoinOptimizationStrategy will be discussed in
> >> subsequent
> >>>> FLIPs, which is expected to happen within a few days.
> >>>>
> >>>> @David
> >>>>
> >>>>> 1. Transformations in StreamGraphGenerator:*
> >>>>>
> >>>>
> >>>> I'm inclined to support submitting StreamGraph because it could
> already
> >>>> provide the actual logical plan of the job at runtime. To be honest,
> >> I'm
> >>>> not sure what additional benefits submitting Transformations would
> >> bring
> >>>> compared to StreamGraph. I would appreciate any insights that you
> might
> >>>> offer on this matter.
> >>>>
> >>>> *2. StreamGraph for Recovery Purposes:
> >>>>
> >>>>
> >>>> In fact, this conflicts with our desire to make adaptive optimization
> >> to
> >>>> the StreamGraph at runtime, as under such scenarios, the StreamGraph
> is
> >>> the
> >>>> complete expression of a job's logic, not the JobGraph. More details
> >> can
> >>>> refer to the specific details in FLIP-469.
> >>>>
> >>>> I have reviewed the code related to the JobGraphStore and discovered
> >> that
> >>>> it can be extended to store both StreamGraph and JobGraph
> >> simultaneously.
> >>>> As for your concerns, can we consider the following: in batch mode, we
> >>> use
> >>>> Job Recovery based on StreamGraph, whereas for stream mode, we
> continue
> >>> to
> >>>> use the original JobGraph recovery and the StreamGraph would be
> >> converted
> >>>> to a JobGraph right at the beginning.
> >>>>
> >>>> 3. Moving Away from Java Serializables:
> >>>>
> >>>>
> >>>> Are you suggesting that Java serialization has limitations, and that
> we
> >>>> should explore alternative serialization approaches? I agree that this
> >>> is a
> >>>> valuable consideration for the future. Do you think this should be
> >>> included
> >>>> in this FLIP? I would prefer to address it as a separate FLIP.
> >>>>
> >>>> Best,
> >>>> Junrui
> >>>>
> >>>> David Morávek <david.mora...@gmail.com> 于2024年7月12日周五 14:47写道:
> >>>>
> >>>>>>
> >>>>>> For batch scenario, if we want to better support dynamic plan tuning
> >>>>>> strategies, the fundamental solution is still to put SQL Optimizer
> >> to
> >>>>>> flink-runtime.
> >>>>>
> >>>>>
> >>>>> One accompanying question is: how do you envision this to work for
> >>>>> streaming where you need to ensure state compatibility after the plan
> >>>>> change? FLIP-496 seems to only focus on batch.
> >>>>>
> >>>>> Best,
> >>>>> D.
> >>>>>
> >>>>> On Fri, Jul 12, 2024 at 4:29 AM Ron Liu <ron9....@gmail.com> wrote:
> >>>>>
> >>>>>> Hi, Junrui
> >>>>>>
> >>>>>> The FLIP proposal looks good to me.
> >>>>>>
> >>>>>> I have the same question as Fabian:
> >>>>>>
> >>>>>>> For join strategies, they are only
> >>>>>> applicable when using an optimizer (that's currently not part of
> >>> Flink's
> >>>>>> runtime) with the Table API or Flink SQL. How do we plan to connect
> >>> the
> >>>>>> optimizer with Flink's runtime?
> >>>>>>
> >>>>>> For batch scenario, if we want to better support dynamic plan tuning
> >>>>>> strategies, the fundamental solution is still to put SQL Optimizer
> >> to
> >>>>>> flink-runtime.
> >>>>>>
> >>>>>> Best,
> >>>>>> Ron
> >>>>>>
> >>>>>> David Morávek <d...@apache.org> 于2024年7月11日周四 19:17写道:
> >>>>>>
> >>>>>>> Hi Junrui,
> >>>>>>>
> >>>>>>> Thank you for drafting the FLIP. I really appreciate the direction
> >>>>> it’s
> >>>>>>> taking. We’ve discussed similar approaches multiple times, and
> >> it’s
> >>>>> great
> >>>>>>> to see this progress.
> >>>>>>>
> >>>>>>> I have a few questions and thoughts:
> >>>>>>>
> >>>>>>>
> >>>>>>> * 1. Transformations in StreamGraphGenerator:*
> >>>>>>> Should we consider taking this a step further by working on a list
> >>> of
> >>>>>>> transformations (inputs of StreamGraphGenerator)?
> >>>>>>>
> >>>>>>>    public StreamGraphGenerator(
> >>>>>>>            List<Transformation<?>> transformations,
> >>>>>>>            ExecutionConfig executionConfig,
> >>>>>>>            CheckpointConfig checkpointConfig,
> >>>>>>>            ReadableConfig configuration) {
> >>>>>>>
> >>>>>>> We could potentially merge ExecutionConfig and CheckpointConfig
> >> into
> >>>>>>> ReadableConfig. This approach might offer us even more
> >> flexibility.
> >>>>>>>
> >>>>>>>
> >>>>>>> *2. StreamGraph for Recovery Purposes:*
> >>>>>>> Should we avoid using StreamGraph for recovery purposes? The
> >>> existing
> >>>>>>> JG-based recovery code paths took years to perfect, and it doesn’t
> >>>>> seem
> >>>>>>> necessary to replace them. We only need SG for cases where we want
> >>> to
> >>>>>>> regenerate the JG.
> >>>>>>> Additionally, translating SG into JG before persisting it in HA
> >>> could
> >>>>> be
> >>>>>>> beneficial, as it allows us to catch potential issues early on.
> >>>>>>>
> >>>>>>>
> >>>>>>> * 3. Moving Away from Java Serializables:*
> >>>>>>> It would be great to start moving away from Java Serializables as
> >>>>> much as
> >>>>>>> possible. Could we instead define proper versioned serializers,
> >>>>> possibly
> >>>>>>> based on a well-defined protobuf blueprint? This change could help
> >>> us
> >>>>>> avoid
> >>>>>>> ongoing issues associated with Serializables.
> >>>>>>>
> >>>>>>> Looking forward to your thoughts.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> D.
> >>>>>>>
> >>>>>>> On Thu, Jul 11, 2024 at 12:58 PM Fabian Paul <fp...@apache.org>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for drafting this FLIP. I really like the idea of
> >>>>> introducing a
> >>>>>>>> concept in Flink that is close to a logical plan submission.
> >>>>>>>>
> >>>>>>>> I have a few questions about the proposal and its future
> >>>>> evolvability.
> >>>>>>>>
> >>>>>>>> - What is the future plan for job submissions in Flink? With the
> >>>>>> current
> >>>>>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan
> >>>>>>>> submissions? It might be confusing for users and complicate the
> >>>>>> existing
> >>>>>>>> job submission logic significantly.
> >>>>>>>> - The FLIP mentions multiple areas of optimization, first
> >> operator
> >>>>>>> chaining
> >>>>>>>> and second dynamic switches between join strategies. I think
> >> from
> >>> a
> >>>>>> Flink
> >>>>>>>> perspective, these are, at the moment, separate concerns.  For
> >>>>> operator
> >>>>>>>> chaining, I agree with the current proposal, which is a concept
> >>> that
> >>>>>>>> applies generally to Flink's runtime. For join strategies, they
> >>> are
> >>>>>> only
> >>>>>>>> applicable when using an optimizer (that's currently not part of
> >>>>>> Flink's
> >>>>>>>> runtime) with the Table API or Flink SQL. How do we plan to
> >>> connect
> >>>>> the
> >>>>>>>> optimizer with Flink's runtime?
> >>>>>>>> - With table/SQL API we already expose a compiled plan to
> >> support
> >>>>>> stable
> >>>>>>>> version upgrades. It would be great to explore a joined plan to
> >>> also
> >>>>>>> offer
> >>>>>>>> stable version upgrades with a potentially persistent
> >> streamgraph.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Fabian
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to