+1

Thanks, Peter. Based on the consensus in the recent thread on FLIP-371 [1]
I agree that this is the right approach. I made some minor edits to the
FLIP, which looks good to me now.

[1] https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57

On Wed, Dec 13, 2023 at 5:30 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Thank you Peter Vary for updating the FLIP based on the discussions.
> I really like the improvements introduced by the mixin interfaces which now
> aligns much better with the source and table connectors.
>
> While this introduces some breaking changes to the existing connectors,
> this is a technical debt that we need to resolve as soon as possible and
> fully before 2.0.
>
> +1 from my side.
>
> I am cc'ing some folks participating in the other threads, sorry about that
> :)
>
> Cheers,
> Gyula
>
> On Wed, Dec 13, 2023 at 4:14 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
> > I have updated the FLIP-372 [1] based on the comments from multiple
> > sources. Moved to the mixin approach as this seems to be the consensus
> > based on this thread [2]
> > Also created a draft implementation [3] PR, so I can test the changes and
> > default implementations (no new tests yet)
> > Please provide your feedback, so I can address your questions, comments
> and
> > then we can move forward to voting.
> >
> > Thanks,
> > Peter
> >
> > [1] -
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > [2] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57
> > [3] - https://github.com/apache/flink/pull/23912
> >
> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. dec.
> > 11.,
> > H, 14:28):
> >
> > > We identified another issue with the current Sink API in a thread [1]
> > > related to FLIP-371 [2]. Currently it is not possible to evolve the
> > > Sink.createWriter method with deprecation, because StatefulSink and
> > > TwoPhaseCommittingSink has methods with the same name and parameters,
> but
> > > narrowed down return type (StatefulSinkWriter,
> PrecommittingSinkWriter).
> > >
> > > To make the Sink API evolvable, we minimally have to remove these.
> > >
> > > The participants there also pointed out, that the Source API also uses
> > > mixin interfaces (SupportsHandleExecutionAttemptSourceEvent,
> > > SupportsIntermediateNoMoreSplits) in some cases. My observation is that
> > it
> > > has inheritance as well (ReaderOutput, ExternallyInducedSourceReader)
> > >
> > > I have created a draft API along these lines in a branch [3] where only
> > > the last commit is relevant [4]. This implementation would follow the
> > same
> > > patterns as the current Source API.
> > >
> > > I see two different general approaches here, and I would like to hear
> > your
> > > preferences:
> > > - Keep the changes minimal, stick to the current Sink API design. We
> > > introduce the required new combination of interfaces
> > > (TwoPhaseCommttingSinkWithPreCommitTopology,
> > > WithPostCommitTopologyWithPreCommitTopology), and do not change the API
> > > structure.
> > >      - Pros:
> > >           - Minimal change - smaller rewrite on the connector side
> > >           - Type checks happen on compile time
> > >      - Cons:
> > >           - Harder to evolve
> > >           - The number of interfaces increases with the possible
> > > combinations
> > >           - Inconsistent API patterns wrt Source API - harder for
> > > developers to understand
> > > - Migrate to a model similar to the Source API. We create mixin
> > interfaces
> > > for SupportsCommitter, SupportsWriterState, SupportsPreCommitTopology,
> > > SupportsPostCommitTopology, SupportsPreWriteTopology.
> > >     - Pros:
> > >         - Better evolvability
> > >         - Consistent with the Source API
> > >     - Cons:
> > >         - The connectors need to change their inheritance patterns
> (after
> > > the deprecation period) if they are using any of the more complicated
> > Sinks.
> > >         - Type checks need to use `instanceof`, which could happen on
> DAG
> > > generation time. Also, if the developer fails to correctly match the
> > > generic types on the mixin interfaces, the error will only surface
> during
> > > execution time - when the job tries to process the first record
> > >
> > > I personally prefer the Mixin approach for easier evolvability and
> better
> > > consistency, but I would like to hear your thoughts, so I can flash out
> > the
> > > chosen approach in FLIP-372
> > >
> > > Thanks,
> > > Peter
> > >
> > > [1] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57
> > > [2] -
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
> > > [3] - https://github.com/pvary/flink/tree/mixin_demo
> > > [4] -
> > >
> >
> https://github.com/pvary/flink/commit/acfc09f4c846f983f633bbf0c902aea49aa6b156
> > >
> > >
> > > On Fri, Nov 24, 2023, 11:38 Gyula Fóra <gyula.f...@gmail.com> wrote:
> > >
> > >> Hi Peter!
> > >>
> > >> Thank you for the analysis of the options.
> > >>
> > >> I don't really have a strong opinion, but in general I am in favor of
> > the
> > >> mix in style interfaces.
> > >> We follow the same approach for table sources / sinks as well.
> > >>
> > >> Some other benefits I see in addition to what you mentioned:
> > >>  - Easier to introduce new experimental / public-evolving interfaces
> in
> > >> the
> > >> future
> > >>  - Easier to declare parts of the api stable going forward as it's not
> > all
> > >> or nothing
> > >>
> > >> The ability to do proper compile time validation is definitely a
> > downside
> > >> but this should mostly make initial development a little harder I
> > believe.
> > >>
> > >> Cheers,
> > >> Gyula
> > >>
> > >> On Thu, Nov 23, 2023 at 1:25 PM Péter Váry <
> peter.vary.apa...@gmail.com
> > >
> > >> wrote:
> > >>
> > >> > We had a longer discussion with Gordon yesterday.
> > >> > The main conclusion was that moving to a completely new interface is
> > not
> > >> > justified, and we try to improve the current one.
> > >> >
> > >> > Another ask from Gordon was to check when the user will be notified
> if
> > >> the
> > >> > parameter types are incorrect using the mixin approach.
> > >> > Imagine the type definition below:
> > >> >
> > >> > private static class
> > >> > TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin
> > >> >         implements
> > >> > TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>,
> > >> >         WithPreCommitTopology<Boolean, Void> {
> > >> >
> > >> > The parametrization of the above interfaces contradicts each other:
> > >> >
> > >> >    - TwoPhaseCommittingSinkWithPreCommitToplogy
> > >> >       - Input - Interger
> > >> >       - WriterResult - Long
> > >> >       - Committable - String)
> > >> >    - WithPreCommitToplogy
> > >> >       - WriteResult - Boolean
> > >> >       - Committable - Void
> > >> >
> > >> >
> > >> > Sadly, I was not able to find a solution where we could notify the
> > user
> > >> at
> > >> > job startup time. The first error the user will get is when the
> first
> > >> > record is processed/committed. Talked with Gyula Fora, and we
> > discussed
> > >> the
> > >> > possibility to use the TypeExtractor to get the types. We have
> decided
> > >> that
> > >> > it could work in some cases, but would not be a generic solution.
> See
> > >> the
> > >> > "NOTES FOR USERS OF THIS CLASS" [1]
> > >> >
> > >> > This missing feature would justify abandoning the mixin solution,
> and
> > >> > sticking to creating individual interfaces, like:
> > >> >
> > >> >    - *TwoPhaseCommittingSink* - When no pre-commit topology is
> needed
> > -
> > >> >    kept because it is enough for most of the use-cases.
> > >> >    - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit
> > >> >    topology is needed with transformation in the pre commit stage -
> > the
> > >> new
> > >> >    generic interface (could be internal)
> > >> >    - *WithPreWriteTopology* - kept as it is
> > >> >    - *WithPreCommitTopology* - extends
> > >> >    TwoPhaseCommittingSinkWithPreCommitTopology with the
> transformation
> > >> > method
> > >> >    (classes from streaming package is needed, so can not be merged
> > with
> > >> >    TwoPhaseCommittingSinkWithPreCommitTopology)
> > >> >    - *WithPostCommitTopology* - kept as it is (extends only
> > >> >    TwoPhaseCommittingSink, so no pre-commit topology is allowed)
> > >> >    - *WithPostCommitTopologyWithPreCommitTopology* - extends
> > >> >    WithPreCommitTopology with the same method as
> > WithPostCommitTopology
> > >> >
> > >> > I don't really like the
> `WithPostCommitTopologyWithPreCommitTopology`
> > >> > complex interface, and if we start adding new features then the
> number
> > >> of
> > >> > the interfaces could exponentially grow, but I agree that the type
> > >> checking
> > >> > is important. I don't have a strong opinion, but I am inclined to
> vote
> > >> for
> > >> > moving in the direction of the individual intefaces.
> > >> >
> > >> > What do you prefer?
> > >> >
> > >> >    1. Go with the mixin approach
> > >> >       1. Better extendability
> > >> >       2. Fewer interfaces (only with 1 now, but later this could be
> > >> more)
> > >> >       3. Easier to understand (IMHO)
> > >> >    2. Stick with the combined interfaces approach (some mixin, like
> > >> >    WithPreWriteTopology, some combined like
> > >> >    WithPostCommitTopologyWithPreCommitTopology)
> > >> >       1. Better error messages
> > >> >       2. Less disruptive change (still breaking for implementations
> of
> > >> >       WithPreCommitTopology)
> > >> >    3. Do you have a better idea?
> > >> >
> > >> >
> > >> > Thanks,
> > >> > Peter
> > >> >
> > >> > CC: Jiabao Sun - as he might be interested in this discussion
> > >> >
> > >> > [1] -
> > >> >
> > >> >
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html
> > >> >
> > >> >
> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023.
> > okt.
> > >> > 25.,
> > >> > Sze, 16:02):
> > >> >
> > >> > > Hi Gordon,
> > >> > >
> > >> > > Thanks for the review, here are my thoughts:
> > >> > >
> > >> > > > In terms of the abstraction layering, I was wondering if you've
> > also
> > >> > > considered this approach which I've quickly sketched in my local
> > fork:
> > >> > >
> > >> >
> > >>
> >
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
> > >> > >
> > >> > > I think we have a few design issues here:
> > >> > > - How to handle the old interface where the transformation is not
> > >> needed
> > >> > > in the pre-commit phase? - As you have proposed, default method
> > >> > > implementation is a nice solution here, as we do not really have
> to
> > >> > change
> > >> > > everything in the transformation process.
> > >> > > - How to handle the WithPostCommitTopology interface? - Currently
> > the
> > >> > > parent interface for the sink with a post commit topology is
> > strictly
> > >> a
> > >> > > single interface (TwoPhaseCommittingSink) and we want to add this
> to
> > >> both
> > >> > > type of sinks (new - with transformation / old - without
> > >> transformation).
> > >> > > In this case we could get away with creating
> > OldTwoPhaseCommittingSink
> > >> > > WithPostCommitTopology and
> > >> > NewTwoPhaseCommittingSinkWithPostCommitTopology,
> > >> > > but this is not a good approach for future extensibility. I tend
> to
> > >> > prefer
> > >> > > a real mixin approach to creating multiple interfaces for this.
> > >> > >
> > >> > > > Quick thought: regarding the awkwardness you mention in the end
> > with
> > >> > > sinks that have post commit topologies, but no pre-commit
> > topologies -
> > >> > > Alternative to the mixin approach in the FLIP, it might make sense
> > to
> > >> > > consider a builder approach for constructing 2PC sinks
> > >> > >
> > >> > > TBH, after providing the possibility to transform in the
> pre-commit
> > >> > phase,
> > >> > > I have started to think about the possible different
> > generalizations:
> > >> > > - Why not have the possibility to have a different return type of
> > the
> > >> > > pre-write phase? - While we have the possibility to transform the
> > data
> > >> > in a
> > >> > > preceding map phase before the Sink, but for some Sinks might want
> > to
> > >> > > encapsulate these transformations before the writes.
> > >> > > - Why not have the explicit possibility to change the return type
> of
> > >> the
> > >> > > committer? - We might not want to emit the incoming Committable,
> we
> > >> might
> > >> > > want to use the commit hash - or any other data generated by the
> > >> > committer
> > >> > > - in the post-commit topology. So in some cases it might make
> sense
> > >> for
> > >> > the
> > >> > > committer to emit elements with different types than the input.
> > >> > > - Why not have everything as a mixin interface and define a Sink
> > this
> > >> way
> > >> > > (very-very similar to your builder approach)
> > >> > >
> > >> > > But I currently do not see explicit requirements for these
> features,
> > >> and
> > >> > > it would result in another full rewrite of the Sink API which had
> a
> > >> > really
> > >> > > troubled history with several rewrites in the recent releases, so
> I
> > >> > decided
> > >> > > against these big changes and kept the changes minimal.
> > >> > >
> > >> > > So, while I personally would love to see the Builder solution, I
> am
> > >> > afraid
> > >> > > that the Flink community needs some stability around the Sink API
> > for
> > >> > now,
> > >> > > so the different Sinks could start to use this new feature.
> > >> > >
> > >> > > What do you think?
> > >> > >
> > >> > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont:
> 2023.
> > >> okt.
> > >> > > 25., Sze, 2:01):
> > >> > >
> > >> > >> Hi Peter,
> > >> > >>
> > >> > >> Thanks a lot for starting this FLIP!
> > >> > >>
> > >> > >> I agree that the current TwoPhaseCommittingSink interfaces is
> > >> limiting
> > >> > in
> > >> > >> that it assumes 1) committers have the same parallelism as
> writers,
> > >> and
> > >> > 2)
> > >> > >> writers immediately produce finalized committables. This FLIP
> > >> captures
> > >> > the
> > >> > >> problem pretty well, and I do think there are use cases for a
> more
> > >> > general
> > >> > >> flexible interface outside of the Iceberg connector as well.
> > >> > >>
> > >> > >> In terms of the abstraction layering, I was wondering if you've
> > also
> > >> > >> considered this approach which I've quickly sketched in my local
> > >> fork:
> > >> > >>
> > >> > >>
> > >> >
> > >>
> >
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
> > >> > >>
> > >> > >> With this approach, the sink translator always expect that 2PC
> sink
> > >> > >> implementations should extend `TwoPhaseCommittingSinkBase` and
> > >> therefore
> > >> > >> assumes that a pre-commit topology always exist. For simple 2PC
> > sinks
> > >> > that
> > >> > >> do not require transforming committables, we would ship (for
> > >> > convenience)
> > >> > >> an additional `SimpleTwoPhaseCommittingSinkBase` where the
> > pre-commit
> > >> > >> topology is a no-op passthrough. With that we avoid some of the
> > >> > >> "boilerplates" where 2PC sinks with pre-commit topology requires
> > >> > >> implementing two interfaces, as proposed in the FLIP.
> > >> > >>
> > >> > >> Quick thought: regarding the awkwardness you mention in the end
> > with
> > >> > sinks
> > >> > >> that have post commit topologies, but no pre-commit topologies -
> > >> > >> Alternative to the mixin approach in the FLIP, it might make
> sense
> > to
> > >> > >> consider a builder approach for constructing 2PC sinks, which
> > should
> > >> > also
> > >> > >> give users type-safety at compile time while not having the
> > >> awkwardness
> > >> > >> with all the types involved. Something along the lines of:
> > >> > >>
> > >> > >> ```
> > >> > >> new TwoPhaseCommittingSinkBuilder(writerSupplier,
> > committerSupplier)
> > >> > >>     .withPreCommitTopology(writerResultsStream -> ...)       //
> > >> > >> Function<DataStream<WriterResultT>, DataStream<CommT>>
> > >> > >>     .withPostCommitTopology(committablesStream -> ...)     //
> > >> > >> Consumer<DataStream<CommT>>
> > >> > >>     .withPreWriteTopology(inputStream -> ...)      //
> > >> > >> Function<DataStream<InputT>, DataStream<InputT>>
> > >> > >>     .build();
> > >> > >> ```
> > >> > >>
> > >> > >> We could probably do some validation in the build() method, e.g.
> if
> > >> > writer
> > >> > >> / committer have different types, then clearly a pre-commit
> > topology
> > >> > >> should
> > >> > >> have been defined to transform intermediate writer results.
> > >> > >>
> > >> > >> Obviously, this would take generalization of the
> > >> TwoPhaseCommittingSink
> > >> > >> interface to the extreme, where we just have one interface with
> all
> > >> of
> > >> > the
> > >> > >> pre-commit / pre-write / post-commit methods built-in, and users
> > >> would
> > >> > use
> > >> > >> the builder as the entrypoint to opt-in / opt-out as needed. The
> > >> upside
> > >> > is
> > >> > >> that the SinkTransformationTranslator logic will become much less
> > >> > >> cluttered.
> > >> > >>
> > >> > >> I'll need to experiment the builder approach a bit more to see if
> > it
> > >> > makes
> > >> > >> sense at all, but wanted to throw out the idea earlier to see
> what
> > >> you
> > >> > >> think.
> > >> > >>
> > >> > >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <
> > >> peter.vary.apa...@gmail.com>
> > >> > >> wrote:
> > >> > >>
> > >> > >> > Hi Team,
> > >> > >> >
> > >> > >> > Did some experimenting and found the originally proposed
> solution
> > >> to
> > >> > be
> > >> > >> a
> > >> > >> > bit awkward for cases where WithPostCommitTopology was needed
> but
> > >> we
> > >> > do
> > >> > >> not
> > >> > >> > need the WithPreCommitTopology transformation.
> > >> > >> > The flexibility of the new API would be better if we would use
> a
> > >> mixin
> > >> > >> like
> > >> > >> > approach. The interfaces would only be used to define the
> > specific
> > >> > >> required
> > >> > >> > methods, and they would not need to extend the original
> > >> > >> > TwoPhaseCommittingSink interface too.
> > >> > >> >
> > >> > >> > Since the interfaces WithPreCommitTopology and the
> > >> > >> WithPostCommitTopology
> > >> > >> > interfaces are still Experimental, after talking to Gyula
> > offline,
> > >> I
> > >> > >> have
> > >> > >> > updated the FLIP to use this new approach.
> > >> > >> >
> > >> > >> > Any comments, thoughts are welcome.
> > >> > >> >
> > >> > >> > Thanks,
> > >> > >> > Peter
> > >> > >> >
> > >> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont:
> > 2023.
> > >> > okt.
> > >> > >> 5.,
> > >> > >> > Cs, 16:04):
> > >> > >> >
> > >> > >> > > Hi Team,
> > >> > >> > >
> > >> > >> > > In my previous email[1] I have described our challenges
> > migrating
> > >> > the
> > >> > >> > > existing Iceberg SinkFunction based implementation, to the
> new
> > >> > SinkV2
> > >> > >> > based
> > >> > >> > > implementation.
> > >> > >> > >
> > >> > >> > > As a result of the discussion around that topic, I have
> created
> > >> the
> > >> > >> > > FLIP-371 [2] to address the Committer related changes, and
> now
> > I
> > >> > have
> > >> > >> > > created a companion FLIP-372 [3] to address the
> > >> > WithPreCommitTopology
> > >> > >> > > related issues.
> > >> > >> > >
> > >> > >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology
> to
> > >> > alter
> > >> > >> the
> > >> > >> > > type of the Committable
> > >> > >> > >
> > >> > >> > > The main goal of the FLIP-372 is to extend the currently
> > existing
> > >> > >> > > TwoPhaseCommittingSink API by adding the possibility to have
> a
> > >> > >> > > PreCommitTopology where the input of and the output types of
> > the
> > >> pre
> > >> > >> > commit
> > >> > >> > > transformation are different.
> > >> > >> > >
> > >> > >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink
> > >> > >> > > WithPreCommitTopology to alter the type of the Committable
> > >> > >> > > <
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > >> > >> > >
> > >> > >> > >
> > >> > >> > > Please use this thread to discuss the FLIP related questions,
> > >> > >> proposals,
> > >> > >> > > and feedback.
> > >> > >> > >
> > >> > >> > > Thanks,
> > >> > >> > > Peter
> > >> > >> > >
> > >> > >> > > - [1]
> > >> > >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
> > >> > >> > > - [2]
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
> > >> > >> > > - [3]
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > >> > >> > >
> > >> > >> > >
> > >> > >> > >
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> > >> > >> > >
> > >> > >> >
> > >> > >>
> > >> > >
> > >> >
> > >>
> > >>
> >
>

Reply via email to