Guowei, thanks a lot for the summary. Here are a couple more questions that need more clarification for the GlobalCommitter case.
* framework provides some sort of unique id per GlobalCommT (e.g. nonce or some sort of transaction id) * commit failure handling. Should we roll over to the next cycle? if so, we may need commit(List<GlobalCommT> ) On Wed, Sep 16, 2020 at 2:11 AM Piotr Nowojski <piotr.nowoj...@gmail.com> wrote: > Hey > > Thanks Dawid for bringing up my suggestion :) > > > I'm not so sure about this, the sinks I'm aware of would not be able to > > implement this method: Kafka doesn't have this, I didn't see it in the > > Iceberg interfaces, and HDFS/S3 also don't have it. > > Aljoscha, as I wrote, FlinkKafkaProducer is actually one for which we > could do some magic. At the very least we could use > `FlinkKafkaProducer#pendingRecords` to make the sink unavailable when some > threshold is exceeded. Alternatively, maybe we could hook in to the > KafkaProducer's buffer state [1]: > > > The buffer.memory controls the total amount of memory available to the > producer for buffering. > > If records are sent faster than they can be transmitted to the server > then this buffer space will be exhausted. > > When the buffer space is exhausted additional send calls will block. > > As far as I can see, Kafka is exposing the `buffer-available-bytes` > metric, which we might use instead of `pendingRecords`. Heck, we are > already hacking KafkaProducer with reflections, we could access > `org.apache.kafka.clients.producer.KafkaProducer#accumulator` field to > call `accumulator.bufferPoolAvailableMemory()` method, if metric would be > to expensive to check per every record. > > Furthermore, I'm pretty sure other sinks (maybe not all) provide similar > features. If we are desperate, we could always contribute something to > those systems to make them expose the internal buffer's state. > > If we are really desperate, we could provide a generic records handover > wrapper sink, that would have a buffer of N (5? 10? ) records and would be > handing over those records to the blocking sink running in another thread. > If the buffer is full, the sink would be unavailable. > > Guowei > > Does the sink's snapshot return immediately when the sink's status is > unavailable? > > State snapshot call is generally speaking non blocking already, so it > should not be an issue. If it's blocking and if it will be solving some > problem, we could later decide in the runtime code to not execute snapshot > calls if a sink is unavailable. Think about isAvailable more like a hint > from the operator to the runtime, which we can use to make better > decisions. Also take a look at the FLIP-27 sources (`SourceReader`), where > there already is `isAvailable()` method. It would be best if new sinks > would just duplicate the same contract. > > > For me I want to know is what specific sink will benefit from this > feature > > It's not the sinks that would benefit from this, but other parts of the > system. Currently task thread is blocked on backpressured Sink, it's > blocking some things from happening (checkpointing, closing, ...). If we > make sinks non blocking (as is the network stack in the most part and as > are the FLIP-27 sources), we will be able to snapshot state of the operator > immediately. For example, change from blocking to non blocking sources was > speeding up unaligned checkpoints from ~30seconds down to ~5seconds in > our benchmarks, but the difference can be even more profound (hours instead > of seconds/minutes as reported by some users). > > Piotrek > > [1] > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > śr., 16 wrz 2020 o 06:29 Guowei Ma <guowei....@gmail.com> napisał(a): > >> Hi,all >> >> Thanks for all your valuable options and ideas.Currently there are many >> topics in the mail. I try to summarize what is consensus and what is not. >> Correct me if I am wrong. >> >> ## Consensus >> >> 1. The motivation of the unified sink API is to decouple the sink >> implementation from the different runtime execution mode. >> 2. The initial scope of the unified sink API only covers the file system >> type, which supports the real transactions. The FLIP focuses more on the >> semantics the new sink api should support. >> 3. We prefer the first alternative API, which could give the framework a >> greater opportunity to optimize. >> 4. The `Writer` needs to add a method `prepareCommit`, which would be >> called from `prepareSnapshotPreBarrier`. And remove the `Flush` method. >> 5. The FLIP could move the `Snapshot & Drain` section in order to be more >> focused. >> >> ## Not Consensus >> >> 1. What should the “Unified Sink API” support/cover? The API can >> “unified”(decoupe) the commit operation in the term of supporting exactly >> once semantics. However, even if we narrow down the initial supported >> system to the file system there would be different topology requirements. >> These requirements come from performance optimization >> (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is >> “finished”). Should the unified sink API support these requirements? >> 2. The API does not expose the checkpoint-id because the batch execution >> mode does not have the normal checkpoint. But there still some >> implementations depend on this.(IceBergSink uses this to do some dedupe). >> I think how to support this requirement depends on the first open >> question. >> 3. Whether the `Writer` supports async functionality or not. Currently I >> do >> not know which sink could benefit from it. Maybe it is just my own >> problem. >> >> Best, >> Guowei >> >> >> On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma <guowei....@gmail.com> wrote: >> >> > >> > Hi, Steven >> > Thanks you for your thoughtful ideas and concerns. >> > >> > >>I still like the concept of grouping data files per checkpoint for >> > streaming mode. it is cleaner and probably easier to manage and deal >> with >> > commit failures. Plus, it >>can reduce dupes for the at least once >> > >>mode. I understand checkpoint is not an option for batch execution. >> We >> > don't have to expose the checkpointId in API, as >>long as the internal >> > bookkeeping groups data files by checkpoints for streaming >>mode. >> > >> > I think this problem(How to dedupe the combined committed data) also >> > depends on where to place the agg/combine logic . >> > >> > 1. If the agg/combine takes place in the “commit” maybe we need to >> figure >> > out how to give the aggregated committable a unique and auto-increment >> id >> > in the committer. >> > 2. If the agg/combine takes place in a separate operator maybe sink >> > developer could maintain the id itself by using the state. >> > >> > I think this problem is also decided by what the topology pattern the >> sink >> > API should support. Actually there are already many other topology >> > requirements. :) >> > >> > Best, >> > Guowei >> > >> > >> > On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com> wrote: >> > >> >> > AFAIK the committer would not see the file-1-2 when ck1 happens in >> the >> >> ExactlyOnce mode. >> >> >> >> @Guowei Ma <guowei....@gmail.com> I think you are right for exactly >> once >> >> checkpoint semantics. what about "at least once"? I guess we can argue >> that >> >> it is fine to commit file-1-2 for at least once mode. >> >> >> >> I still like the concept of grouping data files per checkpoint for >> >> streaming mode. it is cleaner and probably easier to manage and deal >> with >> >> commit failures. Plus, it can reduce dupes for the at least once >> mode. I >> >> understand checkpoint is not an option for batch execution. We don't >> have >> >> to expose the checkpointId in API, as long as the internal bookkeeping >> >> groups data files by checkpoints for streaming mode. >> >> >> >> >> >> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com> >> wrote: >> >> >> >>> > images don't make it through to the mailing lists. You would need to >> >>> host the file somewhere and send a link. >> >>> >> >>> Sorry about that. Here is the sample DAG in google drawings. >> >>> >> >>> >> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing >> >>> >> >>> >> >>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com> >> wrote: >> >>> >> >>>> Hi, Dawid >> >>>> >> >>>> >>I still find the merging case the most confusing. I don't >> necessarily >> >>>> understand why do you need the "SingleFileCommit" step in this >> scenario. >> >>>> The way I >> >>>> >> understand "commit" operation is that it makes some data/artifacts >> >>>> visible to the external system, thus it should be immutable from a >> >>>> point of >> >>>> view of a single >>process. Having an additional step in the same >> >>>> process >> >>>> that works on committed data contradicts with those assumptions. I >> >>>> might be >> >>>> missing something though. >> Could you elaborate >why can't it be >> >>>> something >> >>>> like FileWriter -> FileMergeWriter -> Committer (either global or >> >>>> non-global)? Again it might be just me not getting the example. >> >>>> >> >>>> I think you are right. The topology >> >>>> "FileWriter->FileMergeWriter->Committer" could meet the merge >> >>>> requirement. >> >>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter -> >> >>>> GlobalCommitter" reuses some code of the StreamingFileSink(For >> example >> >>>> rolling policy) so it has the "SingleFileCommitter" in the topology. >> In >> >>>> general I want to use the case to show that there are different >> >>>> topologies >> >>>> according to the requirements. >> >>>> >> >>>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me that >> >>>> the >> >>>> actual topology of merged supported HiveSink is more complicated than >> >>>> that. >> >>>> >> >>>> >> >>>> >> I've just briefly skimmed over the proposed interfaces. I would >> >>>> suggest >> >>>> one >> >>>> >> addition to the Writer interface (as I understand this is the >> runtime >> >>>> >> interface in this proposal?): add some availability method, to >> >>>> avoid, if >> >>>> >> possible, blocking calls on the sink. We already have similar >> >>>> >> availability methods in the new sources [1] and in various places >> in >> >>>> the >> >>>> >> network stack [2]. >> >>>> >> BTW Let's not forget about Piotr's comment. I think we could add >> the >> >>>> isAvailable or similar method to the Writer interface in the FLIP. >> >>>> >> >>>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org> for your >> reminder. >> >>>> There >> >>>> are two many issues at the same time. >> >>>> >> >>>> In addition to what Ajjoscha said : there is very little system >> support >> >>>> it. Another thing I worry about is that: Does the sink's snapshot >> >>>> return >> >>>> immediately when the sink's status is unavailable? Maybe we could do >> it >> >>>> by >> >>>> dedupe some element in the state but I think it might be too >> >>>> complicated. >> >>>> For me I want to know is what specific sink will benefit from this >> >>>> feature. @piotr <pi...@ververica.com> Please correct me if I >> >>>> misunderstand you. thanks. >> >>>> >> >>>> Best, >> >>>> Guowei >> >>>> >> >>>> >> >>>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz < >> >>>> dwysakow...@apache.org> >> >>>> wrote: >> >>>> >> >>>> > What I understand is that HiveSink's implementation might need the >> >>>> local >> >>>> > committer(FileCommitter) because the file rename is needed. >> >>>> > But the iceberg only needs to write the manifest file. Would you >> >>>> like to >> >>>> > enlighten me why the Iceberg needs the local committer? >> >>>> > Thanks >> >>>> > >> >>>> > Sorry if I caused a confusion here. I am not saying the Iceberg >> sink >> >>>> needs >> >>>> > a local committer. What I had in mind is that prior to the Iceberg >> >>>> example >> >>>> > I did not see a need for a "GlobalCommitter" in the streaming >> case. I >> >>>> > thought it is always enough to have the "normal" committer in that >> >>>> case. >> >>>> > Now I understand that this differentiation is not really about >> logical >> >>>> > separation. It is not really about the granularity with which we >> >>>> commit, >> >>>> > i.e. answering the "WHAT" question. It is really about the >> >>>> performance and >> >>>> > that in the end we will have a single "transaction", so it is about >> >>>> > answering the question "HOW". >> >>>> > >> >>>> > >> >>>> > - >> >>>> > >> >>>> > Commit a directory with merged files(Some user want to merge the >> >>>> files >> >>>> > in a directory before committing the directory to Hive meta >> store) >> >>>> > >> >>>> > >> >>>> > 1. >> >>>> > >> >>>> > FileWriter -> SingleFileCommit -> FileMergeWriter -> >> >>>> GlobalCommitter >> >>>> > >> >>>> > I still find the merging case the most confusing. I don't >> necessarily >> >>>> > understand why do you need the "SingleFileCommit" step in this >> >>>> scenario. >> >>>> > The way I understand "commit" operation is that it makes some >> >>>> > data/artifacts visible to the external system, thus it should be >> >>>> immutable >> >>>> > from a point of view of a single process. Having an additional step >> >>>> in the >> >>>> > same process that works on committed data contradicts with those >> >>>> > assumptions. I might be missing something though. Could you >> elaborate >> >>>> why >> >>>> > can't it be something like FileWriter -> FileMergeWriter -> >> Committer >> >>>> > (either global or non-global)? Again it might be just me not >> getting >> >>>> the >> >>>> > example. >> >>>> > >> >>>> > I've just briefly skimmed over the proposed interfaces. I would >> >>>> suggest one >> >>>> > addition to the Writer interface (as I understand this is the >> runtime >> >>>> > interface in this proposal?): add some availability method, to >> avoid, >> >>>> if >> >>>> > possible, blocking calls on the sink. We already have similar >> >>>> > availability methods in the new sources [1] and in various places >> in >> >>>> the >> >>>> > network stack [2]. >> >>>> > >> >>>> > BTW Let's not forget about Piotr's comment. I think we could add >> the >> >>>> > isAvailable or similar method to the Writer interface in the FLIP. >> >>>> > >> >>>> > Best, >> >>>> > >> >>>> > Dawid >> >>>> > On 15/09/2020 08:06, Guowei Ma wrote: >> >>>> > >> >>>> > I would think that we only need flush() and the semantics are that >> it >> >>>> > prepares for a commit, so on a physical level it would be called >> from >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> >>>> > think flush() should be renamed to something like >> "prepareCommit()". >> >>>> > >> >>>> > Generally speaking it is a good point that emitting the >> committables >> >>>> > should happen before emitting the checkpoint barrier downstream. >> >>>> > However, if I remember offline discussions well, the idea behind >> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit >> on >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of these >> >>>> > methods could emit committables, but the flush should not leave >> any in >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in >> >>>> > snapshotState it could leave some open files that would be >> committed >> >>>> in >> >>>> > a subsequent cycle, however flush should close all files). The >> >>>> > snapshotState as it is now can not be called in >> >>>> > prepareSnapshotPreBarrier as it can store some state, which should >> >>>> > happen in Operator#snapshotState as otherwise it would always be >> >>>> > synchronous. Therefore I think we would need sth like: >> >>>> > >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >> >>>> > >> >>>> > ver 1: >> >>>> > >> >>>> > List<StateT> snapshotState(); >> >>>> > >> >>>> > ver 2: >> >>>> > >> >>>> > void snapshotState(); // not sure if we need that method at all in >> >>>> option >> >>>> > >> >>>> > 2 >> >>>> > >> >>>> > I second Dawid's proposal. This is a valid scenario. And version2 >> >>>> does not >> >>>> > need the snapshotState() any more. >> >>>> > >> >>>> > >> >>>> > The Committer is as described in the FLIP, it's basically a >> function >> >>>> > "void commit(Committable)". The GobalCommitter would be a function >> >>>> "void >> >>>> > commit(List<Committable>)". The former would be used by an S3 sink >> >>>> where >> >>>> > we can individually commit files to S3, a committable would be the >> >>>> list >> >>>> > of part uploads that will form the final file and the commit >> operation >> >>>> > creates the metadata in S3. The latter would be used by something >> like >> >>>> > Iceberg where the Committer needs a global view of all the commits >> to >> >>>> be >> >>>> > efficient and not overwhelm the system. >> >>>> > >> >>>> > I don't know yet if sinks would only implement on type of commit >> >>>> > function or potentially both at the same time, and maybe Commit can >> >>>> > return some CommitResult that gets shipped to the GlobalCommit >> >>>> function. >> >>>> > I must admit it I did not get the need for Local/Normal + Global >> >>>> > committer at first. The Iceberg example helped a lot. I think it >> >>>> makes a >> >>>> > lot of sense. >> >>>> > >> >>>> > @Dawid >> >>>> > What I understand is that HiveSink's implementation might need the >> >>>> local >> >>>> > committer(FileCommitter) because the file rename is needed. >> >>>> > But the iceberg only needs to write the manifest file. Would you >> >>>> like to >> >>>> > enlighten me why the Iceberg needs the local committer? >> >>>> > Thanks >> >>>> > >> >>>> > Best, >> >>>> > Guowei >> >>>> > >> >>>> > >> >>>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz < >> >>>> dwysakow...@apache.org> <dwysakow...@apache.org> >> >>>> > wrote: >> >>>> > >> >>>> > >> >>>> > Hi all, >> >>>> > >> >>>> > >> >>>> > I would think that we only need flush() and the semantics are that >> it >> >>>> > prepares for a commit, so on a physical level it would be called >> from >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> >>>> > think flush() should be renamed to something like >> "prepareCommit()". >> >>>> > >> >>>> > Generally speaking it is a good point that emitting the >> committables >> >>>> > should happen before emitting the checkpoint barrier downstream. >> >>>> > However, if I remember offline discussions well, the idea behind >> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit >> on >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of these >> >>>> > methods could emit committables, but the flush should not leave >> any in >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in >> >>>> > snapshotState it could leave some open files that would be >> committed >> >>>> in >> >>>> > a subsequent cycle, however flush should close all files). The >> >>>> > snapshotState as it is now can not be called in >> >>>> > prepareSnapshotPreBarrier as it can store some state, which should >> >>>> > happen in Operator#snapshotState as otherwise it would always be >> >>>> > synchronous. Therefore I think we would need sth like: >> >>>> > >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output); >> >>>> > >> >>>> > ver 1: >> >>>> > >> >>>> > List<StateT> snapshotState(); >> >>>> > >> >>>> > ver 2: >> >>>> > >> >>>> > void snapshotState(); // not sure if we need that method at all in >> >>>> option 2 >> >>>> > >> >>>> > >> >>>> > The Committer is as described in the FLIP, it's basically a >> function >> >>>> > "void commit(Committable)". The GobalCommitter would be a function >> >>>> "void >> >>>> > commit(List<Committable>)". The former would be used by an S3 sink >> >>>> where >> >>>> > we can individually commit files to S3, a committable would be the >> >>>> list >> >>>> > of part uploads that will form the final file and the commit >> operation >> >>>> > creates the metadata in S3. The latter would be used by something >> like >> >>>> > Iceberg where the Committer needs a global view of all the commits >> to >> >>>> be >> >>>> > efficient and not overwhelm the system. >> >>>> > >> >>>> > I don't know yet if sinks would only implement on type of commit >> >>>> > function or potentially both at the same time, and maybe Commit can >> >>>> > return some CommitResult that gets shipped to the GlobalCommit >> >>>> function. >> >>>> > >> >>>> > I must admit it I did not get the need for Local/Normal + Global >> >>>> > committer at first. The Iceberg example helped a lot. I think it >> >>>> makes a >> >>>> > lot of sense. >> >>>> > >> >>>> > >> >>>> > For Iceberg, writers don't need any state. But the GlobalCommitter >> >>>> > needs to >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a >> >>>> single >> >>>> > committer can collect thousands (or more) data files in one >> checkpoint >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile" >> (for >> >>>> the >> >>>> > collected thousands data files) as StateT. This allows us to absorb >> >>>> > extended commit outages without losing written/uploaded data >> files, as >> >>>> > operator state size is as small as one manifest file per checkpoint >> >>>> cycle >> >>>> > [2]. >> >>>> > ------------------ >> >>>> > StateT snapshotState(SnapshotContext context) throws Exception; >> >>>> > >> >>>> > That means we also need the restoreCommitter API in the Sink >> interface >> >>>> > --------------- >> >>>> > Committer<CommT, StateT> restoreCommitter(InitContext context, >> StateT >> >>>> > state); >> >>>> > >> >>>> > I think this might be a valid case. Not sure though if I would go >> >>>> with a >> >>>> > "state" there. Having a state in a committer would imply we need a >> >>>> > collect method as well. So far we needed a single method >> commit(...) >> >>>> and >> >>>> > the bookkeeping of the committables could be handled by the >> >>>> framework. I >> >>>> > think something like an optional combiner in the GlobalCommitter >> would >> >>>> > be enough. What do you think? >> >>>> > >> >>>> > GlobalCommitter<CommT, GlobalCommT> { >> >>>> > >> >>>> > void commit(GlobalCommT globalCommittables); >> >>>> > >> >>>> > GlobalCommT combine(List<CommT> committables); >> >>>> > >> >>>> > } >> >>>> > >> >>>> > A different problem that I see here is how do we handle commit >> >>>> failures. >> >>>> > Should the committables (both normal and global be included in the >> >>>> next >> >>>> > cycle, shall we retry it, ...) I think it would be worth laying it >> out >> >>>> > in the FLIP. >> >>>> > >> >>>> > @Aljoscha I think you can find the code Steven was referring in >> here: >> >>>> > >> >>>> >> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java >> >>>> > >> >>>> > Best, >> >>>> > >> >>>> > Dawid >> >>>> > >> >>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote: >> >>>> > >> >>>> > On 14.09.20 01:23, Steven Wu wrote: >> >>>> > >> >>>> > ## Writer interface >> >>>> > >> >>>> > For the Writer interface, should we add "*prepareSnapshot"* before >> the >> >>>> > checkpoint barrier emitted downstream? IcebergWriter would need >> it. >> >>>> Or >> >>>> > would the framework call "*flush*" before the barrier emitted >> >>>> > downstream? >> >>>> > that guarantee would achieve the same goal. >> >>>> > >> >>>> > I would think that we only need flush() and the semantics are that >> it >> >>>> > prepares for a commit, so on a physical level it would be called >> from >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I >> >>>> > think flush() should be renamed to something like >> "prepareCommit()". >> >>>> > >> >>>> > @Guowei, what do you think about this? >> >>>> > >> >>>> > >> >>>> > In [1], we discussed the reason for Writer to emit (checkpointId, >> >>>> CommT) >> >>>> > tuple to the committer. The committer needs checkpointId to >> separate >> >>>> out >> >>>> > data files for different checkpoints if concurrent checkpoints are >> >>>> > enabled. >> >>>> > >> >>>> > When can this happen? Even with concurrent checkpoints the snapshot >> >>>> > barriers would still cleanly segregate the input stream of an >> operator >> >>>> > into tranches that should manifest in only one checkpoint. With >> >>>> > concurrent checkpoints, all that can happen is that we start a >> >>>> > checkpoint before a last one is confirmed completed. >> >>>> > >> >>>> > Unless there is some weirdness in the sources and some sources >> start >> >>>> > chk1 first and some other ones start chk2 first? >> >>>> > >> >>>> > @Piotrek, do you think this is a problem? >> >>>> > >> >>>> > >> >>>> > For the Committer interface, I am wondering if we should split the >> >>>> > single >> >>>> > commit method into separate "*collect"* and "*commit"* methods? >> This >> >>>> > way, >> >>>> > it can handle both single and multiple CommT objects. >> >>>> > >> >>>> > I think we can't do this. If the sink only needs a regular >> Commiter, >> >>>> > we can perform the commits in parallel, possibly on different >> >>>> > machines. Only when the sink needs a GlobalCommitter would we need >> to >> >>>> > ship all commits to a single process and perform the commit there. >> If >> >>>> > both methods were unified in one interface we couldn't make the >> >>>> > decision of were to commit in the framework code. >> >>>> > >> >>>> > >> >>>> > For Iceberg, writers don't need any state. But the GlobalCommitter >> >>>> > needs to >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since a >> >>>> > single >> >>>> > committer can collect thousands (or more) data files in one >> checkpoint >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile" >> (for >> >>>> the >> >>>> > collected thousands data files) as StateT. This allows us to absorb >> >>>> > extended commit outages without losing written/uploaded data >> files, as >> >>>> > operator state size is as small as one manifest file per checkpoint >> >>>> > cycle >> >>>> > >> >>>> > You could have a point here. Is the code for this available in >> >>>> > open-source? I was checking out >> >>>> > >> >>>> > >> >>>> > >> >>>> >> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java >> >>>> > >> >>>> > and didn't find the ManifestFile optimization there. >> >>>> > >> >>>> > Best, >> >>>> > Aljoscha >> >>>> > >> >>>> > >> >>>> > >> >>>> >> >>> >> >