Aljoscha/Guowei,

I think we are pretty close with aligning on the Iceberg sink requirements.
This new sink API can really benefit and simplify Iceberg sink
implementation. Looking forward to the initial scope with 1.12 release.

>   CommitResult commit(GlobalCommittableT);

I like the CommitResult return type. Since CommitResult can have RETRY,
which is probably the default behavior for commit failure. Framework would
accumulate a list of GlobalCommittableT. Then when the next checkpoint
happens, we will have more than one GlobalCommittableT item. Hence, I think
the commit method probably should take a list.

In addition, it is undesirable to do the committed-or-not check in the
commit method, which happens for each checkpoint cycle. CommitResult
already indicates SUCCESS or not. when framework calls commit with a list
of GlobalCommittableT, it should be certain they are uncommitted. The only
time we aren't sure is when a list of  GlobalCommittableT is restored from
a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do such a
check and filter out the ones that were already committed. Retained ones
will be committed in the next checkpoint cycle. Since framework takes care
of the checkpoint and restore, we need some hook for the sink to add the
custom logic on the restored list.

Thanks,
Steven


On Mon, Sep 21, 2020 at 10:37 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi all,
>
> I'll try and summarize my thoughts after Guowei, Yun, Kostas, Dawid, and
> me had an offline discussion about this.
>
> Also, I would like to give credit to Guowei for initially coming up with
> the idea of a topology sink in the context of this discussion. I think
> it's a good idea and we should pursue it in the future. And yes, Beam
> already does it like this but I hadn't thought about it now when
> thinking about the sink APIs because having a more limited API gives
> more freedom to the framework.
>
> ## Topology Sink vs. Transactional Sink
>
>  From the discussion, it seems clear to me that to support all kinds of
> different use cases we will have to offer some sort of API that allows
> Sink developers to specify mostly arbitrary operator topologies. I
> think, however, that we will not manage to finish such a (semi
> user-facing) API within the 1.12 release cycle with satisfactory
> results. Therefore, I would say that we need to go with a more
> straightforward TransactionalSink API (name TBD) that lets sink
> developers specify basic Writer, Committer, GlobalCommitter components
> as discussed above.
>
> This Sink interface would initially support a FileSink that supports
> HDFS/S3 and behaves like the StreamingFileSink does for STREAM execution
> mode. Additionally, it would seamlessly work for BATCH execution mode.
> With the addition of a properly designed GlobalCommitter this should
> also work for Iceberg.
>
> It seems to me that the Hive use cases are still to fuzzy and not well
> defined to allow us to come up with a good solution.
>
> ## Committer vs. GlobalCommitter or both
>
> To make it short, we should make both optional but also allow both to be
> used by the same sink.
>
> The Committer is the interface that should be preferred because it
> allows the framework to distribute the work of committing, i.e. it has
> more potential for being optimised.
>
> Iceberg would use only a GlobalCommitter.
>
> The FileSink would use only Committer but can optionally use a
> GlobalCommitter to create a _SUCCESS file in the output directory to
> emulate Hadoop to some degree. Writing such a _SUCCESS file would only
> work in BATCH execution mode and it would write a _SUCCESS file in the
> toplevel output directory. Writing _SUCCESS files in individual buckets
> or subdirectories whenever these buckets are considered "done" is a
> different problem, and one I don't think we can solve well right now.
>
> Initially, I would propose these interfaces that have been floated by
> various people above:
>
> interface Committer<CommittableT> {
>    CommitResult commit(CommittableT);
> }
>
> interface GlobalCommitter<CommittableT, GlobalCommittableT> {
>    GlobalCommittableT merge(List<CommittableT>);
>    CommitResult commit(GlobalCommittableT);
> }
>
> enum CommitResult {
>    SUCCESS, FAILURE, RETRY
> }
>
> Alternatively, we could consider changing commit() to take a List<> to
> allow the sink to better check if commits are already in the external
> system. For example, Iceberg would check for the whole batch whether
> they are already committed.
>
> Also alternatively, we could change the GlobalCommitter to basically
> return an AggregateFunction instead of the simple merge() function.
>
> What do you think?
>
> Best,
> Aljoscha
>
> On 21.09.20 10:06, Piotr Nowojski wrote:
> > Hi Guowei,
> >
> >> I believe that we could support such an async sink writer
> >> very easily in the future. What do you think?
> >
> > How would you see the expansion in the future? Do you mean just adding
> > `isAvailable()` method with a default implementation later on?
> >
> > Piotrek
> >
> > pon., 21 wrz 2020 o 02:39 Steven Wu <stevenz...@gmail.com> napisał(a):
> >
> >>> I think Iceberg sink needs to do the dedup in the `commit` call. The
> >> `recoveredGlobalCommittables` is just for restoring the ids.
> >>
> >>
> >> @Guowei Ma <guowei....@gmail.com>  It is undesirable to do the dedup
> check
> >> in the `commit` call, because it happens for each checkpoint cycle. We
> only
> >> need to do the de-dup check one time when restoring GlobalCommT list
> from
> >> the checkpoint.
> >>
> >>
> >> Can you clarify the purpose of `recoveredGlobalCommittables`? If it is
> to
> >> let sink implementations know the recovered GlobalCommT list, it is
> >> probably not a sufficient API. For the Iceberg sink, we can try to
> >> implement the de-dup check  inside the `recoveredGlobalCommittables`
> method
> >> and commit any uncommitted GlobalCommT items. But how do we handle the
> >> commit failed?
> >>
> >>
> >> One alternative is to allow sink implementations to override "Li
> >> st<GlobalCommT> recoverGlobalCommittables()". Framework handles the
> >> checkpoint/state, and sink implementations can further customize the
> >> restored list with de-dup check and filtering. Recovered uncommitted
> >> GlobalCommT list will be committed in the next cycle. It is the same
> >> rollover strategy for commit failure handling that we have been
> discussing.
> >>
> >>
> >> ## topologies
> >>
> >>
> >> Regarding the topology options, if we agree that there is no one size
> fit
> >> for all, we can let sink implementations choose the best topology. Maybe
> >> the framework can provide 2-3 pre-defined topology implementations to
> help
> >> the sinks.
> >>
> >>
> >>
> >>
> >> On Sun, Sep 20, 2020 at 3:27 AM Guowei Ma <guowei....@gmail.com> wrote:
> >>
> >>> I would like to summarize the file type sink in the thread and their
> >>> possible topologies.  I also try to give pros and cons of every
> topology
> >>> option. Correct me if I am wrong.
> >>>
> >>> ### FileSink
> >>>
> >>> Topology Option: TmpFileWriter + Committer.
> >>>
> >>> ### IceBerg Sink
> >>>
> >>> #### Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
> >>> Pro:
> >>> 1. Same group has some id.
> >>> Cons:
> >>> 1. May limit users’ optimization space;
> >>> 2. The topology does not meet the Hive’s requirements.
> >>>
> >>> #### Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
> >>> Pro:
> >>> 1. User has the opportunity to optimize the implementation of
> idempotence
> >>> Cons:
> >>> 2. Make the GlobalCommit more complicated.
> >>> 3. The topology does not meets the Hive’s requirements
> >>>
> >>> ### Topology Option3: DataFileWriter + AggWriter + Committer
> >>>
> >>> Pros:
> >>> 1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s
> >> requirements.
> >>> 2. Opportunity to optimize the implementation of idempotence
> >>> 3. The topology meets the Hive’s requirements.(See flowing)
> >>> Con:
> >>> 1. It introduce a relative complex topologies
> >>>
> >>> ## HiveSink
> >>>
> >>> ### Topology Option1: `TmpFileWriter` + `Committer` +
> >> `GlobalCommitterV2`.
> >>> Pro:
> >>> 1. Could skip the cleanup problem at first.
> >>> Con:
> >>> 1. This style topology does not meet the CompactHiveSink requirements.
> >>>
> >>> ### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
> >>> `Committer`
> >>> Pros
> >>> 1. Could skip the clean up problem at first.
> >>> 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> >>> Cons
> >>> 1. This style topology does not meet the CompactHiveSink requirements.
> >>> 2. There are two general `Committers` in the topology. For Hive’s case
> >>> there might be no problem. But there might be a problem in 1.12. For
> >>> example where to execute the sub-topology following the `Committer` in
> >>> batch execution mode for the general case. Because the topology is
> built
> >>> from `Writer` and `Committer` we might put all the sub-topology in the
> >>> OperatorCoordinator. But if the topology is too complicated it might be
> >>> very complicated. See following.
> >>>
> >>> ### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
> >>> Pro
> >>> 1. There is only one general committer.
> >>> Cons
> >>> 1. It has to consider the cleanup problem. (In theory both the Option1
> >> and
> >>> Option2 need to cleanup)
> >>> 2. This style topology does not meet the CompactHiveSink requirements.
> >>> 3. Have to figure out how to make the current version compatible.
> >>>
> >>> ### CompactHiveSink/MergeHiveSink
> >>>
> >>> #### Topology Option1 `TmpFileWriter` + `Committer` +
> `MergerCoordinator`
> >>> + `MergeWriter` + `GlobalCommiterV2`
> >>> Pro
> >>> 1. Could skip the clean up problem at first.
> >>> Cons
> >>> 2. Where to execute the sub-topology following the `Committer`.
> >>>
> >>> #### Topology Option2 `TmpFileWriter` + `Committer` +
> `MergerCoordinator`
> >>> + `MergeWriter` + AggWriter + Committer
> >>> Pros
> >>> 1. Could skip the clean up problem at first
> >>> 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
> >>> Con
> >>> 1. Where to execute the sub-topology following the `Committer`.
> >>>
> >>> ### Option3 FileWriter + MergeCoordinator + MergeFileWriter +
> Writer(Agg)
> >>> + Committer
> >>> Pro
> >>> 1. There is only one committer. It is very easy to support in the batch
> >>> execution mode.
> >>> Con
> >>> 2. It has to consider the cleanup problem. (In theory both the Option1
> >> and
> >>> Option2 need to cleanup)
> >>>
> >>>
> >>> ### Summary
> >>>
> >>>  From above we could divide the sink topology into two parts:
> >>> 1. Write topology.
> >>> 2. And One committer
> >>>
> >>> So we could provide a unified sink API looks like the following:
> >>>
> >>> public interface Sink<CommT> {
> >>>          List<Writer<?, ?>> getWriters();
> >>>          Committer<CommT> createCommitter()
> >>> }
> >>>
> >>> In the long run maybe we could give the user more powerful ability like
> >>> this (Currently some transformation still belongs to runtime):
> >>> Sink<CommT> {
> >>>          Transformation<CommT> createWriteTopology();
> >>>           CommitFunction<CommT> createCommitter();
> >>> }
> >>>
> >>> Best,
> >>> Guowei
> >>>
> >>>
> >>> On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma <guowei....@gmail.com>
> wrote:
> >>>
> >>>> Hi, Stevn
> >>>> I want to make a clarification first, the following reply only
> considers
> >>>> the Iceberge sink, but does not consider other sinks.  Before make
> >> decision
> >>>> we should consider all the sink.I would try to summary all the sink
> >>>> requirments in the next mail
> >>>>
> >>>>
> >>>>>>   run global committer in jobmanager (e.g. like sink coordinator)
> >>>>
> >>>> I think it could be.
> >>>>
> >>>>
> >>>>>> You meant GlobalCommit -> GlobalCommT, right?
> >>>>
> >>>> Yes. Thanks :)
> >>>>
> >>>>
> >>>>>> Is this called when restored from checkpoint/savepoint?
> >>>>
> >>>> Yes.
> >>>>
> >>>>
> >>>>>> Iceberg sink needs to do a dup check here on which GlobalCommT were
> >>>> committed and which weren't. Should it return the filtered/de-duped
> >> list of
> >>>> GlobalCommT?
> >>>>
> >>>>
> >>>> I think Iceberg sink needs to do the dedup in the `commit` call. The
> >>>> `recoveredGlobalCommittables` is just for restoring the ids.
> >>>>
> >>>>
> >>>>>> Sink implementation can decide if it wants to commit immediately or
> >>>> just leave
> >>>>
> >>>> I think only the frame knows *when* call the commit function.
> >>>>
> >>>>
> >>>>>> should this be "commit(List<GlobalCommT>)"?
> >>>>
> >>>> It could be. thanks.
> >>>>
> >>>>
> >>>> Best,
> >>>> Guowei
> >>>>
> >>>>
> >>>> On Sun, Sep 20, 2020 at 12:11 AM Steven Wu <stevenz...@gmail.com>
> >> wrote:
> >>>>
> >>>>>> I prefer to let the developer produce id to dedupe. I think this
> >> gives
> >>>>> the developer more opportunity to optimize.
> >>>>>
> >>>>> Thinking about it again, I totally agree with Guowei on this. We
> don't
> >>>>> really need the framework to generate the unique id for Iceberg sink.
> >>>>> De-dup logic is totally internal to Iceberg sink and should be
> isolated
> >>>>> inside. My earlier question regarding
> >> "commitGlobally(List<GlobalCommT>)
> >>>>> can be concurrent or not" also becomes irrelevant, as long as the
> >> framework
> >>>>> handles the GlobalCommT list properly (even with concurrent calls).
> >>>>>
> >>>>> Here are the things where framework can help
> >>>>>
> >>>>>     1. run global committer in jobmanager (e.g. like sink
> coordinator)
> >>>>>     2. help with checkpointing, bookkeeping, commit failure handling,
> >>>>>     recovery
> >>>>>
> >>>>>
> >>>>> @Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter
> >>>>> interface, I have some clarifying questions.
> >>>>>
> >>>>>> void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
> >>>>>
> >>>>>     1. You meant GlobalCommit -> GlobalCommT, right?
> >>>>>     2. Is this called when restored from checkpoint/savepoint?
> >>>>>     3.  Iceberg sink needs to do a dup check here on which
> GlobalCommT
> >>>>>     were committed and which weren't. Should it return the
> >> filtered/de-duped
> >>>>>     list of GlobalCommT?
> >>>>>     4. Sink implementation can decide if it wants to commit
> immediately
> >>>>>     or just leave
> >>>>>
> >>>>>> void commit(GlobalCommit globalCommit);
> >>>>>
> >>>>> should this be "commit(List<GlobalCommT>)"?
> >>>>>
> >>>>> Thanks,
> >>>>> Steven
> >>>>>
> >>>>>
> >>>>> On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma <guowei....@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi, all
> >>>>>>
> >>>>>>>> Just to add to what Aljoscha said regarding the unique id. Iceberg
> >>>>>> sink
> >>>>>>>> checkpoints the unique id into state during snapshot. It also
> >> inserts
> >>>>>> the
> >>>>>>>> unique id into the Iceberg snapshot metadata during commit. When a
> >> job
> >>>>>>>> restores the state after failure, it needs to know if the restored
> >>>>>>>> transactions/commits were successful or not. It basically iterates
> >>>>>> through
> >>>>>>>> the list of table snapshots from Iceberg and matches the unique
> ids
> >>>>>> with
> >>>>>>>> what is stored in Iceberg snapshot metadata.
> >>>>>>
> >>>>>> Thanks Steven for these detailed explanations. It makes me know the
> >>>>>> IceBerg
> >>>>>> better. However, I prefer to let the developer produce id to
> dedupe. I
> >>>>>> think this gives the developer more opportunity to optimize. You
> could
> >>>>>> see
> >>>>>> the following for more details. Please correct me if I misunderstand
> >>>>>> you.
> >>>>>>
> >>>>>>>> 3. Whether the `Writer` supports async functionality or not.
> >>>>>> Currently I
> >>>>>> do
> >>>>>>>> not know which sink could benefit from it. Maybe it is just my own
> >>>>>> problem.
> >>>>>>
> >>>>>>>> Here, I don't really know. We can introduce an "isAvailable()"
> >> method
> >>>>>>>> and mostly ignore it for now and sinks can just always return
> true.
> >>>>>> Or,
> >>>>>>>> as an alternative, we don't add the method now but can add it
> later
> >>>>>> with
> >>>>>>>> a default implementation. Either way, we will probably not take
> >>>>>>>> advantage of the "isAvailable()" now because that would require
> >> more
> >>>>>>>> runtime changes.
> >>>>>>
> >>>>>>  From the @Pitor's explanation I could see the other benefit that
> might
> >>>>>> be
> >>>>>> gained in the future. For example decoupling the task number and the
> >>>>>> thread
> >>>>>> number. But I have to admit that introducing `isAvailable` might
> >>>>>> introduce
> >>>>>> some complications in the runtime. You could see my alternative API
> >>>>>> option
> >>>>>> in the following. I believe that we could support such an async sink
> >>>>>> writer
> >>>>>> very easily in the future. What do you think?
> >>>>>>
> >>>>>>>> Yes, this is still tricky. What is the current state, would the
> >>>>>>>> introduction of a "LocalCommit" and a "GlobalCommit" already solve
> >>>>>> both
> >>>>>>>> the Iceberg and Hive cases? I believe Hive is the most tricky one
> >>>>>> here,
> >>>>>>>> but if we introduce the "combine" method on GlobalCommit, that
> >> could
> >>>>>>>> serve the same purpose as the "aggregation operation" on the
> >>>>>> individual
> >>>>>>>> files, and we could even execute that "combine" in a distributed
> >> way.
> >>>>>>>> We assume that GlobalCommit is a Agg/Combiner?
> >>>>>>
> >>>>>> I would share what possible problems that I am seeing currently and
> >> the
> >>>>>> alternative options.
> >>>>>>
> >>>>>> ## IceBerg Sink
> >>>>>>
> >>>>>> ### Concern about generating nonce by framework.
> >>>>>>
> >>>>>> If let the `GlobalCommitter` provide a random nonce for the
> >>>>>> `IceBergSink` I
> >>>>>> think that it might not be efficient.  Because even if there are a
> >> very
> >>>>>> small number of committables in the state you still need to iterate
> >> all
> >>>>>> the
> >>>>>> iceberg snapshot files to check whether the committable is committed
> >>>>>> already. Even if it is efficient for the IceBergSink it might not be
> >> the
> >>>>>> case for other sinks.
> >>>>>>
> >>>>>> If the framework generates auto-increment nonce instead, it might
> >> still
> >>>>>> not
> >>>>>> be optimal for users. For example, users might want to use some
> >>>>>> business id
> >>>>>> so that after failover they could query whether the commit is
> >> successful
> >>>>>> after failover.
> >>>>>>
> >>>>>> I think users could generate more efficient nonce such as an
> >>>>>> auto-increment
> >>>>>> one. Therefore, it seems to provide more optimization chances if we
> >> let
> >>>>>> users to generate the nonce.
> >>>>>>
> >>>>>>
> >>>>>> ### Alternative Option
> >>>>>>
> >>>>>> public interface GlobalCommit<CommT, GlobalCommT> {
> >>>>>>          // provide some runtime context such as
> >>>>>> attempt-id,job-id,task-id.
> >>>>>>          void open(InitContext context);
> >>>>>>
> >>>>>>          // This GlobalCommit would aggregate the committable to a
> >>>>>> GlobalCommit before doing the commit operation.
> >>>>>>          GlobalCommT combine(List<Committable> commitables)
> >>>>>>
> >>>>>>          // This method would be called after committing all the
> >>>>>> GlobalCommit producing in the previous session.
> >>>>>>          void recoveredGlobalCommittables(List<GlobalCommit>
> >>>>>> globalCommits)
> >>>>>>
> >>>>>>          // developer would guarantee the idempotency by himself
> >>>>>>          void commit(GlobalCommit globalCommit);
> >>>>>> }
> >>>>>>
> >>>>>> User could guarantee the idenpointecy himself in a more efficient or
> >>>>>> application specific way. If the user wants the `GlobalCommit` to be
> >>>>>> executed in a distributed way, the user could use the runtime
> >>>>>> information
> >>>>>> to generate the partial order id himself.(We could ignore the clean
> up
> >>>>>> first)
> >>>>>>
> >>>>>> Currently the sink might be looks like following:
> >>>>>>
> >>>>>> Sink<IN, LC, LCO, GC> {
> >>>>>>          Writer<IN, LC> createWriter();
> >>>>>>          Optional<Committer<LC, LCO>> createCommitter();
> >>>>>>          Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter();
> >>>>>> }
> >>>>>>
> >>>>>> ## Hive
> >>>>>>
> >>>>>> The HiveSink needs to compute whether a directory is finished or
> not.
> >>>>>> But
> >>>>>> HiveSink can not use the above `combine` method to decide whether a
> >>>>>> directory is finished or not.
> >>>>>>
> >>>>>> For example we assume that whether the directory is finished or not
> is
> >>>>>> decided by the event time. There might be a topology that the source
> >> and
> >>>>>> sink are forward. The event time might be different in different
> >>>>>> instances
> >>>>>> of the `writer`. So the GlobalCommit’s combine can not produce a
> >>>>>> GlobalCommT when the snapshot happens.
> >>>>>>
> >>>>>> In addition to the above case we should also consider the unaligned
> >>>>>> checkpoint. Because the watermark does not skip. So there might be
> the
> >>>>>> same
> >>>>>> problem in the unaligned checkpoint.
> >>>>>>
> >>>>>> ### Option1:
> >>>>>>
> >>>>>> public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> {
> >>>>>>          // provide some runtime context such as
> >>>>>> attempt-id,job-id,task-id,
> >>>>>> maybe the event time;provide the restore state
> >>>>>>          void open(InitContext context, StateT state);
> >>>>>>
> >>>>>>          // This is for the HiveSink. When all the writer say that
> the
> >>>>>> the
> >>>>>> bucket is finished it would return a GlobalCommitT
> >>>>>>          Optional<GlobalCommT> combine(Committable commitables)
> >>>>>>
> >>>>>>          // This is for IcebergSink. Producing a GlobalCommitT every
> >>>>>> checkpoint.
> >>>>>>          Optional<GlobalCommT> preCommit();
> >>>>>>
> >>>>>>          // Maybe we need the shareState? After we decide the
> directory
> >>>>>> we
> >>>>>> make more detailed consideration then. The id could be remembered
> >> here.
> >>>>>>          StateT snapshotState();
> >>>>>>
> >>>>>>          // developer would guarantee the idempotency by himself
> >>>>>>          void commit(GlobalCommit globalCommit);
> >>>>>> }
> >>>>>>
> >>>>>> ### Option2
> >>>>>>
> >>>>>> Actually the `GlobalCommit` in the option1 mixes the `Writer` and
> >>>>>> `Committer` together. So it is intuitive to decouple the two
> >> functions.
> >>>>>> For
> >>>>>> support the hive we could prove a sink look like following
> >>>>>>
> >>>>>> Sink<In, LC, LCO, LCG> {
> >>>>>>          Writer<In, LC> createWriter();
> >>>>>>          Optional<Committer<LC, LCO>> createCommitter(); // we need
> >> this
> >>>>>> to
> >>>>>> change name.
> >>>>>>          Optional<Writer<LCO, LCG>> createGlobalAgg();
> >>>>>>          Optional<Committer<LCG, void>> createGlobalCommitter();
> >>>>>> }
> >>>>>>
> >>>>>> The pro of this method is that we use two basic concepts:
> `Committer`
> >>>>>> and
> >>>>>> `Writer` to build a HiveSink.
> >>>>>>
> >>>>>> ### CompactHiveSink / MergeHiveSink
> >>>>>>
> >>>>>> There are still other complicated cases, which are not satisfied by
> >> the
> >>>>>> above option. Users often complain about writing out many small
> files,
> >>>>>> which will affect file reading efficiency and the performance and
> >>>>>> stability
> >>>>>> of the distributed file system. CompactHiveSink/MergeHiveSink hopes
> to
> >>>>>> merge all files generated by this job in a single Checkpoint.
> >>>>>>
> >>>>>> The CompactHiveSink/MergeHiveSink topology can simply describe this
> >>>>>> topology as follows:
> >>>>>>
> >>>>>> CompactSubTopology -> GlobalAgg -> GobalCommitter.
> >>>>>>
> >>>>>> The CompactSubTopology would look like following:
> >>>>>>
> >>>>>> TmpFileWriter -> CompactCoodinator -> CompactorFileWriter
> >>>>>>
> >>>>>> Maybe the topology could be simpler but please keep in mind I just
> >> want
> >>>>>> to
> >>>>>> show that there might be very complicated topology requirements for
> >>>>>> users.
> >>>>>>
> >>>>>>
> >>>>>> A possible alternative option would be let the user build the
> topology
> >>>>>> himself. But considering we have two execution modes we could only
> use
> >>>>>> `Writer` and `Committer` to build the sink topology.
> >>>>>>
> >>>>>> ### Build Topology Option
> >>>>>>
> >>>>>> Sink<IN, OUT> {
> >>>>>>          Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a
> >>>>>> WriterBuidler
> >>>>>>          Sink<In, Out> addCommitter(Committer<In, Out> committer);
> //
> >>>>>> Maybe
> >>>>>> we could make this return Void if we do not consider code reuse and
> >>>>>> introduce the cleaner
> >>>>>> }
> >>>>>>
> >>>>>> ## Summary
> >>>>>> The requirements of sink might be different, maybe we could use two
> >>>>>> basic
> >>>>>> bricks(Writer/Committer) to let the user build their own sink
> >> topology.
> >>>>>> What do you guys think?
> >>>>>>
> >>>>>> I know the name stuff might be trikky for now but I want to discuss
> >>>>>> these
> >>>>>> things after we get the consus on the direction first.
> >>>>>>
> >>>>>> Best,
> >>>>>> Guowei
> >>>>>>
> >>>>>>
> >>>>>> On Sat, Sep 19, 2020 at 12:15 AM Steven Wu <stevenz...@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Aljoscha,
> >>>>>>>
> >>>>>>>> Instead the sink would have to check for each set of committables
> >>>>>>> seperately if they had already been committed. Do you think this is
> >>>>>>> feasible?
> >>>>>>>
> >>>>>>> Yes, that is how it works in our internal implementation [1]. We
> >>>>>> don't use
> >>>>>>> checkpointId. We generate a manifest file (GlobalCommT) to bundle
> >> all
> >>>>>> the
> >>>>>>> data files that the committer received in one checkpoint cycle.
> Then
> >>>>>> we
> >>>>>>> generate a unique manifest id for by hashing the location of the
> >>>>>> manifest
> >>>>>>> file. The manifest ids are stored in Iceberg snapshot metadata.
> Upon
> >>>>>>> restore, we check each of the restored manifest files against
> >> Iceberg
> >>>>>> table
> >>>>>>> snapshot metadata to determine if we should discard or keep the
> >>>>>> restored
> >>>>>>> manifest files. If a commit has multiple manifest files (e.g.
> >>>>>> accumulated
> >>>>>>> from previous failed commits), we store the comma-separated
> manifest
> >>>>>> ids in
> >>>>>>> Iceberg snapshot metadata.
> >>>>>>>
> >>>>>>>> During normal operation this set would be very small, it would
> >>>>>> usually
> >>>>>>> only be the committables for the last checkpoint. Only when there
> is
> >>>>>> an
> >>>>>>> outage would multiple sets of committables pile up.
> >>>>>>>
> >>>>>>> You are absolutely right here. Even if there are multiple sets of
> >>>>>>> committables, it is usually the last a few or dozen of snapshots we
> >>>>>> need to
> >>>>>>> check. Even with our current inefficient implementation of
> >> traversing
> >>>>>> all
> >>>>>>> table snapshots (in the scale of thousands) from oldest to latest,
> >> it
> >>>>>> only
> >>>>>>> took avg 60 ms and max 800 ms. so it is really not a concern for
> >>>>>> Iceberg.
> >>>>>>>
> >>>>>>>> CommitStatus commitGlobally(List<Committable>, Nonce)
> >>>>>>>
> >>>>>>> Just to clarify on the terminology here. Assuming here the
> >> Committable
> >>>>>>> meant the `GlobalCommT` (like ManifestFile in Iceberg) in
> >>>>>>> previous discussions, right? `CommT` means the Iceberg DataFile
> from
> >>>>>> writer
> >>>>>>> to committer.
> >>>>>>>
> >>>>>>> This can work assuming we *don't have concurrent executions
> >>>>>>> of commitGlobally* even with concurrent checkpoints. Here is the
> >>>>>> scenario
> >>>>>>> regarding failure recovery I want to avoid.
> >>>>>>>
> >>>>>>> Assuming checkpoints 1, 2, 3 all completed. Each checkpoint
> >> generates
> >>>>>> a
> >>>>>>> manifest file, manifest-1, 2, 3.
> >>>>>>> timeline
> >>>>>>>
> >>>>>>
> >>
> ------------------------------------------------------------------------->
> >>>>>>> now
> >>>>>>> commitGlobally(manifest-1, nonce-1) started
> >>>>>>>           commitGlobally(manifest-2, nonce-2) started
> >>>>>>>                      commitGlobally(manifest-2, nonce-2) failed
> >>>>>>>                              commitGlobally(manifest-2 and
> >> manifest-3,
> >>>>>>> nonce-3) started
> >>>>>>>                                      commitGlobally(manifest-1,
> >>>>>> nonce-1)
> >>>>>>> failed
> >>>>>>>
> >>   commitGlobally(manifest-2
> >>>>>> and
> >>>>>>> manifest-3, nonce-3) succeeded
> >>>>>>>
> >>>>>>> Now the job failed and was restored from checkpoint 3, which
> >> contains
> >>>>>>> manifest file 1,2,3. We found nonce-3 was committed when checking
> >>>>>> Iceberg
> >>>>>>> table snapshot metadata. But in this case we won't be able to
> >>>>>> correctly
> >>>>>>> determine which manifest files were committed or not.
> >>>>>>>
> >>>>>>> If it is possible to have concurrent executions of  commitGlobally,
> >>>>>> the
> >>>>>>> alternative is to generate the unique id/nonce per GlobalCommT.
> Then
> >>>>>> we can
> >>>>>>> check each individual GlobalCommT (ManifestFile) with Iceberg
> >> snapshot
> >>>>>>> metadata.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Steven
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569
> >>>>>>>
> >>>>>>> On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <
> >> aljos...@apache.org
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Steven,
> >>>>>>>>
> >>>>>>>> we were also wondering if it is a strict requirement that "later"
> >>>>>>>> updates to Iceberg subsume earlier updates. In the current
> >> version,
> >>>>>> you
> >>>>>>>> only check whether checkpoint X made it to Iceberg and then
> >> discard
> >>>>>> all
> >>>>>>>> committable state from Flink state for checkpoints smaller X.
> >>>>>>>>
> >>>>>>>> If we go with a (somewhat random) nonce, this would not work.
> >>>>>> Instead
> >>>>>>>> the sink would have to check for each set of committables
> >>>>>> seperately if
> >>>>>>>> they had already been committed. Do you think this is feasible?
> >>>>>> During
> >>>>>>>> normal operation this set would be very small, it would usually
> >>>>>> only be
> >>>>>>>> the committables for the last checkpoint. Only when there is an
> >>>>>> outage
> >>>>>>>> would multiple sets of committables pile up.
> >>>>>>>>
> >>>>>>>> We were thinking to extend the GlobalCommitter interface to allow
> >>>>>> it to
> >>>>>>>> report success or failure and then let the framework retry. I
> >> think
> >>>>>> this
> >>>>>>>> is something that you would need for the Iceberg case. The
> >> signature
> >>>>>>>> could be like this:
> >>>>>>>>
> >>>>>>>> CommitStatus commitGlobally(List<Committable>, Nonce)
> >>>>>>>>
> >>>>>>>> where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE,
> >>>>>> and
> >>>>>>>> RETRY.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>
> >
>
>

Reply via email to