We had a longer discussion with Gordon yesterday.
The main conclusion was that moving to a completely new interface is not
justified, and we try to improve the current one.

Another ask from Gordon was to check when the user will be notified if the
parameter types are incorrect using the mixin approach.
Imagine the type definition below:

private static class TestTwoPhaseCommittingSinkWithPreCommitTopologyWrongMixin
        implements
TwoPhaseCommittingSinkWithPreCommitTopology<Integer, Long, String>,
        WithPreCommitTopology<Boolean, Void> {

The parametrization of the above interfaces contradicts each other:

   - TwoPhaseCommittingSinkWithPreCommitToplogy
      - Input - Interger
      - WriterResult - Long
      - Committable - String)
   - WithPreCommitToplogy
      - WriteResult - Boolean
      - Committable - Void


Sadly, I was not able to find a solution where we could notify the user at
job startup time. The first error the user will get is when the first
record is processed/committed. Talked with Gyula Fora, and we discussed the
possibility to use the TypeExtractor to get the types. We have decided that
it could work in some cases, but would not be a generic solution. See the
"NOTES FOR USERS OF THIS CLASS" [1]

This missing feature would justify abandoning the mixin solution, and
sticking to creating individual interfaces, like:

   - *TwoPhaseCommittingSink* - When no pre-commit topology is needed -
   kept because it is enough for most of the use-cases.
   - *TwoPhaseCommittingSinkWithPreCommitTopology* - When pre-commit
   topology is needed with transformation in the pre commit stage - the new
   generic interface (could be internal)
   - *WithPreWriteTopology* - kept as it is
   - *WithPreCommitTopology* - extends
   TwoPhaseCommittingSinkWithPreCommitTopology with the transformation method
   (classes from streaming package is needed, so can not be merged with
   TwoPhaseCommittingSinkWithPreCommitTopology)
   - *WithPostCommitTopology* - kept as it is (extends only
   TwoPhaseCommittingSink, so no pre-commit topology is allowed)
   - *WithPostCommitTopologyWithPreCommitTopology* - extends
   WithPreCommitTopology with the same method as WithPostCommitTopology

I don't really like the `WithPostCommitTopologyWithPreCommitTopology`
complex interface, and if we start adding new features then the number of
the interfaces could exponentially grow, but I agree that the type checking
is important. I don't have a strong opinion, but I am inclined to vote for
moving in the direction of the individual intefaces.

What do you prefer?

   1. Go with the mixin approach
      1. Better extendability
      2. Fewer interfaces (only with 1 now, but later this could be more)
      3. Easier to understand (IMHO)
   2. Stick with the combined interfaces approach (some mixin, like
   WithPreWriteTopology, some combined like
   WithPostCommitTopologyWithPreCommitTopology)
      1. Better error messages
      2. Less disruptive change (still breaking for implementations of
      WithPreCommitTopology)
   3. Do you have a better idea?


Thanks,
Peter

CC: Jiabao Sun - as he might be interested in this discussion

[1] -
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/TypeExtractor.html


Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt. 25.,
Sze, 16:02):

> Hi Gordon,
>
> Thanks for the review, here are my thoughts:
>
> > In terms of the abstraction layering, I was wondering if you've also
> considered this approach which I've quickly sketched in my local fork:
> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
>
> I think we have a few design issues here:
> - How to handle the old interface where the transformation is not needed
> in the pre-commit phase? - As you have proposed, default method
> implementation is a nice solution here, as we do not really have to change
> everything in the transformation process.
> - How to handle the WithPostCommitTopology interface? - Currently the
> parent interface for the sink with a post commit topology is strictly a
> single interface (TwoPhaseCommittingSink) and we want to add this to both
> type of sinks (new - with transformation / old - without transformation).
> In this case we could get away with creating OldTwoPhaseCommittingSink
> WithPostCommitTopology and NewTwoPhaseCommittingSinkWithPostCommitTopology,
> but this is not a good approach for future extensibility. I tend to prefer
> a real mixin approach to creating multiple interfaces for this.
>
> > Quick thought: regarding the awkwardness you mention in the end with
> sinks that have post commit topologies, but no pre-commit topologies -
> Alternative to the mixin approach in the FLIP, it might make sense to
> consider a builder approach for constructing 2PC sinks
>
> TBH, after providing the possibility to transform in the pre-commit phase,
> I have started to think about the possible different generalizations:
> - Why not have the possibility to have a different return type of the
> pre-write phase? - While we have the possibility to transform the data in a
> preceding map phase before the Sink, but for some Sinks might want to
> encapsulate these transformations before the writes.
> - Why not have the explicit possibility to change the return type of the
> committer? - We might not want to emit the incoming Committable, we might
> want to use the commit hash - or any other data generated by the committer
> - in the post-commit topology. So in some cases it might make sense for the
> committer to emit elements with different types than the input.
> - Why not have everything as a mixin interface and define a Sink this way
> (very-very similar to your builder approach)
>
> But I currently do not see explicit requirements for these features, and
> it would result in another full rewrite of the Sink API which had a really
> troubled history with several rewrites in the recent releases, so I decided
> against these big changes and kept the changes minimal.
>
> So, while I personally would love to see the Builder solution, I am afraid
> that the Flink community needs some stability around the Sink API for now,
> so the different Sinks could start to use this new feature.
>
> What do you think?
>
> Tzu-Li (Gordon) Tai <tzuli...@apache.org> ezt írta (időpont: 2023. okt.
> 25., Sze, 2:01):
>
>> Hi Peter,
>>
>> Thanks a lot for starting this FLIP!
>>
>> I agree that the current TwoPhaseCommittingSink interfaces is limiting in
>> that it assumes 1) committers have the same parallelism as writers, and 2)
>> writers immediately produce finalized committables. This FLIP captures the
>> problem pretty well, and I do think there are use cases for a more general
>> flexible interface outside of the Iceberg connector as well.
>>
>> In terms of the abstraction layering, I was wondering if you've also
>> considered this approach which I've quickly sketched in my local fork:
>>
>> https://github.com/tzulitai/flink/commit/e84e3ac57ce023c35037a8470fefdfcad877bcae
>>
>> With this approach, the sink translator always expect that 2PC sink
>> implementations should extend `TwoPhaseCommittingSinkBase` and therefore
>> assumes that a pre-commit topology always exist. For simple 2PC sinks that
>> do not require transforming committables, we would ship (for convenience)
>> an additional `SimpleTwoPhaseCommittingSinkBase` where the pre-commit
>> topology is a no-op passthrough. With that we avoid some of the
>> "boilerplates" where 2PC sinks with pre-commit topology requires
>> implementing two interfaces, as proposed in the FLIP.
>>
>> Quick thought: regarding the awkwardness you mention in the end with sinks
>> that have post commit topologies, but no pre-commit topologies -
>> Alternative to the mixin approach in the FLIP, it might make sense to
>> consider a builder approach for constructing 2PC sinks, which should also
>> give users type-safety at compile time while not having the awkwardness
>> with all the types involved. Something along the lines of:
>>
>> ```
>> new TwoPhaseCommittingSinkBuilder(writerSupplier, committerSupplier)
>>     .withPreCommitTopology(writerResultsStream -> ...)       //
>> Function<DataStream<WriterResultT>, DataStream<CommT>>
>>     .withPostCommitTopology(committablesStream -> ...)     //
>> Consumer<DataStream<CommT>>
>>     .withPreWriteTopology(inputStream -> ...)      //
>> Function<DataStream<InputT>, DataStream<InputT>>
>>     .build();
>> ```
>>
>> We could probably do some validation in the build() method, e.g. if writer
>> / committer have different types, then clearly a pre-commit topology
>> should
>> have been defined to transform intermediate writer results.
>>
>> Obviously, this would take generalization of the TwoPhaseCommittingSink
>> interface to the extreme, where we just have one interface with all of the
>> pre-commit / pre-write / post-commit methods built-in, and users would use
>> the builder as the entrypoint to opt-in / opt-out as needed. The upside is
>> that the SinkTransformationTranslator logic will become much less
>> cluttered.
>>
>> I'll need to experiment the builder approach a bit more to see if it makes
>> sense at all, but wanted to throw out the idea earlier to see what you
>> think.
>>
>> On Mon, Oct 9, 2023 at 6:59 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>> > Hi Team,
>> >
>> > Did some experimenting and found the originally proposed solution to be
>> a
>> > bit awkward for cases where WithPostCommitTopology was needed but we do
>> not
>> > need the WithPreCommitTopology transformation.
>> > The flexibility of the new API would be better if we would use a mixin
>> like
>> > approach. The interfaces would only be used to define the specific
>> required
>> > methods, and they would not need to extend the original
>> > TwoPhaseCommittingSink interface too.
>> >
>> > Since the interfaces WithPreCommitTopology and the
>> WithPostCommitTopology
>> > interfaces are still Experimental, after talking to Gyula offline, I
>> have
>> > updated the FLIP to use this new approach.
>> >
>> > Any comments, thoughts are welcome.
>> >
>> > Thanks,
>> > Peter
>> >
>> > Péter Váry <peter.vary.apa...@gmail.com> ezt írta (időpont: 2023. okt.
>> 5.,
>> > Cs, 16:04):
>> >
>> > > Hi Team,
>> > >
>> > > In my previous email[1] I have described our challenges migrating the
>> > > existing Iceberg SinkFunction based implementation, to the new SinkV2
>> > based
>> > > implementation.
>> > >
>> > > As a result of the discussion around that topic, I have created the
>> > > FLIP-371 [2] to address the Committer related changes, and now I have
>> > > created a companion FLIP-372 [3] to address the WithPreCommitTopology
>> > > related issues.
>> > >
>> > > FLIP-372: Allow TwoPhaseCommittingSink WithPreCommitTopology to alter
>> the
>> > > type of the Committable
>> > >
>> > > The main goal of the FLIP-372 is to extend the currently existing
>> > > TwoPhaseCommittingSink API by adding the possibility to have a
>> > > PreCommitTopology where the input of and the output types of the pre
>> > commit
>> > > transformation are different.
>> > >
>> > > Here is the FLIP: FLIP-372: Allow TwoPhaseCommittingSink
>> > > WithPreCommitTopology to alter the type of the Committable
>> > > <
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
>> > >
>> > >
>> > > Please use this thread to discuss the FLIP related questions,
>> proposals,
>> > > and feedback.
>> > >
>> > > Thanks,
>> > > Peter
>> > >
>> > > - [1]
>> https://lists.apache.org/thread/h3kg7jcgjstpvwlhnofq093vk93ylgsn
>> > > - [2]
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-371%3A+Provide+initialization+context+for+Committer+creation+in+TwoPhaseCommittingSink
>> > > - [3]
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
>> > >
>> > >
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-372%3A+Allow+TwoPhaseCommittingSink+WithPreCommitTopology+to+alter+the+type+of+the+Committable
>> > >
>> >
>>
>

Reply via email to