Hi Guowei,

Thanks for the explanation. Now I get your point.

Basically any action that would make sink unavailable, would also cause it
to block on snapshotting the state (in option 1. with flushing). I agree
that lack of availability is much less of an issue than I have thought
before.

For option 2., the difference would be much larger, in cases when the sink
is heavily back pressured and writing buffered records on to the state is
must faster compared to flushing them. Without availability, we would need
to wait for at least a single record to be written (which can be a long
time), and if sink is batching writes, it could block for the whole batch.
While with the availability, we could snapshot the state immediately.

Currently we don't have option 2. sinks, right? But isn't option 2. the
write ahead log approach?

There are some other minor benefits for availability:
a) metrics/backpressure detection
b) the more non blocking API, the more future proof it is. For example it
could allow us to scale up the number of tasks that we can execute on a
single TM (having a pool of worker threads instead of one thread per task)
c) we could move more critical things into the task thread and extend
single thread/mailbox model

For c), we would probably want to have asynchronous flushing (snapshotting).

I wonder if non blocking WAL sinks and metrics/backpressure detection are
good enough justifications for the availability?


But the whole discussion made me realise that it might be nice to have some
unified way to configure the amount of buffered data for the new sinks.
Either passing via some parameter, or via some config (`ExecutionConfig`?)
an optional request:
"hey sink, user configured that he wants to have at most 100KB of buffered
data. If you can, please respect this configuration".

What do you think?

Piotrek

czw., 17 wrz 2020 o 06:28 Guowei Ma <guowei....@gmail.com> napisał(a):

> Hi, Steven
>
> I am not particularly sure whether to provide id in GlobalCommit.
>
> But my understanding is: if the committer function is idempotent, the
> framework can guarantee exactly once semantics in batch/stream execution
> mode. But I think maybe the idempotence should be guaranteed by the sink
> developer, not on the basic API.
>
> We could  provide an id in GlobalCommit. But the following question would
> be that: do we need to provide an id for a normal committable? I would like
> to say that I prefer to make the committer single responsibility.
>
> I think maybe we could have an answer when the first nonconsensual
> question is resolved.
>
> Aboving is just my personal opinion. I think this is still an open
> question.
>
> Thank you again for your valuable and thoughtful response.
>
> Best,
> Guowei
>
>
> On Thu, Sep 17, 2020 at 10:53 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>> Guowei, thanks a lot for the summary. Here are a couple more questions
>> that
>> need more clarification for the GlobalCommitter case.
>>
>> * framework provides some sort of unique id per GlobalCommT (e.g. nonce or
>> some sort of transaction id)
>> * commit failure handling. Should we roll over to the next cycle? if so,
>> we
>> may need commit(List<GlobalCommT> )
>>
>> On Wed, Sep 16, 2020 at 2:11 AM Piotr Nowojski <piotr.nowoj...@gmail.com>
>> wrote:
>>
>> > Hey
>> >
>> > Thanks Dawid for bringing up my suggestion :)
>> >
>> > > I'm not so sure about this, the sinks I'm aware of would not be able
>> to
>> > > implement this method: Kafka doesn't have this, I didn't see it in the
>> > > Iceberg interfaces, and HDFS/S3 also don't have it.
>> >
>> > Aljoscha, as I wrote, FlinkKafkaProducer is actually one for which we
>> > could do some magic. At the very least we could use
>> > `FlinkKafkaProducer#pendingRecords` to make the sink unavailable when
>> some
>> > threshold is exceeded. Alternatively, maybe we could hook in to the
>> > KafkaProducer's buffer state [1]:
>> >
>> > > The buffer.memory controls the total amount of memory available to the
>> > producer for buffering.
>> > > If records are sent faster than they can be transmitted to the server
>> > then this buffer space will be exhausted.
>> > > When the buffer space is exhausted additional send calls will block.
>> >
>> > As far as I can see, Kafka is exposing the `buffer-available-bytes`
>> > metric, which we might use instead of `pendingRecords`. Heck, we are
>> > already hacking KafkaProducer with reflections, we could access
>> > `org.apache.kafka.clients.producer.KafkaProducer#accumulator` field to
>> > call  `accumulator.bufferPoolAvailableMemory()` method, if metric would
>> be
>> > to expensive to check per every record.
>> >
>> > Furthermore, I'm pretty sure other sinks (maybe not all) provide similar
>> > features. If we are desperate, we could always contribute something to
>> > those systems to make them expose the internal buffer's state.
>> >
>> > If we are really desperate, we could provide a generic records handover
>> > wrapper sink, that would have a buffer of N (5? 10? ) records and would
>> be
>> > handing over those records to the blocking sink running in another
>> thread.
>> > If the buffer is full, the sink would be unavailable.
>> >
>> > Guowei
>> > > Does the sink's snapshot return immediately when the sink's status is
>> > unavailable?
>> >
>> > State snapshot call is generally speaking non blocking already, so it
>> > should not be an issue. If it's blocking and if it will be solving some
>> > problem, we could later decide in the runtime code to not execute
>> snapshot
>> > calls if a sink is unavailable. Think about isAvailable more like a hint
>> > from the operator to the runtime, which we can use to make better
>> > decisions. Also take a look at the FLIP-27 sources (`SourceReader`),
>> where
>> > there already is `isAvailable()` method. It would be best if new sinks
>> > would just duplicate the same contract.
>> >
>> > > For me I want to know is what specific sink will benefit from this
>> > feature
>> >
>> > It's not the sinks that would benefit from this, but other parts of the
>> > system. Currently task thread is blocked on backpressured Sink, it's
>> > blocking some things from happening (checkpointing, closing, ...). If we
>> > make sinks non blocking (as is the network stack in the most part and as
>> > are the FLIP-27 sources), we will be able to snapshot state of the
>> operator
>> > immediately. For example, change from blocking to non blocking sources
>> was
>> > speeding up unaligned checkpoints from ~30seconds down to ~5seconds in
>> > our benchmarks, but the difference can be even more profound (hours
>> instead
>> > of seconds/minutes as reported by some users).
>> >
>> > Piotrek
>> >
>> > [1]
>> >
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>> >
>> > śr., 16 wrz 2020 o 06:29 Guowei Ma <guowei....@gmail.com> napisał(a):
>> >
>> >> Hi,all
>> >>
>> >> Thanks for all your valuable options and ideas.Currently there are many
>> >> topics in the mail. I try to summarize what is consensus and what is
>> not.
>> >> Correct me if I am wrong.
>> >>
>> >> ## Consensus
>> >>
>> >> 1. The motivation of the unified sink API is to decouple the sink
>> >> implementation from the different runtime execution mode.
>> >> 2. The initial scope of the unified sink API only covers the file
>> system
>> >> type, which supports the real transactions. The FLIP focuses more on
>> the
>> >> semantics the new sink api should support.
>> >> 3. We prefer the first alternative API, which could give the framework
>> a
>> >> greater opportunity to optimize.
>> >> 4. The `Writer` needs to add a method `prepareCommit`, which would be
>> >> called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
>> >> 5. The FLIP could move the `Snapshot & Drain` section in order to be
>> more
>> >> focused.
>> >>
>> >> ## Not Consensus
>> >>
>> >> 1. What should the “Unified Sink API” support/cover? The API can
>> >> “unified”(decoupe) the commit operation in the term of supporting
>> exactly
>> >> once semantics. However, even if we narrow down the initial supported
>> >> system to the file system there would be different topology
>> requirements.
>> >> These requirements come from performance optimization
>> >> (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
>> >> “finished”).  Should the unified sink API support these requirements?
>> >> 2. The API does not expose the checkpoint-id because the batch
>> execution
>> >> mode does not have the normal checkpoint. But there still some
>> >> implementations depend on this.(IceBergSink uses this to do some
>> dedupe).
>> >> I think how to support this requirement depends on the first open
>> >> question.
>> >> 3. Whether the `Writer` supports async functionality or not. Currently
>> I
>> >> do
>> >> not know which sink could benefit from it. Maybe it is just my own
>> >> problem.
>> >>
>> >> Best,
>> >> Guowei
>> >>
>> >>
>> >> On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma <guowei....@gmail.com>
>> wrote:
>> >>
>> >> >
>> >> > Hi, Steven
>> >> > Thanks you for your thoughtful ideas and concerns.
>> >> >
>> >> > >>I still like the concept of grouping data files per checkpoint for
>> >> > streaming mode. it is cleaner and probably easier to manage and deal
>> >> with
>> >> > commit failures. Plus, it >>can reduce dupes for the at least once
>> >> > >>mode.  I understand checkpoint is not an option for batch
>> execution.
>> >> We
>> >> > don't have to expose the checkpointId in API, as >>long as  the
>> internal
>> >> > bookkeeping groups data files by checkpoints for streaming >>mode.
>> >> >
>> >> > I think this problem(How to dedupe the combined committed data) also
>> >> > depends on where to place the agg/combine logic .
>> >> >
>> >> > 1. If the agg/combine takes place in the “commit” maybe we need to
>> >> figure
>> >> > out how to give the aggregated committable a unique and
>> auto-increment
>> >> id
>> >> > in the committer.
>> >> > 2. If the agg/combine takes place in a separate operator maybe sink
>> >> > developer could maintain the id itself by using the state.
>> >> >
>> >> > I think this problem is also decided by what the topology pattern the
>> >> sink
>> >> > API should support. Actually there are already many other topology
>> >> > requirements. :)
>> >> >
>> >> > Best,
>> >> > Guowei
>> >> >
>> >> >
>> >> > On Wed, Sep 16, 2020 at 7:46 AM Steven Wu <stevenz...@gmail.com>
>> wrote:
>> >> >
>> >> >> > AFAIK the committer would not see the file-1-2 when ck1 happens in
>> >> the
>> >> >> ExactlyOnce mode.
>> >> >>
>> >> >> @Guowei Ma <guowei....@gmail.com> I think you are right for exactly
>> >> once
>> >> >> checkpoint semantics. what about "at least once"? I guess we can
>> argue
>> >> that
>> >> >> it is fine to commit file-1-2 for at least once mode.
>> >> >>
>> >> >> I still like the concept of grouping data files per checkpoint for
>> >> >> streaming mode. it is cleaner and probably easier to manage and deal
>> >> with
>> >> >> commit failures. Plus, it can reduce dupes for the at least once
>> >> mode.  I
>> >> >> understand checkpoint is not an option for batch execution. We don't
>> >> have
>> >> >> to expose the checkpointId in API, as long as  the internal
>> bookkeeping
>> >> >> groups data files by checkpoints for streaming mode.
>> >> >>
>> >> >>
>> >> >> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu <stevenz...@gmail.com>
>> >> wrote:
>> >> >>
>> >> >>> > images don't make it through to the mailing lists. You would
>> need to
>> >> >>> host the file somewhere and send a link.
>> >> >>>
>> >> >>> Sorry about that. Here is the sample DAG in google drawings.
>> >> >>>
>> >> >>>
>> >>
>> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
>> >> >>>
>> >> >>>
>> >> >>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma <guowei....@gmail.com>
>> >> wrote:
>> >> >>>
>> >> >>>> Hi, Dawid
>> >> >>>>
>> >> >>>> >>I still find the merging case the most confusing. I don't
>> >> necessarily
>> >> >>>> understand why do you need the "SingleFileCommit" step in this
>> >> scenario.
>> >> >>>> The way I
>> >> >>>> >> understand "commit" operation is that it makes some
>> data/artifacts
>> >> >>>> visible to the external system, thus it should be immutable from a
>> >> >>>> point of
>> >> >>>> view of a single >>process. Having an additional step in the same
>> >> >>>> process
>> >> >>>> that works on committed data contradicts with those assumptions. I
>> >> >>>> might be
>> >> >>>> missing something though. >> Could you elaborate >why can't it be
>> >> >>>> something
>> >> >>>> like FileWriter -> FileMergeWriter -> Committer (either global or
>> >> >>>> non-global)? Again it might be just me not getting the example.
>> >> >>>>
>> >> >>>> I think you are right. The topology
>> >> >>>> "FileWriter->FileMergeWriter->Committer" could meet the merge
>> >> >>>> requirement.
>> >> >>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter
>> ->
>> >> >>>> GlobalCommitter" reuses some code of the StreamingFileSink(For
>> >> example
>> >> >>>> rolling policy) so it has the "SingleFileCommitter" in the
>> topology.
>> >> In
>> >> >>>> general I want to use the case to show that there are different
>> >> >>>> topologies
>> >> >>>> according to the requirements.
>> >> >>>>
>> >> >>>> BTW: IIRC, @Jingsong Lee <zhixin....@alibaba-inc.com> telled me
>> that
>> >> >>>> the
>> >> >>>> actual topology of merged supported HiveSink is more complicated
>> than
>> >> >>>> that.
>> >> >>>>
>> >> >>>>
>> >> >>>> >> I've just briefly skimmed over the proposed interfaces. I would
>> >> >>>> suggest
>> >> >>>> one
>> >> >>>> >> addition to the Writer interface (as I understand this is the
>> >> runtime
>> >> >>>> >> interface in this proposal?): add some availability method, to
>> >> >>>> avoid, if
>> >> >>>> >> possible, blocking calls on the sink. We already have similar
>> >> >>>> >> availability methods in the new sources [1] and in various
>> places
>> >> in
>> >> >>>> the
>> >> >>>> >> network stack [2].
>> >> >>>> >> BTW Let's not forget about Piotr's comment. I think we could
>> add
>> >> the
>> >> >>>> isAvailable or similar method to the Writer interface in the FLIP.
>> >> >>>>
>> >> >>>> Thanks @Dawid Wysakowicz <dwysakow...@apache.org>  for your
>> >> reminder.
>> >> >>>> There
>> >> >>>> are two many issues at the same time.
>> >> >>>>
>> >> >>>> In addition to what Ajjoscha said : there is very little system
>> >> support
>> >> >>>> it.   Another thing I worry about is that: Does the sink's
>> snapshot
>> >> >>>> return
>> >> >>>> immediately when the sink's status is unavailable? Maybe we could
>> do
>> >> it
>> >> >>>> by
>> >> >>>> dedupe some element in the state but I think it might be too
>> >> >>>> complicated.
>> >> >>>> For me I want to know is what specific sink will benefit from this
>> >> >>>> feature.  @piotr <pi...@ververica.com>  Please correct me if  I
>> >> >>>> misunderstand you. thanks.
>> >> >>>>
>> >> >>>> Best,
>> >> >>>> Guowei
>> >> >>>>
>> >> >>>>
>> >> >>>> On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz <
>> >> >>>> dwysakow...@apache.org>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>> > What I understand is that HiveSink's implementation might need
>> the
>> >> >>>> local
>> >> >>>> > committer(FileCommitter) because the file rename is needed.
>> >> >>>> > But the iceberg only needs to write the manifest file.  Would
>> you
>> >> >>>> like to
>> >> >>>> > enlighten me why the Iceberg needs the local committer?
>> >> >>>> > Thanks
>> >> >>>> >
>> >> >>>> > Sorry if I caused a confusion here. I am not saying the Iceberg
>> >> sink
>> >> >>>> needs
>> >> >>>> > a local committer. What I had in mind is that prior to the
>> Iceberg
>> >> >>>> example
>> >> >>>> > I did not see a need for a "GlobalCommitter" in the streaming
>> >> case. I
>> >> >>>> > thought it is always enough to have the "normal" committer in
>> that
>> >> >>>> case.
>> >> >>>> > Now I understand that this differentiation is not really about
>> >> logical
>> >> >>>> > separation. It is not really about the granularity with which we
>> >> >>>> commit,
>> >> >>>> > i.e. answering the "WHAT" question. It is really about the
>> >> >>>> performance and
>> >> >>>> > that in the end we will have a single "transaction", so it is
>> about
>> >> >>>> > answering the question "HOW".
>> >> >>>> >
>> >> >>>> >
>> >> >>>> >    -
>> >> >>>> >
>> >> >>>> >    Commit a directory with merged files(Some user want to merge
>> the
>> >> >>>> files
>> >> >>>> >    in a directory before committing the directory to Hive meta
>> >> store)
>> >> >>>> >
>> >> >>>> >
>> >> >>>> >    1.
>> >> >>>> >
>> >> >>>> >    FileWriter -> SingleFileCommit -> FileMergeWriter  ->
>> >> >>>> GlobalCommitter
>> >> >>>> >
>> >> >>>> > I still find the merging case the most confusing. I don't
>> >> necessarily
>> >> >>>> > understand why do you need the "SingleFileCommit" step in this
>> >> >>>> scenario.
>> >> >>>> > The way I understand "commit" operation is that it makes some
>> >> >>>> > data/artifacts visible to the external system, thus it should be
>> >> >>>> immutable
>> >> >>>> > from a point of view of a single process. Having an additional
>> step
>> >> >>>> in the
>> >> >>>> > same process that works on committed data contradicts with those
>> >> >>>> > assumptions. I might be missing something though. Could you
>> >> elaborate
>> >> >>>> why
>> >> >>>> > can't it be something like FileWriter -> FileMergeWriter ->
>> >> Committer
>> >> >>>> > (either global or non-global)? Again it might be just me not
>> >> getting
>> >> >>>> the
>> >> >>>> > example.
>> >> >>>> >
>> >> >>>> > I've just briefly skimmed over the proposed interfaces. I would
>> >> >>>> suggest one
>> >> >>>> > addition to the Writer interface (as I understand this is the
>> >> runtime
>> >> >>>> > interface in this proposal?): add some availability method, to
>> >> avoid,
>> >> >>>> if
>> >> >>>> > possible, blocking calls on the sink. We already have similar
>> >> >>>> > availability methods in the new sources [1] and in various
>> places
>> >> in
>> >> >>>> the
>> >> >>>> > network stack [2].
>> >> >>>> >
>> >> >>>> > BTW Let's not forget about Piotr's comment. I think we could add
>> >> the
>> >> >>>> > isAvailable or similar method to the Writer interface in the
>> FLIP.
>> >> >>>> >
>> >> >>>> > Best,
>> >> >>>> >
>> >> >>>> > Dawid
>> >> >>>> > On 15/09/2020 08:06, Guowei Ma wrote:
>> >> >>>> >
>> >> >>>> > I would think that we only need flush() and the semantics are
>> that
>> >> it
>> >> >>>> > prepares for a commit, so on a physical level it would be called
>> >> from
>> >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it
>> more I
>> >> >>>> > think flush() should be renamed to something like
>> >> "prepareCommit()".
>> >> >>>> >
>> >> >>>> > Generally speaking it is a good point that emitting the
>> >> committables
>> >> >>>> > should happen before emitting the checkpoint barrier downstream.
>> >> >>>> > However, if I remember offline discussions well, the idea behind
>> >> >>>> > Writer#flush and Writer#snapshotState was to differentiate
>> commit
>> >> on
>> >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of
>> these
>> >> >>>> > methods could emit committables, but the flush should not leave
>> >> any in
>> >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in
>> >> >>>> > snapshotState it could leave some open files that would be
>> >> committed
>> >> >>>> in
>> >> >>>> > a subsequent cycle, however flush should close all files). The
>> >> >>>> > snapshotState as it is now can not be called in
>> >> >>>> > prepareSnapshotPreBarrier as it can store some state, which
>> should
>> >> >>>> > happen in Operator#snapshotState as otherwise it would always be
>> >> >>>> > synchronous. Therefore I think we would need sth like:
>> >> >>>> >
>> >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>> >> >>>> >
>> >> >>>> > ver 1:
>> >> >>>> >
>> >> >>>> > List<StateT> snapshotState();
>> >> >>>> >
>> >> >>>> > ver 2:
>> >> >>>> >
>> >> >>>> > void snapshotState(); // not sure if we need that method at all
>> in
>> >> >>>> option
>> >> >>>> >
>> >> >>>> > 2
>> >> >>>> >
>> >> >>>> > I second Dawid's proposal. This is a valid scenario. And
>> version2
>> >> >>>> does not
>> >> >>>> > need the snapshotState() any more.
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > The Committer is as described in the FLIP, it's basically a
>> >> function
>> >> >>>> > "void commit(Committable)". The GobalCommitter would be a
>> function
>> >> >>>> "void
>> >> >>>> > commit(List<Committable>)". The former would be used by an S3
>> sink
>> >> >>>> where
>> >> >>>> > we can individually commit files to S3, a committable would be
>> the
>> >> >>>> list
>> >> >>>> > of part uploads that will form the final file and the commit
>> >> operation
>> >> >>>> > creates the metadata in S3. The latter would be used by
>> something
>> >> like
>> >> >>>> > Iceberg where the Committer needs a global view of all the
>> commits
>> >> to
>> >> >>>> be
>> >> >>>> > efficient and not overwhelm the system.
>> >> >>>> >
>> >> >>>> > I don't know yet if sinks would only implement on type of commit
>> >> >>>> > function or potentially both at the same time, and maybe Commit
>> can
>> >> >>>> > return some CommitResult that gets shipped to the GlobalCommit
>> >> >>>> function.
>> >> >>>> > I must admit it I did not get the need for Local/Normal + Global
>> >> >>>> > committer at first. The Iceberg example helped a lot. I think it
>> >> >>>> makes a
>> >> >>>> > lot of sense.
>> >> >>>> >
>> >> >>>> > @Dawid
>> >> >>>> > What I understand is that HiveSink's implementation might need
>> the
>> >> >>>> local
>> >> >>>> > committer(FileCommitter) because the file rename is needed.
>> >> >>>> > But the iceberg only needs to write the manifest file.  Would
>> you
>> >> >>>> like to
>> >> >>>> > enlighten me why the Iceberg needs the local committer?
>> >> >>>> > Thanks
>> >> >>>> >
>> >> >>>> > Best,
>> >> >>>> > Guowei
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz <
>> >> >>>> dwysakow...@apache.org> <dwysakow...@apache.org>
>> >> >>>> > wrote:
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > Hi all,
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > I would think that we only need flush() and the semantics are
>> that
>> >> it
>> >> >>>> > prepares for a commit, so on a physical level it would be called
>> >> from
>> >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it
>> more I
>> >> >>>> > think flush() should be renamed to something like
>> >> "prepareCommit()".
>> >> >>>> >
>> >> >>>> > Generally speaking it is a good point that emitting the
>> >> committables
>> >> >>>> > should happen before emitting the checkpoint barrier downstream.
>> >> >>>> > However, if I remember offline discussions well, the idea behind
>> >> >>>> > Writer#flush and Writer#snapshotState was to differentiate
>> commit
>> >> on
>> >> >>>> > checkpoint vs final checkpoint at the end of the job. Both of
>> these
>> >> >>>> > methods could emit committables, but the flush should not leave
>> >> any in
>> >> >>>> > progress state (e.g. in case of file sink in STREAM mode, in
>> >> >>>> > snapshotState it could leave some open files that would be
>> >> committed
>> >> >>>> in
>> >> >>>> > a subsequent cycle, however flush should close all files). The
>> >> >>>> > snapshotState as it is now can not be called in
>> >> >>>> > prepareSnapshotPreBarrier as it can store some state, which
>> should
>> >> >>>> > happen in Operator#snapshotState as otherwise it would always be
>> >> >>>> > synchronous. Therefore I think we would need sth like:
>> >> >>>> >
>> >> >>>> > void prepareCommit(boolean flush, WriterOutput<CommT> output);
>> >> >>>> >
>> >> >>>> > ver 1:
>> >> >>>> >
>> >> >>>> > List<StateT> snapshotState();
>> >> >>>> >
>> >> >>>> > ver 2:
>> >> >>>> >
>> >> >>>> > void snapshotState(); // not sure if we need that method at all
>> in
>> >> >>>> option 2
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > The Committer is as described in the FLIP, it's basically a
>> >> function
>> >> >>>> > "void commit(Committable)". The GobalCommitter would be a
>> function
>> >> >>>> "void
>> >> >>>> > commit(List<Committable>)". The former would be used by an S3
>> sink
>> >> >>>> where
>> >> >>>> > we can individually commit files to S3, a committable would be
>> the
>> >> >>>> list
>> >> >>>> > of part uploads that will form the final file and the commit
>> >> operation
>> >> >>>> > creates the metadata in S3. The latter would be used by
>> something
>> >> like
>> >> >>>> > Iceberg where the Committer needs a global view of all the
>> commits
>> >> to
>> >> >>>> be
>> >> >>>> > efficient and not overwhelm the system.
>> >> >>>> >
>> >> >>>> > I don't know yet if sinks would only implement on type of commit
>> >> >>>> > function or potentially both at the same time, and maybe Commit
>> can
>> >> >>>> > return some CommitResult that gets shipped to the GlobalCommit
>> >> >>>> function.
>> >> >>>> >
>> >> >>>> > I must admit it I did not get the need for Local/Normal + Global
>> >> >>>> > committer at first. The Iceberg example helped a lot. I think it
>> >> >>>> makes a
>> >> >>>> > lot of sense.
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > For Iceberg, writers don't need any state. But the
>> GlobalCommitter
>> >> >>>> > needs to
>> >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile".
>> Since a
>> >> >>>> single
>> >> >>>> > committer can collect thousands (or more) data files in one
>> >> checkpoint
>> >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile"
>> >> (for
>> >> >>>> the
>> >> >>>> > collected thousands data files) as StateT. This allows us to
>> absorb
>> >> >>>> > extended commit outages without losing written/uploaded data
>> >> files, as
>> >> >>>> > operator state size is as small as one manifest file per
>> checkpoint
>> >> >>>> cycle
>> >> >>>> > [2].
>> >> >>>> > ------------------
>> >> >>>> > StateT snapshotState(SnapshotContext context) throws Exception;
>> >> >>>> >
>> >> >>>> > That means we also need the restoreCommitter API in the Sink
>> >> interface
>> >> >>>> > ---------------
>> >> >>>> > Committer<CommT, StateT> restoreCommitter(InitContext context,
>> >> StateT
>> >> >>>> > state);
>> >> >>>> >
>> >> >>>> > I think this might be a valid case. Not sure though if I would
>> go
>> >> >>>> with a
>> >> >>>> > "state" there. Having a state in a committer would imply we
>> need a
>> >> >>>> > collect method as well. So far we needed a single method
>> >> commit(...)
>> >> >>>> and
>> >> >>>> > the bookkeeping of the committables could be handled by the
>> >> >>>> framework. I
>> >> >>>> > think something like an optional combiner in the GlobalCommitter
>> >> would
>> >> >>>> > be enough. What do you think?
>> >> >>>> >
>> >> >>>> > GlobalCommitter<CommT, GlobalCommT> {
>> >> >>>> >
>> >> >>>> >     void commit(GlobalCommT globalCommittables);
>> >> >>>> >
>> >> >>>> >     GlobalCommT combine(List<CommT> committables);
>> >> >>>> >
>> >> >>>> > }
>> >> >>>> >
>> >> >>>> > A different problem that I see here is how do we handle commit
>> >> >>>> failures.
>> >> >>>> > Should the committables (both normal and global be included in
>> the
>> >> >>>> next
>> >> >>>> > cycle, shall we retry it, ...) I think it would be worth laying
>> it
>> >> out
>> >> >>>> > in the FLIP.
>> >> >>>> >
>> >> >>>> > @Aljoscha I think you can find the code Steven was referring in
>> >> here:
>> >> >>>> >
>> >> >>>>
>> >>
>> https://github.com/Netflix-Skunkworks/nfflink-connector-iceberg/blob/master/nfflink-connector-iceberg/src/main/java/com/netflix/spaas/nfflink/connector/iceberg/sink/IcebergCommitter.java
>> >> >>>> >
>> >> >>>> > Best,
>> >> >>>> >
>> >> >>>> > Dawid
>> >> >>>> >
>> >> >>>> > On 14/09/2020 15:19, Aljoscha Krettek wrote:
>> >> >>>> >
>> >> >>>> > On 14.09.20 01:23, Steven Wu wrote:
>> >> >>>> >
>> >> >>>> > ## Writer interface
>> >> >>>> >
>> >> >>>> > For the Writer interface, should we add "*prepareSnapshot"*
>> before
>> >> the
>> >> >>>> > checkpoint barrier emitted downstream?  IcebergWriter would need
>> >> it.
>> >> >>>> Or
>> >> >>>> > would the framework call "*flush*" before the barrier emitted
>> >> >>>> > downstream?
>> >> >>>> > that guarantee would achieve the same goal.
>> >> >>>> >
>> >> >>>> > I would think that we only need flush() and the semantics are
>> that
>> >> it
>> >> >>>> > prepares for a commit, so on a physical level it would be called
>> >> from
>> >> >>>> > "prepareSnapshotPreBarrier". Now that I'm thinking about it
>> more I
>> >> >>>> > think flush() should be renamed to something like
>> >> "prepareCommit()".
>> >> >>>> >
>> >> >>>> > @Guowei, what do you think about this?
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > In [1], we discussed the reason for Writer to emit
>> (checkpointId,
>> >> >>>> CommT)
>> >> >>>> > tuple to the committer. The committer needs checkpointId to
>> >> separate
>> >> >>>> out
>> >> >>>> > data files for different checkpoints if concurrent checkpoints
>> are
>> >> >>>> > enabled.
>> >> >>>> >
>> >> >>>> > When can this happen? Even with concurrent checkpoints the
>> snapshot
>> >> >>>> > barriers would still cleanly segregate the input stream of an
>> >> operator
>> >> >>>> > into tranches that should manifest in only one checkpoint. With
>> >> >>>> > concurrent checkpoints, all that can happen is that we start a
>> >> >>>> > checkpoint before a last one is confirmed completed.
>> >> >>>> >
>> >> >>>> > Unless there is some weirdness in the sources and some sources
>> >> start
>> >> >>>> > chk1 first and some other ones start chk2 first?
>> >> >>>> >
>> >> >>>> > @Piotrek, do you think this is a problem?
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > For the Committer interface, I am wondering if we should split
>> the
>> >> >>>> > single
>> >> >>>> > commit method into separate "*collect"* and "*commit"* methods?
>> >> This
>> >> >>>> > way,
>> >> >>>> > it can handle both single and multiple CommT objects.
>> >> >>>> >
>> >> >>>> > I think we can't do this. If the sink only needs a regular
>> >> Commiter,
>> >> >>>> > we can perform the commits in parallel, possibly on different
>> >> >>>> > machines. Only when the sink needs a GlobalCommitter would we
>> need
>> >> to
>> >> >>>> > ship all commits to a single process and perform the commit
>> there.
>> >> If
>> >> >>>> > both methods were unified in one interface we couldn't make the
>> >> >>>> > decision of were to commit in the framework code.
>> >> >>>> >
>> >> >>>> >
>> >> >>>> > For Iceberg, writers don't need any state. But the
>> GlobalCommitter
>> >> >>>> > needs to
>> >> >>>> > checkpoint StateT. For the committer, CommT is "DataFile".
>> Since a
>> >> >>>> > single
>> >> >>>> > committer can collect thousands (or more) data files in one
>> >> checkpoint
>> >> >>>> > cycle, as an optimization we checkpoint a single "ManifestFile"
>> >> (for
>> >> >>>> the
>> >> >>>> > collected thousands data files) as StateT. This allows us to
>> absorb
>> >> >>>> > extended commit outages without losing written/uploaded data
>> >> files, as
>> >> >>>> > operator state size is as small as one manifest file per
>> checkpoint
>> >> >>>> > cycle
>> >> >>>> >
>> >> >>>> > You could have a point here. Is the code for this available in
>> >> >>>> > open-source? I was checking out
>> >> >>>> >
>> >> >>>> >
>> >> >>>> >
>> >> >>>>
>> >>
>> https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
>> >> >>>> >
>> >> >>>> > and didn't find the ManifestFile optimization there.
>> >> >>>> >
>> >> >>>> > Best,
>> >> >>>> > Aljoscha
>> >> >>>> >
>> >> >>>> >
>> >> >>>> >
>> >> >>>>
>> >> >>>
>> >>
>> >
>>
>

Reply via email to