Thank you Peter Vary for updating the FLIP based on the discussions. I really like the improvements introduced by the mixin interfaces which now aligns much better with the source and table connectors.
While this introduces some breaking changes to the existing connectors, this is a technical debt that we need to resolve as soon as possible and fully before 2.0. +1 from my side. I am cc'ing some folks participating in the other threads, sorry about that :) Cheers, Gyula On Wed, Dec 13, 2023 at 4:14 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > I have updated the FLIP-372 [1] based on the comments from multiple > sources. Moved to the mixin approach as this seems to be the consensus > based on this thread [2] > Also created a draft implementation [3] PR, so I can test the changes and > default implementations (no new tests yet) > Please provide your feedback, so I can address your questions, comments and > then we can move forward to voting. > > Thanks, > Peter > > [1] - > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > [2] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 > [3] - https://github.com/apache/flink/pull/23912 > > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. dec. > 11., > H, 14:28): > > > We identified another issue with the current Sink API in a thread [1] > > related to FLIP-371 [2]. Currently it is not possible to evolve the > > Sink.createWriter method with deprecation, because StatefulSink and > > TwoPhaseCommittingSink has methods with the same name and parameters, but > > narrowed down return type (StatefulSinkWriter, PrecommittingSinkWriter). > > > > To make the Sink API evolvable, we minimally have to remove these. > > > > The participants there also pointed out, that the Source API also uses > > mixin interfaces (SupportsHandleExecutionAttemptSourceEvent, > > SupportsIntermediateNoMoreSplits) in some cases. My observation is that > it > > has inheritance as well (ReaderOutput, ExternallyInducedSourceReader) > > > > I have created a draft API along these lines in a branch [3] where only > > the last commit is relevant [4]. This implementation would follow the > same > > patterns as the current Source API. > > > > I see two different general approaches here, and I would like to hear > your > > preferences: > > - Keep the changes minimal, stick to the current Sink API design. We > > introduce the required new combination of interfaces > > (TwoPhaseCommttingSinkWithPreCommitTopology, > > WithPostCommitTopologyWithPreCommitTopology), and do not change the API > > structure. > > - Pros: > > - Minimal change - smaller rewrite on the connector side > > - Type checks happen on compile time > > - Cons: > > - Harder to evolve > > - The number of interfaces increases with the possible > > combinations > > - Inconsistent API patterns wrt Source API - harder for > > developers to understand > > - Migrate to a model similar to the Source API. We create mixin > interfaces > > for SupportsCommitter, SupportsWriterState, SupportsPreCommitTopology, > > SupportsPostCommitTopology, SupportsPreWriteTopology. > > - Pros: > > - Better evolvability > > - Consistent with the Source API > > - Cons: > > - The connectors need to change their inheritance patterns (after > > the deprecation period) if they are using any of the more complicated > Sinks. > > - Type checks need to use `instanceof`, which could happen on DAG > > generation time. Also, if the developer fails to correctly match the > > generic types on the mixin interfaces, the error will only surface during > > execution time - when the job tries to process the first record > > > > I personally prefer the Mixin approach for easier evolvability and better > > consistency, but I would like to hear your thoughts, so I can flash out > the > > chosen approach in FLIP-372 > > > > Thanks, > > Peter > > > > [1] - https://lists.apache.org/thread/h6nkgth838dlh5s90sd95zd6hlsxwx57 > > [2] - > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > > [3] - https://github.com/pvary/flink/tree/mixin_demo > > [4] - > > > https://github.com/pvary/flink/commit/acfc09f4c846f983f633bbf0c902aea49aa6b156 > > > > > > On Fri, Nov 24, 2023, 11:38 Gyula Fóra <gyula.f...@gmail.com> wrote: > > > >> Hi Peter! > >> > >> Thank you for the analysis of the options. > >> > >> I don't really have a strong opinion, but in general I am in favor of > the > >> mix in style interfaces. > >> We follow the same approach for table sources / sinks as well. > >> > >> Some other benefits I see in addition to what you mentioned: > >> - Easier to introduce new experimental / public-evolving interfaces in > >> the > >> future > >> - Easier to declare parts of the api stable going forward as it's not > all > >> or nothing > >> > >> The ability to do proper compile time validation is definitely a > downside > >> but this should mostly make initial development a little harder I > believe. > >> > >> Cheers, > >> Gyula > >> > >> On Thu, Nov 23, 2023 at 1:25 PM Péter Váry <peter.vary.apa...@gmail.com > > > >> wrote: > >> > >> > We had a longer discussion with Gordon yesterday. > >> > The main conclusion was that moving to a completely new interface is > not > >> > justified, and we try to improve the current one. > >> > > >> > Another ask from Gordon was to check when the user will be notified if > >> the > >> > parameter types are incorrect using the mixin approach. > >> > Imagine the type definition below: > >> > > >> > private static class > >> > TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin > >> > implements > >> > TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>, > >> > WithPreCommitTopology<Boolean, Void> { > >> > > >> > The parametrization of the above interfaces contradicts each other: > >> > > >> > - TwoPhaseCommittingSinkWithPreCommitToplogy > >> > - Input - Interger > >> > - WriterResult - Long > >> > - Committable - String) > >> > - WithPreCommitToplogy > >> > - WriteResult - Boolean > >> > - Committable - Void > >> > > >> > > >> > Sadly, I was not able to find a solution where we could notify the > user > >> at > >> > job startup time. The first error the user will get is when the first > >> > record is processed/committed. Talked with Gyula Fora, and we > discussed > >> the > >> > possibility to use the TypeExtractor to get the types. We have decided > >> that > >> > it could work in some cases, but would not be a generic solution. See > >> the > >> > "NOTES FOR USERS OF THIS CLASS" [1] > >> > > >> > This missing feature would justify abandoning the mixin solution, and > >> > sticking to creating individual interfaces, like: > >> > > >> > - *TwoPhaseCommittingSink* - When no pre-commit topology is needed > - > >> > kept because it is enough for most of the use-cases. > >> > - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit > >> > topology is needed with transformation in the pre commit stage - > the > >> new > >> > generic interface (could be internal) > >> > - *WithPreWriteTopology* - kept as it is > >> > - *WithPreCommitTopology* - extends > >> > TwoPhaseCommittingSinkWithPreCommitTopology with the transformation > >> > method > >> > (classes from streaming package is needed, so can not be merged > with > >> > TwoPhaseCommittingSinkWithPreCommitTopology) > >> > - *WithPostCommitTopology* - kept as it is (extends only > >> > TwoPhaseCommittingSink, so no pre-commit topology is allowed) > >> > - *WithPostCommitTopologyWithPreCommitTopology* - extends > >> > WithPreCommitTopology with the same method as > WithPostCommitTopology > >> > > >> > I don't really like the `WithPostCommitTopologyWithPreCommitTopology` > >> > complex interface, and if we start adding new features then the number > >> of > >> > the interfaces could exponentially grow, but I agree that the type > >> checking > >> > is important. I don't have a strong opinion, but I am inclined to vote > >> for > >> > moving in the direction of the individual intefaces. > >> > > >> > What do you prefer? > >> > > >> > 1. Go with the mixin approach > >> > 1. Better extendability > >> > 2. Fewer interfaces (only with 1 now, but later this could be > >> more) > >> > 3. Easier to understand (IMHO) > >> > 2. Stick with the combined interfaces approach (some mixin, like > >> > WithPreWriteTopology, some combined like > >> > WithPostCommitTopologyWithPreCommitTopology) > >> > 1. Better error messages > >> > 2. Less disruptive change (still breaking for implementations of > >> > WithPreCommitTopology) > >> > 3. Do you have a better idea? > >> > > >> > > >> > Thanks, > >> > Peter > >> > > >> > CC: Jiabao Sun - as he might be interested in this discussion > >> > > >> > [1] - > >> > > >> > > >> > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html > >> > > >> > > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. > okt. > >> > 25., > >> > Sze, 16:02): > >> > > >> > > Hi Gordon, > >> > > > >> > > Thanks for the review, here are my thoughts: > >> > > > >> > > > In terms of the abstraction layering, I was wondering if you've > also > >> > > considered this approach which I've quickly sketched in my local > fork: > >> > > > >> > > >> > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > >> > > > >> > > I think we have a few design issues here: > >> > > - How to handle the old interface where the transformation is not > >> needed > >> > > in the pre-commit phase? - As you have proposed, default method > >> > > implementation is a nice solution here, as we do not really have to > >> > change > >> > > everything in the transformation process. > >> > > - How to handle the WithPostCommitTopology interface? - Currently > the > >> > > parent interface for the sink with a post commit topology is > strictly > >> a > >> > > single interface (TwoPhaseCommittingSink) and we want to add this to > >> both > >> > > type of sinks (new - with transformation / old - without > >> transformation). > >> > > In this case we could get away with creating > OldTwoPhaseCommittingSink > >> > > WithPostCommitTopology and > >> > NewTwoPhaseCommittingSinkWithPostCommitTopology, > >> > > but this is not a good approach for future extensibility. I tend to > >> > prefer > >> > > a real mixin approach to creating multiple interfaces for this. > >> > > > >> > > > Quick thought: regarding the awkwardness you mention in the end > with > >> > > sinks that have post commit topologies, but no pre-commit > topologies - > >> > > Alternative to the mixin approach in the FLIP, it might make sense > to > >> > > consider a builder approach for constructing 2PC sinks > >> > > > >> > > TBH, after providing the possibility to transform in the pre-commit > >> > phase, > >> > > I have started to think about the possible different > generalizations: > >> > > - Why not have the possibility to have a different return type of > the > >> > > pre-write phase? - While we have the possibility to transform the > data > >> > in a > >> > > preceding map phase before the Sink, but for some Sinks might want > to > >> > > encapsulate these transformations before the writes. > >> > > - Why not have the explicit possibility to change the return type of > >> the > >> > > committer? - We might not want to emit the incoming Committable, we > >> might > >> > > want to use the commit hash - or any other data generated by the > >> > committer > >> > > - in the post-commit topology. So in some cases it might make sense > >> for > >> > the > >> > > committer to emit elements with different types than the input. > >> > > - Why not have everything as a mixin interface and define a Sink > this > >> way > >> > > (very-very similar to your builder approach) > >> > > > >> > > But I currently do not see explicit requirements for these features, > >> and > >> > > it would result in another full rewrite of the Sink API which had a > >> > really > >> > > troubled history with several rewrites in the recent releases, so I > >> > decided > >> > > against these big changes and kept the changes minimal. > >> > > > >> > > So, while I personally would love to see the Builder solution, I am > >> > afraid > >> > > that the Flink community needs some stability around the Sink API > for > >> > now, > >> > > so the different Sinks could start to use this new feature. > >> > > > >> > > What do you think? > >> > > > >> > > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. > >> okt. > >> > > 25., Sze, 2:01): > >> > > > >> > >> Hi Peter, > >> > >> > >> > >> Thanks a lot for starting this FLIP! > >> > >> > >> > >> I agree that the current TwoPhaseCommittingSink interfaces is > >> limiting > >> > in > >> > >> that it assumes 1) committers have the same parallelism as writers, > >> and > >> > 2) > >> > >> writers immediately produce finalized committables. This FLIP > >> captures > >> > the > >> > >> problem pretty well, and I do think there are use cases for a more > >> > general > >> > >> flexible interface outside of the Iceberg connector as well. > >> > >> > >> > >> In terms of the abstraction layering, I was wondering if you've > also > >> > >> considered this approach which I've quickly sketched in my local > >> fork: > >> > >> > >> > >> > >> > > >> > https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae > >> > >> > >> > >> With this approach, the sink translator always expect that 2PC sink > >> > >> implementations should extend `TwoPhaseCommittingSinkBase` and > >> therefore > >> > >> assumes that a pre-commit topology always exist. For simple 2PC > sinks > >> > that > >> > >> do not require transforming committables, we would ship (for > >> > convenience) > >> > >> an additional `SimpleTwoPhaseCommittingSinkBase` where the > pre-commit > >> > >> topology is a no-op passthrough. With that we avoid some of the > >> > >> "boilerplates" where 2PC sinks with pre-commit topology requires > >> > >> implementing two interfaces, as proposed in the FLIP. > >> > >> > >> > >> Quick thought: regarding the awkwardness you mention in the end > with > >> > sinks > >> > >> that have post commit topologies, but no pre-commit topologies - > >> > >> Alternative to the mixin approach in the FLIP, it might make sense > to > >> > >> consider a builder approach for constructing 2PC sinks, which > should > >> > also > >> > >> give users type-safety at compile time while not having the > >> awkwardness > >> > >> with all the types involved. Something along the lines of: > >> > >> > >> > >> ``` > >> > >> new TwoPhaseCommittingSinkBuilder(writerSupplier, > committerSupplier) > >> > >> .withPreCommitTopology(writerResultsStream -> ...) // > >> > >> Function<DataStream<WriterResultT>, DataStream<CommT>> > >> > >> .withPostCommitTopology(committablesStream -> ...) // > >> > >> Consumer<DataStream<CommT>> > >> > >> .withPreWriteTopology(inputStream -> ...) // > >> > >> Function<DataStream<InputT>, DataStream<InputT>> > >> > >> .build(); > >> > >> ``` > >> > >> > >> > >> We could probably do some validation in the build() method, e.g. if > >> > writer > >> > >> / committer have different types, then clearly a pre-commit > topology > >> > >> should > >> > >> have been defined to transform intermediate writer results. > >> > >> > >> > >> Obviously, this would take generalization of the > >> TwoPhaseCommittingSink > >> > >> interface to the extreme, where we just have one interface with all > >> of > >> > the > >> > >> pre-commit / pre-write / post-commit methods built-in, and users > >> would > >> > use > >> > >> the builder as the entrypoint to opt-in / opt-out as needed. The > >> upside > >> > is > >> > >> that the SinkTransformationTranslator logic will become much less > >> > >> cluttered. > >> > >> > >> > >> I'll need to experiment the builder approach a bit more to see if > it > >> > makes > >> > >> sense at all, but wanted to throw out the idea earlier to see what > >> you > >> > >> think. > >> > >> > >> > >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry < > >> peter.vary.apa...@gmail.com> > >> > >> wrote: > >> > >> > >> > >> > Hi Team, > >> > >> > > >> > >> > Did some experimenting and found the originally proposed solution > >> to > >> > be > >> > >> a > >> > >> > bit awkward for cases where WithPostCommitTopology was needed but > >> we > >> > do > >> > >> not > >> > >> > need the WithPreCommitTopology transformation. > >> > >> > The flexibility of the new API would be better if we would use a > >> mixin > >> > >> like > >> > >> > approach. The interfaces would only be used to define the > specific > >> > >> required > >> > >> > methods, and they would not need to extend the original > >> > >> > TwoPhaseCommittingSink interface too. > >> > >> > > >> > >> > Since the interfaces WithPreCommitTopology and the > >> > >> WithPostCommitTopology > >> > >> > interfaces are still Experimental, after talking to Gyula > offline, > >> I > >> > >> have > >> > >> > updated the FLIP to use this new approach. > >> > >> > > >> > >> > Any comments, thoughts are welcome. > >> > >> > > >> > >> > Thanks, > >> > >> > Peter > >> > >> > > >> > >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: > 2023. > >> > okt. > >> > >> 5., > >> > >> > Cs, 16:04): > >> > >> > > >> > >> > > Hi Team, > >> > >> > > > >> > >> > > In my previous email[1] I have described our challenges > migrating > >> > the > >> > >> > > existing Iceberg SinkFunction based implementation, to the new > >> > SinkV2 > >> > >> > based > >> > >> > > implementation. > >> > >> > > > >> > >> > > As a result of the discussion around that topic, I have created > >> the > >> > >> > > FLIP-371 [2] to address the Committer related changes, and now > I > >> > have > >> > >> > > created a companion FLIP-372 [3] to address the > >> > WithPreCommitTopology > >> > >> > > related issues. > >> > >> > > > >> > >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to > >> > alter > >> > >> the > >> > >> > > type of the Committable > >> > >> > > > >> > >> > > The main goal of the FLIP-372 is to extend the currently > existing > >> > >> > > TwoPhaseCommittingSink API by adding the possibility to have a > >> > >> > > PreCommitTopology where the input of and the output types of > the > >> pre > >> > >> > commit > >> > >> > > transformation are different. > >> > >> > > > >> > >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink > >> > >> > > WithPreCommitTopology to alter the type of the Committable > >> > >> > > < > >> > >> > > >> > >> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > >> > >> > > > >> > >> > > > >> > >> > > Please use this thread to discuss the FLIP related questions, > >> > >> proposals, > >> > >> > > and feedback. > >> > >> > > > >> > >> > > Thanks, > >> > >> > > Peter > >> > >> > > > >> > >> > > - [1] > >> > >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn > >> > >> > > - [2] > >> > >> > > > >> > >> > > >> > >> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink > >> > >> > > - [3] > >> > >> > > > >> > >> > > >> > >> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > >> > >> > >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > >> > >> >