> I prefer to let the developer produce id to dedupe. I think this gives
the developer more opportunity to optimize.

Thinking about it again, I totally agree with Guowei on this. We don't
really need the framework to generate the unique id for Iceberg sink.
De-dup logic is totally internal to Iceberg sink and should be isolated
inside. My earlier question regarding "commitGlobally(List<GlobalCommT>)
can be concurrent or not" also becomes irrelevant, as long as the framework
handles the GlobalCommT list properly (even with concurrent calls).

Here are the things where framework can help

   1. run global committer in jobmanager (e.g. like sink coordinator)
   2. help with checkpointing, bookkeeping, commit failure handling,
   recovery


@Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter interface,
I have some clarifying questions.

> void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)

   1. You meant GlobalCommit -> GlobalCommT, right?
   2. Is this called when restored from checkpoint/savepoint?
   3.  Iceberg sink needs to do a dup check here on which GlobalCommT were
   committed and which weren't. Should it return the filtered/de-duped list of
   GlobalCommT?
   4. Sink implementation can decide if it wants to commit immediately or
   just leave

> void commit(GlobalCommit globalCommit);

should this be "commit(List<GlobalCommT>)"?

Thanks,
Steven


On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, all
>
> >>Just to add to what Aljoscha said regarding the unique id. Iceberg sink
> >>checkpoints the unique id into state during snapshot. It also inserts the
> >>unique id into the Iceberg snapshot metadata during commit. When a job
> >>restores the state after failure, it needs to know if the restored
> >>transactions/commits were successful or not. It basically iterates
> through
> >>the list of table snapshots from Iceberg and matches the unique ids with
> >>what is stored in Iceberg snapshot metadata.
>
> Thanks Steven for these detailed explanations. It makes me know the IceBerg
> better. However, I prefer to let the developer produce id to dedupe. I
> think this gives the developer more opportunity to optimize. You could see
> the following for more details. Please correct me if I misunderstand you.
>
> >> 3. Whether the `Writer` supports async functionality or not. Currently I
> do
> >> not know which sink could benefit from it. Maybe it is just my own
> problem.
>
> >> Here, I don't really know. We can introduce an "isAvailable()" method
> >> and mostly ignore it for now and sinks can just always return true. Or,
> >> as an alternative, we don't add the method now but can add it later with
> >> a default implementation. Either way, we will probably not take
> >> advantage of the "isAvailable()" now because that would require more
> >> runtime changes.
>
> From the @Pitor's explanation I could see the other benefit that might be
> gained in the future. For example decoupling the task number and the thread
> number. But I have to admit that introducing `isAvailable` might introduce
> some complications in the runtime. You could see my alternative API option
> in the following. I believe that we could support such an async sink writer
> very easily in the future. What do you think?
>
> >> Yes, this is still tricky. What is the current state, would the
> >> introduction of a "LocalCommit" and a "GlobalCommit" already solve both
> >> the Iceberg and Hive cases? I believe Hive is the most tricky one here,
> >> but if we introduce the "combine" method on GlobalCommit, that could
> >> serve the same purpose as the "aggregation operation" on the individual
> >> files, and we could even execute that "combine" in a distributed way.
> >>We assume that GlobalCommit is a Agg/Combiner?
>
> I would share what possible problems that I am seeing currently and the
> alternative options.
>
> ## IceBerg Sink
>
> ### Concern about generating nonce by framework.
>
> If let the `GlobalCommitter` provide a random nonce for the `IceBergSink` I
> think that it might not be efficient.  Because even if there are a very
> small number of committables in the state you still need to iterate all the
> iceberg snapshot files to check whether the committable is committed
> already. Even if it is efficient for the IceBergSink it might not be the
> case for other sinks.
>
> If the framework generates auto-increment nonce instead, it might still not
> be optimal for users. For example, users might want to use some business id
> so that after failover they could query whether the commit is successful
> after failover.
>
> I think users could generate more efficient nonce such as an auto-increment
> one. Therefore, it seems to provide more optimization chances if we let
> users to generate the nonce.
>
>
> ### Alternative Option
>
> public interface GlobalCommit<CommT, GlobalCommT> {
>         // provide some runtime context such as attempt-id,job-id,task-id.
>         void open(InitContext context);
>
>         // This GlobalCommit would aggregate the committable to a
> GlobalCommit before doing the commit operation.
>         GlobalCommT combine(List<Committable> commitables)
>
>         // This method would be called after committing all the
> GlobalCommit producing in the previous session.
>         void recoveredGlobalCommittables(List<GlobalCommit> globalCommits)
>
>         // developer would guarantee the idempotency by himself
>         void commit(GlobalCommit globalCommit);
> }
>
> User could guarantee the idenpointecy himself in a more efficient or
> application specific way. If the user wants the `GlobalCommit` to be
> executed in a distributed way, the user could use the runtime information
> to generate the partial order id himself.(We could ignore the clean up
> first)
>
> Currently the sink might be looks like following:
>
> Sink<IN, LC, LCO, GC> {
>         Writer<IN, LC> createWriter();
>         Optional<Committer<LC, LCO>> createCommitter();
>         Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter();
> }
>
> ## Hive
>
> The HiveSink needs to compute whether a directory is finished or not. But
> HiveSink can not use the above `combine` method to decide whether a
> directory is finished or not.
>
> For example we assume that whether the directory is finished or not is
> decided by the event time. There might be a topology that the source and
> sink are forward. The event time might be different in different instances
> of the `writer`. So the GlobalCommit’s combine can not produce a
> GlobalCommT when the snapshot happens.
>
> In addition to the above case we should also consider the unaligned
> checkpoint. Because the watermark does not skip. So there might be the same
> problem in the unaligned checkpoint.
>
> ### Option1:
>
> public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> {
>         // provide some runtime context such as attempt-id,job-id,task-id,
> maybe the event time;provide the restore state
>         void open(InitContext context, StateT state);
>
>         // This is for the HiveSink. When all the writer say that the the
> bucket is finished it would return a GlobalCommitT
>         Optional<GlobalCommT> combine(Committable commitables)
>
>         // This is for IcebergSink. Producing a GlobalCommitT every
> checkpoint.
>         Optional<GlobalCommT> preCommit();
>
>         // Maybe we need the shareState? After we decide the directory we
> make more detailed consideration then. The id could be remembered here.
>         StateT snapshotState();
>
>         // developer would guarantee the idempotency by himself
>         void commit(GlobalCommit globalCommit);
> }
>
> ### Option2
>
> Actually the `GlobalCommit` in the option1 mixes the `Writer` and
> `Committer` together. So it is intuitive to decouple the two functions. For
> support the hive we could prove a sink look like following
>
> Sink<In, LC, LCO, LCG> {
>         Writer<In, LC> createWriter();
>         Optional<Committer<LC, LCO>> createCommitter(); // we need this to
> change name.
>         Optional<Writer<LCO, LCG>> createGlobalAgg();
>         Optional<Committer<LCG, void>> createGlobalCommitter();
> }
>
> The pro of this method is that we use two basic concepts: `Committer` and
> `Writer` to build a HiveSink.
>
> ### CompactHiveSink / MergeHiveSink
>
> There are still other complicated cases, which are not satisfied by the
> above option. Users often complain about writing out many small files,
> which will affect file reading efficiency and the performance and stability
> of the distributed file system. CompactHiveSink/MergeHiveSink hopes to
> merge all files generated by this job in a single Checkpoint.
>
> The CompactHiveSink/MergeHiveSink topology can simply describe this
> topology as follows:
>
> CompactSubTopology -> GlobalAgg -> GobalCommitter.
>
> The CompactSubTopology would look like following:
>
> TmpFileWriter -> CompactCoodinator -> CompactorFileWriter
>
> Maybe the topology could be simpler but please keep in mind I just want to
> show that there might be very complicated topology requirements for users.
>
>
> A possible alternative option would be let the user build the topology
> himself. But considering we have two execution modes we could only use
> `Writer` and `Committer` to build the sink topology.
>
> ### Build Topology Option
>
> Sink<IN, OUT> {
>         Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a
> WriterBuidler
>         Sink<In, Out> addCommitter(Committer<In, Out> committer); // Maybe
> we could make this return Void if we do not consider code reuse and
> introduce the cleaner
> }
>
> ## Summary
> The requirements of sink might be different, maybe we could use two basic
> bricks(Writer/Committer) to let the user build their own sink topology.
> What do you guys think?
>
> I know the name stuff might be trikky for now but I want to discuss these
> things after we get the consus on the direction first.
>
> Best,
> Guowei
>
>
> On Sat, Sep 19, 2020 at 12:15 AM Steven Wu <stevenz...@gmail.com> wrote:
>
> > Aljoscha,
> >
> > > Instead the sink would have to check for each set of committables
> > seperately if they had already been committed. Do you think this is
> > feasible?
> >
> > Yes, that is how it works in our internal implementation [1]. We don't
> use
> > checkpointId. We generate a manifest file (GlobalCommT) to bundle all the
> > data files that the committer received in one checkpoint cycle. Then we
> > generate a unique manifest id for by hashing the location of the manifest
> > file. The manifest ids are stored in Iceberg snapshot metadata. Upon
> > restore, we check each of the restored manifest files against Iceberg
> table
> > snapshot metadata to determine if we should discard or keep the restored
> > manifest files. If a commit has multiple manifest files (e.g. accumulated
> > from previous failed commits), we store the comma-separated manifest ids
> in
> > Iceberg snapshot metadata.
> >
> > > During normal operation this set would be very small, it would usually
> > only be the committables for the last checkpoint. Only when there is an
> > outage would multiple sets of committables pile up.
> >
> > You are absolutely right here. Even if there are multiple sets of
> > committables, it is usually the last a few or dozen of snapshots we need
> to
> > check. Even with our current inefficient implementation of traversing all
> > table snapshots (in the scale of thousands) from oldest to latest, it
> only
> > took avg 60 ms and max 800 ms. so it is really not a concern for Iceberg.
> >
> > > CommitStatus commitGlobally(List<Committable>, Nonce)
> >
> > Just to clarify on the terminology here. Assuming here the Committable
> > meant the `GlobalCommT` (like ManifestFile in Iceberg) in
> > previous discussions, right? `CommT` means the Iceberg DataFile from
> writer
> > to committer.
> >
> > This can work assuming we *don't have concurrent executions
> > of commitGlobally* even with concurrent checkpoints. Here is the scenario
> > regarding failure recovery I want to avoid.
> >
> > Assuming checkpoints 1, 2, 3 all completed. Each checkpoint generates a
> > manifest file, manifest-1, 2, 3.
> > timeline
> >
> ------------------------------------------------------------------------->
> > now
> > commitGlobally(manifest-1, nonce-1) started
> >          commitGlobally(manifest-2, nonce-2) started
> >                     commitGlobally(manifest-2, nonce-2) failed
> >                             commitGlobally(manifest-2 and manifest-3,
> > nonce-3) started
> >                                     commitGlobally(manifest-1, nonce-1)
> > failed
> >                                             commitGlobally(manifest-2 and
> > manifest-3, nonce-3) succeeded
> >
> > Now the job failed and was restored from checkpoint 3, which contains
> > manifest file 1,2,3. We found nonce-3 was committed when checking Iceberg
> > table snapshot metadata. But in this case we won't be able to correctly
> > determine which manifest files were committed or not.
> >
> > If it is possible to have concurrent executions of  commitGlobally, the
> > alternative is to generate the unique id/nonce per GlobalCommT. Then we
> can
> > check each individual GlobalCommT (ManifestFile) with Iceberg snapshot
> > metadata.
> >
> > Thanks,
> > Steven
> >
> > [1]
> >
> >
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569
> >
> > On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Steven,
> > >
> > > we were also wondering if it is a strict requirement that "later"
> > > updates to Iceberg subsume earlier updates. In the current version, you
> > > only check whether checkpoint X made it to Iceberg and then discard all
> > > committable state from Flink state for checkpoints smaller X.
> > >
> > > If we go with a (somewhat random) nonce, this would not work. Instead
> > > the sink would have to check for each set of committables seperately if
> > > they had already been committed. Do you think this is feasible? During
> > > normal operation this set would be very small, it would usually only be
> > > the committables for the last checkpoint. Only when there is an outage
> > > would multiple sets of committables pile up.
> > >
> > > We were thinking to extend the GlobalCommitter interface to allow it to
> > > report success or failure and then let the framework retry. I think
> this
> > > is something that you would need for the Iceberg case. The signature
> > > could be like this:
> > >
> > > CommitStatus commitGlobally(List<Committable>, Nonce)
> > >
> > > where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE, and
> > > RETRY.
> > >
> > > Best,
> > > Aljoscha
> > >
> >
>

Reply via email to