Hi, Steven >>I also have a clarifying question regarding the WriterStateT. Since >>IcebergWriter won't need to checkpoint any state, should we set it to *Void* >>type? Since getWriterStateSerializer() returns Optional, that is clear and >>we can return Optional.empty().
Yes I think you could do it. If you return Optional.empty() we would ignore all the state you return. Best, Guowei On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi,Steven > > Thank you for reading the FLIP so carefully. > 1. The frame can not know which `GlobalCommT` to retry if we use the > List<GlobalCommT> as parameter when the `commit` returns `RETRY`. > 2. Of course we can let the `commit` return more detailed info but it > might be too complicated. > 3. On the other hand, I think only when restoring IcebergSink needs a > collection of `GlobalCommT` and giving back another collection of > `GlobalCommT` that are not committed. > > Best, > Guowei > > > On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote: > >> Guowei, >> >> Thanks a lot for updating the wiki page. It looks great. >> >> I noticed one inconsistency in the wiki with your last summary email for >> GlobalCommitter interface. I think the version in the summary email is the >> intended one, because rollover from previous failed commits can accumulate >> a list. >> CommitResult commit(GlobalCommT globalCommittable); // in the wiki >> => >> CommitResult commit(List<GlobalCommT> globalCommittable); // in the >> summary email >> >> I also have a clarifying question regarding the WriterStateT. Since >> IcebergWriter won't need to checkpoint any state, should we set it to >> *Void* >> type? Since getWriterStateSerializer() returns Optional, that is clear and >> we can return Optional.empty(). >> >> Thanks, >> Steven >> >> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote: >> >> > Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments >> are >> > welcome. >> > >> > Best, >> > Guowei >> > >> > >> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org> >> > wrote: >> > >> > > Yes, that sounds good! I'll probably have some comments on the FLIP >> > > about the names of generic parameters and the Javadoc but we can >> address >> > > them later or during implementation. >> > > >> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for >> > > globalCommit() but we can also do that as a later addition. >> > > >> > > So I think we're good to go to update the FLIP, do any last minute >> > > changes and then vote. >> > > >> > > Best, >> > > Aljoscha >> > > >> > > On 23.09.20 06:13, Guowei Ma wrote: >> > > > Hi, all >> > > > >> > > > Thank everyone very much for your ideas and suggestions. I would >> try to >> > > > summarize again the consensus :). Correct me if I am wrong or >> > > misunderstand >> > > > you. >> > > > >> > > > ## Consensus-1 >> > > > >> > > > 1. The motivation of the unified sink API is to decouple the sink >> > > > implementation from the different runtime execution mode. >> > > > 2. The initial scope of the unified sink API only covers the file >> > system >> > > > type, which supports the real transactions. The FLIP focuses more on >> > the >> > > > semantics the new sink api should support. >> > > > 3. We prefer the first alternative API, which could give the >> framework >> > a >> > > > greater opportunity to optimize. >> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would >> be >> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush` >> method. >> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be >> > more >> > > > focused. >> > > > >> > > > ## Consensus-2 >> > > > >> > > > 1. What should the “Unified Sink API” support/cover? It includes two >> > > > aspects. 1. The same sink implementation would work for both the >> batch >> > > and >> > > > stream execution mode. 2. In the long run we should give the sink >> > > developer >> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 >> we >> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink. >> > > > 2. Because the batch execution mode does not have the normal >> checkpoint >> > > the >> > > > sink developer should not depend on it any more if we want a unified >> > > sink. >> > > > 3. We can benefit by providing an asynchronous `Writer` version. But >> > > > because the unified sink is already very complicated, we don’t add >> this >> > > in >> > > > the first version. >> > > > >> > > > >> > > > According to these consensus I would propose the first version of >> the >> > new >> > > > sink api as follows. What do you think? Any comments are welcome. >> > > > >> > > > /** >> > > > * This interface lets the sink developer build a simple >> transactional >> > > sink >> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink. >> > > > * This sink topology includes one {@link Writer} + one {@link >> > > Committer} + >> > > > one {@link GlobalCommitter}. >> > > > * The {@link Writer} is responsible for producing the committable. >> > > > * The {@link Committer} is responsible for committing a single >> > > > committables. >> > > > * The {@link GlobalCommitter} is responsible for committing an >> > > aggregated >> > > > committable, which we called global committables. >> > > > * >> > > > * But both the {@link Committer} and the {@link GlobalCommitter} >> are >> > > > optional. >> > > > */ >> > > > interface TSink<IN, CommT, GCommT, WriterS> { >> > > > >> > > > Writer<IN, CommT, WriterS> createWriter(InitContext >> > > initContext); >> > > > >> > > > Writer<IN, CommT, WriterS> restoreWriter(InitContext >> > > initContext, >> > > > List<WriterS> states); >> > > > >> > > > Optional<Committer<CommT>> createCommitter(); >> > > > >> > > > Optional<GlobalCommitter<CommT, GCommT>> >> > > createGlobalCommitter(); >> > > > >> > > > SimpleVersionedSerializer<CommT> >> getCommittableSerializer(); >> > > > >> > > > Optional<SimpleVersionedSerializer<GCommT>> >> > > > getGlobalCommittableSerializer(); >> > > > } >> > > > >> > > > /** >> > > > * The {@link GlobalCommitter} is responsible for committing an >> > > aggregated >> > > > committable, which we called global committables. >> > > > */ >> > > > interface GlobalCommitter<CommT, GCommT> { >> > > > >> > > > /** >> > > > * This method is called when restoring from a failover. >> > > > * @param globalCommittables the global committables that >> are >> > > not >> > > > committed in the previous session. >> > > > * @return the global committables that should be committed >> > > again >> > > > in the current session. >> > > > */ >> > > > List<GCommT> filterRecoveredCommittables(List<GCommT> >> > > > globalCommittables); >> > > > >> > > > /** >> > > > * Compute an aggregated committable from a collection of >> > > > committables. >> > > > * @param committables a collection of committables that >> are >> > > needed >> > > > to combine >> > > > * @return an aggregated committable >> > > > */ >> > > > GCommT combine(List<CommT> committables); >> > > > >> > > > void commit(List<GCommT> globalCommittables); >> > > > >> > > > /** >> > > > * There are no committables any more. >> > > > */ >> > > > void endOfInput(); >> > > > } >> > > > >> > > > Best, >> > > > Guowei >> > > >> > > >> > >> >