Hi,Steven

Thank you for reading the FLIP so carefully.
1. The frame can not know which `GlobalCommT` to retry if we use the
List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
2. Of course we can let the `commit` return more detailed info but it might
be too complicated.
3. On the other hand, I think only when restoring IcebergSink needs a
collection of `GlobalCommT` and giving back another collection of
`GlobalCommT` that are not committed.

Best,
Guowei


On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com> wrote:

> Guowei,
>
> Thanks a lot for updating the wiki page. It looks great.
>
> I noticed one inconsistency in the wiki with your last summary email for
> GlobalCommitter interface. I think the version in the summary email is the
> intended one, because rollover from previous failed commits can accumulate
> a list.
> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> =>
> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
> summary email
>
> I also have a clarifying question regarding the WriterStateT. Since
> IcebergWriter won't need to checkpoint any state, should we set it to
> *Void*
> type? Since getWriterStateSerializer() returns Optional, that is clear and
> we can return Optional.empty().
>
> Thanks,
> Steven
>
> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com> wrote:
>
> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments
> are
> > welcome.
> >
> > Best,
> > Guowei
> >
> >
> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Yes, that sounds good! I'll probably have some comments on the FLIP
> > > about the names of generic parameters and the Javadoc but we can
> address
> > > them later or during implementation.
> > >
> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
> > > globalCommit() but we can also do that as a later addition.
> > >
> > > So I think we're good to go to update the FLIP, do any last minute
> > > changes and then vote.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 23.09.20 06:13, Guowei Ma wrote:
> > > > Hi, all
> > > >
> > > > Thank everyone very much for your ideas and suggestions. I would try
> to
> > > > summarize again the consensus :). Correct me if I am wrong or
> > > misunderstand
> > > > you.
> > > >
> > > > ## Consensus-1
> > > >
> > > > 1. The motivation of the unified sink API is to decouple the sink
> > > > implementation from the different runtime execution mode.
> > > > 2. The initial scope of the unified sink API only covers the file
> > system
> > > > type, which supports the real transactions. The FLIP focuses more on
> > the
> > > > semantics the new sink api should support.
> > > > 3. We prefer the first alternative API, which could give the
> framework
> > a
> > > > greater opportunity to optimize.
> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
> method.
> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
> > more
> > > > focused.
> > > >
> > > > ## Consensus-2
> > > >
> > > > 1. What should the “Unified Sink API” support/cover? It includes two
> > > > aspects. 1. The same sink implementation would work for both the
> batch
> > > and
> > > > stream execution mode. 2. In the long run we should give the sink
> > > developer
> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > > > 2. Because the batch execution mode does not have the normal
> checkpoint
> > > the
> > > > sink developer should not depend on it any more if we want a unified
> > > sink.
> > > > 3. We can benefit by providing an asynchronous `Writer` version. But
> > > > because the unified sink is already very complicated, we don’t add
> this
> > > in
> > > > the first version.
> > > >
> > > >
> > > > According to these consensus I would propose the first version of the
> > new
> > > > sink api as follows. What do you think? Any comments are welcome.
> > > >
> > > > /**
> > > >   * This interface lets the sink developer build a simple
> transactional
> > > sink
> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> > > >   * This sink topology includes one {@link Writer} + one {@link
> > > Committer} +
> > > > one {@link GlobalCommitter}.
> > > >   * The {@link Writer} is responsible for producing the committable.
> > > >   * The {@link Committer} is responsible for committing a single
> > > > committables.
> > > >   * The {@link GlobalCommitter} is responsible for committing an
> > > aggregated
> > > > committable, which we called global committables.
> > > >   *
> > > >   * But both the {@link Committer} and the {@link GlobalCommitter}
> are
> > > > optional.
> > > >   */
> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> > > >
> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> > > initContext);
> > > >
> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> > > initContext,
> > > > List<WriterS> states);
> > > >
> > > >          Optional<Committer<CommT>> createCommitter();
> > > >
> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> > > createGlobalCommitter();
> > > >
> > > >          SimpleVersionedSerializer<CommT> getCommittableSerializer();
> > > >
> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> > > > getGlobalCommittableSerializer();
> > > > }
> > > >
> > > > /**
> > > >   * The {@link GlobalCommitter} is responsible for committing an
> > > aggregated
> > > > committable, which we called global committables.
> > > >   */
> > > > interface GlobalCommitter<CommT, GCommT> {
> > > >
> > > >          /**
> > > >           * This method is called when restoring from a failover.
> > > >           * @param globalCommittables the global committables that
> are
> > > not
> > > > committed in the previous session.
> > > >           * @return the global committables that should be committed
> > > again
> > > > in the current session.
> > > >           */
> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> > > > globalCommittables);
> > > >
> > > >          /**
> > > >           * Compute an aggregated committable from a collection of
> > > > committables.
> > > >           * @param committables a collection of committables that are
> > > needed
> > > > to combine
> > > >           * @return an aggregated committable
> > > >           */
> > > >          GCommT combine(List<CommT> committables);
> > > >
> > > >          void commit(List<GCommT> globalCommittables);
> > > >
> > > >          /**
> > > >           * There are no committables any more.
> > > >           */
> > > >          void endOfInput();
> > > > }
> > > >
> > > > Best,
> > > > Guowei
> > >
> > >
> >
>

Reply via email to