>>  run global committer in jobmanager (e.g. like sink coordinator)

>> You meant GlobalCommit -> GlobalCommT, right?

>> Is this called when restored from checkpoint/savepoint?


>>Iceberg sink needs to do a dup check here on which GlobalCommT were
committed and which weren't. Should it return the filtered/de-duped list of

I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.

>> Sink implementation can decide if it wants to commit immediately or just

I think only the frame knows *when* call the commit function.

>>should this be "commit(List<GlobalCommT>)"?

> > I prefer to let the developer produce id to dedupe. I think this gives
> the developer more opportunity to optimize.
> Thinking about it again, I totally agree with Guowei on this. We don't
> really need the framework to generate the unique id for Iceberg sink.
> De-dup logic is totally internal to Iceberg sink and should be isolated
> inside. My earlier question regarding "commitGlobally(List<GlobalCommT>)
> can be concurrent or not" also becomes irrelevant, as long as the framework
> handles the GlobalCommT list properly (even with concurrent calls).
> Here are the things where framework can help
>    1. run global committer in jobmanager (e.g. like sink coordinator)
>    2. help with checkpointing, bookkeeping, commit failure handling,
>    recovery
> @Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter
> interface, I have some clarifying questions.
> > void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
>    1. You meant GlobalCommit -> GlobalCommT, right?
>    2. Is this called when restored from checkpoint/savepoint?
>    3.  Iceberg sink needs to do a dup check here on which GlobalCommT
>    were committed and which weren't. Should it return the filtered/de-duped
>    list of GlobalCommT?
>    4. Sink implementation can decide if it wants to commit immediately or
>    just leave
> > void commit(GlobalCommit globalCommit);
> should this be "commit(List<GlobalCommT>)"?
>> Hi, all
>> >>Just to add to what Aljoscha said regarding the unique id. Iceberg sink
>> >>checkpoints the unique id into state during snapshot. It also inserts
>> the
>> >>unique id into the Iceberg snapshot metadata during commit. When a job
>> >>restores the state after failure, it needs to know if the restored
>> >>transactions/commits were successful or not. It basically iterates
>> through
>> >>the list of table snapshots from Iceberg and matches the unique ids with
>> >>what is stored in Iceberg snapshot metadata.
>> Thanks Steven for these detailed explanations. It makes me know the
>> IceBerg
>> better. However, I prefer to let the developer produce id to dedupe. I
>> think this gives the developer more opportunity to optimize. You could see
>> the following for more details. Please correct me if I misunderstand you.
>> >> 3. Whether the `Writer` supports async functionality or not. Currently
>> I
>> do
>> >> not know which sink could benefit from it. Maybe it is just my own
>> problem.
>> >> Here, I don't really know. We can introduce an "isAvailable()" method
>> >> and mostly ignore it for now and sinks can just always return true. Or,
>> >> as an alternative, we don't add the method now but can add it later
>> with
>> >> a default implementation. Either way, we will probably not take
>> >> advantage of the "isAvailable()" now because that would require more
>> >> runtime changes.
>> From the @Pitor's explanation I could see the other benefit that might be
>> gained in the future. For example decoupling the task number and the
>> thread
>> number. But I have to admit that introducing `isAvailable` might introduce
>> some complications in the runtime. You could see my alternative API option
>> in the following. I believe that we could support such an async sink
>> writer
>> very easily in the future. What do you think?
>> >> Yes, this is still tricky. What is the current state, would the
>> >> introduction of a "LocalCommit" and a "GlobalCommit" already solve both
>> >> the Iceberg and Hive cases? I believe Hive is the most tricky one here,
>> >> but if we introduce the "combine" method on GlobalCommit, that could
>> >> serve the same purpose as the "aggregation operation" on the individual
>> >> files, and we could even execute that "combine" in a distributed way.
>> >>We assume that GlobalCommit is a Agg/Combiner?
>> I would share what possible problems that I am seeing currently and the
>> alternative options.
>> ## IceBerg Sink
>> ### Concern about generating nonce by framework.
>> If let the `GlobalCommitter` provide a random nonce for the `IceBergSink`
>> I
>> think that it might not be efficient.  Because even if there are a very
>> small number of committables in the state you still need to iterate all
>> the
>> iceberg snapshot files to check whether the committable is committed
>> already. Even if it is efficient for the IceBergSink it might not be the
>> case for other sinks.
>> If the framework generates auto-increment nonce instead, it might still
>> not
>> be optimal for users. For example, users might want to use some business
>> id
>> so that after failover they could query whether the commit is successful
>> after failover.
>> I think users could generate more efficient nonce such as an
>> auto-increment
>> one. Therefore, it seems to provide more optimization chances if we let
>> users to generate the nonce.
>> ### Alternative Option
>> public interface GlobalCommit<CommT, GlobalCommT> {
>>         // provide some runtime context such as attempt-id,job-id,task-id.
>>         void open(InitContext context);
>>         // This GlobalCommit would aggregate the committable to a
>> GlobalCommit before doing the commit operation.
>>         GlobalCommT combine(List<Committable> commitables)
>>         // This method would be called after committing all the
>> GlobalCommit producing in the previous session.
>>         void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
>>         // developer would guarantee the idempotency by himself
>>         void commit(GlobalCommit globalCommit);
>> }
>> User could guarantee the idenpointecy himself in a more efficient or
>> application specific way. If the user wants the `GlobalCommit` to be
>> executed in a distributed way, the user could use the runtime information
>> to generate the partial order id himself.(We could ignore the clean up
>> first)
>> Currently the sink might be looks like following:
>> Sink<IN, LC, LCO, GC> {
>>         Writer<IN, LC> createWriter();
>>         Optional<Committer<LC, LCO>> createCommitter();
>>         Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter();
>> }
>> ## Hive
>> The HiveSink needs to compute whether a directory is finished or not. But
>> HiveSink can not use the above `combine` method to decide whether a
>> directory is finished or not.
>> For example we assume that whether the directory is finished or not is
>> decided by the event time. There might be a topology that the source and
>> sink are forward. The event time might be different in different instances
>> of the `writer`. So the GlobalCommit’s combine can not produce a
>> GlobalCommT when the snapshot happens.
>> In addition to the above case we should also consider the unaligned
>> checkpoint. Because the watermark does not skip. So there might be the
>> same
>> problem in the unaligned checkpoint.
>> ### Option1:
>> public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> {
>>         // provide some runtime context such as attempt-id,job-id,task-id,
>> maybe the event time;provide the restore state
>>         void open(InitContext context, StateT state);
>>         // This is for the HiveSink. When all the writer say that the the
>> bucket is finished it would return a GlobalCommitT
>>         Optional<GlobalCommT> combine(Committable commitables)
>>         // This is for IcebergSink. Producing a GlobalCommitT every
>> checkpoint.
>>         Optional<GlobalCommT> preCommit();
>>         // Maybe we need the shareState? After we decide the directory we
>> make more detailed consideration then. The id could be remembered here.
>>         StateT snapshotState();
>>         // developer would guarantee the idempotency by himself
>>         void commit(GlobalCommit globalCommit);
>> }
>> ### Option2
>> Actually the `GlobalCommit` in the option1 mixes the `Writer` and
>> `Committer` together. So it is intuitive to decouple the two functions.
>> For
>> support the hive we could prove a sink look like following
>> Sink<In, LC, LCO, LCG> {
>>         Writer<In, LC> createWriter();
>>         Optional<Committer<LC, LCO>> createCommitter(); // we need this to
>> change name.
>>         Optional<Writer<LCO, LCG>> createGlobalAgg();
>>         Optional<Committer<LCG, void>> createGlobalCommitter();
>> }
>> The pro of this method is that we use two basic concepts: `Committer` and
>> `Writer` to build a HiveSink.
>> ### CompactHiveSink / MergeHiveSink
>> There are still other complicated cases, which are not satisfied by the
>> above option. Users often complain about writing out many small files,
>> which will affect file reading efficiency and the performance and
>> stability
>> of the distributed file system. CompactHiveSink/MergeHiveSink hopes to
>> merge all files generated by this job in a single Checkpoint.
>> The CompactHiveSink/MergeHiveSink topology can simply describe this
>> topology as follows:
>> CompactSubTopology -> GlobalAgg -> GobalCommitter.
>> The CompactSubTopology would look like following:
>> TmpFileWriter -> CompactCoodinator -> CompactorFileWriter
>> Maybe the topology could be simpler but please keep in mind I just want to
>> show that there might be very complicated topology requirements for users.
>> A possible alternative option would be let the user build the topology
>> himself. But considering we have two execution modes we could only use
>> `Writer` and `Committer` to build the sink topology.
>> ### Build Topology Option
>> Sink<IN, OUT> {
>>         Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a
>> WriterBuidler
>>         Sink<In, Out> addCommitter(Committer<In, Out> committer); // Maybe
>> we could make this return Void if we do not consider code reuse and
>> introduce the cleaner
>> }
>> ## Summary
>> The requirements of sink might be different, maybe we could use two basic
>> bricks(Writer/Committer) to let the user build their own sink topology.
>> What do you guys think?
>> I know the name stuff might be trikky for now but I want to discuss these
>> things after we get the consus on the direction first.
>> > Aljoscha,
>> >
>> > > Instead the sink would have to check for each set of committables
>> > seperately if they had already been committed. Do you think this is
>> > feasible?
>> >
>> > Yes, that is how it works in our internal implementation [1]. We don't
>> use
>> > checkpointId. We generate a manifest file (GlobalCommT) to bundle all
>> the
>> > data files that the committer received in one checkpoint cycle. Then we
>> > generate a unique manifest id for by hashing the location of the
>> manifest
>> > file. The manifest ids are stored in Iceberg snapshot metadata. Upon
>> > restore, we check each of the restored manifest files against Iceberg
>> table
>> > snapshot metadata to determine if we should discard or keep the restored
>> > manifest files. If a commit has multiple manifest files (e.g.
>> accumulated
>> > from previous failed commits), we store the comma-separated manifest
>> ids in
>> > Iceberg snapshot metadata.
>> >
>> > > During normal operation this set would be very small, it would usually
>> > only be the committables for the last checkpoint. Only when there is an
>> > outage would multiple sets of committables pile up.
>> >
>> > You are absolutely right here. Even if there are multiple sets of
>> > committables, it is usually the last a few or dozen of snapshots we
>> need to
>> > check. Even with our current inefficient implementation of traversing
>> all
>> > table snapshots (in the scale of thousands) from oldest to latest, it
>> only
>> > took avg 60 ms and max 800 ms. so it is really not a concern for
>> Iceberg.
>> >
>> > > CommitStatus commitGlobally(List<Committable>, Nonce)
>> >
>> > Just to clarify on the terminology here. Assuming here the Committable
>> > meant the `GlobalCommT` (like ManifestFile in Iceberg) in
>> > previous discussions, right? `CommT` means the Iceberg DataFile from
>> writer
>> > to committer.
>> >
>> > This can work assuming we *don't have concurrent executions
>> > of commitGlobally* even with concurrent checkpoints. Here is the
>> scenario
>> > regarding failure recovery I want to avoid.
>> >
>> > Assuming checkpoints 1, 2, 3 all completed. Each checkpoint generates a
>> > manifest file, manifest-1, 2, 3.
>> > timeline
>> >
>> ------------------------------------------------------------------------->
>> > now
>> > commitGlobally(manifest-1, nonce-1) started
>> >          commitGlobally(manifest-2, nonce-2) started
>> >                     commitGlobally(manifest-2, nonce-2) failed
>> >                             commitGlobally(manifest-2 and manifest-3,
>> > nonce-3) started
>> >                                     commitGlobally(manifest-1, nonce-1)
>> > failed
>> >                                             commitGlobally(manifest-2
>> and
>> > manifest-3, nonce-3) succeeded
>> >
>> > Now the job failed and was restored from checkpoint 3, which contains
>> > manifest file 1,2,3. We found nonce-3 was committed when checking
>> Iceberg
>> > table snapshot metadata. But in this case we won't be able to correctly
>> > determine which manifest files were committed or not.
>> >
>> > If it is possible to have concurrent executions of  commitGlobally, the
>> > alternative is to generate the unique id/nonce per GlobalCommT. Then we
>> can
>> > check each individual GlobalCommT (ManifestFile) with Iceberg snapshot
>> > metadata.
>> >
>> > >
>> > > we were also wondering if it is a strict requirement that "later"
>> > > updates to Iceberg subsume earlier updates. In the current version,
>> you
>> > > only check whether checkpoint X made it to Iceberg and then discard
>> all
>> > > committable state from Flink state for checkpoints smaller X.
>> > >
>> > > If we go with a (somewhat random) nonce, this would not work. Instead
>> > > the sink would have to check for each set of committables seperately
>> if
>> > > they had already been committed. Do you think this is feasible? During
>> > > normal operation this set would be very small, it would usually only
>> be
>> > > the committables for the last checkpoint. Only when there is an outage
>> > > would multiple sets of committables pile up.
>> > >
>> > > We were thinking to extend the GlobalCommitter interface to allow it
>> to
>> > > report success or failure and then let the framework retry. I think
>> this
>> > > is something that you would need for the Iceberg case. The signature
>> > > could be like this:
>> > >
>> > > CommitStatus commitGlobally(List<Committable>, Nonce)
>> > >
>> > > where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE, and
>> > > RETRY.
>> > >
