Hi,Steven Thank you for reading the FLIP so carefully. 1. The frame can not know which `GlobalCommT` to retry if we use the List<GlobalCommT> as parameter when the `commit` returns `RETRY`. 2. Of course we can let the `commit` return more detailed info but it might be too complicated. 3. On the other hand, I think only when restoring IcebergSink needs a collection of `GlobalCommT` and giving back another collection of `GlobalCommT` that are not committed.
Best, Guowei On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote: > Guowei, > > Thanks a lot for updating the wiki page. It looks great. > > I noticed one inconsistency in the wiki with your last summary email for > GlobalCommitter interface. I think the version in the summary email is the > intended one, because rollover from previous failed commits can accumulate > a list. > CommitResult commit(GlobalCommT globalCommittable); // in the wiki > => > CommitResult commit(List<GlobalCommT> globalCommittable); // in the > summary email > > I also have a clarifying question regarding the WriterStateT. Since > IcebergWriter won't need to checkpoint any state, should we set it to > *Void* > type? Since getWriterStateSerializer() returns Optional, that is clear and > we can return Optional.empty(). > > Thanks, > Steven > > On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote: > > > Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments > are > > welcome. > > > > Best, > > Guowei > > > > > > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > Yes, that sounds good! I'll probably have some comments on the FLIP > > > about the names of generic parameters and the Javadoc but we can > address > > > them later or during implementation. > > > > > > I also think that we probably need the FAIL,RETRY,SUCCESS result for > > > globalCommit() but we can also do that as a later addition. > > > > > > So I think we're good to go to update the FLIP, do any last minute > > > changes and then vote. > > > > > > Best, > > > Aljoscha > > > > > > On 23.09.20 06:13, Guowei Ma wrote: > > > > Hi, all > > > > > > > > Thank everyone very much for your ideas and suggestions. I would try > to > > > > summarize again the consensus :). Correct me if I am wrong or > > > misunderstand > > > > you. > > > > > > > > ## Consensus-1 > > > > > > > > 1. The motivation of the unified sink API is to decouple the sink > > > > implementation from the different runtime execution mode. > > > > 2. The initial scope of the unified sink API only covers the file > > system > > > > type, which supports the real transactions. The FLIP focuses more on > > the > > > > semantics the new sink api should support. > > > > 3. We prefer the first alternative API, which could give the > framework > > a > > > > greater opportunity to optimize. > > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be > > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` > method. > > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be > > more > > > > focused. > > > > > > > > ## Consensus-2 > > > > > > > > 1. What should the “Unified Sink API” support/cover? It includes two > > > > aspects. 1. The same sink implementation would work for both the > batch > > > and > > > > stream execution mode. 2. In the long run we should give the sink > > > developer > > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we > > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. > > > > 2. Because the batch execution mode does not have the normal > checkpoint > > > the > > > > sink developer should not depend on it any more if we want a unified > > > sink. > > > > 3. We can benefit by providing an asynchronous `Writer` version. But > > > > because the unified sink is already very complicated, we don’t add > this > > > in > > > > the first version. > > > > > > > > > > > > According to these consensus I would propose the first version of the > > new > > > > sink api as follows. What do you think? Any comments are welcome. > > > > > > > > /** > > > > * This interface lets the sink developer build a simple > transactional > > > sink > > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. > > > > * This sink topology includes one {@link Writer} + one {@link > > > Committer} + > > > > one {@link GlobalCommitter}. > > > > * The {@link Writer} is responsible for producing the committable. > > > > * The {@link Committer} is responsible for committing a single > > > > committables. > > > > * The {@link GlobalCommitter} is responsible for committing an > > > aggregated > > > > committable, which we called global committables. > > > > * > > > > * But both the {@link Committer} and the {@link GlobalCommitter} > are > > > > optional. > > > > */ > > > > interface TSink<IN, CommT, GCommT, WriterS> { > > > > > > > > Writer<IN, CommT, WriterS> createWriter(InitContext > > > initContext); > > > > > > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext > > > initContext, > > > > List<WriterS> states); > > > > > > > > Optional<Committer<CommT>> createCommitter(); > > > > > > > > Optional<GlobalCommitter<CommT, GCommT>> > > > createGlobalCommitter(); > > > > > > > > SimpleVersionedSerializer<CommT> getCommittableSerializer(); > > > > > > > > Optional<SimpleVersionedSerializer<GCommT>> > > > > getGlobalCommittableSerializer(); > > > > } > > > > > > > > /** > > > > * The {@link GlobalCommitter} is responsible for committing an > > > aggregated > > > > committable, which we called global committables. > > > > */ > > > > interface GlobalCommitter<CommT, GCommT> { > > > > > > > > /** > > > > * This method is called when restoring from a failover. > > > > * @param globalCommittables the global committables that > are > > > not > > > > committed in the previous session. > > > > * @return the global committables that should be committed > > > again > > > > in the current session. > > > > */ > > > > List<GCommT> filterRecoveredCommittables(List<GCommT> > > > > globalCommittables); > > > > > > > > /** > > > > * Compute an aggregated committable from a collection of > > > > committables. > > > > * @param committables a collection of committables that are > > > needed > > > > to combine > > > > * @return an aggregated committable > > > > */ > > > > GCommT combine(List<CommT> committables); > > > > > > > > void commit(List<GCommT> globalCommittables); > > > > > > > > /** > > > > * There are no committables any more. > > > > */ > > > > void endOfInput(); > > > > } > > > > > > > > Best, > > > > Guowei > > > > > > > > >