Here is a summary of my current understanding, please correct me if it's
wrong.

We currently have an API for submitting a list of transformations and
configurations called CompiledPlan. However, it is currently specific to
the SQL domain.

In reviewing the follow-up FLIPs, it is evident that we are targeting graph
changes exclusively for the SQL landscape. Implementing these changes for
the DataStream API may not be feasible, as we lack the semantic information
needed to understand the underlying pipeline.

There appear to be three main categories of issues:

1. JobGraph Changes: To recompute the JobGraph based on new evidence, we
need to have a clear understanding of the pipeline, which is likely
exclusive to SQL. This requires the use of SG/CompiledPlan.
2. Partitioning Changes: These primarily involve key group redistributions
and do not necessitate changes to the JobGraph. Such optimizations can be
applied to both DataStream and SQL.
3. Changes to WebUI: These changes can be addressed at the JobGraph level
by providing additional context to JsonPlan, without requiring
SG/CompiledPlan.

Given these points, is it accurate to say that this FLIP is strictly
necessary for TableAPI/SQL jobs only?

Best,
D.

On Tue, Jul 30, 2024 at 11:25 AM Zhu Zhu <reed...@gmail.com> wrote:

> Thanks for sharing the thoughts, David!
>
> IIUC, there are two goals to make the APIs used for RPCs as
> versionless as possible?
> 1. no version mismatch happens if no code changes
> 2. supports different versioned Flink clients and clusters
> The first goal can be achieved by explicitly assigning a serialVersionUID
> to serializable classes, which is already required in Flink code style.
> Therefore, it should not be a major problem.
> The second goal requires introducing versioned serializers and rewriting
> classes to serialize. It can also bring benefits like better performance.
> However, it is not limited to the job submission API. And it would require
> a thorough and maybe complex design. Therefore, I think it's better to do
> it in an individual FLIP.
>
> Regarding submitting a StreamGraph or a list of transformations with
> configuration, I think StreamGraph is simple enough. It contains fields
> like operators, partitioners and configuration, which are already
> serialized to JobGraph nowadays, which means there is little work and
> risk to turn it into a serializable. Directly serializing and submitting
> transformations is also acceptable at the first glance, but
> SinkTransformation seems to be a blocker, as Junrui just mentioned.
>
> Thanks,
> Zhu
>
> Junrui Lee <jrlee....@gmail.com> 于2024年7月30日周二 12:13写道:
>
> > Hi David,
> >
> > Thank you very much for your detailed explanation, which is crucial in
> > helping to further improve this FLIP.
> >
> > This FLIP is applicable to both batch and stream processing. For batch
> > processing, it can be used to optimize the StreamGraph (e.g., FLIP-469),
> > while for streaming, we can use the StreamGraph to show a detailed
> logical
> > plan at runtime (e.g., FLINK-33230) and potentially retain possible
> logical
> > topology optimizations in the future (e.g., performing intelligent chain
> > breaks while ensuring state compatibility).
> >
> > Overall, whether based on the existing JobGraph or the proposed
> submission
> > based on StreamGraph, it is invisible to users, and its REST API is
> > internal because the JobGraph and StreamGraph are internal classes.
> > Although it is documented, we could consider removing it from the
> official
> > documentation.
> >
> > Regarding your mention of "making the RPC APIs as versionless as
> possible,"
> > I think your viewpoint is correct and highly valuable. I have carefully
> > considered your suggestion of serializing a list of transformations and a
> > configuration. This is indeed a step towards making the APIs used for
> RPCs
> > as versionless as possible.
> > However, I think this task is much more complex than serializing a
> > StreamGraph, as it requires ensuring that each subclass of transformation
> > and its properties are serializable. This obviously adds a significant
> > amount of complexity. For example, the SinkTransformation includes
> > DataStream properties, which have many unserializable fields, such as the
> > StreamExecutionEnvironment.
> > Moreover, this solution does not completely solve the problem of RPC API
> > versioning.
> >
> > Therefore, as this FLIP does not truly change a public REST API, I think
> we
> > can narrow down the scope of this FLIP a bit, focusing on how to enable
> the
> > JM to see and operate on StreamGraph.
> > In my understanding, current proposal will not complicate the future work
> > if Flink tries to make its REST API more versionless, e.g. directly
> submit
> > transformations. Instead, most of the work can be reused, like creating
> > JobGraph at runtime.
> >
> > WDYT? Looking forward to your feedback
> >
> > Best,
> > Junrui
> >
> > David Morávek <d...@apache.org> 于2024年7月29日周一 21:34写道:
> >
> > > Hi all,
> > >
> > > My main concern is the absence of a comprehensive vision for how we
> could
> > > make this work for both Batch and Streaming. Right now, it feels like
> the
> > > proposal is solely centered around very specific batch optimizations.
> > >
> > > I’m inclined to support submitting StreamGraph because it could already
> > > > provide the actual logical plan of the job at runtime. To be honest,
> > I’m
> > > > not sure what additional benefits submitting Transformations would
> > bring
> > > > compared to StreamGraph. I would appreciate any insights that you
> might
> > > > offer on this matter.
> > >
> > >
> > > This FLIP actually proposes a new job representation, so it would be
> > great
> > > to learn from the mistakes of the JobGraph. The main drawback of the
> > > JobGraph is its very tight coupling with the particular Flink version.
> > Even
> > > a small patch version difference between client and server can make the
> > > JobGraph invalid due to the nature of Java Serializability. StreamGraph
> > is
> > > exposed to an even bigger surface area with more internal data
> structures
> > > that have the potential to make this problem more visible.
> > >
> > > In general, it would be highly valuable to make the APIs used for RPCs
> as
> > > versionless as possible. If you were to write a custom serializer for
> the
> > > StreamGraph, you’d actually find that you don’t need more than a list
> of
> > > transformations and a configuration. Passing those to the
> > > StreamGraphGenerator will produce a deterministic result. In this case,
> > > you’ve limited the surface area of what you need to JavaSerialize to
> > > user-provided transformations, which is a pretty nice property.
> > >
> > > A list of transformations plus configuration is also the most
> > minimalistic
> > > common denominator across the APIs Flink offers (DataStream, Table,
> SQL).
> > > Additionally, it would be beneficial to make the submission data
> > structures
> > > immutable. This immutability would ensure the integrity and consistency
> > of
> > > the job definitions throughout their lifecycle, reducing the risk of
> > > inadvertent modifications and making the system more robust.
> > >
> > > Best,
> > > D.
> > >
> > > On Sat, Jul 27, 2024 at 4:16 AM Yu Chen <yuchen.e...@gmail.com> wrote:
> > >
> > > > Hi Junrui,
> > > >
> > > > Thanks for driving this. It’s really helpful for user to explore the
> > job
> > > > they submitted.
> > > >
> > > > It's an important cornerstone of the FLINK-33230 since there are no
> > > > stream-graph informations serialized.
> > > > I'd like to base on this flip to expose operator level metrics and
> the
> > > > presentation of stream-graphs on the web to enable fine-grained
> > > > observations for users, as mentioned in Future Work.
> > > >
> > > >
> > > > Best,
> > > > Yu Chen
> > > >
> > > >
> > > > > 2024年7月24日 16:03,Junrui Lee <jrlee....@gmail.com> 写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > Thank you for all the feedback and suggestions so far. If there are
> > no
> > > > > further comments, we will open the voting thread on Friday, July
> 26,
> > > > 2024.
> > > > >
> > > > > Best regards,
> > > > > Junrui
> > > > >
> > > > > Ron Liu <ron9....@gmail.com> 于2024年7月24日周三 13:43写道:
> > > > >
> > > > >> Hi, Junrui
> > > > >>
> > > > >> Thanks for your detailed reply.
> > > > >>
> > > > >> After reading the updated FLIP-468 & FLIP-470, I see that the
> design
> > > > looks
> > > > >> good.
> > > > >>
> > > > >>
> > > > >> Best.
> > > > >> Ron
> > > > >>
> > > > >> Junrui Lee <jrlee....@gmail.com> 于2024年7月18日周四 14:26写道:
> > > > >>
> > > > >>> Hi all,
> > > > >>>
> > > > >>> I would like to follow up on my previous email regarding your
> > > feedback.
> > > > >>> Below
> > > > >>> is a concise summary of my main points:
> > > > >>>
> > > > >>> 1. Compiled Plan:
> > > > >>> IIUC, the compiled plan is primarily for ensuring execution plan
> > > > >>> compatibility across job versions (e.g., during upgrades).
> > > Eventually,
> > > > it
> > > > >>> needs to be converted to StreamGraph for submission. Persisting
> > > > >>> StreamGraph/JobGraph is for supporting high availability in jm
> > > failover
> > > > >>> scenarios.
> > > > >>>
> > > > >>> 2. JobGraph vs StreamGraph Submission:
> > > > >>> For users, whether to submit JobGraph or StreamGraph is mostly
> > > > >> transparent.
> > > > >>> In the client mode, this detail is hidden from users. The REST
> API
> > > > >> (/jobs)
> > > > >>> was originally serving the internal JobGraph class, intended for
> > the
> > > > >> Flink
> > > > >>> client, not as a strict public API.
> > > > >>>
> > > > >>> 3. Runtime SQL Optimization:
> > > > >>> To facilitate runtime SQL-related optimization, we will
> introduce a
> > > > >>> strategy mechanism called StreamGraphOptimizationStrategy. SQL
> can
> > > > >>> implement specific strategies, such as
> > > > >>> AdaptiveBroadcastJoinOptimizationStrategy. More details are
> > available
> > > > in
> > > > >>> FLIP-469 [1].
> > > > >>>
> > > > >>> 4. Persistence Strategy:
> > > > >>> Initially, we can retain the JobGraph persistence method to
> ensure
> > it
> > > > >> does
> > > > >>> not affect stream jobs. After the new method is validated in
> batch
> > > > >>> processing scenarios over several versions, we can unify the
> stream
> > > > jobs
> > > > >> to
> > > > >>> also use StreamGraph persistence.
> > > > >>>
> > > > >>> Your thoughts and insights on these points would be highly
> > > appreciated.
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-469%3A+Supports+Adaptive+Optimization+of+StreamGraph
> > > > >>>
> > > > >>> Best regards,
> > > > >>>
> > > > >>> Junrui
> > > > >>>
> > > > >>> Junrui Lee <jrlee....@gmail.com> 于2024年7月12日周五 20:29写道:
> > > > >>>
> > > > >>>> Hi all,
> > > > >>>>
> > > > >>>> Thanks for your feedback. Below are my thoughts on the questions
> > > > you've
> > > > >>>> raised
> > > > >>>>
> > > > >>>> @Fabian
> > > > >>>>
> > > > >>>> - What is the future plan for job submissions in Flink? With the
> > > > >> current
> > > > >>>>> proposal, Flink will support JobGraph/StreamGraph/compiled plan
> > > > >>>>> submissions? It might be confusing for users and complicate the
> > > > >> existing
> > > > >>>>> job submission logic significantly.
> > > > >>>>
> > > > >>>>
> > > > >>>> In my view, Flink should support submissions via StreamGraph
> and,
> > > > >>>> eventually, remove support for JobGraph submissions. As for the
> > > > >> compiled
> > > > >>>> plan, I consider it to be a concept related to table/sql, which
> > also
> > > > >>> needs
> > > > >>>> to be compiled into the corresponding StreamGraph (a more
> > > generalized
> > > > >>>> concept, applicable as well to DataStream API jobs) upon
> > submission.
> > > > >>>> In this FLIP, considering to provide a smoother migration for
> > users,
> > > > >> and
> > > > >>>> recognizing that currently, a vast number of Flink UT/IT cases
> > > depend
> > > > >> on
> > > > >>>> the submission of a specialized JobGraph to achieve test
> > objectives,
> > > > >>>> transitioning these test cases and replacing with StreamGraph
> > would
> > > be
> > > > >> a
> > > > >>>> highly risky and complicated action. Therefore, I am inclined
> not
> > to
> > > > >>>> eliminate the pathway of submitting JobGraph within this FLIP.
> > > > >>>>
> > > > >>>> How do we plan to connect the optimizer with Flink's runtime?
> > > > >>>>
> > > > >>>> @Ron
> > > > >>>>
> > > > >>>>> For batch scenario, if we want to better support dynamic plan
> > > tuning
> > > > >>>>> strategies, the fundamental solution is still to put SQL
> > Optimizer
> > > to
> > > > >>>>> flink-runtime.
> > > > >>>>
> > > > >>>>
> > > > >>>> Our current solution is to abstract an interface called
> > > > >>>> StreamGraphOptimizationStrategy. For table-related optimization
> > > > >>> strategies,
> > > > >>>> taking the Broadcast Join concept as an example, we can
> implement
> > an
> > > > >>>> AdaptiveBroadcastJoinOptimizationStrategy at the table layer.
> When
> > > > this
> > > > >>>> strategy is needed, during SQL compilation, we will set the
> > > > >> configuration
> > > > >>>> item
> > execution.batch.adaptive.stream-graph-optimization.strategies.
> > > > >> Then,
> > > > >>>> the runtime layer will load the corresponding Strategy class
> based
> > > on
> > > > >>> this
> > > > >>>> configuration item to be used at runtime.
> > > > >>>>
> > > > >>>> We will describe this part in the StreamGraphOptimizer section
> of
> > > > >>>> FLIP-469, and the specific introduction of
> > > > >>>> AdaptiveBroadcastJoinOptimizationStrategy will be discussed in
> > > > >> subsequent
> > > > >>>> FLIPs, which is expected to happen within a few days.
> > > > >>>>
> > > > >>>> @David
> > > > >>>>
> > > > >>>>> 1. Transformations in StreamGraphGenerator:*
> > > > >>>>>
> > > > >>>>
> > > > >>>> I'm inclined to support submitting StreamGraph because it could
> > > > already
> > > > >>>> provide the actual logical plan of the job at runtime. To be
> > honest,
> > > > >> I'm
> > > > >>>> not sure what additional benefits submitting Transformations
> would
> > > > >> bring
> > > > >>>> compared to StreamGraph. I would appreciate any insights that
> you
> > > > might
> > > > >>>> offer on this matter.
> > > > >>>>
> > > > >>>> *2. StreamGraph for Recovery Purposes:
> > > > >>>>
> > > > >>>>
> > > > >>>> In fact, this conflicts with our desire to make adaptive
> > > optimization
> > > > >> to
> > > > >>>> the StreamGraph at runtime, as under such scenarios, the
> > StreamGraph
> > > > is
> > > > >>> the
> > > > >>>> complete expression of a job's logic, not the JobGraph. More
> > details
> > > > >> can
> > > > >>>> refer to the specific details in FLIP-469.
> > > > >>>>
> > > > >>>> I have reviewed the code related to the JobGraphStore and
> > discovered
> > > > >> that
> > > > >>>> it can be extended to store both StreamGraph and JobGraph
> > > > >> simultaneously.
> > > > >>>> As for your concerns, can we consider the following: in batch
> > mode,
> > > we
> > > > >>> use
> > > > >>>> Job Recovery based on StreamGraph, whereas for stream mode, we
> > > > continue
> > > > >>> to
> > > > >>>> use the original JobGraph recovery and the StreamGraph would be
> > > > >> converted
> > > > >>>> to a JobGraph right at the beginning.
> > > > >>>>
> > > > >>>> 3. Moving Away from Java Serializables:
> > > > >>>>
> > > > >>>>
> > > > >>>> Are you suggesting that Java serialization has limitations, and
> > that
> > > > we
> > > > >>>> should explore alternative serialization approaches? I agree
> that
> > > this
> > > > >>> is a
> > > > >>>> valuable consideration for the future. Do you think this should
> be
> > > > >>> included
> > > > >>>> in this FLIP? I would prefer to address it as a separate FLIP.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Junrui
> > > > >>>>
> > > > >>>> David Morávek <david.mora...@gmail.com> 于2024年7月12日周五 14:47写道:
> > > > >>>>
> > > > >>>>>>
> > > > >>>>>> For batch scenario, if we want to better support dynamic plan
> > > tuning
> > > > >>>>>> strategies, the fundamental solution is still to put SQL
> > Optimizer
> > > > >> to
> > > > >>>>>> flink-runtime.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> One accompanying question is: how do you envision this to work
> > for
> > > > >>>>> streaming where you need to ensure state compatibility after
> the
> > > plan
> > > > >>>>> change? FLIP-496 seems to only focus on batch.
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> D.
> > > > >>>>>
> > > > >>>>> On Fri, Jul 12, 2024 at 4:29 AM Ron Liu <ron9....@gmail.com>
> > > wrote:
> > > > >>>>>
> > > > >>>>>> Hi, Junrui
> > > > >>>>>>
> > > > >>>>>> The FLIP proposal looks good to me.
> > > > >>>>>>
> > > > >>>>>> I have the same question as Fabian:
> > > > >>>>>>
> > > > >>>>>>> For join strategies, they are only
> > > > >>>>>> applicable when using an optimizer (that's currently not part
> of
> > > > >>> Flink's
> > > > >>>>>> runtime) with the Table API or Flink SQL. How do we plan to
> > > connect
> > > > >>> the
> > > > >>>>>> optimizer with Flink's runtime?
> > > > >>>>>>
> > > > >>>>>> For batch scenario, if we want to better support dynamic plan
> > > tuning
> > > > >>>>>> strategies, the fundamental solution is still to put SQL
> > Optimizer
> > > > >> to
> > > > >>>>>> flink-runtime.
> > > > >>>>>>
> > > > >>>>>> Best,
> > > > >>>>>> Ron
> > > > >>>>>>
> > > > >>>>>> David Morávek <d...@apache.org> 于2024年7月11日周四 19:17写道:
> > > > >>>>>>
> > > > >>>>>>> Hi Junrui,
> > > > >>>>>>>
> > > > >>>>>>> Thank you for drafting the FLIP. I really appreciate the
> > > direction
> > > > >>>>> it’s
> > > > >>>>>>> taking. We’ve discussed similar approaches multiple times,
> and
> > > > >> it’s
> > > > >>>>> great
> > > > >>>>>>> to see this progress.
> > > > >>>>>>>
> > > > >>>>>>> I have a few questions and thoughts:
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> * 1. Transformations in StreamGraphGenerator:*
> > > > >>>>>>> Should we consider taking this a step further by working on a
> > > list
> > > > >>> of
> > > > >>>>>>> transformations (inputs of StreamGraphGenerator)?
> > > > >>>>>>>
> > > > >>>>>>>    public StreamGraphGenerator(
> > > > >>>>>>>            List<Transformation<?>> transformations,
> > > > >>>>>>>            ExecutionConfig executionConfig,
> > > > >>>>>>>            CheckpointConfig checkpointConfig,
> > > > >>>>>>>            ReadableConfig configuration) {
> > > > >>>>>>>
> > > > >>>>>>> We could potentially merge ExecutionConfig and
> CheckpointConfig
> > > > >> into
> > > > >>>>>>> ReadableConfig. This approach might offer us even more
> > > > >> flexibility.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> *2. StreamGraph for Recovery Purposes:*
> > > > >>>>>>> Should we avoid using StreamGraph for recovery purposes? The
> > > > >>> existing
> > > > >>>>>>> JG-based recovery code paths took years to perfect, and it
> > > doesn’t
> > > > >>>>> seem
> > > > >>>>>>> necessary to replace them. We only need SG for cases where we
> > > want
> > > > >>> to
> > > > >>>>>>> regenerate the JG.
> > > > >>>>>>> Additionally, translating SG into JG before persisting it in
> HA
> > > > >>> could
> > > > >>>>> be
> > > > >>>>>>> beneficial, as it allows us to catch potential issues early
> on.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> * 3. Moving Away from Java Serializables:*
> > > > >>>>>>> It would be great to start moving away from Java
> Serializables
> > as
> > > > >>>>> much as
> > > > >>>>>>> possible. Could we instead define proper versioned
> serializers,
> > > > >>>>> possibly
> > > > >>>>>>> based on a well-defined protobuf blueprint? This change could
> > > help
> > > > >>> us
> > > > >>>>>> avoid
> > > > >>>>>>> ongoing issues associated with Serializables.
> > > > >>>>>>>
> > > > >>>>>>> Looking forward to your thoughts.
> > > > >>>>>>>
> > > > >>>>>>> Best,
> > > > >>>>>>> D.
> > > > >>>>>>>
> > > > >>>>>>> On Thu, Jul 11, 2024 at 12:58 PM Fabian Paul <
> fp...@apache.org
> > >
> > > > >>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Thanks for drafting this FLIP. I really like the idea of
> > > > >>>>> introducing a
> > > > >>>>>>>> concept in Flink that is close to a logical plan submission.
> > > > >>>>>>>>
> > > > >>>>>>>> I have a few questions about the proposal and its future
> > > > >>>>> evolvability.
> > > > >>>>>>>>
> > > > >>>>>>>> - What is the future plan for job submissions in Flink? With
> > the
> > > > >>>>>> current
> > > > >>>>>>>> proposal, Flink will support JobGraph/StreamGraph/compiled
> > plan
> > > > >>>>>>>> submissions? It might be confusing for users and complicate
> > the
> > > > >>>>>> existing
> > > > >>>>>>>> job submission logic significantly.
> > > > >>>>>>>> - The FLIP mentions multiple areas of optimization, first
> > > > >> operator
> > > > >>>>>>> chaining
> > > > >>>>>>>> and second dynamic switches between join strategies. I think
> > > > >> from
> > > > >>> a
> > > > >>>>>> Flink
> > > > >>>>>>>> perspective, these are, at the moment, separate concerns.
> For
> > > > >>>>> operator
> > > > >>>>>>>> chaining, I agree with the current proposal, which is a
> > concept
> > > > >>> that
> > > > >>>>>>>> applies generally to Flink's runtime. For join strategies,
> > they
> > > > >>> are
> > > > >>>>>> only
> > > > >>>>>>>> applicable when using an optimizer (that's currently not
> part
> > of
> > > > >>>>>> Flink's
> > > > >>>>>>>> runtime) with the Table API or Flink SQL. How do we plan to
> > > > >>> connect
> > > > >>>>> the
> > > > >>>>>>>> optimizer with Flink's runtime?
> > > > >>>>>>>> - With table/SQL API we already expose a compiled plan to
> > > > >> support
> > > > >>>>>> stable
> > > > >>>>>>>> version upgrades. It would be great to explore a joined plan
> > to
> > > > >>> also
> > > > >>>>>>> offer
> > > > >>>>>>>> stable version upgrades with a potentially persistent
> > > > >> streamgraph.
> > > > >>>>>>>>
> > > > >>>>>>>> Best,
> > > > >>>>>>>> Fabian
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to