It is more about extended outages of metastore. E.g. If we commit every 2
minutes, 4 hours of metastore outage can lead to over 120 GlobalCommitT.
And regarding metastore outages, it is undesirable for streaming jobs to
fail the job and keep restarting. It is better to keep processing records
(avoiding backlog) and upload to DFS (like S3). Commit will succeed
whenever the metastore comes back. It also provides a nice automatic
recovery story. Since GlobalCommT combines all data files (hundreds or
thousands in one checkpoint cycle) into a single item in state, this really
makes it scalable and efficient to deal with extended metastore outages.

"CommitResult commit(GlobalCommitT)" API can work, although it is less
efficient and flexible for some sinks. It is probably better to let sink
implementations decide what is the best retry behavior: one by one vs a big
batch/transaction. Hence I would propose APIs like these.
------------------------------
interface GlobalCommitter {
  // commit all pending GlobalCommitT items accumulated
  CommitResult commit(List<GlobalCommitT>)
}

interface CommitResult {
  List<GlobalCommitT> getSucceededCommitables();
  List<GlobalCommitT> getFailedCommitables();

  // most likely, framework just need to check and roll over the retryable
list to the next commit try
  List<GlobalCommitT> getRetrableCommittables();
}
---------------------------

Anyway, I am going to vote yes on the voting thread, since it is important
to move forward to meet the 1.12 goal. We can also discuss the small tweak
during the implementation phase.

Thanks,
Steven


On Sat, Sep 26, 2020 at 8:46 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi Steven
>
> Thank you very much for your detailed explanation.
>
> Now I got your point, I could see that there are benefits from committing a
> collection of `GlobalCommT` as a whole when the external metastore
> environment is unstable at some time.
>
> But I have two little concern about introducing committing the collection
> of `GlobalCommit`:
>
> 1. For Option1: CommitResult commit(List<GlobalCommitT>). This option
> implies that users should commit to the collection of `GlobalCommit` as a
> whole.
> But maybe not all the system could do it as a whole, for example changing
> some file names could not do it. If it is the case I think maybe some guy
> would always ask the same question as I asked in the previous mail.
>
> 2. For Option2: List<CommitResult> commit(List<GlobalCommitT>). This option
> is more clear than the first one. But IMHO this option has only benefits
> when the external metastore is unstable and we want to retry many times and
> not fail the job. Maybe we should not rety so many times and end up with a
> lot of the uncommitted `GlobalCommitT`. If this is the case maybe we should
> make the api more clear/simple for the normal scenario. In addition there
> is only a globalcommit instance so I think the external system could bear
> the pressure.
>
> So personally I would like to say we might keep the API simpler at the
> beginning in 1.12
>
> What do you think?
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 9:30 PM Steven Wu <stevenz...@gmail.com> wrote:
>
> > I should clarify my last email a little more.
> >
> > For the example of commits for checkpoints 1-100 failed, the job is still
> > up (processing records and uploading files). When commit for checkpoint
> 101
> > came, IcebergSink would prefer the framework to pass in all 101
> GlobalCommT
> > (100 old + 1 new) so that it can commit all of them in one transaction.
> it
> > is more efficient than 101 separate transactions.
> >
> > Maybe the GlobalCommitter#commit semantics is to give the sink all
> > uncommitted GlobalCommT items and let sink implementation decide whether
> to
> > retry one by one or in a single transaction. It could mean that we need
> to
> > expand the CommitResult (e.g. a list for each result type, SUCCESS,
> > FAILURE, RETRY) interface. We can also start with the simple enum style
> > result for the whole list for now. If we need to break the experimental
> > API, it is also not a big deal since we only need to update a few sink
> > implementations.
> >
> > Thanks,
> > Steven
> >
> > On Fri, Sep 25, 2020 at 5:56 AM Steven Wu <stevenz...@gmail.com> wrote:
> >
> > > > 1. The frame can not know which `GlobalCommT` to retry if we use the
> > > > List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> > > > 2. Of course we can let the `commit` return more detailed info but it
> > > might
> > > > be too complicated.
> > >
> > > If commit(List<GlobalCommT>) returns RETRY, it means the whole list
> needs
> > > to be retried. E.g. we have some outage with metadata service, commits
> > for
> > > checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we
> > don't
> > > want to commit them one by one. It is faster to commit the whole list
> as
> > > one batch.
> > >
> > > > 3. On the other hand, I think only when restoring IcebergSink needs a
> > > > collection of `GlobalCommT` and giving back another collection of
> > > > `GlobalCommT` that are not committed
> > >
> > > That is when the job restarted due to failure or deployment.
> > >
> > >
> > > On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma <guowei....@gmail.com>
> wrote:
> > >
> > >> Hi, all
> > >>
> > >> From the above discussion we could find that FLIP focuses on providing
> > an
> > >> unified transactional sink API. So I updated the FLIP's title to
> > "Unified
> > >> Transactional Sink API". But I found that the old link could not be
> > opened
> > >> again.
> > >>
> > >> I would update the link[1] here. Sorry for the inconvenience.
> > >>
> > >> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
> > >>
> > >> Best,
> > >> Guowei
> > >>
> > >>
> > >> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma <guowei....@gmail.com>
> wrote:
> > >>
> > >> > Hi, Steven
> > >> >
> > >> > >>I also have a clarifying question regarding the WriterStateT.
> Since
> > >> > >>IcebergWriter won't need to checkpoint any state, should we set it
> > to
> > >> > *Void*
> > >> > >>type? Since getWriterStateSerializer() returns Optional, that is
> > clear
> > >> > and
> > >> > >>we can return Optional.empty().
> > >> >
> > >> > Yes I think you could do it. If you return Optional.empty() we would
> > >> > ignore all the state you return.
> > >> >
> > >> > Best,
> > >> > Guowei
> > >> >
> > >> >
> > >> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma <guowei....@gmail.com>
> > wrote:
> > >> >
> > >> >> Hi,Steven
> > >> >>
> > >> >> Thank you for reading the FLIP so carefully.
> > >> >> 1. The frame can not know which `GlobalCommT` to retry if we use
> the
> > >> >> List<GlobalCommT> as parameter when the `commit` returns `RETRY`.
> > >> >> 2. Of course we can let the `commit` return more detailed info but
> it
> > >> >> might be too complicated.
> > >> >> 3. On the other hand, I think only when restoring IcebergSink
> needs a
> > >> >> collection of `GlobalCommT` and giving back another collection of
> > >> >> `GlobalCommT` that are not committed.
> > >> >>
> > >> >> Best,
> > >> >> Guowei
> > >> >>
> > >> >>
> > >> >> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu <stevenz...@gmail.com>
> > >> wrote:
> > >> >>
> > >> >>> Guowei,
> > >> >>>
> > >> >>> Thanks a lot for updating the wiki page. It looks great.
> > >> >>>
> > >> >>> I noticed one inconsistency in the wiki with your last summary
> email
> > >> for
> > >> >>> GlobalCommitter interface. I think the version in the summary
> email
> > is
> > >> >>> the
> > >> >>> intended one, because rollover from previous failed commits can
> > >> >>> accumulate
> > >> >>> a list.
> > >> >>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> > >> >>> =>
> > >> >>> CommitResult commit(List<GlobalCommT> globalCommittable);  // in
> the
> > >> >>> summary email
> > >> >>>
> > >> >>> I also have a clarifying question regarding the WriterStateT.
> Since
> > >> >>> IcebergWriter won't need to checkpoint any state, should we set it
> > to
> > >> >>> *Void*
> > >> >>> type? Since getWriterStateSerializer() returns Optional, that is
> > clear
> > >> >>> and
> > >> >>> we can return Optional.empty().
> > >> >>>
> > >> >>> Thanks,
> > >> >>> Steven
> > >> >>>
> > >> >>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma <guowei....@gmail.com>
> > >> wrote:
> > >> >>>
> > >> >>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
> > >> >>> comments are
> > >> >>> > welcome.
> > >> >>> >
> > >> >>> > Best,
> > >> >>> > Guowei
> > >> >>> >
> > >> >>> >
> > >> >>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek <
> > >> aljos...@apache.org>
> > >> >>> > wrote:
> > >> >>> >
> > >> >>> > > Yes, that sounds good! I'll probably have some comments on the
> > >> FLIP
> > >> >>> > > about the names of generic parameters and the Javadoc but we
> can
> > >> >>> address
> > >> >>> > > them later or during implementation.
> > >> >>> > >
> > >> >>> > > I also think that we probably need the FAIL,RETRY,SUCCESS
> result
> > >> for
> > >> >>> > > globalCommit() but we can also do that as a later addition.
> > >> >>> > >
> > >> >>> > > So I think we're good to go to update the FLIP, do any last
> > minute
> > >> >>> > > changes and then vote.
> > >> >>> > >
> > >> >>> > > Best,
> > >> >>> > > Aljoscha
> > >> >>> > >
> > >> >>> > > On 23.09.20 06:13, Guowei Ma wrote:
> > >> >>> > > > Hi, all
> > >> >>> > > >
> > >> >>> > > > Thank everyone very much for your ideas and suggestions. I
> > would
> > >> >>> try to
> > >> >>> > > > summarize again the consensus :). Correct me if I am wrong
> or
> > >> >>> > > misunderstand
> > >> >>> > > > you.
> > >> >>> > > >
> > >> >>> > > > ## Consensus-1
> > >> >>> > > >
> > >> >>> > > > 1. The motivation of the unified sink API is to decouple the
> > >> sink
> > >> >>> > > > implementation from the different runtime execution mode.
> > >> >>> > > > 2. The initial scope of the unified sink API only covers the
> > >> file
> > >> >>> > system
> > >> >>> > > > type, which supports the real transactions. The FLIP focuses
> > >> more
> > >> >>> on
> > >> >>> > the
> > >> >>> > > > semantics the new sink api should support.
> > >> >>> > > > 3. We prefer the first alternative API, which could give the
> > >> >>> framework
> > >> >>> > a
> > >> >>> > > > greater opportunity to optimize.
> > >> >>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which
> > >> would
> > >> >>> be
> > >> >>> > > > called from `prepareSnapshotPreBarrier`. And remove the
> > `Flush`
> > >> >>> method.
> > >> >>> > > > 5. The FLIP could move the `Snapshot & Drain` section in
> order
> > >> to
> > >> >>> be
> > >> >>> > more
> > >> >>> > > > focused.
> > >> >>> > > >
> > >> >>> > > > ## Consensus-2
> > >> >>> > > >
> > >> >>> > > > 1. What should the “Unified Sink API” support/cover? It
> > includes
> > >> >>> two
> > >> >>> > > > aspects. 1. The same sink implementation would work for both
> > the
> > >> >>> batch
> > >> >>> > > and
> > >> >>> > > > stream execution mode. 2. In the long run we should give the
> > >> sink
> > >> >>> > > developer
> > >> >>> > > > the ability of building “arbitrary” topologies. But for
> > >> Flink-1.12
> > >> >>> we
> > >> >>> > > > should be more focused on only satisfying the
> S3/HDFS/Iceberg
> > >> sink.
> > >> >>> > > > 2. Because the batch execution mode does not have the normal
> > >> >>> checkpoint
> > >> >>> > > the
> > >> >>> > > > sink developer should not depend on it any more if we want a
> > >> >>> unified
> > >> >>> > > sink.
> > >> >>> > > > 3. We can benefit by providing an asynchronous `Writer`
> > version.
> > >> >>> But
> > >> >>> > > > because the unified sink is already very complicated, we
> don’t
> > >> add
> > >> >>> this
> > >> >>> > > in
> > >> >>> > > > the first version.
> > >> >>> > > >
> > >> >>> > > >
> > >> >>> > > > According to these consensus I would propose the first
> version
> > >> of
> > >> >>> the
> > >> >>> > new
> > >> >>> > > > sink api as follows. What do you think? Any comments are
> > >> welcome.
> > >> >>> > > >
> > >> >>> > > > /**
> > >> >>> > > >   * This interface lets the sink developer build a simple
> > >> >>> transactional
> > >> >>> > > sink
> > >> >>> > > > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> > >> >>> > > >   * This sink topology includes one {@link Writer} + one
> > {@link
> > >> >>> > > Committer} +
> > >> >>> > > > one {@link GlobalCommitter}.
> > >> >>> > > >   * The {@link Writer} is responsible for producing the
> > >> >>> committable.
> > >> >>> > > >   * The {@link Committer} is responsible for committing a
> > single
> > >> >>> > > > committables.
> > >> >>> > > >   * The {@link GlobalCommitter} is responsible for
> committing
> > an
> > >> >>> > > aggregated
> > >> >>> > > > committable, which we called global committables.
> > >> >>> > > >   *
> > >> >>> > > >   * But both the {@link Committer} and the {@link
> > >> GlobalCommitter}
> > >> >>> are
> > >> >>> > > > optional.
> > >> >>> > > >   */
> > >> >>> > > > interface TSink<IN, CommT, GCommT, WriterS> {
> > >> >>> > > >
> > >> >>> > > >          Writer<IN, CommT, WriterS> createWriter(InitContext
> > >> >>> > > initContext);
> > >> >>> > > >
> > >> >>> > > >          Writer<IN, CommT, WriterS>
> restoreWriter(InitContext
> > >> >>> > > initContext,
> > >> >>> > > > List<WriterS> states);
> > >> >>> > > >
> > >> >>> > > >          Optional<Committer<CommT>> createCommitter();
> > >> >>> > > >
> > >> >>> > > >          Optional<GlobalCommitter<CommT, GCommT>>
> > >> >>> > > createGlobalCommitter();
> > >> >>> > > >
> > >> >>> > > >          SimpleVersionedSerializer<CommT>
> > >> >>> getCommittableSerializer();
> > >> >>> > > >
> > >> >>> > > >          Optional<SimpleVersionedSerializer<GCommT>>
> > >> >>> > > > getGlobalCommittableSerializer();
> > >> >>> > > > }
> > >> >>> > > >
> > >> >>> > > > /**
> > >> >>> > > >   * The {@link GlobalCommitter} is responsible for
> committing
> > an
> > >> >>> > > aggregated
> > >> >>> > > > committable, which we called global committables.
> > >> >>> > > >   */
> > >> >>> > > > interface GlobalCommitter<CommT, GCommT> {
> > >> >>> > > >
> > >> >>> > > >          /**
> > >> >>> > > >           * This method is called when restoring from a
> > >> failover.
> > >> >>> > > >           * @param globalCommittables the global
> committables
> > >> that
> > >> >>> are
> > >> >>> > > not
> > >> >>> > > > committed in the previous session.
> > >> >>> > > >           * @return the global committables that should be
> > >> >>> committed
> > >> >>> > > again
> > >> >>> > > > in the current session.
> > >> >>> > > >           */
> > >> >>> > > >          List<GCommT>
> filterRecoveredCommittables(List<GCommT>
> > >> >>> > > > globalCommittables);
> > >> >>> > > >
> > >> >>> > > >          /**
> > >> >>> > > >           * Compute an aggregated committable from a
> > collection
> > >> of
> > >> >>> > > > committables.
> > >> >>> > > >           * @param committables a collection of committables
> > >> that
> > >> >>> are
> > >> >>> > > needed
> > >> >>> > > > to combine
> > >> >>> > > >           * @return an aggregated committable
> > >> >>> > > >           */
> > >> >>> > > >          GCommT combine(List<CommT> committables);
> > >> >>> > > >
> > >> >>> > > >          void commit(List<GCommT> globalCommittables);
> > >> >>> > > >
> > >> >>> > > >          /**
> > >> >>> > > >           * There are no committables any more.
> > >> >>> > > >           */
> > >> >>> > > >          void endOfInput();
> > >> >>> > > > }
> > >> >>> > > >
> > >> >>> > > > Best,
> > >> >>> > > > Guowei
> > >> >>> > >
> > >> >>> > >
> > >> >>> >
> > >> >>>
> > >> >>
> > >>
> > >
> >
>

Reply via email to