Hi Guowei, > I believe that we could support such an async sink writer > very easily in the future. What do you think?
How would you see the expansion in the future? Do you mean just adding `isAvailable()` method with a default implementation later on? Piotrek pon., 21 wrz 2020 o 02:39 Steven Wu <stevenz...@gmail.com> napisał(a): > > I think Iceberg sink needs to do the dedup in the `commit` call. The > `recoveredGlobalCommittables` is just for restoring the ids. > > > @Guowei Ma <guowei....@gmail.com> It is undesirable to do the dedup check > in the `commit` call, because it happens for each checkpoint cycle. We only > need to do the de-dup check one time when restoring GlobalCommT list from > the checkpoint. > > > Can you clarify the purpose of `recoveredGlobalCommittables`? If it is to > let sink implementations know the recovered GlobalCommT list, it is > probably not a sufficient API. For the Iceberg sink, we can try to > implement the de-dup check inside the `recoveredGlobalCommittables` method > and commit any uncommitted GlobalCommT items. But how do we handle the > commit failed? > > > One alternative is to allow sink implementations to override "Li > st<GlobalCommT> recoverGlobalCommittables()". Framework handles the > checkpoint/state, and sink implementations can further customize the > restored list with de-dup check and filtering. Recovered uncommitted > GlobalCommT list will be committed in the next cycle. It is the same > rollover strategy for commit failure handling that we have been discussing. > > > ## topologies > > > Regarding the topology options, if we agree that there is no one size fit > for all, we can let sink implementations choose the best topology. Maybe > the framework can provide 2-3 pre-defined topology implementations to help > the sinks. > > > > > On Sun, Sep 20, 2020 at 3:27 AM Guowei Ma <guowei....@gmail.com> wrote: > > > I would like to summarize the file type sink in the thread and their > > possible topologies. I also try to give pros and cons of every topology > > option. Correct me if I am wrong. > > > > ### FileSink > > > > Topology Option: TmpFileWriter + Committer. > > > > ### IceBerg Sink > > > > #### Topology Option1: `DataFileWriter` + `GlobalCommitterV0`. > > Pro: > > 1. Same group has some id. > > Cons: > > 1. May limit users’ optimization space; > > 2. The topology does not meet the Hive’s requirements. > > > > #### Topology Option 2: `DataFileWriter` + `GlobalCommitterV1` > > Pro: > > 1. User has the opportunity to optimize the implementation of idempotence > > Cons: > > 2. Make the GlobalCommit more complicated. > > 3. The topology does not meets the Hive’s requirements > > > > ### Topology Option3: DataFileWriter + AggWriter + Committer > > > > Pros: > > 1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s > requirements. > > 2. Opportunity to optimize the implementation of idempotence > > 3. The topology meets the Hive’s requirements.(See flowing) > > Con: > > 1. It introduce a relative complex topologies > > > > ## HiveSink > > > > ### Topology Option1: `TmpFileWriter` + `Committer` + > `GlobalCommitterV2`. > > Pro: > > 1. Could skip the cleanup problem at first. > > Con: > > 1. This style topology does not meet the CompactHiveSink requirements. > > > > ### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` + > > `Committer` > > Pros > > 1. Could skip the clean up problem at first. > > 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer` > > Cons > > 1. This style topology does not meet the CompactHiveSink requirements. > > 2. There are two general `Committers` in the topology. For Hive’s case > > there might be no problem. But there might be a problem in 1.12. For > > example where to execute the sub-topology following the `Committer` in > > batch execution mode for the general case. Because the topology is built > > from `Writer` and `Committer` we might put all the sub-topology in the > > OperatorCoordinator. But if the topology is too complicated it might be > > very complicated. See following. > > > > ### Topology Option3 `FileWriter` + `AggWriter` + `Committer` > > Pro > > 1. There is only one general committer. > > Cons > > 1. It has to consider the cleanup problem. (In theory both the Option1 > and > > Option2 need to cleanup) > > 2. This style topology does not meet the CompactHiveSink requirements. > > 3. Have to figure out how to make the current version compatible. > > > > ### CompactHiveSink/MergeHiveSink > > > > #### Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator` > > + `MergeWriter` + `GlobalCommiterV2` > > Pro > > 1. Could skip the clean up problem at first. > > Cons > > 2. Where to execute the sub-topology following the `Committer`. > > > > #### Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator` > > + `MergeWriter` + AggWriter + Committer > > Pros > > 1. Could skip the clean up problem at first > > 2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer` > > Con > > 1. Where to execute the sub-topology following the `Committer`. > > > > ### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg) > > + Committer > > Pro > > 1. There is only one committer. It is very easy to support in the batch > > execution mode. > > Con > > 2. It has to consider the cleanup problem. (In theory both the Option1 > and > > Option2 need to cleanup) > > > > > > ### Summary > > > > From above we could divide the sink topology into two parts: > > 1. Write topology. > > 2. And One committer > > > > So we could provide a unified sink API looks like the following: > > > > public interface Sink<CommT> { > > List<Writer<?, ?>> getWriters(); > > Committer<CommT> createCommitter() > > } > > > > In the long run maybe we could give the user more powerful ability like > > this (Currently some transformation still belongs to runtime): > > Sink<CommT> { > > Transformation<CommT> createWriteTopology(); > > CommitFunction<CommT> createCommitter(); > > } > > > > Best, > > Guowei > > > > > > On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma <guowei....@gmail.com> wrote: > > > >> Hi, Stevn > >> I want to make a clarification first, the following reply only considers > >> the Iceberge sink, but does not consider other sinks. Before make > decision > >> we should consider all the sink.I would try to summary all the sink > >> requirments in the next mail > >> > >> > >> >> run global committer in jobmanager (e.g. like sink coordinator) > >> > >> I think it could be. > >> > >> > >> >> You meant GlobalCommit -> GlobalCommT, right? > >> > >> Yes. Thanks :) > >> > >> > >> >> Is this called when restored from checkpoint/savepoint? > >> > >> Yes. > >> > >> > >> >>Iceberg sink needs to do a dup check here on which GlobalCommT were > >> committed and which weren't. Should it return the filtered/de-duped > list of > >> GlobalCommT? > >> > >> > >> I think Iceberg sink needs to do the dedup in the `commit` call. The > >> `recoveredGlobalCommittables` is just for restoring the ids. > >> > >> > >> >> Sink implementation can decide if it wants to commit immediately or > >> just leave > >> > >> I think only the frame knows *when* call the commit function. > >> > >> > >> >>should this be "commit(List<GlobalCommT>)"? > >> > >> It could be. thanks. > >> > >> > >> Best, > >> Guowei > >> > >> > >> On Sun, Sep 20, 2020 at 12:11 AM Steven Wu <stevenz...@gmail.com> > wrote: > >> > >>> > I prefer to let the developer produce id to dedupe. I think this > gives > >>> the developer more opportunity to optimize. > >>> > >>> Thinking about it again, I totally agree with Guowei on this. We don't > >>> really need the framework to generate the unique id for Iceberg sink. > >>> De-dup logic is totally internal to Iceberg sink and should be isolated > >>> inside. My earlier question regarding > "commitGlobally(List<GlobalCommT>) > >>> can be concurrent or not" also becomes irrelevant, as long as the > framework > >>> handles the GlobalCommT list properly (even with concurrent calls). > >>> > >>> Here are the things where framework can help > >>> > >>> 1. run global committer in jobmanager (e.g. like sink coordinator) > >>> 2. help with checkpointing, bookkeeping, commit failure handling, > >>> recovery > >>> > >>> > >>> @Guowei Ma <guowei....@gmail.com> regarding the GlobalCommitter > >>> interface, I have some clarifying questions. > >>> > >>> > void recoveredGlobalCommittables(List<GlobalCommit> globalCommits) > >>> > >>> 1. You meant GlobalCommit -> GlobalCommT, right? > >>> 2. Is this called when restored from checkpoint/savepoint? > >>> 3. Iceberg sink needs to do a dup check here on which GlobalCommT > >>> were committed and which weren't. Should it return the > filtered/de-duped > >>> list of GlobalCommT? > >>> 4. Sink implementation can decide if it wants to commit immediately > >>> or just leave > >>> > >>> > void commit(GlobalCommit globalCommit); > >>> > >>> should this be "commit(List<GlobalCommT>)"? > >>> > >>> Thanks, > >>> Steven > >>> > >>> > >>> On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma <guowei....@gmail.com> > wrote: > >>> > >>>> Hi, all > >>>> > >>>> >>Just to add to what Aljoscha said regarding the unique id. Iceberg > >>>> sink > >>>> >>checkpoints the unique id into state during snapshot. It also > inserts > >>>> the > >>>> >>unique id into the Iceberg snapshot metadata during commit. When a > job > >>>> >>restores the state after failure, it needs to know if the restored > >>>> >>transactions/commits were successful or not. It basically iterates > >>>> through > >>>> >>the list of table snapshots from Iceberg and matches the unique ids > >>>> with > >>>> >>what is stored in Iceberg snapshot metadata. > >>>> > >>>> Thanks Steven for these detailed explanations. It makes me know the > >>>> IceBerg > >>>> better. However, I prefer to let the developer produce id to dedupe. I > >>>> think this gives the developer more opportunity to optimize. You could > >>>> see > >>>> the following for more details. Please correct me if I misunderstand > >>>> you. > >>>> > >>>> >> 3. Whether the `Writer` supports async functionality or not. > >>>> Currently I > >>>> do > >>>> >> not know which sink could benefit from it. Maybe it is just my own > >>>> problem. > >>>> > >>>> >> Here, I don't really know. We can introduce an "isAvailable()" > method > >>>> >> and mostly ignore it for now and sinks can just always return true. > >>>> Or, > >>>> >> as an alternative, we don't add the method now but can add it later > >>>> with > >>>> >> a default implementation. Either way, we will probably not take > >>>> >> advantage of the "isAvailable()" now because that would require > more > >>>> >> runtime changes. > >>>> > >>>> From the @Pitor's explanation I could see the other benefit that might > >>>> be > >>>> gained in the future. For example decoupling the task number and the > >>>> thread > >>>> number. But I have to admit that introducing `isAvailable` might > >>>> introduce > >>>> some complications in the runtime. You could see my alternative API > >>>> option > >>>> in the following. I believe that we could support such an async sink > >>>> writer > >>>> very easily in the future. What do you think? > >>>> > >>>> >> Yes, this is still tricky. What is the current state, would the > >>>> >> introduction of a "LocalCommit" and a "GlobalCommit" already solve > >>>> both > >>>> >> the Iceberg and Hive cases? I believe Hive is the most tricky one > >>>> here, > >>>> >> but if we introduce the "combine" method on GlobalCommit, that > could > >>>> >> serve the same purpose as the "aggregation operation" on the > >>>> individual > >>>> >> files, and we could even execute that "combine" in a distributed > way. > >>>> >>We assume that GlobalCommit is a Agg/Combiner? > >>>> > >>>> I would share what possible problems that I am seeing currently and > the > >>>> alternative options. > >>>> > >>>> ## IceBerg Sink > >>>> > >>>> ### Concern about generating nonce by framework. > >>>> > >>>> If let the `GlobalCommitter` provide a random nonce for the > >>>> `IceBergSink` I > >>>> think that it might not be efficient. Because even if there are a > very > >>>> small number of committables in the state you still need to iterate > all > >>>> the > >>>> iceberg snapshot files to check whether the committable is committed > >>>> already. Even if it is efficient for the IceBergSink it might not be > the > >>>> case for other sinks. > >>>> > >>>> If the framework generates auto-increment nonce instead, it might > still > >>>> not > >>>> be optimal for users. For example, users might want to use some > >>>> business id > >>>> so that after failover they could query whether the commit is > successful > >>>> after failover. > >>>> > >>>> I think users could generate more efficient nonce such as an > >>>> auto-increment > >>>> one. Therefore, it seems to provide more optimization chances if we > let > >>>> users to generate the nonce. > >>>> > >>>> > >>>> ### Alternative Option > >>>> > >>>> public interface GlobalCommit<CommT, GlobalCommT> { > >>>> // provide some runtime context such as > >>>> attempt-id,job-id,task-id. > >>>> void open(InitContext context); > >>>> > >>>> // This GlobalCommit would aggregate the committable to a > >>>> GlobalCommit before doing the commit operation. > >>>> GlobalCommT combine(List<Committable> commitables) > >>>> > >>>> // This method would be called after committing all the > >>>> GlobalCommit producing in the previous session. > >>>> void recoveredGlobalCommittables(List<GlobalCommit> > >>>> globalCommits) > >>>> > >>>> // developer would guarantee the idempotency by himself > >>>> void commit(GlobalCommit globalCommit); > >>>> } > >>>> > >>>> User could guarantee the idenpointecy himself in a more efficient or > >>>> application specific way. If the user wants the `GlobalCommit` to be > >>>> executed in a distributed way, the user could use the runtime > >>>> information > >>>> to generate the partial order id himself.(We could ignore the clean up > >>>> first) > >>>> > >>>> Currently the sink might be looks like following: > >>>> > >>>> Sink<IN, LC, LCO, GC> { > >>>> Writer<IN, LC> createWriter(); > >>>> Optional<Committer<LC, LCO>> createCommitter(); > >>>> Optional<GlobalCommitter<LCO, GC>> createGlobalCommitter(); > >>>> } > >>>> > >>>> ## Hive > >>>> > >>>> The HiveSink needs to compute whether a directory is finished or not. > >>>> But > >>>> HiveSink can not use the above `combine` method to decide whether a > >>>> directory is finished or not. > >>>> > >>>> For example we assume that whether the directory is finished or not is > >>>> decided by the event time. There might be a topology that the source > and > >>>> sink are forward. The event time might be different in different > >>>> instances > >>>> of the `writer`. So the GlobalCommit’s combine can not produce a > >>>> GlobalCommT when the snapshot happens. > >>>> > >>>> In addition to the above case we should also consider the unaligned > >>>> checkpoint. Because the watermark does not skip. So there might be the > >>>> same > >>>> problem in the unaligned checkpoint. > >>>> > >>>> ### Option1: > >>>> > >>>> public interface GlobalCommit<CommT, GlobalCommT, StateT, ShareT> { > >>>> // provide some runtime context such as > >>>> attempt-id,job-id,task-id, > >>>> maybe the event time;provide the restore state > >>>> void open(InitContext context, StateT state); > >>>> > >>>> // This is for the HiveSink. When all the writer say that the > >>>> the > >>>> bucket is finished it would return a GlobalCommitT > >>>> Optional<GlobalCommT> combine(Committable commitables) > >>>> > >>>> // This is for IcebergSink. Producing a GlobalCommitT every > >>>> checkpoint. > >>>> Optional<GlobalCommT> preCommit(); > >>>> > >>>> // Maybe we need the shareState? After we decide the directory > >>>> we > >>>> make more detailed consideration then. The id could be remembered > here. > >>>> StateT snapshotState(); > >>>> > >>>> // developer would guarantee the idempotency by himself > >>>> void commit(GlobalCommit globalCommit); > >>>> } > >>>> > >>>> ### Option2 > >>>> > >>>> Actually the `GlobalCommit` in the option1 mixes the `Writer` and > >>>> `Committer` together. So it is intuitive to decouple the two > functions. > >>>> For > >>>> support the hive we could prove a sink look like following > >>>> > >>>> Sink<In, LC, LCO, LCG> { > >>>> Writer<In, LC> createWriter(); > >>>> Optional<Committer<LC, LCO>> createCommitter(); // we need > this > >>>> to > >>>> change name. > >>>> Optional<Writer<LCO, LCG>> createGlobalAgg(); > >>>> Optional<Committer<LCG, void>> createGlobalCommitter(); > >>>> } > >>>> > >>>> The pro of this method is that we use two basic concepts: `Committer` > >>>> and > >>>> `Writer` to build a HiveSink. > >>>> > >>>> ### CompactHiveSink / MergeHiveSink > >>>> > >>>> There are still other complicated cases, which are not satisfied by > the > >>>> above option. Users often complain about writing out many small files, > >>>> which will affect file reading efficiency and the performance and > >>>> stability > >>>> of the distributed file system. CompactHiveSink/MergeHiveSink hopes to > >>>> merge all files generated by this job in a single Checkpoint. > >>>> > >>>> The CompactHiveSink/MergeHiveSink topology can simply describe this > >>>> topology as follows: > >>>> > >>>> CompactSubTopology -> GlobalAgg -> GobalCommitter. > >>>> > >>>> The CompactSubTopology would look like following: > >>>> > >>>> TmpFileWriter -> CompactCoodinator -> CompactorFileWriter > >>>> > >>>> Maybe the topology could be simpler but please keep in mind I just > want > >>>> to > >>>> show that there might be very complicated topology requirements for > >>>> users. > >>>> > >>>> > >>>> A possible alternative option would be let the user build the topology > >>>> himself. But considering we have two execution modes we could only use > >>>> `Writer` and `Committer` to build the sink topology. > >>>> > >>>> ### Build Topology Option > >>>> > >>>> Sink<IN, OUT> { > >>>> Sink<In, Out> addWriter(Writer<In, Out> Writer); // Maybe a > >>>> WriterBuidler > >>>> Sink<In, Out> addCommitter(Committer<In, Out> committer); // > >>>> Maybe > >>>> we could make this return Void if we do not consider code reuse and > >>>> introduce the cleaner > >>>> } > >>>> > >>>> ## Summary > >>>> The requirements of sink might be different, maybe we could use two > >>>> basic > >>>> bricks(Writer/Committer) to let the user build their own sink > topology. > >>>> What do you guys think? > >>>> > >>>> I know the name stuff might be trikky for now but I want to discuss > >>>> these > >>>> things after we get the consus on the direction first. > >>>> > >>>> Best, > >>>> Guowei > >>>> > >>>> > >>>> On Sat, Sep 19, 2020 at 12:15 AM Steven Wu <stevenz...@gmail.com> > >>>> wrote: > >>>> > >>>> > Aljoscha, > >>>> > > >>>> > > Instead the sink would have to check for each set of committables > >>>> > seperately if they had already been committed. Do you think this is > >>>> > feasible? > >>>> > > >>>> > Yes, that is how it works in our internal implementation [1]. We > >>>> don't use > >>>> > checkpointId. We generate a manifest file (GlobalCommT) to bundle > all > >>>> the > >>>> > data files that the committer received in one checkpoint cycle. Then > >>>> we > >>>> > generate a unique manifest id for by hashing the location of the > >>>> manifest > >>>> > file. The manifest ids are stored in Iceberg snapshot metadata. Upon > >>>> > restore, we check each of the restored manifest files against > Iceberg > >>>> table > >>>> > snapshot metadata to determine if we should discard or keep the > >>>> restored > >>>> > manifest files. If a commit has multiple manifest files (e.g. > >>>> accumulated > >>>> > from previous failed commits), we store the comma-separated manifest > >>>> ids in > >>>> > Iceberg snapshot metadata. > >>>> > > >>>> > > During normal operation this set would be very small, it would > >>>> usually > >>>> > only be the committables for the last checkpoint. Only when there is > >>>> an > >>>> > outage would multiple sets of committables pile up. > >>>> > > >>>> > You are absolutely right here. Even if there are multiple sets of > >>>> > committables, it is usually the last a few or dozen of snapshots we > >>>> need to > >>>> > check. Even with our current inefficient implementation of > traversing > >>>> all > >>>> > table snapshots (in the scale of thousands) from oldest to latest, > it > >>>> only > >>>> > took avg 60 ms and max 800 ms. so it is really not a concern for > >>>> Iceberg. > >>>> > > >>>> > > CommitStatus commitGlobally(List<Committable>, Nonce) > >>>> > > >>>> > Just to clarify on the terminology here. Assuming here the > Committable > >>>> > meant the `GlobalCommT` (like ManifestFile in Iceberg) in > >>>> > previous discussions, right? `CommT` means the Iceberg DataFile from > >>>> writer > >>>> > to committer. > >>>> > > >>>> > This can work assuming we *don't have concurrent executions > >>>> > of commitGlobally* even with concurrent checkpoints. Here is the > >>>> scenario > >>>> > regarding failure recovery I want to avoid. > >>>> > > >>>> > Assuming checkpoints 1, 2, 3 all completed. Each checkpoint > generates > >>>> a > >>>> > manifest file, manifest-1, 2, 3. > >>>> > timeline > >>>> > > >>>> > -------------------------------------------------------------------------> > >>>> > now > >>>> > commitGlobally(manifest-1, nonce-1) started > >>>> > commitGlobally(manifest-2, nonce-2) started > >>>> > commitGlobally(manifest-2, nonce-2) failed > >>>> > commitGlobally(manifest-2 and > manifest-3, > >>>> > nonce-3) started > >>>> > commitGlobally(manifest-1, > >>>> nonce-1) > >>>> > failed > >>>> > > commitGlobally(manifest-2 > >>>> and > >>>> > manifest-3, nonce-3) succeeded > >>>> > > >>>> > Now the job failed and was restored from checkpoint 3, which > contains > >>>> > manifest file 1,2,3. We found nonce-3 was committed when checking > >>>> Iceberg > >>>> > table snapshot metadata. But in this case we won't be able to > >>>> correctly > >>>> > determine which manifest files were committed or not. > >>>> > > >>>> > If it is possible to have concurrent executions of commitGlobally, > >>>> the > >>>> > alternative is to generate the unique id/nonce per GlobalCommT. Then > >>>> we can > >>>> > check each individual GlobalCommT (ManifestFile) with Iceberg > snapshot > >>>> > metadata. > >>>> > > >>>> > Thanks, > >>>> > Steven > >>>> > > >>>> > [1] > >>>> > > >>>> > > >>>> > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java#L569 > >>>> > > >>>> > On Fri, Sep 18, 2020 at 2:44 AM Aljoscha Krettek < > aljos...@apache.org > >>>> > > >>>> > wrote: > >>>> > > >>>> > > Steven, > >>>> > > > >>>> > > we were also wondering if it is a strict requirement that "later" > >>>> > > updates to Iceberg subsume earlier updates. In the current > version, > >>>> you > >>>> > > only check whether checkpoint X made it to Iceberg and then > discard > >>>> all > >>>> > > committable state from Flink state for checkpoints smaller X. > >>>> > > > >>>> > > If we go with a (somewhat random) nonce, this would not work. > >>>> Instead > >>>> > > the sink would have to check for each set of committables > >>>> seperately if > >>>> > > they had already been committed. Do you think this is feasible? > >>>> During > >>>> > > normal operation this set would be very small, it would usually > >>>> only be > >>>> > > the committables for the last checkpoint. Only when there is an > >>>> outage > >>>> > > would multiple sets of committables pile up. > >>>> > > > >>>> > > We were thinking to extend the GlobalCommitter interface to allow > >>>> it to > >>>> > > report success or failure and then let the framework retry. I > think > >>>> this > >>>> > > is something that you would need for the Iceberg case. The > signature > >>>> > > could be like this: > >>>> > > > >>>> > > CommitStatus commitGlobally(List<Committable>, Nonce) > >>>> > > > >>>> > > where CommitStatus could be an enum of SUCCESS, TERMINAL_FAILURE, > >>>> and > >>>> > > RETRY. > >>>> > > > >>>> > > Best, > >>>> > > Aljoscha > >>>> > > > >>>> > > >>>> > >>> >