Hi Guowei,

> I believe that we could support such an async sink writer
> very easily in the future. What do you think?

How would you see the expansion in the future? Do you mean just adding
`isAvailable()` method with a default implementation later on?

Piotrek

pon., 21 wrz 2020 o 02:39 Steven Wu <stevenz...@gmail.com> napisał(a):

> > I think Iceberg sink needs to do the dedup in the `commit` call. The
> `recoveredGlobalCommittables` is just for restoring the ids.
>
>
> @Guowei Ma <guowei....@gmail.com>  It is undesirable to do the dedup check
> in the `commit` call, because it happens for each checkpoint cycle. We only
> need to do the de-dup check one time when restoring GlobalCommT list from
> the checkpoint.
>
>
> Can you clarify the purpose of `recoveredGlobalCommittables`? If it is to
> let sink implementations know the recovered GlobalCommT list, it is
> probably not a sufficient API. For the Iceberg sink, we can try to
> implement the de-dup check  inside the `recoveredGlobalCommittables` method
> and commit any uncommitted GlobalCommT items. But how do we handle the
> commit failed?
>
>
> One alternative is to allow sink implementations to override "Li
> st<GlobalCommT> recoverGlobalCommittables()". Framework handles the
> checkpoint/state, and sink implementations can further customize the
> restored list with de-dup check and filtering. Recovered uncommitted
> GlobalCommT list will be committed in the next cycle. It is the same
> rollover strategy for commit failure handling that we have been discussing.
>
>
> ## topologies
>
>
> Regarding the topology options, if we agree that there is no one size fit
> for all, we can let sink implementations choose the best topology. Maybe
> the framework can provide 2-3 pre-defined topology implementations to help
> the sinks.
>
>
>
>
> On Sun, Sep 20, 2020 at 3:27 AM Guowei Ma <guowei....@gmail.com> wrote:
>
> > I would like to summarize the file type sink in the thread and their
> > possible topologies.  I also try to give pros and cons of every topology
> > option. Correct me if I am wrong.
> >
> > ### FileSink
> >
> > Topology Option: TmpFileWriter + Committer.
> >
> > ### IceBerg Sink
> >
> > #### Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
> > Pro:
> > 1. Same group has some id.
> > Cons:
> > 1. May limit users’ optimization space;
> > 2. The topology does not meet the Hive’s requirements.
> >
> > #### Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
> > Pro:
> > 1. User has the opportunity to optimize the implementation of idempotence
> > Cons:
> > 2. Make the GlobalCommit more complicated.
> > 3. The topology does not meets the Hive’s requirements
> >
> > ### Topology Option3: DataFileWriter + AggWriter + Committer
> >
> > Pros:
> > 1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s
> requirements.
> > 2. Opportunity to optimize the implementation of idempotence
> > 3. The topology meets the Hive’s requirements.(See flowing)
> > Con:
> > 1. It introduce a relative complex topologies
> >
> > ## HiveSink
> >
> > ### Topology Option1: `TmpFileWriter` + `Committer` +
> `GlobalCommitterV2`.
> > Pro:
> > 1. Could skip the cleanup problem at first.
> > Con:
> > 1. This style topology does not meet the CompactHiveSink requirements.
> >
> > ### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
> > `Committer`
> > Pros
> > 1. Could skip the clean up problem at first.
> > 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> > Cons
> > 1. This style topology does not meet the CompactHiveSink requirements.
> > 2. There are two general `Committers` in the topology. For Hive’s case
> > there might be no problem. But there might be a problem in 1.12. For
> > example where to execute the sub-topology following the `Committer` in
> > batch execution mode for the general case. Because the topology is built
> > from `Writer` and `Committer` we might put all the sub-topology in the
> > OperatorCoordinator. But if the topology is too complicated it might be
> > very complicated. See following.
> >
> > ### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
> > Pro
> > 1. There is only one general committer.
> > Cons
> > 1. It has to consider the cleanup problem. (In theory both the Option1
> and
> > Option2 need to cleanup)
> > 2. This style topology does not meet the CompactHiveSink requirements.
> > 3. Have to figure out how to make the current version compatible.
> >
> > ### CompactHiveSink/MergeHiveSink
> >
> > #### Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator`
> > + `MergeWriter` + `GlobalCommiterV2`
> > Pro
> > 1. Could skip the clean up problem at first.
> > Cons
> > 2. Where to execute the sub-topology following the `Committer`.
> >
> > #### Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator`
> > + `MergeWriter` + AggWriter + Committer
> > Pros
> > 1. Could skip the clean up problem at first
> > 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> > Con
> > 1. Where to execute the sub-topology following the `Committer`.
> >
> > ### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg)
> > + Committer
> > Pro
> > 1. There is only one committer. It is very easy to support in the batch
> > execution mode.
> > Con
> > 2. It has to consider the cleanup problem. (In theory both the Option1
> and
> > Option2 need to cleanup)
> >
> >
> > ### Summary
> >
> > From above we could divide the sink topology into two parts:
> > 1. Write topology.
> > 2. And One committer
> >
> > So we could provide a unified sink API looks like the following:
> >
> > public interface Sink<CommT> {
> >         List<Writer<?, ?>> getWriters();
> >         Committer<CommT> createCommitter()
> > }
> >
> > In the long run maybe we could give the user more powerful ability like
> > this (Currently some transformation still belongs to runtime):
> > Sink<CommT> {
> >         Transformation<CommT> createWriteTopology();
> >          CommitFunction<CommT> createCommitter();
> > }
> >
> > Best,
> > Guowei
> >
> >
> > On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma <guowei....@gmail.com> wrote:
> >
> >> Hi, Stevn
> >> I want to make a clarification first, the following reply only considers
> >> the Iceberge sink, but does not consider other sinks.  Before make
> decision
> >> we should consider all the sink.I would try to summary all the sink
> >> requirments in the next mail
> >>
> >>
> >> >>  run global committer in jobmanager (e.g. like sink coordinator)
> >>
> >> I think it could be.
> >>
> >>
> >> >> You meant GlobalCommit -> GlobalCommT, right?
> >>
> >> Yes. Thanks :)
> >>
> >>
> >> >> Is this called when restored from checkpoint/savepoint?
> >>
> >> Yes.
> >>
> >>
> >> >>Iceberg sink needs to do a dup check here on which GlobalCommT were
> >> committed and which weren't. Should it return the filtered/de-duped
> list of
> >> GlobalCommT?
> >>
> >>
> >> I think Iceberg sink needs to do the dedup in the `commit` call. The
> >> `recoveredGlobalCommittables` is just for restoring the ids.
> >>
> >>
> >> >> Sink implementation can decide if it wants to commit immediately or
> >> just leave
> >>
> >> I think only the frame knows *when* call the commit function.
> >>
> >>
> >> >>should this be "commit(List<GlobalCommT>)"?
> >>
> >> It could be. thanks.
> >>
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Sun, Sep 20, 2020 at 12:11 AM Steven Wu <stevenz...@gmail.com>
> wrote:
> >>
> >>> > I prefer to let the developer produce id to dedupe. I think this
> gives
> >>> the developer more opportunity to optimize.
> >>>
> >>> Thinking about it again, I totally agree with Guowei on this. We don't
> >>> really need the framework to generate the unique id for Iceberg sink.
> >>> De-dup logic is totally internal to Iceberg sink and should be isolated
> >>> inside. My earlier question regarding
> "commitGlobally(List<GlobalCommT>)
> >>> can be concurrent or not" also becomes irrelevant, as long as the
> framework
> >>> handles the GlobalCommT list properly (even with concurrent calls).
> >>>
> >>> Here are the things where framework can help
> >>>
> >>>    1. run global committer in jobmanager (e.g. like sink coordinator)
> >>>    2. help with checkpointing, bookkeeping, commit failure handling,
> >>>    recovery
> >>>
> >>>
> >>> @Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter
> >>> interface, I have some clarifying questions.
> >>>
> >>> > void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
> >>>
> >>>    1. You meant GlobalCommit -> GlobalCommT, right?
> >>>    2. Is this called when restored from checkpoint/savepoint?
> >>>    3.  Iceberg sink needs to do a dup check here on which GlobalCommT
> >>>    were committed and which weren't. Should it return the
> filtered/de-duped
> >>>    list of GlobalCommT?
> >>>    4. Sink implementation can decide if it wants to commit immediately
> >>>    or just leave
> >>>
> >>> > void commit(GlobalCommit globalCommit);
> >>>
> >>> should this be "commit(List<GlobalCommT>)"?
> >>>
> >>> Thanks,
> >>> Steven
> >>>
> >>>
> >>> On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma <guowei....@gmail.com>
> wrote:
> >>>
> >>>> Hi, all
> >>>>
> >>>> >>Just to add to what Aljoscha said regarding the unique id. Iceberg
> >>>> sink
> >>>> >>checkpoints the unique id into state during snapshot. It also
> inserts
> >>>> the
> >>>> >>unique id into the Iceberg snapshot metadata during commit. When a
> job
> >>>> >>restores the state after failure, it needs to know if the restored
> >>>> >>transactions/commits were successful or not. It basically iterates
> >>>> through
> >>>> >>the list of table snapshots from Iceberg and matches the unique ids
> >>>> with
> >>>> >>what is stored in Iceberg snapshot metadata.
> >>>>
> >>>> Thanks Steven for these detailed explanations. It makes me know the
> >>>> IceBerg
> >>>> better. However, I prefer to let the developer produce id to dedupe. I
> >>>> think this gives the developer more opportunity to optimize. You could
> >>>> see
> >>>> the following for more details. Please correct me if I misunderstand
> >>>> you.
> >>>>
> >>>> >> 3. Whether the `Writer` supports async functionality or not.
> >>>> Currently I
> >>>> do
> >>>> >> not know which sink could benefit from it. Maybe it is just my own
> >>>> problem.
> >>>>
> >>>> >> Here, I don't really know. We can introduce an "isAvailable()"
> method
> >>>> >> and mostly ignore it for now and sinks can just always return true.
> >>>> Or,
> >>>> >> as an alternative, we don't add the method now but can add it later
> >>>> with
> >>>> >> a default implementation. Either way, we will probably not take
> >>>> >> advantage of the "isAvailable()" now because that would require
> more
> >>>> >> runtime changes.
> >>>>
> >>>> From the @Pitor's explanation I could see the other benefit that might
> >>>> be
> >>>> gained in the future. For example decoupling the task number and the
> >>>> thread
> >>>> number. But I have to admit that introducing `isAvailable` might
> >>>> introduce
> >>>> some complications in the runtime. You could see my alternative API
> >>>> option
> >>>> in the following. I believe that we could support such an async sink
> >>>> writer
> >>>> very easily in the future. What do you think?
> >>>>
> >>>> >> Yes, this is still tricky. What is the current state, would the
> >>>> >> introduction of a "LocalCommit" and a "GlobalCommit" already solve
> >>>> both
> >>>> >> the Iceberg and Hive cases? I believe Hive is the most tricky one
> >>>> here,
> >>>> >> but if we introduce the "combine" method on GlobalCommit, that
> could
> >>>> >> serve the same purpose as the "aggregation operation" on the
> >>>> individual
> >>>> >> files, and we could even execute that "combine" in a distributed
> way.
> >>>> >>We assume that GlobalCommit is a Agg/Combiner?
> >>>>
> >>>> I would share what possible problems that I am seeing currently and
> the
> >>>> alternative options.
> >>>>
> >>>> ## IceBerg Sink
> >>>>
> >>>> ### Concern about generating nonce by framework.
> >>>>
> >>>> If let the `GlobalCommitter` provide a random nonce for the
> >>>> `IceBergSink` I
> >>>> think that it might not be efficient.  Because even if there are a
> very
> >>>> small number of committables in the state you still need to iterate
> all
> >>>> the
> >>>> iceberg snapshot files to check whether the committable is committed
> >>>> already. Even if it is efficient for the IceBergSink it might not be
> the
> >>>> case for other sinks.
> >>>>
> >>>> If the framework generates auto-increment nonce instead, it might
> still
> >>>> not
> >>>> be optimal for users. For example, users might want to use some
> >>>> business id
> >>>> so that after failover they could query whether the commit is
> successful
> >>>> after failover.
> >>>>
> >>>> I think users could generate more efficient nonce such as an
> >>>> auto-increment
> >>>> one. Therefore, it seems to provide more optimization chances if we
> let
> >>>> users to generate the nonce.
> >>>>
> >>>>
> >>>> ### Alternative Option
> >>>>
> >>>> public interface GlobalCommit<CommT, GlobalCommT> {
> >>>>         // provide some runtime context such as
> >>>> attempt-id,job-id,task-id.
> >>>>         void open(InitContext context);
> >>>>
> >>>>         // This GlobalCommit would aggregate the committable to a
> >>>> GlobalCommit before doing the commit operation.
> >>>>         GlobalCommT combine(List<Committable> commitables)
> >>>>
> >>>>         // This method would be called after committing all the
> >>>> GlobalCommit producing in the previous session.
> >>>>         void recoveredGlobalCommittables(List<GlobalCommit>
> >>>> globalCommits)
> >>>>
> >>>>         // developer would guarantee the idempotency by himself
> >>>>         void commit(GlobalCommit globalCommit);
> >>>> }
> >>>>
> >>>> User could guarantee the idenpointecy himself in a more efficient or
> >>>> application specific way. If the user wants the `GlobalCommit` to be
> >>>> executed in a distributed way, the user could use the runtime
> >>>> information
> >>>> to generate the partial order id himself.(We could ignore the clean up
> >>>> first)
> >>>>
> >>>> Currently the sink might be looks like following:
> >>>>
> >>>> Sink<IN, LC, LCO, GC> {
> >>>>         Writer<IN, LC> createWriter();
> >>>>         Optional<Committer<LC, LCO>> createCommitter();
> >>>>         Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter();
> >>>> }
> >>>>
> >>>> ## Hive
> >>>>
> >>>> The HiveSink needs to compute whether a directory is finished or not.
> >>>> But
> >>>> HiveSink can not use the above `combine` method to decide whether a
> >>>> directory is finished or not.
> >>>>
> >>>> For example we assume that whether the directory is finished or not is
> >>>> decided by the event time. There might be a topology that the source
> and
> >>>> sink are forward. The event time might be different in different
> >>>> instances
> >>>> of the `writer`. So the GlobalCommit’s combine can not produce a
> >>>> GlobalCommT when the snapshot happens.
> >>>>
> >>>> In addition to the above case we should also consider the unaligned
> >>>> checkpoint. Because the watermark does not skip. So there might be the
> >>>> same
> >>>> problem in the unaligned checkpoint.
> >>>>
> >>>> ### Option1:
> >>>>
> >>>> public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> {
> >>>>         // provide some runtime context such as
> >>>> attempt-id,job-id,task-id,
> >>>> maybe the event time;provide the restore state
> >>>>         void open(InitContext context, StateT state);
> >>>>
> >>>>         // This is for the HiveSink. When all the writer say that the
> >>>> the
> >>>> bucket is finished it would return a GlobalCommitT
> >>>>         Optional<GlobalCommT> combine(Committable commitables)
> >>>>
> >>>>         // This is for IcebergSink. Producing a GlobalCommitT every
> >>>> checkpoint.
> >>>>         Optional<GlobalCommT> preCommit();
> >>>>
> >>>>         // Maybe we need the shareState? After we decide the directory
> >>>> we
> >>>> make more detailed consideration then. The id could be remembered
> here.
> >>>>         StateT snapshotState();
> >>>>
> >>>>         // developer would guarantee the idempotency by himself
> >>>>         void commit(GlobalCommit globalCommit);
> >>>> }
> >>>>
> >>>> ### Option2
> >>>>
> >>>> Actually the `GlobalCommit` in the option1 mixes the `Writer` and
> >>>> `Committer` together. So it is intuitive to decouple the two
> functions.
> >>>> For
> >>>> support the hive we could prove a sink look like following
> >>>>
> >>>> Sink<In, LC, LCO, LCG> {
> >>>>         Writer<In, LC> createWriter();
> >>>>         Optional<Committer<LC, LCO>> createCommitter(); // we need
> this
> >>>> to
> >>>> change name.
> >>>>         Optional<Writer<LCO, LCG>> createGlobalAgg();
> >>>>         Optional<Committer<LCG, void>> createGlobalCommitter();
> >>>> }
> >>>>
> >>>> The pro of this method is that we use two basic concepts: `Committer`
> >>>> and
> >>>> `Writer` to build a HiveSink.
> >>>>
> >>>> ### CompactHiveSink / MergeHiveSink
> >>>>
> >>>> There are still other complicated cases, which are not satisfied by
> the
> >>>> above option. Users often complain about writing out many small files,
> >>>> which will affect file reading efficiency and the performance and
> >>>> stability
> >>>> of the distributed file system. CompactHiveSink/MergeHiveSink hopes to
> >>>> merge all files generated by this job in a single Checkpoint.
> >>>>
> >>>> The CompactHiveSink/MergeHiveSink topology can simply describe this
> >>>> topology as follows:
> >>>>
> >>>> CompactSubTopology -> GlobalAgg -> GobalCommitter.
> >>>>
> >>>> The CompactSubTopology would look like following:
> >>>>
> >>>> TmpFileWriter -> CompactCoodinator -> CompactorFileWriter
> >>>>
> >>>> Maybe the topology could be simpler but please keep in mind I just
> want
> >>>> to
> >>>> show that there might be very complicated topology requirements for
> >>>> users.
> >>>>
> >>>>
> >>>> A possible alternative option would be let the user build the topology
> >>>> himself. But considering we have two execution modes we could only use
> >>>> `Writer` and `Committer` to build the sink topology.
> >>>>
> >>>> ### Build Topology Option
> >>>>
> >>>> Sink<IN, OUT> {
> >>>>         Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a
> >>>> WriterBuidler
> >>>>         Sink<In, Out> addCommitter(Committer<In, Out> committer); //
> >>>> Maybe
> >>>> we could make this return Void if we do not consider code reuse and
> >>>> introduce the cleaner
> >>>> }
> >>>>
> >>>> ## Summary
> >>>> The requirements of sink might be different, maybe we could use two
> >>>> basic
> >>>> bricks(Writer/Committer) to let the user build their own sink
> topology.
> >>>> What do you guys think?
> >>>>
> >>>> I know the name stuff might be trikky for now but I want to discuss
> >>>> these
> >>>> things after we get the consus on the direction first.
> >>>>
> >>>> Best,
> >>>> Guowei
> >>>>
> >>>>
> >>>> On Sat, Sep 19, 2020 at 12:15 AM Steven Wu <stevenz...@gmail.com>
> >>>> wrote:
> >>>>
> >>>> > Aljoscha,
> >>>> >
> >>>> > > Instead the sink would have to check for each set of committables
> >>>> > seperately if they had already been committed. Do you think this is
> >>>> > feasible?
> >>>> >
> >>>> > Yes, that is how it works in our internal implementation [1]. We
> >>>> don't use
> >>>> > checkpointId. We generate a manifest file (GlobalCommT) to bundle
> all
> >>>> the
> >>>> > data files that the committer received in one checkpoint cycle. Then
> >>>> we
> >>>> > generate a unique manifest id for by hashing the location of the
> >>>> manifest
> >>>> > file. The manifest ids are stored in Iceberg snapshot metadata. Upon
> >>>> > restore, we check each of the restored manifest files against
> Iceberg
> >>>> table
> >>>> > snapshot metadata to determine if we should discard or keep the
> >>>> restored
> >>>> > manifest files. If a commit has multiple manifest files (e.g.
> >>>> accumulated
> >>>> > from previous failed commits), we store the comma-separated manifest
> >>>> ids in
> >>>> > Iceberg snapshot metadata.
> >>>> >
> >>>> > > During normal operation this set would be very small, it would
> >>>> usually
> >>>> > only be the committables for the last checkpoint. Only when there is
> >>>> an
> >>>> > outage would multiple sets of committables pile up.
> >>>> >
> >>>> > You are absolutely right here. Even if there are multiple sets of
> >>>> > committables, it is usually the last a few or dozen of snapshots we
> >>>> need to
> >>>> > check. Even with our current inefficient implementation of
> traversing
> >>>> all
> >>>> > table snapshots (in the scale of thousands) from oldest to latest,
> it
> >>>> only
> >>>> > took avg 60 ms and max 800 ms. so it is really not a concern for
> >>>> Iceberg.
> >>>> >
> >>>> > > CommitStatus commitGlobally(List<Committable>, Nonce)
> >>>> >
> >>>> > Just to clarify on the terminology here. Assuming here the
> Committable
> >>>> > meant the `GlobalCommT` (like ManifestFile in Iceberg) in
> >>>> > previous discussions, right? `CommT` means the Iceberg DataFile from
> >>>> writer
> >>>> > to committer.
> >>>> >
> >>>> > This can work assuming we *don't have concurrent executions
> >>>> > of commitGlobally* even with concurrent checkpoints. Here is the
> >>>> scenario
> >>>> > regarding failure recovery I want to avoid.
> >>>> >
> >>>> > Assuming checkpoints 1, 2, 3 all completed. Each checkpoint
> generates
> >>>> a
> >>>> > manifest file, manifest-1, 2, 3.
> >>>> > timeline
> >>>> >
> >>>>
> ------------------------------------------------------------------------->
> >>>> > now
> >>>> > commitGlobally(manifest-1, nonce-1) started
> >>>> >          commitGlobally(manifest-2, nonce-2) started
> >>>> >                     commitGlobally(manifest-2, nonce-2) failed
> >>>> >                             commitGlobally(manifest-2 and
> manifest-3,
> >>>> > nonce-3) started
> >>>> >                                     commitGlobally(manifest-1,
> >>>> nonce-1)
> >>>> > failed
> >>>> >
>  commitGlobally(manifest-2
> >>>> and
> >>>> > manifest-3, nonce-3) succeeded
> >>>> >
> >>>> > Now the job failed and was restored from checkpoint 3, which
> contains
> >>>> > manifest file 1,2,3. We found nonce-3 was committed when checking
> >>>> Iceberg
> >>>> > table snapshot metadata. But in this case we won't be able to
> >>>> correctly
> >>>> > determine which manifest files were committed or not.
> >>>> >
> >>>> > If it is possible to have concurrent executions of  commitGlobally,
> >>>> the
> >>>> > alternative is to generate the unique id/nonce per GlobalCommT. Then
> >>>> we can
> >>>> > check each individual GlobalCommT (ManifestFile) with Iceberg
> snapshot
> >>>> > metadata.
> >>>> >
> >>>> > Thanks,
> >>>> > Steven
> >>>> >
> >>>> > [1]
> >>>> >
> >>>> >
> >>>>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569
> >>>> >
> >>>> > On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <
> aljos...@apache.org
> >>>> >
> >>>> > wrote:
> >>>> >
> >>>> > > Steven,
> >>>> > >
> >>>> > > we were also wondering if it is a strict requirement that "later"
> >>>> > > updates to Iceberg subsume earlier updates. In the current
> version,
> >>>> you
> >>>> > > only check whether checkpoint X made it to Iceberg and then
> discard
> >>>> all
> >>>> > > committable state from Flink state for checkpoints smaller X.
> >>>> > >
> >>>> > > If we go with a (somewhat random) nonce, this would not work.
> >>>> Instead
> >>>> > > the sink would have to check for each set of committables
> >>>> seperately if
> >>>> > > they had already been committed. Do you think this is feasible?
> >>>> During
> >>>> > > normal operation this set would be very small, it would usually
> >>>> only be
> >>>> > > the committables for the last checkpoint. Only when there is an
> >>>> outage
> >>>> > > would multiple sets of committables pile up.
> >>>> > >
> >>>> > > We were thinking to extend the GlobalCommitter interface to allow
> >>>> it to
> >>>> > > report success or failure and then let the framework retry. I
> think
> >>>> this
> >>>> > > is something that you would need for the Iceberg case. The
> signature
> >>>> > > could be like this:
> >>>> > >
> >>>> > > CommitStatus commitGlobally(List<Committable>, Nonce)
> >>>> > >
> >>>> > > where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE,
> >>>> and
> >>>> > > RETRY.
> >>>> > >
> >>>> > > Best,
> >>>> > > Aljoscha
> >>>> > >
> >>>> >
> >>>>
> >>>
>

Reply via email to