Hi, Steven I am not particularly sure whether to provide id in GlobalCommit.
But my understanding is: if the committer function is idempotent, the framework can guarantee exactly once semantics in batch/stream execution mode. But I think maybe the idempotence should be guaranteed by the sink developer, not on the basic API. We could provide an id in GlobalCommit. But the following question would be that: do we need to provide an id for a normal committable? I would like to say that I prefer to make the committer single responsibility. I think maybe we could have an answer when the first nonconsensual question is resolved. Aboving is just my personal opinion. I think this is still an open question. Thank you again for your valuable and thoughtful response. Best, Guowei On Thu, Sep 17, 2020 at 10:53 AM Steven Wu <stevenz...@gmail.com> wrote: > Guowei, thanks a lot for the summary. Here are a couple more questions that > need more clarification for the GlobalCommitter case. > > * framework provides some sort of unique id per GlobalCommT (e.g. nonce or > some sort of transaction id) > * commit failure handling. Should we roll over to the next cycle? if so, we > may need commit(List<GlobalCommT> ) > > On Wed, Sep 16, 2020 at 2:11 AM Piotr Nowojski <piotr.nowoj...@gmail.com> > wrote: > > > Hey > > > > Thanks Dawid for bringing up my suggestion :) > > > > > I'm not so sure about this, the sinks I'm aware of would not be able to > > > implement this method: Kafka doesn't have this, I didn't see it in the > > > Iceberg interfaces, and HDFS/S3 also don't have it. > > > > Aljoscha, as I wrote, FlinkKafkaProducer is actually one for which we > > could do some magic. At the very least we could use > > `FlinkKafkaProducer#pendingRecords` to make the sink unavailable when > some > > threshold is exceeded. Alternatively, maybe we could hook in to the > > KafkaProducer's buffer state [1]: > > > > > The buffer.memory controls the total amount of memory available to the > > producer for buffering. > > > If records are sent faster than they can be transmitted to the server > > then this buffer space will be exhausted. > > > When the buffer space is exhausted additional send calls will block. > > > > As far as I can see, Kafka is exposing the `buffer-available-bytes` > > metric, which we might use instead of `pendingRecords`. Heck, we are > > already hacking KafkaProducer with reflections, we could access > > `org.apache.kafka.clients.producer.KafkaProducer#accumulator` field to > > call `accumulator.bufferPoolAvailableMemory()` method, if metric would > be > > to expensive to check per every record. > > > > Furthermore, I'm pretty sure other sinks (maybe not all) provide similar > > features. If we are desperate, we could always contribute something to > > those systems to make them expose the internal buffer's state. > > > > If we are really desperate, we could provide a generic records handover > > wrapper sink, that would have a buffer of N (5? 10? ) records and would > be > > handing over those records to the blocking sink running in another > thread. > > If the buffer is full, the sink would be unavailable. > > > > Guowei > > > Does the sink's snapshot return immediately when the sink's status is > > unavailable? > > > > State snapshot call is generally speaking non blocking already, so it > > should not be an issue. If it's blocking and if it will be solving some > > problem, we could later decide in the runtime code to not execute > snapshot > > calls if a sink is unavailable. Think about isAvailable more like a hint > > from the operator to the runtime, which we can use to make better > > decisions. Also take a look at the FLIP-27 sources (`SourceReader`), > where > > there already is `isAvailable()` method. It would be best if new sinks > > would just duplicate the same contract. > > > > > For me I want to know is what specific sink will benefit from this > > feature > > > > It's not the sinks that would benefit from this, but other parts of the > > system. Currently task thread is blocked on backpressured Sink, it's > > blocking some things from happening (checkpointing, closing, ...). If we > > make sinks non blocking (as is the network stack in the most part and as > > are the FLIP-27 sources), we will be able to snapshot state of the > operator > > immediately. For example, change from blocking to non blocking sources > was > > speeding up unaligned checkpoints from ~30seconds down to ~5seconds in > > our benchmarks, but the difference can be even more profound (hours > instead > > of seconds/minutes as reported by some users). > > > > Piotrek > > > > [1] > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > > > śr., 16 wrz 2020 o 06:29 Guowei Ma <guowei....@gmail.com> napisał(a): > > > >> Hi,all > >> > >> Thanks for all your valuable options and ideas.Currently there are many > >> topics in the mail. I try to summarize what is consensus and what is > not. > >> Correct me if I am wrong. > >> > >> ## Consensus > >> > >> 1. The motivation of the unified sink API is to decouple the sink > >> implementation from the different runtime execution mode. > >> 2. The initial scope of the unified sink API only covers the file system > >> type, which supports the real transactions. The FLIP focuses more on the > >> semantics the new sink api should support. > >> 3. We prefer the first alternative API, which could give the framework a > >> greater opportunity to optimize. > >> 4. The `Writer` needs to add a method `prepareCommit`, which would be > >> called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. > >> 5. The FLIP could move the `Snapshot & Drain` section in order to be > more > >> focused. > >> > >> ## Not Consensus > >> > >> 1. What should the “Unified Sink API” support/cover? The API can > >> “unified”(decoupe) the commit operation in the term of supporting > exactly > >> once semantics. However, even if we narrow down the initial supported > >> system to the file system there would be different topology > requirements. > >> These requirements come from performance optimization > >> (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is > >> “finished”). Should the unified sink API support these requirements? > >> 2. The API does not expose the checkpoint-id because the batch execution > >> mode does not have the normal checkpoint. But there still some > >> implementations depend on this.(IceBergSink uses this to do some > dedupe). > >> I think how to support this requirement depends on the first open > >> question. > >> 3. Whether the `Writer` supports async functionality or not. Currently I > >> do > >> not know which sink could benefit from it. Maybe it is just my own > >> problem. > >> > >> Best, > >> Guowei > >> > >> > >> On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma <guowei....@gmail.com> > wrote: > >> > >> > > >> > Hi, Steven > >> > Thanks you for your thoughtful ideas and concerns. > >> > > >> > >>I still like the concept of grouping data files per checkpoint for > >> > streaming mode. it is cleaner and probably easier to manage and deal > >> with > >> > commit failures. Plus, it >>can reduce dupes for the at least once > >> > >>mode. I understand checkpoint is not an option for batch execution. > >> We > >> > don't have to expose the checkpointId in API, as >>long as the > internal > >> > bookkeeping groups data files by checkpoints for streaming >>mode. > >> > > >> > I think this problem(How to dedupe the combined committed data) also > >> > depends on where to place the agg/combine logic . > >> > > >> > 1. If the agg/combine takes place in the “commit” maybe we need to > >> figure > >> > out how to give the aggregated committable a unique and auto-increment > >> id > >> > in the committer. > >> > 2. If the agg/combine takes place in a separate operator maybe sink > >> > developer could maintain the id itself by using the state. > >> > > >> > I think this problem is also decided by what the topology pattern the > >> sink > >> > API should support. Actually there are already many other topology > >> > requirements. :) > >> > > >> > Best, > >> > Guowei > >> > > >> > > >> > On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com> > wrote: > >> > > >> >> > AFAIK the committer would not see the file-1-2 when ck1 happens in > >> the > >> >> ExactlyOnce mode. > >> >> > >> >> @Guowei Ma <guowei....@gmail.com> I think you are right for exactly > >> once > >> >> checkpoint semantics. what about "at least once"? I guess we can > argue > >> that > >> >> it is fine to commit file-1-2 for at least once mode. > >> >> > >> >> I still like the concept of grouping data files per checkpoint for > >> >> streaming mode. it is cleaner and probably easier to manage and deal > >> with > >> >> commit failures. Plus, it can reduce dupes for the at least once > >> mode. I > >> >> understand checkpoint is not an option for batch execution. We don't > >> have > >> >> to expose the checkpointId in API, as long as the internal > bookkeeping > >> >> groups data files by checkpoints for streaming mode. > >> >> > >> >> > >> >> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com> > >> wrote: > >> >> > >> >>> > images don't make it through to the mailing lists. You would need > to > >> >>> host the file somewhere and send a link. > >> >>> > >> >>> Sorry about that. Here is the sample DAG in google drawings. > >> >>> > >> >>> > >> > https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing > >> >>> > >> >>> > >> >>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> > >> wrote: > >> >>> > >> >>>> Hi, Dawid > >> >>>> > >> >>>> >>I still find the merging case the most confusing. I don't > >> necessarily > >> >>>> understand why do you need the "SingleFileCommit" step in this > >> scenario. > >> >>>> The way I > >> >>>> >> understand "commit" operation is that it makes some > data/artifacts > >> >>>> visible to the external system, thus it should be immutable from a > >> >>>> point of > >> >>>> view of a single >>process. Having an additional step in the same > >> >>>> process > >> >>>> that works on committed data contradicts with those assumptions. I > >> >>>> might be > >> >>>> missing something though. >> Could you elaborate >why can't it be > >> >>>> something > >> >>>> like FileWriter -> FileMergeWriter -> Committer (either global or > >> >>>> non-global)? Again it might be just me not getting the example. > >> >>>> > >> >>>> I think you are right. The topology > >> >>>> "FileWriter->FileMergeWriter->Committer" could meet the merge > >> >>>> requirement. > >> >>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter > -> > >> >>>> GlobalCommitter" reuses some code of the StreamingFileSink(For > >> example > >> >>>> rolling policy) so it has the "SingleFileCommitter" in the > topology. > >> In > >> >>>> general I want to use the case to show that there are different > >> >>>> topologies > >> >>>> according to the requirements. > >> >>>> > >> >>>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me > that > >> >>>> the > >> >>>> actual topology of merged supported HiveSink is more complicated > than > >> >>>> that. > >> >>>> > >> >>>> > >> >>>> >> I've just briefly skimmed over the proposed interfaces. I would > >> >>>> suggest > >> >>>> one > >> >>>> >> addition to the Writer interface (as I understand this is the > >> runtime > >> >>>> >> interface in this proposal?): add some availability method, to > >> >>>> avoid, if > >> >>>> >> possible, blocking calls on the sink. We already have similar > >> >>>> >> availability methods in the new sources [1] and in various > places > >> in > >> >>>> the > >> >>>> >> network stack [2]. > >> >>>> >> BTW Let's not forget about Piotr's comment. I think we could add > >> the > >> >>>> isAvailable or similar method to the Writer interface in the FLIP. > >> >>>> > >> >>>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org> for your > >> reminder. > >> >>>> There > >> >>>> are two many issues at the same time. > >> >>>> > >> >>>> In addition to what Ajjoscha said : there is very little system > >> support > >> >>>> it. Another thing I worry about is that: Does the sink's snapshot > >> >>>> return > >> >>>> immediately when the sink's status is unavailable? Maybe we could > do > >> it > >> >>>> by > >> >>>> dedupe some element in the state but I think it might be too > >> >>>> complicated. > >> >>>> For me I want to know is what specific sink will benefit from this > >> >>>> feature. @piotr <pi...@ververica.com> Please correct me if I > >> >>>> misunderstand you. thanks. > >> >>>> > >> >>>> Best, > >> >>>> Guowei > >> >>>> > >> >>>> > >> >>>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz < > >> >>>> dwysakow...@apache.org> > >> >>>> wrote: > >> >>>> > >> >>>> > What I understand is that HiveSink's implementation might need > the > >> >>>> local > >> >>>> > committer(FileCommitter) because the file rename is needed. > >> >>>> > But the iceberg only needs to write the manifest file. Would you > >> >>>> like to > >> >>>> > enlighten me why the Iceberg needs the local committer? > >> >>>> > Thanks > >> >>>> > > >> >>>> > Sorry if I caused a confusion here. I am not saying the Iceberg > >> sink > >> >>>> needs > >> >>>> > a local committer. What I had in mind is that prior to the > Iceberg > >> >>>> example > >> >>>> > I did not see a need for a "GlobalCommitter" in the streaming > >> case. I > >> >>>> > thought it is always enough to have the "normal" committer in > that > >> >>>> case. > >> >>>> > Now I understand that this differentiation is not really about > >> logical > >> >>>> > separation. It is not really about the granularity with which we > >> >>>> commit, > >> >>>> > i.e. answering the "WHAT" question. It is really about the > >> >>>> performance and > >> >>>> > that in the end we will have a single "transaction", so it is > about > >> >>>> > answering the question "HOW". > >> >>>> > > >> >>>> > > >> >>>> > - > >> >>>> > > >> >>>> > Commit a directory with merged files(Some user want to merge > the > >> >>>> files > >> >>>> > in a directory before committing the directory to Hive meta > >> store) > >> >>>> > > >> >>>> > > >> >>>> > 1. > >> >>>> > > >> >>>> > FileWriter -> SingleFileCommit -> FileMergeWriter -> > >> >>>> GlobalCommitter > >> >>>> > > >> >>>> > I still find the merging case the most confusing. I don't > >> necessarily > >> >>>> > understand why do you need the "SingleFileCommit" step in this > >> >>>> scenario. > >> >>>> > The way I understand "commit" operation is that it makes some > >> >>>> > data/artifacts visible to the external system, thus it should be > >> >>>> immutable > >> >>>> > from a point of view of a single process. Having an additional > step > >> >>>> in the > >> >>>> > same process that works on committed data contradicts with those > >> >>>> > assumptions. I might be missing something though. Could you > >> elaborate > >> >>>> why > >> >>>> > can't it be something like FileWriter -> FileMergeWriter -> > >> Committer > >> >>>> > (either global or non-global)? Again it might be just me not > >> getting > >> >>>> the > >> >>>> > example. > >> >>>> > > >> >>>> > I've just briefly skimmed over the proposed interfaces. I would > >> >>>> suggest one > >> >>>> > addition to the Writer interface (as I understand this is the > >> runtime > >> >>>> > interface in this proposal?): add some availability method, to > >> avoid, > >> >>>> if > >> >>>> > possible, blocking calls on the sink. We already have similar > >> >>>> > availability methods in the new sources [1] and in various places > >> in > >> >>>> the > >> >>>> > network stack [2]. > >> >>>> > > >> >>>> > BTW Let's not forget about Piotr's comment. I think we could add > >> the > >> >>>> > isAvailable or similar method to the Writer interface in the > FLIP. > >> >>>> > > >> >>>> > Best, > >> >>>> > > >> >>>> > Dawid > >> >>>> > On 15/09/2020 08:06, Guowei Ma wrote: > >> >>>> > > >> >>>> > I would think that we only need flush() and the semantics are > that > >> it > >> >>>> > prepares for a commit, so on a physical level it would be called > >> from > >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more > I > >> >>>> > think flush() should be renamed to something like > >> "prepareCommit()". > >> >>>> > > >> >>>> > Generally speaking it is a good point that emitting the > >> committables > >> >>>> > should happen before emitting the checkpoint barrier downstream. > >> >>>> > However, if I remember offline discussions well, the idea behind > >> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit > >> on > >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of > these > >> >>>> > methods could emit committables, but the flush should not leave > >> any in > >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in > >> >>>> > snapshotState it could leave some open files that would be > >> committed > >> >>>> in > >> >>>> > a subsequent cycle, however flush should close all files). The > >> >>>> > snapshotState as it is now can not be called in > >> >>>> > prepareSnapshotPreBarrier as it can store some state, which > should > >> >>>> > happen in Operator#snapshotState as otherwise it would always be > >> >>>> > synchronous. Therefore I think we would need sth like: > >> >>>> > > >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); > >> >>>> > > >> >>>> > ver 1: > >> >>>> > > >> >>>> > List<StateT> snapshotState(); > >> >>>> > > >> >>>> > ver 2: > >> >>>> > > >> >>>> > void snapshotState(); // not sure if we need that method at all > in > >> >>>> option > >> >>>> > > >> >>>> > 2 > >> >>>> > > >> >>>> > I second Dawid's proposal. This is a valid scenario. And version2 > >> >>>> does not > >> >>>> > need the snapshotState() any more. > >> >>>> > > >> >>>> > > >> >>>> > The Committer is as described in the FLIP, it's basically a > >> function > >> >>>> > "void commit(Committable)". The GobalCommitter would be a > function > >> >>>> "void > >> >>>> > commit(List<Committable>)". The former would be used by an S3 > sink > >> >>>> where > >> >>>> > we can individually commit files to S3, a committable would be > the > >> >>>> list > >> >>>> > of part uploads that will form the final file and the commit > >> operation > >> >>>> > creates the metadata in S3. The latter would be used by something > >> like > >> >>>> > Iceberg where the Committer needs a global view of all the > commits > >> to > >> >>>> be > >> >>>> > efficient and not overwhelm the system. > >> >>>> > > >> >>>> > I don't know yet if sinks would only implement on type of commit > >> >>>> > function or potentially both at the same time, and maybe Commit > can > >> >>>> > return some CommitResult that gets shipped to the GlobalCommit > >> >>>> function. > >> >>>> > I must admit it I did not get the need for Local/Normal + Global > >> >>>> > committer at first. The Iceberg example helped a lot. I think it > >> >>>> makes a > >> >>>> > lot of sense. > >> >>>> > > >> >>>> > @Dawid > >> >>>> > What I understand is that HiveSink's implementation might need > the > >> >>>> local > >> >>>> > committer(FileCommitter) because the file rename is needed. > >> >>>> > But the iceberg only needs to write the manifest file. Would you > >> >>>> like to > >> >>>> > enlighten me why the Iceberg needs the local committer? > >> >>>> > Thanks > >> >>>> > > >> >>>> > Best, > >> >>>> > Guowei > >> >>>> > > >> >>>> > > >> >>>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < > >> >>>> dwysakow...@apache.org> <dwysakow...@apache.org> > >> >>>> > wrote: > >> >>>> > > >> >>>> > > >> >>>> > Hi all, > >> >>>> > > >> >>>> > > >> >>>> > I would think that we only need flush() and the semantics are > that > >> it > >> >>>> > prepares for a commit, so on a physical level it would be called > >> from > >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more > I > >> >>>> > think flush() should be renamed to something like > >> "prepareCommit()". > >> >>>> > > >> >>>> > Generally speaking it is a good point that emitting the > >> committables > >> >>>> > should happen before emitting the checkpoint barrier downstream. > >> >>>> > However, if I remember offline discussions well, the idea behind > >> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit > >> on > >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of > these > >> >>>> > methods could emit committables, but the flush should not leave > >> any in > >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in > >> >>>> > snapshotState it could leave some open files that would be > >> committed > >> >>>> in > >> >>>> > a subsequent cycle, however flush should close all files). The > >> >>>> > snapshotState as it is now can not be called in > >> >>>> > prepareSnapshotPreBarrier as it can store some state, which > should > >> >>>> > happen in Operator#snapshotState as otherwise it would always be > >> >>>> > synchronous. Therefore I think we would need sth like: > >> >>>> > > >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); > >> >>>> > > >> >>>> > ver 1: > >> >>>> > > >> >>>> > List<StateT> snapshotState(); > >> >>>> > > >> >>>> > ver 2: > >> >>>> > > >> >>>> > void snapshotState(); // not sure if we need that method at all > in > >> >>>> option 2 > >> >>>> > > >> >>>> > > >> >>>> > The Committer is as described in the FLIP, it's basically a > >> function > >> >>>> > "void commit(Committable)". The GobalCommitter would be a > function > >> >>>> "void > >> >>>> > commit(List<Committable>)". The former would be used by an S3 > sink > >> >>>> where > >> >>>> > we can individually commit files to S3, a committable would be > the > >> >>>> list > >> >>>> > of part uploads that will form the final file and the commit > >> operation > >> >>>> > creates the metadata in S3. The latter would be used by something > >> like > >> >>>> > Iceberg where the Committer needs a global view of all the > commits > >> to > >> >>>> be > >> >>>> > efficient and not overwhelm the system. > >> >>>> > > >> >>>> > I don't know yet if sinks would only implement on type of commit > >> >>>> > function or potentially both at the same time, and maybe Commit > can > >> >>>> > return some CommitResult that gets shipped to the GlobalCommit > >> >>>> function. > >> >>>> > > >> >>>> > I must admit it I did not get the need for Local/Normal + Global > >> >>>> > committer at first. The Iceberg example helped a lot. I think it > >> >>>> makes a > >> >>>> > lot of sense. > >> >>>> > > >> >>>> > > >> >>>> > For Iceberg, writers don't need any state. But the > GlobalCommitter > >> >>>> > needs to > >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since > a > >> >>>> single > >> >>>> > committer can collect thousands (or more) data files in one > >> checkpoint > >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile" > >> (for > >> >>>> the > >> >>>> > collected thousands data files) as StateT. This allows us to > absorb > >> >>>> > extended commit outages without losing written/uploaded data > >> files, as > >> >>>> > operator state size is as small as one manifest file per > checkpoint > >> >>>> cycle > >> >>>> > [2]. > >> >>>> > ------------------ > >> >>>> > StateT snapshotState(SnapshotContext context) throws Exception; > >> >>>> > > >> >>>> > That means we also need the restoreCommitter API in the Sink > >> interface > >> >>>> > --------------- > >> >>>> > Committer<CommT, StateT> restoreCommitter(InitContext context, > >> StateT > >> >>>> > state); > >> >>>> > > >> >>>> > I think this might be a valid case. Not sure though if I would go > >> >>>> with a > >> >>>> > "state" there. Having a state in a committer would imply we need > a > >> >>>> > collect method as well. So far we needed a single method > >> commit(...) > >> >>>> and > >> >>>> > the bookkeeping of the committables could be handled by the > >> >>>> framework. I > >> >>>> > think something like an optional combiner in the GlobalCommitter > >> would > >> >>>> > be enough. What do you think? > >> >>>> > > >> >>>> > GlobalCommitter<CommT, GlobalCommT> { > >> >>>> > > >> >>>> > void commit(GlobalCommT globalCommittables); > >> >>>> > > >> >>>> > GlobalCommT combine(List<CommT> committables); > >> >>>> > > >> >>>> > } > >> >>>> > > >> >>>> > A different problem that I see here is how do we handle commit > >> >>>> failures. > >> >>>> > Should the committables (both normal and global be included in > the > >> >>>> next > >> >>>> > cycle, shall we retry it, ...) I think it would be worth laying > it > >> out > >> >>>> > in the FLIP. > >> >>>> > > >> >>>> > @Aljoscha I think you can find the code Steven was referring in > >> here: > >> >>>> > > >> >>>> > >> > https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java > >> >>>> > > >> >>>> > Best, > >> >>>> > > >> >>>> > Dawid > >> >>>> > > >> >>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote: > >> >>>> > > >> >>>> > On 14.09.20 01:23, Steven Wu wrote: > >> >>>> > > >> >>>> > ## Writer interface > >> >>>> > > >> >>>> > For the Writer interface, should we add "*prepareSnapshot"* > before > >> the > >> >>>> > checkpoint barrier emitted downstream? IcebergWriter would need > >> it. > >> >>>> Or > >> >>>> > would the framework call "*flush*" before the barrier emitted > >> >>>> > downstream? > >> >>>> > that guarantee would achieve the same goal. > >> >>>> > > >> >>>> > I would think that we only need flush() and the semantics are > that > >> it > >> >>>> > prepares for a commit, so on a physical level it would be called > >> from > >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more > I > >> >>>> > think flush() should be renamed to something like > >> "prepareCommit()". > >> >>>> > > >> >>>> > @Guowei, what do you think about this? > >> >>>> > > >> >>>> > > >> >>>> > In [1], we discussed the reason for Writer to emit (checkpointId, > >> >>>> CommT) > >> >>>> > tuple to the committer. The committer needs checkpointId to > >> separate > >> >>>> out > >> >>>> > data files for different checkpoints if concurrent checkpoints > are > >> >>>> > enabled. > >> >>>> > > >> >>>> > When can this happen? Even with concurrent checkpoints the > snapshot > >> >>>> > barriers would still cleanly segregate the input stream of an > >> operator > >> >>>> > into tranches that should manifest in only one checkpoint. With > >> >>>> > concurrent checkpoints, all that can happen is that we start a > >> >>>> > checkpoint before a last one is confirmed completed. > >> >>>> > > >> >>>> > Unless there is some weirdness in the sources and some sources > >> start > >> >>>> > chk1 first and some other ones start chk2 first? > >> >>>> > > >> >>>> > @Piotrek, do you think this is a problem? > >> >>>> > > >> >>>> > > >> >>>> > For the Committer interface, I am wondering if we should split > the > >> >>>> > single > >> >>>> > commit method into separate "*collect"* and "*commit"* methods? > >> This > >> >>>> > way, > >> >>>> > it can handle both single and multiple CommT objects. > >> >>>> > > >> >>>> > I think we can't do this. If the sink only needs a regular > >> Commiter, > >> >>>> > we can perform the commits in parallel, possibly on different > >> >>>> > machines. Only when the sink needs a GlobalCommitter would we > need > >> to > >> >>>> > ship all commits to a single process and perform the commit > there. > >> If > >> >>>> > both methods were unified in one interface we couldn't make the > >> >>>> > decision of were to commit in the framework code. > >> >>>> > > >> >>>> > > >> >>>> > For Iceberg, writers don't need any state. But the > GlobalCommitter > >> >>>> > needs to > >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since > a > >> >>>> > single > >> >>>> > committer can collect thousands (or more) data files in one > >> checkpoint > >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile" > >> (for > >> >>>> the > >> >>>> > collected thousands data files) as StateT. This allows us to > absorb > >> >>>> > extended commit outages without losing written/uploaded data > >> files, as > >> >>>> > operator state size is as small as one manifest file per > checkpoint > >> >>>> > cycle > >> >>>> > > >> >>>> > You could have a point here. Is the code for this available in > >> >>>> > open-source? I was checking out > >> >>>> > > >> >>>> > > >> >>>> > > >> >>>> > >> > https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java > >> >>>> > > >> >>>> > and didn't find the ManifestFile optimization there. > >> >>>> > > >> >>>> > Best, > >> >>>> > Aljoscha > >> >>>> > > >> >>>> > > >> >>>> > > >> >>>> > >> >>> > >> > > >