Hi, Steven

I am not particularly sure whether to provide id in GlobalCommit.

But my understanding is: if the committer function is idempotent, the
framework can guarantee exactly once semantics in batch/stream execution
mode. But I think maybe the idempotence should be guaranteed by the sink
developer, not on the basic API.

We could  provide an id in GlobalCommit. But the following question would
be that: do we need to provide an id for a normal committable? I would like
to say that I prefer to make the committer single responsibility.

I think maybe we could have an answer when the first nonconsensual question
is resolved.

Aboving is just my personal opinion. I think this is still an open question.

Thank you again for your valuable and thoughtful response.

Best,
Guowei


On Thu, Sep 17, 2020 at 10:53 AM Steven Wu <stevenz...@gmail.com> wrote:

> Guowei, thanks a lot for the summary. Here are a couple more questions that
> need more clarification for the GlobalCommitter case.
>
> * framework provides some sort of unique id per GlobalCommT (e.g. nonce or
> some sort of transaction id)
> * commit failure handling. Should we roll over to the next cycle? if so, we
> may need commit(List<GlobalCommT> )
>
> On Wed, Sep 16, 2020 at 2:11 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
> wrote:
>
> > Hey
> >
> > Thanks Dawid for bringing up my suggestion :)
> >
> > > I'm not so sure about this, the sinks I'm aware of would not be able to
> > > implement this method: Kafka doesn't have this, I didn't see it in the
> > > Iceberg interfaces, and HDFS/S3 also don't have it.
> >
> > Aljoscha, as I wrote, FlinkKafkaProducer is actually one for which we
> > could do some magic. At the very least we could use
> > `FlinkKafkaProducer#pendingRecords` to make the sink unavailable when
> some
> > threshold is exceeded. Alternatively, maybe we could hook in to the
> > KafkaProducer's buffer state [1]:
> >
> > > The buffer.memory controls the total amount of memory available to the
> > producer for buffering.
> > > If records are sent faster than they can be transmitted to the server
> > then this buffer space will be exhausted.
> > > When the buffer space is exhausted additional send calls will block.
> >
> > As far as I can see, Kafka is exposing the `buffer-available-bytes`
> > metric, which we might use instead of `pendingRecords`. Heck, we are
> > already hacking KafkaProducer with reflections, we could access
> > `org.apache.kafka.clients.producer.KafkaProducer#accumulator` field to
> > call  `accumulator.bufferPoolAvailableMemory()` method, if metric would
> be
> > to expensive to check per every record.
> >
> > Furthermore, I'm pretty sure other sinks (maybe not all) provide similar
> > features. If we are desperate, we could always contribute something to
> > those systems to make them expose the internal buffer's state.
> >
> > If we are really desperate, we could provide a generic records handover
> > wrapper sink, that would have a buffer of N (5? 10? ) records and would
> be
> > handing over those records to the blocking sink running in another
> thread.
> > If the buffer is full, the sink would be unavailable.
> >
> > Guowei
> > > Does the sink's snapshot return immediately when the sink's status is
> > unavailable?
> >
> > State snapshot call is generally speaking non blocking already, so it
> > should not be an issue. If it's blocking and if it will be solving some
> > problem, we could later decide in the runtime code to not execute
> snapshot
> > calls if a sink is unavailable. Think about isAvailable more like a hint
> > from the operator to the runtime, which we can use to make better
> > decisions. Also take a look at the FLIP-27 sources (`SourceReader`),
> where
> > there already is `isAvailable()` method. It would be best if new sinks
> > would just duplicate the same contract.
> >
> > > For me I want to know is what specific sink will benefit from this
> > feature
> >
> > It's not the sinks that would benefit from this, but other parts of the
> > system. Currently task thread is blocked on backpressured Sink, it's
> > blocking some things from happening (checkpointing, closing, ...). If we
> > make sinks non blocking (as is the network stack in the most part and as
> > are the FLIP-27 sources), we will be able to snapshot state of the
> operator
> > immediately. For example, change from blocking to non blocking sources
> was
> > speeding up unaligned checkpoints from ~30seconds down to ~5seconds in
> > our benchmarks, but the difference can be even more profound (hours
> instead
> > of seconds/minutes as reported by some users).
> >
> > Piotrek
> >
> > [1]
> >
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
> >
> > śr., 16 wrz 2020 o 06:29 Guowei Ma <guowei....@gmail.com> napisał(a):
> >
> >> Hi,all
> >>
> >> Thanks for all your valuable options and ideas.Currently there are many
> >> topics in the mail. I try to summarize what is consensus and what is
> not.
> >> Correct me if I am wrong.
> >>
> >> ## Consensus
> >>
> >> 1. The motivation of the unified sink API is to decouple the sink
> >> implementation from the different runtime execution mode.
> >> 2. The initial scope of the unified sink API only covers the file system
> >> type, which supports the real transactions. The FLIP focuses more on the
> >> semantics the new sink api should support.
> >> 3. We prefer the first alternative API, which could give the framework a
> >> greater opportunity to optimize.
> >> 4. The `Writer` needs to add a method `prepareCommit`, which would be
> >> called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> >> 5. The FLIP could move the `Snapshot & Drain` section in order to be
> more
> >> focused.
> >>
> >> ## Not Consensus
> >>
> >> 1. What should the “Unified Sink API” support/cover? The API can
> >> “unified”(decoupe) the commit operation in the term of supporting
> exactly
> >> once semantics. However, even if we narrow down the initial supported
> >> system to the file system there would be different topology
> requirements.
> >> These requirements come from performance optimization
> >> (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
> >> “finished”).  Should the unified sink API support these requirements?
> >> 2. The API does not expose the checkpoint-id because the batch execution
> >> mode does not have the normal checkpoint. But there still some
> >> implementations depend on this.(IceBergSink uses this to do some
> dedupe).
> >> I think how to support this requirement depends on the first open
> >> question.
> >> 3. Whether the `Writer` supports async functionality or not. Currently I
> >> do
> >> not know which sink could benefit from it. Maybe it is just my own
> >> problem.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma <guowei....@gmail.com>
> wrote:
> >>
> >> >
> >> > Hi, Steven
> >> > Thanks you for your thoughtful ideas and concerns.
> >> >
> >> > >>I still like the concept of grouping data files per checkpoint for
> >> > streaming mode. it is cleaner and probably easier to manage and deal
> >> with
> >> > commit failures. Plus, it >>can reduce dupes for the at least once
> >> > >>mode.  I understand checkpoint is not an option for batch execution.
> >> We
> >> > don't have to expose the checkpointId in API, as >>long as  the
> internal
> >> > bookkeeping groups data files by checkpoints for streaming >>mode.
> >> >
> >> > I think this problem(How to dedupe the combined committed data) also
> >> > depends on where to place the agg/combine logic .
> >> >
> >> > 1. If the agg/combine takes place in the “commit” maybe we need to
> >> figure
> >> > out how to give the aggregated committable a unique and auto-increment
> >> id
> >> > in the committer.
> >> > 2. If the agg/combine takes place in a separate operator maybe sink
> >> > developer could maintain the id itself by using the state.
> >> >
> >> > I think this problem is also decided by what the topology pattern the
> >> sink
> >> > API should support. Actually there are already many other topology
> >> > requirements. :)
> >> >
> >> > Best,
> >> > Guowei
> >> >
> >> >
> >> > On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com>
> wrote:
> >> >
> >> >> > AFAIK the committer would not see the file-1-2 when ck1 happens in
> >> the
> >> >> ExactlyOnce mode.
> >> >>
> >> >> @Guowei Ma <guowei....@gmail.com> I think you are right for exactly
> >> once
> >> >> checkpoint semantics. what about "at least once"? I guess we can
> argue
> >> that
> >> >> it is fine to commit file-1-2 for at least once mode.
> >> >>
> >> >> I still like the concept of grouping data files per checkpoint for
> >> >> streaming mode. it is cleaner and probably easier to manage and deal
> >> with
> >> >> commit failures. Plus, it can reduce dupes for the at least once
> >> mode.  I
> >> >> understand checkpoint is not an option for batch execution. We don't
> >> have
> >> >> to expose the checkpointId in API, as long as  the internal
> bookkeeping
> >> >> groups data files by checkpoints for streaming mode.
> >> >>
> >> >>
> >> >> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com>
> >> wrote:
> >> >>
> >> >>> > images don't make it through to the mailing lists. You would need
> to
> >> >>> host the file somewhere and send a link.
> >> >>>
> >> >>> Sorry about that. Here is the sample DAG in google drawings.
> >> >>>
> >> >>>
> >>
> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
> >> >>>
> >> >>>
> >> >>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com>
> >> wrote:
> >> >>>
> >> >>>> Hi, Dawid
> >> >>>>
> >> >>>> >>I still find the merging case the most confusing. I don't
> >> necessarily
> >> >>>> understand why do you need the "SingleFileCommit" step in this
> >> scenario.
> >> >>>> The way I
> >> >>>> >> understand "commit" operation is that it makes some
> data/artifacts
> >> >>>> visible to the external system, thus it should be immutable from a
> >> >>>> point of
> >> >>>> view of a single >>process. Having an additional step in the same
> >> >>>> process
> >> >>>> that works on committed data contradicts with those assumptions. I
> >> >>>> might be
> >> >>>> missing something though. >> Could you elaborate >why can't it be
> >> >>>> something
> >> >>>> like FileWriter -> FileMergeWriter -> Committer (either global or
> >> >>>> non-global)? Again it might be just me not getting the example.
> >> >>>>
> >> >>>> I think you are right. The topology
> >> >>>> "FileWriter->FileMergeWriter->Committer" could meet the merge
> >> >>>> requirement.
> >> >>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter
> ->
> >> >>>> GlobalCommitter" reuses some code of the StreamingFileSink(For
> >> example
> >> >>>> rolling policy) so it has the "SingleFileCommitter" in the
> topology.
> >> In
> >> >>>> general I want to use the case to show that there are different
> >> >>>> topologies
> >> >>>> according to the requirements.
> >> >>>>
> >> >>>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me
> that
> >> >>>> the
> >> >>>> actual topology of merged supported HiveSink is more complicated
> than
> >> >>>> that.
> >> >>>>
> >> >>>>
> >> >>>> >> I've just briefly skimmed over the proposed interfaces. I would
> >> >>>> suggest
> >> >>>> one
> >> >>>> >> addition to the Writer interface (as I understand this is the
> >> runtime
> >> >>>> >> interface in this proposal?): add some availability method, to
> >> >>>> avoid, if
> >> >>>> >> possible, blocking calls on the sink. We already have similar
> >> >>>> >> availability methods in the new sources [1] and in various
> places
> >> in
> >> >>>> the
> >> >>>> >> network stack [2].
> >> >>>> >> BTW Let's not forget about Piotr's comment. I think we could add
> >> the
> >> >>>> isAvailable or similar method to the Writer interface in the FLIP.
> >> >>>>
> >> >>>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org>  for your
> >> reminder.
> >> >>>> There
> >> >>>> are two many issues at the same time.
> >> >>>>
> >> >>>> In addition to what Ajjoscha said : there is very little system
> >> support
> >> >>>> it.   Another thing I worry about is that: Does the sink's snapshot
> >> >>>> return
> >> >>>> immediately when the sink's status is unavailable? Maybe we could
> do
> >> it
> >> >>>> by
> >> >>>> dedupe some element in the state but I think it might be too
> >> >>>> complicated.
> >> >>>> For me I want to know is what specific sink will benefit from this
> >> >>>> feature.  @piotr <pi...@ververica.com>  Please correct me if  I
> >> >>>> misunderstand you. thanks.
> >> >>>>
> >> >>>> Best,
> >> >>>> Guowei
> >> >>>>
> >> >>>>
> >> >>>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <
> >> >>>> dwysakow...@apache.org>
> >> >>>> wrote:
> >> >>>>
> >> >>>> > What I understand is that HiveSink's implementation might need
> the
> >> >>>> local
> >> >>>> > committer(FileCommitter) because the file rename is needed.
> >> >>>> > But the iceberg only needs to write the manifest file.  Would you
> >> >>>> like to
> >> >>>> > enlighten me why the Iceberg needs the local committer?
> >> >>>> > Thanks
> >> >>>> >
> >> >>>> > Sorry if I caused a confusion here. I am not saying the Iceberg
> >> sink
> >> >>>> needs
> >> >>>> > a local committer. What I had in mind is that prior to the
> Iceberg
> >> >>>> example
> >> >>>> > I did not see a need for a "GlobalCommitter" in the streaming
> >> case. I
> >> >>>> > thought it is always enough to have the "normal" committer in
> that
> >> >>>> case.
> >> >>>> > Now I understand that this differentiation is not really about
> >> logical
> >> >>>> > separation. It is not really about the granularity with which we
> >> >>>> commit,
> >> >>>> > i.e. answering the "WHAT" question. It is really about the
> >> >>>> performance and
> >> >>>> > that in the end we will have a single "transaction", so it is
> about
> >> >>>> > answering the question "HOW".
> >> >>>> >
> >> >>>> >
> >> >>>> >    -
> >> >>>> >
> >> >>>> >    Commit a directory with merged files(Some user want to merge
> the
> >> >>>> files
> >> >>>> >    in a directory before committing the directory to Hive meta
> >> store)
> >> >>>> >
> >> >>>> >
> >> >>>> >    1.
> >> >>>> >
> >> >>>> >    FileWriter -> SingleFileCommit -> FileMergeWriter  ->
> >> >>>> GlobalCommitter
> >> >>>> >
> >> >>>> > I still find the merging case the most confusing. I don't
> >> necessarily
> >> >>>> > understand why do you need the "SingleFileCommit" step in this
> >> >>>> scenario.
> >> >>>> > The way I understand "commit" operation is that it makes some
> >> >>>> > data/artifacts visible to the external system, thus it should be
> >> >>>> immutable
> >> >>>> > from a point of view of a single process. Having an additional
> step
> >> >>>> in the
> >> >>>> > same process that works on committed data contradicts with those
> >> >>>> > assumptions. I might be missing something though. Could you
> >> elaborate
> >> >>>> why
> >> >>>> > can't it be something like FileWriter -> FileMergeWriter ->
> >> Committer
> >> >>>> > (either global or non-global)? Again it might be just me not
> >> getting
> >> >>>> the
> >> >>>> > example.
> >> >>>> >
> >> >>>> > I've just briefly skimmed over the proposed interfaces. I would
> >> >>>> suggest one
> >> >>>> > addition to the Writer interface (as I understand this is the
> >> runtime
> >> >>>> > interface in this proposal?): add some availability method, to
> >> avoid,
> >> >>>> if
> >> >>>> > possible, blocking calls on the sink. We already have similar
> >> >>>> > availability methods in the new sources [1] and in various places
> >> in
> >> >>>> the
> >> >>>> > network stack [2].
> >> >>>> >
> >> >>>> > BTW Let's not forget about Piotr's comment. I think we could add
> >> the
> >> >>>> > isAvailable or similar method to the Writer interface in the
> FLIP.
> >> >>>> >
> >> >>>> > Best,
> >> >>>> >
> >> >>>> > Dawid
> >> >>>> > On 15/09/2020 08:06, Guowei Ma wrote:
> >> >>>> >
> >> >>>> > I would think that we only need flush() and the semantics are
> that
> >> it
> >> >>>> > prepares for a commit, so on a physical level it would be called
> >> from
> >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more
> I
> >> >>>> > think flush() should be renamed to something like
> >> "prepareCommit()".
> >> >>>> >
> >> >>>> > Generally speaking it is a good point that emitting the
> >> committables
> >> >>>> > should happen before emitting the checkpoint barrier downstream.
> >> >>>> > However, if I remember offline discussions well, the idea behind
> >> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit
> >> on
> >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of
> these
> >> >>>> > methods could emit committables, but the flush should not leave
> >> any in
> >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in
> >> >>>> > snapshotState it could leave some open files that would be
> >> committed
> >> >>>> in
> >> >>>> > a subsequent cycle, however flush should close all files). The
> >> >>>> > snapshotState as it is now can not be called in
> >> >>>> > prepareSnapshotPreBarrier as it can store some state, which
> should
> >> >>>> > happen in Operator#snapshotState as otherwise it would always be
> >> >>>> > synchronous. Therefore I think we would need sth like:
> >> >>>> >
> >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
> >> >>>> >
> >> >>>> > ver 1:
> >> >>>> >
> >> >>>> > List<StateT> snapshotState();
> >> >>>> >
> >> >>>> > ver 2:
> >> >>>> >
> >> >>>> > void snapshotState(); // not sure if we need that method at all
> in
> >> >>>> option
> >> >>>> >
> >> >>>> > 2
> >> >>>> >
> >> >>>> > I second Dawid's proposal. This is a valid scenario. And version2
> >> >>>> does not
> >> >>>> > need the snapshotState() any more.
> >> >>>> >
> >> >>>> >
> >> >>>> > The Committer is as described in the FLIP, it's basically a
> >> function
> >> >>>> > "void commit(Committable)". The GobalCommitter would be a
> function
> >> >>>> "void
> >> >>>> > commit(List<Committable>)". The former would be used by an S3
> sink
> >> >>>> where
> >> >>>> > we can individually commit files to S3, a committable would be
> the
> >> >>>> list
> >> >>>> > of part uploads that will form the final file and the commit
> >> operation
> >> >>>> > creates the metadata in S3. The latter would be used by something
> >> like
> >> >>>> > Iceberg where the Committer needs a global view of all the
> commits
> >> to
> >> >>>> be
> >> >>>> > efficient and not overwhelm the system.
> >> >>>> >
> >> >>>> > I don't know yet if sinks would only implement on type of commit
> >> >>>> > function or potentially both at the same time, and maybe Commit
> can
> >> >>>> > return some CommitResult that gets shipped to the GlobalCommit
> >> >>>> function.
> >> >>>> > I must admit it I did not get the need for Local/Normal + Global
> >> >>>> > committer at first. The Iceberg example helped a lot. I think it
> >> >>>> makes a
> >> >>>> > lot of sense.
> >> >>>> >
> >> >>>> > @Dawid
> >> >>>> > What I understand is that HiveSink's implementation might need
> the
> >> >>>> local
> >> >>>> > committer(FileCommitter) because the file rename is needed.
> >> >>>> > But the iceberg only needs to write the manifest file.  Would you
> >> >>>> like to
> >> >>>> > enlighten me why the Iceberg needs the local committer?
> >> >>>> > Thanks
> >> >>>> >
> >> >>>> > Best,
> >> >>>> > Guowei
> >> >>>> >
> >> >>>> >
> >> >>>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
> >> >>>> dwysakow...@apache.org> <dwysakow...@apache.org>
> >> >>>> > wrote:
> >> >>>> >
> >> >>>> >
> >> >>>> > Hi all,
> >> >>>> >
> >> >>>> >
> >> >>>> > I would think that we only need flush() and the semantics are
> that
> >> it
> >> >>>> > prepares for a commit, so on a physical level it would be called
> >> from
> >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more
> I
> >> >>>> > think flush() should be renamed to something like
> >> "prepareCommit()".
> >> >>>> >
> >> >>>> > Generally speaking it is a good point that emitting the
> >> committables
> >> >>>> > should happen before emitting the checkpoint barrier downstream.
> >> >>>> > However, if I remember offline discussions well, the idea behind
> >> >>>> > Writer#flush and Writer#snapshotState was to differentiate commit
> >> on
> >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of
> these
> >> >>>> > methods could emit committables, but the flush should not leave
> >> any in
> >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in
> >> >>>> > snapshotState it could leave some open files that would be
> >> committed
> >> >>>> in
> >> >>>> > a subsequent cycle, however flush should close all files). The
> >> >>>> > snapshotState as it is now can not be called in
> >> >>>> > prepareSnapshotPreBarrier as it can store some state, which
> should
> >> >>>> > happen in Operator#snapshotState as otherwise it would always be
> >> >>>> > synchronous. Therefore I think we would need sth like:
> >> >>>> >
> >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
> >> >>>> >
> >> >>>> > ver 1:
> >> >>>> >
> >> >>>> > List<StateT> snapshotState();
> >> >>>> >
> >> >>>> > ver 2:
> >> >>>> >
> >> >>>> > void snapshotState(); // not sure if we need that method at all
> in
> >> >>>> option 2
> >> >>>> >
> >> >>>> >
> >> >>>> > The Committer is as described in the FLIP, it's basically a
> >> function
> >> >>>> > "void commit(Committable)". The GobalCommitter would be a
> function
> >> >>>> "void
> >> >>>> > commit(List<Committable>)". The former would be used by an S3
> sink
> >> >>>> where
> >> >>>> > we can individually commit files to S3, a committable would be
> the
> >> >>>> list
> >> >>>> > of part uploads that will form the final file and the commit
> >> operation
> >> >>>> > creates the metadata in S3. The latter would be used by something
> >> like
> >> >>>> > Iceberg where the Committer needs a global view of all the
> commits
> >> to
> >> >>>> be
> >> >>>> > efficient and not overwhelm the system.
> >> >>>> >
> >> >>>> > I don't know yet if sinks would only implement on type of commit
> >> >>>> > function or potentially both at the same time, and maybe Commit
> can
> >> >>>> > return some CommitResult that gets shipped to the GlobalCommit
> >> >>>> function.
> >> >>>> >
> >> >>>> > I must admit it I did not get the need for Local/Normal + Global
> >> >>>> > committer at first. The Iceberg example helped a lot. I think it
> >> >>>> makes a
> >> >>>> > lot of sense.
> >> >>>> >
> >> >>>> >
> >> >>>> > For Iceberg, writers don't need any state. But the
> GlobalCommitter
> >> >>>> > needs to
> >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since
> a
> >> >>>> single
> >> >>>> > committer can collect thousands (or more) data files in one
> >> checkpoint
> >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile"
> >> (for
> >> >>>> the
> >> >>>> > collected thousands data files) as StateT. This allows us to
> absorb
> >> >>>> > extended commit outages without losing written/uploaded data
> >> files, as
> >> >>>> > operator state size is as small as one manifest file per
> checkpoint
> >> >>>> cycle
> >> >>>> > [2].
> >> >>>> > ------------------
> >> >>>> > StateT snapshotState(SnapshotContext context) throws Exception;
> >> >>>> >
> >> >>>> > That means we also need the restoreCommitter API in the Sink
> >> interface
> >> >>>> > ---------------
> >> >>>> > Committer<CommT, StateT> restoreCommitter(InitContext context,
> >> StateT
> >> >>>> > state);
> >> >>>> >
> >> >>>> > I think this might be a valid case. Not sure though if I would go
> >> >>>> with a
> >> >>>> > "state" there. Having a state in a committer would imply we need
> a
> >> >>>> > collect method as well. So far we needed a single method
> >> commit(...)
> >> >>>> and
> >> >>>> > the bookkeeping of the committables could be handled by the
> >> >>>> framework. I
> >> >>>> > think something like an optional combiner in the GlobalCommitter
> >> would
> >> >>>> > be enough. What do you think?
> >> >>>> >
> >> >>>> > GlobalCommitter<CommT, GlobalCommT> {
> >> >>>> >
> >> >>>> >     void commit(GlobalCommT globalCommittables);
> >> >>>> >
> >> >>>> >     GlobalCommT combine(List<CommT> committables);
> >> >>>> >
> >> >>>> > }
> >> >>>> >
> >> >>>> > A different problem that I see here is how do we handle commit
> >> >>>> failures.
> >> >>>> > Should the committables (both normal and global be included in
> the
> >> >>>> next
> >> >>>> > cycle, shall we retry it, ...) I think it would be worth laying
> it
> >> out
> >> >>>> > in the FLIP.
> >> >>>> >
> >> >>>> > @Aljoscha I think you can find the code Steven was referring in
> >> here:
> >> >>>> >
> >> >>>>
> >>
> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
> >> >>>> >
> >> >>>> > Best,
> >> >>>> >
> >> >>>> > Dawid
> >> >>>> >
> >> >>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
> >> >>>> >
> >> >>>> > On 14.09.20 01:23, Steven Wu wrote:
> >> >>>> >
> >> >>>> > ## Writer interface
> >> >>>> >
> >> >>>> > For the Writer interface, should we add "*prepareSnapshot"*
> before
> >> the
> >> >>>> > checkpoint barrier emitted downstream?  IcebergWriter would need
> >> it.
> >> >>>> Or
> >> >>>> > would the framework call "*flush*" before the barrier emitted
> >> >>>> > downstream?
> >> >>>> > that guarantee would achieve the same goal.
> >> >>>> >
> >> >>>> > I would think that we only need flush() and the semantics are
> that
> >> it
> >> >>>> > prepares for a commit, so on a physical level it would be called
> >> from
> >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more
> I
> >> >>>> > think flush() should be renamed to something like
> >> "prepareCommit()".
> >> >>>> >
> >> >>>> > @Guowei, what do you think about this?
> >> >>>> >
> >> >>>> >
> >> >>>> > In [1], we discussed the reason for Writer to emit (checkpointId,
> >> >>>> CommT)
> >> >>>> > tuple to the committer. The committer needs checkpointId to
> >> separate
> >> >>>> out
> >> >>>> > data files for different checkpoints if concurrent checkpoints
> are
> >> >>>> > enabled.
> >> >>>> >
> >> >>>> > When can this happen? Even with concurrent checkpoints the
> snapshot
> >> >>>> > barriers would still cleanly segregate the input stream of an
> >> operator
> >> >>>> > into tranches that should manifest in only one checkpoint. With
> >> >>>> > concurrent checkpoints, all that can happen is that we start a
> >> >>>> > checkpoint before a last one is confirmed completed.
> >> >>>> >
> >> >>>> > Unless there is some weirdness in the sources and some sources
> >> start
> >> >>>> > chk1 first and some other ones start chk2 first?
> >> >>>> >
> >> >>>> > @Piotrek, do you think this is a problem?
> >> >>>> >
> >> >>>> >
> >> >>>> > For the Committer interface, I am wondering if we should split
> the
> >> >>>> > single
> >> >>>> > commit method into separate "*collect"* and "*commit"* methods?
> >> This
> >> >>>> > way,
> >> >>>> > it can handle both single and multiple CommT objects.
> >> >>>> >
> >> >>>> > I think we can't do this. If the sink only needs a regular
> >> Commiter,
> >> >>>> > we can perform the commits in parallel, possibly on different
> >> >>>> > machines. Only when the sink needs a GlobalCommitter would we
> need
> >> to
> >> >>>> > ship all commits to a single process and perform the commit
> there.
> >> If
> >> >>>> > both methods were unified in one interface we couldn't make the
> >> >>>> > decision of were to commit in the framework code.
> >> >>>> >
> >> >>>> >
> >> >>>> > For Iceberg, writers don't need any state. But the
> GlobalCommitter
> >> >>>> > needs to
> >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile". Since
> a
> >> >>>> > single
> >> >>>> > committer can collect thousands (or more) data files in one
> >> checkpoint
> >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile"
> >> (for
> >> >>>> the
> >> >>>> > collected thousands data files) as StateT. This allows us to
> absorb
> >> >>>> > extended commit outages without losing written/uploaded data
> >> files, as
> >> >>>> > operator state size is as small as one manifest file per
> checkpoint
> >> >>>> > cycle
> >> >>>> >
> >> >>>> > You could have a point here. Is the code for this available in
> >> >>>> > open-source? I was checking out
> >> >>>> >
> >> >>>> >
> >> >>>> >
> >> >>>>
> >>
> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
> >> >>>> >
> >> >>>> > and didn't find the ManifestFile optimization there.
> >> >>>> >
> >> >>>> > Best,
> >> >>>> > Aljoscha
> >> >>>> >
> >> >>>> >
> >> >>>> >
> >> >>>>
> >> >>>
> >>
> >
>

Reply via email to