Hi Steven

Thank you very much for your detailed explanation.

Now I got your point, I could see that there are benefits from committing a
collection of `GlobalCommT` as a whole when the external metastore
environment is unstable at some time.

But I have two little concern about introducing committing the collection
of `GlobalCommit`:

1. For Option1: CommitResult commit(List<GlobalCommitT>). This option
implies that users should commit to the collection of `GlobalCommit` as a
whole.
But maybe not all the system could do it as a whole, for example changing
some file names could not do it. If it is the case I think maybe some guy
would always ask the same question as I asked in the previous mail.

2. For Option2: List<CommitResult> commit(List<GlobalCommitT>). This option
is more clear than the first one. But IMHO this option has only benefits
when the external metastore is unstable and we want to retry many times and
not fail the job. Maybe we should not rety so many times and end up with a
lot of the uncommitted `GlobalCommitT`. If this is the case maybe we should
make the api more clear/simple for the normal scenario. In addition there
is only a globalcommit instance so I think the external system could bear
the pressure.

So personally I would like to say we might keep the API simpler at the
beginning in 1.12

What do you think?

Best,
Guowei


On Fri, Sep 25, 2020 at 9:30 PM Steven Wu <stevenz...@gmail.com> wrote:

> I should clarify my last email a little more.
>
> For the example of commits for checkpoints 1-100 failed, the job is still
> up (processing records and uploading files). When commit for checkpoint 101
> came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT
> (100 old + 1 new) so that it can commit all of them in one transaction. it
> is more efficient than 101 separate transactions.
>
> Maybe the GlobalCommitter#commit semantics is to give the sink all
> uncommitted GlobalCommT items and let sink implementation decide whether to
> retry one by one or in a single transaction. It could mean that we need to
> expand the CommitResult (e.g. a list for each result type, SUCCESS,
> FAILURE, RETRY) interface. We can also start with the simple enum style
> result for the whole list for now. If we need to break the experimental
> API, it is also not a big deal since we only need to update a few sink
> implementations.
>
> Thanks,
> Steven
>
> On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <stevenz...@gmail.com> wrote:
>
> > > 1. The frame can not know which `GlobalCommT` to retry if we use the
> > > List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> > > 2. Of course we can let the `commit` return more detailed info but it
> > might
> > > be too complicated.
> >
> > If commit(List<GlobalCommT>) returns RETRY, it means the whole list needs
> > to be retried. E.g. we have some outage with metadata service, commits
> for
> > checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we
> don't
> > want to commit them one by one. It is faster to commit the whole list as
> > one batch.
> >
> > > 3. On the other hand, I think only when restoring IcebergSink needs a
> > > collection of `GlobalCommT` and giving back another collection of
> > > `GlobalCommT` that are not committed
> >
> > That is when the job restarted due to failure or deployment.
> >
> >
> > On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <guowei....@gmail.com> wrote:
> >
> >> Hi, all
> >>
> >> From the above discussion we could find that FLIP focuses on providing
> an
> >> unified transactional sink API. So I updated the FLIP's title to
> "Unified
> >> Transactional Sink API". But I found that the old link could not be
> opened
> >> again.
> >>
> >> I would update the link[1] here. Sorry for the inconvenience.
> >>
> >> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com> wrote:
> >>
> >> > Hi, Steven
> >> >
> >> > >>I also have a clarifying question regarding the WriterStateT. Since
> >> > >>IcebergWriter won't need to checkpoint any state, should we set it
> to
> >> > *Void*
> >> > >>type? Since getWriterStateSerializer() returns Optional, that is
> clear
> >> > and
> >> > >>we can return Optional.empty().
> >> >
> >> > Yes I think you could do it. If you return Optional.empty() we would
> >> > ignore all the state you return.
> >> >
> >> > Best,
> >> > Guowei
> >> >
> >> >
> >> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com>
> wrote:
> >> >
> >> >> Hi,Steven
> >> >>
> >> >> Thank you for reading the FLIP so carefully.
> >> >> 1. The frame can not know which `GlobalCommT` to retry if we use the
> >> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> >> >> 2. Of course we can let the `commit` return more detailed info but it
> >> >> might be too complicated.
> >> >> 3. On the other hand, I think only when restoring IcebergSink needs a
> >> >> collection of `GlobalCommT` and giving back another collection of
> >> >> `GlobalCommT` that are not committed.
> >> >>
> >> >> Best,
> >> >> Guowei
> >> >>
> >> >>
> >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Guowei,
> >> >>>
> >> >>> Thanks a lot for updating the wiki page. It looks great.
> >> >>>
> >> >>> I noticed one inconsistency in the wiki with your last summary email
> >> for
> >> >>> GlobalCommitter interface. I think the version in the summary email
> is
> >> >>> the
> >> >>> intended one, because rollover from previous failed commits can
> >> >>> accumulate
> >> >>> a list.
> >> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> >> >>> =>
> >> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in the
> >> >>> summary email
> >> >>>
> >> >>> I also have a clarifying question regarding the WriterStateT. Since
> >> >>> IcebergWriter won't need to checkpoint any state, should we set it
> to
> >> >>> *Void*
> >> >>> type? Since getWriterStateSerializer() returns Optional, that is
> clear
> >> >>> and
> >> >>> we can return Optional.empty().
> >> >>>
> >> >>> Thanks,
> >> >>> Steven
> >> >>>
> >> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com>
> >> wrote:
> >> >>>
> >> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
> >> >>> comments are
> >> >>> > welcome.
> >> >>> >
> >> >>> > Best,
> >> >>> > Guowei
> >> >>> >
> >> >>> >
> >> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
> >> aljos...@apache.org>
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Yes, that sounds good! I'll probably have some comments on the
> >> FLIP
> >> >>> > > about the names of generic parameters and the Javadoc but we can
> >> >>> address
> >> >>> > > them later or during implementation.
> >> >>> > >
> >> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result
> >> for
> >> >>> > > globalCommit() but we can also do that as a later addition.
> >> >>> > >
> >> >>> > > So I think we're good to go to update the FLIP, do any last
> minute
> >> >>> > > changes and then vote.
> >> >>> > >
> >> >>> > > Best,
> >> >>> > > Aljoscha
> >> >>> > >
> >> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
> >> >>> > > > Hi, all
> >> >>> > > >
> >> >>> > > > Thank everyone very much for your ideas and suggestions. I
> would
> >> >>> try to
> >> >>> > > > summarize again the consensus :). Correct me if I am wrong or
> >> >>> > > misunderstand
> >> >>> > > > you.
> >> >>> > > >
> >> >>> > > > ## Consensus-1
> >> >>> > > >
> >> >>> > > > 1. The motivation of the unified sink API is to decouple the
> >> sink
> >> >>> > > > implementation from the different runtime execution mode.
> >> >>> > > > 2. The initial scope of the unified sink API only covers the
> >> file
> >> >>> > system
> >> >>> > > > type, which supports the real transactions. The FLIP focuses
> >> more
> >> >>> on
> >> >>> > the
> >> >>> > > > semantics the new sink api should support.
> >> >>> > > > 3. We prefer the first alternative API, which could give the
> >> >>> framework
> >> >>> > a
> >> >>> > > > greater opportunity to optimize.
> >> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
> >> would
> >> >>> be
> >> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the
> `Flush`
> >> >>> method.
> >> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order
> >> to
> >> >>> be
> >> >>> > more
> >> >>> > > > focused.
> >> >>> > > >
> >> >>> > > > ## Consensus-2
> >> >>> > > >
> >> >>> > > > 1. What should the “Unified Sink API” support/cover? It
> includes
> >> >>> two
> >> >>> > > > aspects. 1. The same sink implementation would work for both
> the
> >> >>> batch
> >> >>> > > and
> >> >>> > > > stream execution mode. 2. In the long run we should give the
> >> sink
> >> >>> > > developer
> >> >>> > > > the ability of building “arbitrary” topologies. But for
> >> Flink-1.12
> >> >>> we
> >> >>> > > > should be more focused on only satisfying the S3/HDFS/Iceberg
> >> sink.
> >> >>> > > > 2. Because the batch execution mode does not have the normal
> >> >>> checkpoint
> >> >>> > > the
> >> >>> > > > sink developer should not depend on it any more if we want a
> >> >>> unified
> >> >>> > > sink.
> >> >>> > > > 3. We can benefit by providing an asynchronous `Writer`
> version.
> >> >>> But
> >> >>> > > > because the unified sink is already very complicated, we don’t
> >> add
> >> >>> this
> >> >>> > > in
> >> >>> > > > the first version.
> >> >>> > > >
> >> >>> > > >
> >> >>> > > > According to these consensus I would propose the first version
> >> of
> >> >>> the
> >> >>> > new
> >> >>> > > > sink api as follows. What do you think? Any comments are
> >> welcome.
> >> >>> > > >
> >> >>> > > > /**
> >> >>> > > >   * This interface lets the sink developer build a simple
> >> >>> transactional
> >> >>> > > sink
> >> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >> >>> > > >   * This sink topology includes one {@link Writer} + one
> {@link
> >> >>> > > Committer} +
> >> >>> > > > one {@link GlobalCommitter}.
> >> >>> > > >   * The {@link Writer} is responsible for producing the
> >> >>> committable.
> >> >>> > > >   * The {@link Committer} is responsible for committing a
> single
> >> >>> > > > committables.
> >> >>> > > >   * The {@link GlobalCommitter} is responsible for committing
> an
> >> >>> > > aggregated
> >> >>> > > > committable, which we called global committables.
> >> >>> > > >   *
> >> >>> > > >   * But both the {@link Committer} and the {@link
> >> GlobalCommitter}
> >> >>> are
> >> >>> > > > optional.
> >> >>> > > >   */
> >> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> >> >>> > > >
> >> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> >> >>> > > initContext);
> >> >>> > > >
> >> >>> > > >          Writer<IN, CommT, WriterS> restoreWriter(InitContext
> >> >>> > > initContext,
> >> >>> > > > List<WriterS> states);
> >> >>> > > >
> >> >>> > > >          Optional<Committer<CommT>> createCommitter();
> >> >>> > > >
> >> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> >> >>> > > createGlobalCommitter();
> >> >>> > > >
> >> >>> > > >          SimpleVersionedSerializer<CommT>
> >> >>> getCommittableSerializer();
> >> >>> > > >
> >> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> >> >>> > > > getGlobalCommittableSerializer();
> >> >>> > > > }
> >> >>> > > >
> >> >>> > > > /**
> >> >>> > > >   * The {@link GlobalCommitter} is responsible for committing
> an
> >> >>> > > aggregated
> >> >>> > > > committable, which we called global committables.
> >> >>> > > >   */
> >> >>> > > > interface GlobalCommitter<CommT, GCommT> {
> >> >>> > > >
> >> >>> > > >          /**
> >> >>> > > >           * This method is called when restoring from a
> >> failover.
> >> >>> > > >           * @param globalCommittables the global committables
> >> that
> >> >>> are
> >> >>> > > not
> >> >>> > > > committed in the previous session.
> >> >>> > > >           * @return the global committables that should be
> >> >>> committed
> >> >>> > > again
> >> >>> > > > in the current session.
> >> >>> > > >           */
> >> >>> > > >          List<GCommT> filterRecoveredCommittables(List<GCommT>
> >> >>> > > > globalCommittables);
> >> >>> > > >
> >> >>> > > >          /**
> >> >>> > > >           * Compute an aggregated committable from a
> collection
> >> of
> >> >>> > > > committables.
> >> >>> > > >           * @param committables a collection of committables
> >> that
> >> >>> are
> >> >>> > > needed
> >> >>> > > > to combine
> >> >>> > > >           * @return an aggregated committable
> >> >>> > > >           */
> >> >>> > > >          GCommT combine(List<CommT> committables);
> >> >>> > > >
> >> >>> > > >          void commit(List<GCommT> globalCommittables);
> >> >>> > > >
> >> >>> > > >          /**
> >> >>> > > >           * There are no committables any more.
> >> >>> > > >           */
> >> >>> > > >          void endOfInput();
> >> >>> > > > }
> >> >>> > > >
> >> >>> > > > Best,
> >> >>> > > > Guowei
> >> >>> > >
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>
> >>
> >
>

Reply via email to