Hi Peter!

Thank you for the analysis of the options.

I don't really have a strong opinion, but in general I am in favor of the
mix in style interfaces.
We follow the same approach for table sources / sinks as well.

Some other benefits I see in addition to what you mentioned:
 - Easier to introduce new experimental / public-evolving interfaces in the
future
 - Easier to declare parts of the api stable going forward as it's not all
or nothing

The ability to do proper compile time validation is definitely a downside
but this should mostly make initial development a little harder I believe.

Cheers,
Gyula

On Thu, Nov 23, 2023 at 1:25 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> We had a longer discussion with Gordon yesterday.
> The main conclusion was that moving to a completely new interface is not
> justified, and we try to improve the current one.
>
> Another ask from Gordon was to check when the user will be notified if the
> parameter types are incorrect using the mixin approach.
> Imagine the type definition below:
>
> private static class
> TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin
>         implements
> TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>,
>         WithPreCommitTopology<Boolean, Void> {
>
> The parametrization of the above interfaces contradicts each other:
>
>    - TwoPhaseCommittingSinkWithPreCommitToplogy
>       - Input - Interger
>       - WriterResult - Long
>       - Committable - String)
>    - WithPreCommitToplogy
>       - WriteResult - Boolean
>       - Committable - Void
>
>
> Sadly, I was not able to find a solution where we could notify the user at
> job startup time. The first error the user will get is when the first
> record is processed/committed. Talked with Gyula Fora, and we discussed the
> possibility to use the TypeExtractor to get the types. We have decided that
> it could work in some cases, but would not be a generic solution. See the
> "NOTES FOR USERS OF THIS CLASS" [1]
>
> This missing feature would justify abandoning the mixin solution, and
> sticking to creating individual interfaces, like:
>
>    - *TwoPhaseCommittingSink* - When no pre-commit topology is needed -
>    kept because it is enough for most of the use-cases.
>    - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit
>    topology is needed with transformation in the pre commit stage - the new
>    generic interface (could be internal)
>    - *WithPreWriteTopology* - kept as it is
>    - *WithPreCommitTopology* - extends
>    TwoPhaseCommittingSinkWithPreCommitTopology with the transformation
> method
>    (classes from streaming package is needed, so can not be merged with
>    TwoPhaseCommittingSinkWithPreCommitTopology)
>    - *WithPostCommitTopology* - kept as it is (extends only
>    TwoPhaseCommittingSink, so no pre-commit topology is allowed)
>    - *WithPostCommitTopologyWithPreCommitTopology* - extends
>    WithPreCommitTopology with the same method as WithPostCommitTopology
>
> I don't really like the `WithPostCommitTopologyWithPreCommitTopology`
> complex interface, and if we start adding new features then the number of
> the interfaces could exponentially grow, but I agree that the type checking
> is important. I don't have a strong opinion, but I am inclined to vote for
> moving in the direction of the individual intefaces.
>
> What do you prefer?
>
>    1. Go with the mixin approach
>       1. Better extendability
>       2. Fewer interfaces (only with 1 now, but later this could be more)
>       3. Easier to understand (IMHO)
>    2. Stick with the combined interfaces approach (some mixin, like
>    WithPreWriteTopology, some combined like
>    WithPostCommitTopologyWithPreCommitTopology)
>       1. Better error messages
>       2. Less disruptive change (still breaking for implementations of
>       WithPreCommitTopology)
>    3. Do you have a better idea?
>
>
> Thanks,
> Peter
>
> CC: Jiabao Sun - as he might be interested in this discussion
>
> [1] -
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html
>
>
> Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt.
> 25.,
> Sze, 16:02):
>
> > Hi Gordon,
> >
> > Thanks for the review, here are my thoughts:
> >
> > > In terms of the abstraction layering, I was wondering if you've also
> > considered this approach which I've quickly sketched in my local fork:
> >
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
> >
> > I think we have a few design issues here:
> > - How to handle the old interface where the transformation is not needed
> > in the pre-commit phase? - As you have proposed, default method
> > implementation is a nice solution here, as we do not really have to
> change
> > everything in the transformation process.
> > - How to handle the WithPostCommitTopology interface? - Currently the
> > parent interface for the sink with a post commit topology is strictly a
> > single interface (TwoPhaseCommittingSink) and we want to add this to both
> > type of sinks (new - with transformation / old - without transformation).
> > In this case we could get away with creating OldTwoPhaseCommittingSink
> > WithPostCommitTopology and
> NewTwoPhaseCommittingSinkWithPostCommitTopology,
> > but this is not a good approach for future extensibility. I tend to
> prefer
> > a real mixin approach to creating multiple interfaces for this.
> >
> > > Quick thought: regarding the awkwardness you mention in the end with
> > sinks that have post commit topologies, but no pre-commit topologies -
> > Alternative to the mixin approach in the FLIP, it might make sense to
> > consider a builder approach for constructing 2PC sinks
> >
> > TBH, after providing the possibility to transform in the pre-commit
> phase,
> > I have started to think about the possible different generalizations:
> > - Why not have the possibility to have a different return type of the
> > pre-write phase? - While we have the possibility to transform the data
> in a
> > preceding map phase before the Sink, but for some Sinks might want to
> > encapsulate these transformations before the writes.
> > - Why not have the explicit possibility to change the return type of the
> > committer? - We might not want to emit the incoming Committable, we might
> > want to use the commit hash - or any other data generated by the
> committer
> > - in the post-commit topology. So in some cases it might make sense for
> the
> > committer to emit elements with different types than the input.
> > - Why not have everything as a mixin interface and define a Sink this way
> > (very-very similar to your builder approach)
> >
> > But I currently do not see explicit requirements for these features, and
> > it would result in another full rewrite of the Sink API which had a
> really
> > troubled history with several rewrites in the recent releases, so I
> decided
> > against these big changes and kept the changes minimal.
> >
> > So, while I personally would love to see the Builder solution, I am
> afraid
> > that the Flink community needs some stability around the Sink API for
> now,
> > so the different Sinks could start to use this new feature.
> >
> > What do you think?
> >
> > Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. okt.
> > 25., Sze, 2:01):
> >
> >> Hi Peter,
> >>
> >> Thanks a lot for starting this FLIP!
> >>
> >> I agree that the current TwoPhaseCommittingSink interfaces is limiting
> in
> >> that it assumes 1) committers have the same parallelism as writers, and
> 2)
> >> writers immediately produce finalized committables. This FLIP captures
> the
> >> problem pretty well, and I do think there are use cases for a more
> general
> >> flexible interface outside of the Iceberg connector as well.
> >>
> >> In terms of the abstraction layering, I was wondering if you've also
> >> considered this approach which I've quickly sketched in my local fork:
> >>
> >>
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
> >>
> >> With this approach, the sink translator always expect that 2PC sink
> >> implementations should extend `TwoPhaseCommittingSinkBase` and therefore
> >> assumes that a pre-commit topology always exist. For simple 2PC sinks
> that
> >> do not require transforming committables, we would ship (for
> convenience)
> >> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit
> >> topology is a no-op passthrough. With that we avoid some of the
> >> "boilerplates" where 2PC sinks with pre-commit topology requires
> >> implementing two interfaces, as proposed in the FLIP.
> >>
> >> Quick thought: regarding the awkwardness you mention in the end with
> sinks
> >> that have post commit topologies, but no pre-commit topologies -
> >> Alternative to the mixin approach in the FLIP, it might make sense to
> >> consider a builder approach for constructing 2PC sinks, which should
> also
> >> give users type-safety at compile time while not having the awkwardness
> >> with all the types involved. Something along the lines of:
> >>
> >> ```
> >> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier)
> >>     .withPreCommitTopology(writerResultsStream -> ...)       //
> >> Function<DataStream<WriterResultT>, DataStream<CommT>>
> >>     .withPostCommitTopology(committablesStream -> ...)     //
> >> Consumer<DataStream<CommT>>
> >>     .withPreWriteTopology(inputStream -> ...)      //
> >> Function<DataStream<InputT>, DataStream<InputT>>
> >>     .build();
> >> ```
> >>
> >> We could probably do some validation in the build() method, e.g. if
> writer
> >> / committer have different types, then clearly a pre-commit topology
> >> should
> >> have been defined to transform intermediate writer results.
> >>
> >> Obviously, this would take generalization of the TwoPhaseCommittingSink
> >> interface to the extreme, where we just have one interface with all of
> the
> >> pre-commit / pre-write / post-commit methods built-in, and users would
> use
> >> the builder as the entrypoint to opt-in / opt-out as needed. The upside
> is
> >> that the SinkTransformationTranslator logic will become much less
> >> cluttered.
> >>
> >> I'll need to experiment the builder approach a bit more to see if it
> makes
> >> sense at all, but wanted to throw out the idea earlier to see what you
> >> think.
> >>
> >> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com>
> >> wrote:
> >>
> >> > Hi Team,
> >> >
> >> > Did some experimenting and found the originally proposed solution to
> be
> >> a
> >> > bit awkward for cases where WithPostCommitTopology was needed but we
> do
> >> not
> >> > need the WithPreCommitTopology transformation.
> >> > The flexibility of the new API would be better if we would use a mixin
> >> like
> >> > approach. The interfaces would only be used to define the specific
> >> required
> >> > methods, and they would not need to extend the original
> >> > TwoPhaseCommittingSink interface too.
> >> >
> >> > Since the interfaces WithPreCommitTopology and the
> >> WithPostCommitTopology
> >> > interfaces are still Experimental, after talking to Gyula offline, I
> >> have
> >> > updated the FLIP to use this new approach.
> >> >
> >> > Any comments, thoughts are welcome.
> >> >
> >> > Thanks,
> >> > Peter
> >> >
> >> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023.
> okt.
> >> 5.,
> >> > Cs, 16:04):
> >> >
> >> > > Hi Team,
> >> > >
> >> > > In my previous email[1] I have described our challenges migrating
> the
> >> > > existing Iceberg SinkFunction based implementation, to the new
> SinkV2
> >> > based
> >> > > implementation.
> >> > >
> >> > > As a result of the discussion around that topic, I have created the
> >> > > FLIP-371 [2] to address the Committer related changes, and now I
> have
> >> > > created a companion FLIP-372 [3] to address the
> WithPreCommitTopology
> >> > > related issues.
> >> > >
> >> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to
> alter
> >> the
> >> > > type of the Committable
> >> > >
> >> > > The main goal of the FLIP-372 is to extend the currently existing
> >> > > TwoPhaseCommittingSink API by adding the possibility to have a
> >> > > PreCommitTopology where the input of and the output types of the pre
> >> > commit
> >> > > transformation are different.
> >> > >
> >> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink
> >> > > WithPreCommitTopology to alter the type of the Committable
> >> > > <
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> >> > >
> >> > >
> >> > > Please use this thread to discuss the FLIP related questions,
> >> proposals,
> >> > > and feedback.
> >> > >
> >> > > Thanks,
> >> > > Peter
> >> > >
> >> > > - [1]
> >> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
> >> > > - [2]
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
> >> > > - [3]
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> >> > >
> >> > >
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
> >> > >
> >> >
> >>
> >
>

Reply via email to