It is more about extended outages of metastore. E.g. If we commit every 2
minutes, 4 hours of metastore outage can lead to over 120 GlobalCommitT.
And regarding metastore outages, it is undesirable for streaming jobs to
fail the job and keep restarting. It is better to keep processing records
(avoi
Hi Steven
Thank you very much for your detailed explanation.
Now I got your point, I could see that there are benefits from committing a
collection of `GlobalCommT` as a whole when the external metastore
environment is unstable at some time.
But I have two little concern about introducing commit
I should clarify my last email a little more.
For the example of commits for checkpoints 1-100 failed, the job is still
up (processing records and uploading files). When commit for checkpoint 101
came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT
(100 old + 1 new) so that
> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List as parameter when the `commit` returns `RETRY`.
> 2. Of course we can let the `commit` return more detailed info but it
might
> be too complicated.
If commit(List) returns RETRY, it means the whole list needs
to be retri
Hi, all
>From the above discussion we could find that FLIP focuses on providing an
unified transactional sink API. So I updated the FLIP's title to "Unified
Transactional Sink API". But I found that the old link could not be opened
again.
I would update the link[1] here. Sorry for the inconvenien
Hi, Steven
>>I also have a clarifying question regarding the WriterStateT. Since
>>IcebergWriter won't need to checkpoint any state, should we set it to
*Void*
>>type? Since getWriterStateSerializer() returns Optional, that is clear and
>>we can return Optional.empty().
Yes I think you could do i
Hi,Steven
Thank you for reading the FLIP so carefully.
1. The frame can not know which `GlobalCommT` to retry if we use the
List as parameter when the `commit` returns `RETRY`.
2. Of course we can let the `commit` return more detailed info but it might
be too complicated.
3. On the other hand, I t
Guowei,
Thanks a lot for updating the wiki page. It looks great.
I noticed one inconsistency in the wiki with your last summary email for
GlobalCommitter interface. I think the version in the summary email is the
intended one, because rollover from previous failed commits can accumulate
a list.
C
Thanks Aljoscha for your suggestion. I have updated FLIP. Any comments are
welcome.
Best,
Guowei
On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek
wrote:
> Yes, that sounds good! I'll probably have some comments on the FLIP
> about the names of generic parameters and the Javadoc but we can add
Yes, that sounds good! I'll probably have some comments on the FLIP
about the names of generic parameters and the Javadoc but we can address
them later or during implementation.
I also think that we probably need the FAIL,RETRY,SUCCESS result for
globalCommit() but we can also do that as a lat
Hi, all
Thank everyone very much for your ideas and suggestions. I would try to
summarize again the consensus :). Correct me if I am wrong or misunderstand
you.
## Consensus-1
1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mo
>> I think we should go with something like
>> List filterRecoveredCommittables(List<>)
>> to keep things simple. This should also be easy to do from the framework
>> side and then the sink doesn't need to do any custom state handling.
I second Aljoscha's proposal. For the first version there i
I think we should go with something like
List filterRecoveredCommittables(List<>)
to keep things simple. This should also be easy to do from the framework
side and then the sink doesn't need to do any custom state handling.
Best,
Aljoscha
On 22.09.20 16:03, Steven Wu wrote:
Previous APIs di
Previous APIs discussed have been trying to do more in the framework. If we
take a different approach to a lighter framework, these sets of
minimal APIs are probably good enough. Sink can handle the bookkeeping,
merge, retry logics.
/**
* CommT is the DataFile in Iceberg
* GlobalCommT is the che
It is fine to leave the CommitResult/RETRY outside the scope of framework.
Then the framework might need to provide some hooks in the
checkpoint/restore logic. because the commit happened in the post
checkpoint completion step, sink needs to update the internal state when
the commit is successful s
On 22.09.20 13:26, Guowei Ma wrote:
Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
But for the initial version I hope we could make the sink api sync because
there is already a lot of stuff that has to finish. :--)
I agree, for the first version we should stick to a sim
>> I believe that we could support such an async sink writer
>> very easily in the future. What do you think?
>> How would you see the expansion in the future? Do you mean just adding
`isAvailable()` method with a default implementation later on?
Hi @piotr
Actually I am not sure adding `isAvail
On 22.09.20 11:10, Guowei Ma wrote:
1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
that we could make `write success file` be available in both batch and
stream execution mode.
We could, yes. I'm now hesitant because we're adding more things but I
think it should be fi
Thanks @aljoscha summary. I agree we should postpone the discussion of the
sink topology first and focus on the normal file sink and IcebergSink in
the Flink 1.12.
I have three little questions:
1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
that we could make `write succ
Ah sorry, I think I now see what you mean. I think it's ok to add a
`List recoverCommittables(List)`
method.
On 22.09.20 09:42, Aljoscha Krettek wrote:
On 22.09.20 06:06, Steven Wu wrote:
In addition, it is undesirable to do the committed-or-not check in the
commit method, which happens for
On 22.09.20 06:06, Steven Wu wrote:
In addition, it is undesirable to do the committed-or-not check in the
commit method, which happens for each checkpoint cycle. CommitResult
already indicates SUCCESS or not. when framework calls commit with a list
of GlobalCommittableT, it should be certain the
Aljoscha/Guowei,
I think we are pretty close with aligning on the Iceberg sink requirements.
This new sink API can really benefit and simplify Iceberg sink
implementation. Looking forward to the initial scope with 1.12 release.
> CommitResult commit(GlobalCommittableT);
I like the CommitResult
Hi all,
I'll try and summarize my thoughts after Guowei, Yun, Kostas, Dawid, and
me had an offline discussion about this.
Also, I would like to give credit to Guowei for initially coming up with
the idea of a topology sink in the context of this discussion. I think
it's a good idea and we sh
Hi Guowei,
> I believe that we could support such an async sink writer
> very easily in the future. What do you think?
How would you see the expansion in the future? Do you mean just adding
`isAvailable()` method with a default implementation later on?
Piotrek
pon., 21 wrz 2020 o 02:39 Steven W
> I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.
@Guowei Ma It is undesirable to do the dedup check
in the `commit` call, because it happens for each checkpoint cycle. We only
need to do the de-dup check one time
I would like to summarize the file type sink in the thread and their
possible topologies. I also try to give pros and cons of every topology
option. Correct me if I am wrong.
### FileSink
Topology Option: TmpFileWriter + Committer.
### IceBerg Sink
Topology Option1: `DataFileWriter` + `Gl
Hi, Stevn
I want to make a clarification first, the following reply only considers
the Iceberge sink, but does not consider other sinks. Before make decision
we should consider all the sink.I would try to summary all the sink
requirments in the next mail
>> run global committer in jobmanager (e
> I prefer to let the developer produce id to dedupe. I think this gives
the developer more opportunity to optimize.
Thinking about it again, I totally agree with Guowei on this. We don't
really need the framework to generate the unique id for Iceberg sink.
De-dup logic is totally internal to Iceb
Hi, all
>>Just to add to what Aljoscha said regarding the unique id. Iceberg sink
>>checkpoints the unique id into state during snapshot. It also inserts the
>>unique id into the Iceberg snapshot metadata during commit. When a job
>>restores the state after failure, it needs to know if the restore
Aljoscha,
> Instead the sink would have to check for each set of committables
seperately if they had already been committed. Do you think this is
feasible?
Yes, that is how it works in our internal implementation [1]. We don't use
checkpointId. We generate a manifest file (GlobalCommT) to bundle
Steven,
we were also wondering if it is a strict requirement that "later"
updates to Iceberg subsume earlier updates. In the current version, you
only check whether checkpoint X made it to Iceberg and then discard all
committable state from Flink state for checkpoints smaller X.
If we go wit
Guowei
Just to add to what Aljoscha said regarding the unique id. Iceberg sink
checkpoints the unique id into state during snapshot. It also inserts the
unique id into the Iceberg snapshot metadata during commit. When a job
restores the state after failure, it needs to know if the restored
transac
Thanks for the summary!
On 16.09.20 06:29, Guowei Ma wrote:
## Consensus
1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the
Hi Guowei,
Thanks for the explanation. Now I get your point.
Basically any action that would make sink unavailable, would also cause it
to block on snapshotting the state (in option 1. with flushing). I agree
that lack of availability is much less of an issue than I have thought
before.
For opti
Hi, Steven
I am not particularly sure whether to provide id in GlobalCommit.
But my understanding is: if the committer function is idempotent, the
framework can guarantee exactly once semantics in batch/stream execution
mode. But I think maybe the idempotence should be guaranteed by the sink
deve
Thank @piotr very much for your patient explanation.
I would try to explain what is in my mind.
Considering following case:
FlinkSink E6 -> Client Buffer Queue |E5|E4|E3|E2| --> External
System E1
When the FlinkSink can not add the E6 to the Queue maybe the external
client is flushing/w
Guowei, thanks a lot for the summary. Here are a couple more questions that
need more clarification for the GlobalCommitter case.
* framework provides some sort of unique id per GlobalCommT (e.g. nonce or
some sort of transaction id)
* commit failure handling. Should we roll over to the next cycle
Hey
Thanks Dawid for bringing up my suggestion :)
> I'm not so sure about this, the sinks I'm aware of would not be able to
> implement this method: Kafka doesn't have this, I didn't see it in the
> Iceberg interfaces, and HDFS/S3 also don't have it.
Aljoscha, as I wrote, FlinkKafkaProducer is a
Hi,all
Thanks for all your valuable options and ideas.Currently there are many
topics in the mail. I try to summarize what is consensus and what is not.
Correct me if I am wrong.
## Consensus
1. The motivation of the unified sink API is to decouple the sink
implementation from the different runt
Hi, Steven
Thanks you for your thoughtful ideas and concerns.
>>I still like the concept of grouping data files per checkpoint for
streaming mode. it is cleaner and probably easier to manage and deal with
commit failures. Plus, it >>can reduce dupes for the at least once
>>mode. I understand chec
> AFAIK the committer would not see the file-1-2 when ck1 happens in the
ExactlyOnce mode.
@Guowei Ma I think you are right for exactly once
checkpoint semantics. what about "at least once"? I guess we can argue that
it is fine to commit file-1-2 for at least once mode.
I still like the concept
> images don't make it through to the mailing lists. You would need to host
the file somewhere and send a link.
Sorry about that. Here is the sample DAG in google drawings.
https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
On Tue, Sep 15, 2020 at 4:
Hi, Dawid
>>I still find the merging case the most confusing. I don't necessarily
understand why do you need the "SingleFileCommit" step in this scenario.
The way I
>> understand "commit" operation is that it makes some data/artifacts
visible to the external system, thus it should be immutable fro
On 15.09.20 09:55, Dawid Wysakowicz wrote:
BTW Let's not forget about Piotr's comment. I think we could add the
isAvailable or similar method to the Writer interface in the FLIP.
I'm not so sure about this, the sinks I'm aware of would not be able to
implement this method: Kafka doesn't have t
> What I understand is that HiveSink's implementation might need the local
> committer(FileCommitter) because the file rename is needed.
> But the iceberg only needs to write the manifest file. Would you like to
> enlighten me why the Iceberg needs the local committer?
> Thanks
Sorry if I caused a
On 15.09.20 06:05, Guowei Ma wrote:
## Using checkpointId
In the batch execution mode there would be no normal checkpoint any more.
That is why we do not introduce the checkpoint id in the API. So it is a
great thing that sink decouples its implementation from checkpointid. :)
Yes, this is a ve
On 15.09.20 01:33, Steven Wu wrote:
## concurrent checkpoints
@Aljoscha Krettek regarding the concurrent
checkpoints, let me illustrate with a simple DAG below.
[image: image.png]
Hi Steven,
images don't make it through to the mailing lists. You would need to
host the file somewhere and sen
>> I would think that we only need flush() and the semantics are that it
>> prepares for a commit, so on a physical level it would be called from
>> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>> think flush() should be renamed to something like "prepareCommit()".
> Generall
Hi, aljoscha
>I don't understand why we need the "Drain and Snapshot" section. It
>seems to be some details about stop-with-savepoint and drain, and the
>relation to BATCH execution but I don't know if it is needed to
>understand the rest of the document. I'm happy to be wrong here, though,
>if th
## Concurrent checkpoints
AFAIK the committer would not see the file-1-2 when ck1 happens in the
ExactlyOnce mode.
## Committable bookkeeping and combining
I agree with you that the "CombineGlobalCommitter" would work. But we put
more optimization logic in the committer, which will make the commi
## concurrent checkpoints
@Aljoscha Krettek regarding the concurrent
checkpoints, let me illustrate with a simple DAG below.
[image: image.png]
Let's assume each writer emits one file per checkpoint cycle and *writer-2
is slow*. Now let's look at what the global committer receives
timeline:
---
Hi all,
Very thanks for the discussion and the valuable opinions! Currently there
are several ongoing issues and we would like to show what we are thinking
in the next few mails.
It seems that the biggest issue now is about the topology of the sinks.
Before deciding what the sink API would look
Hi all,
> I would think that we only need flush() and the semantics are that it
> prepares for a commit, so on a physical level it would be called from
> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> think flush() should be renamed to something like "prepareCommit()".
Gene
I thought about this some more. One of the important parts of the
Iceberg sink is to know whether we have already committed some
DataFiles. Currently, this is implemented by writing a (JobId,
MaxCheckpointId) tuple to the Iceberg table when committing. When
restoring from a failure we check thi
On 14.09.20 01:23, Steven Wu wrote:
## Writer interface
For the Writer interface, should we add "*prepareSnapshot"* before the
checkpoint barrier emitted downstream? IcebergWriter would need it. Or
would the framework call "*flush*" before the barrier emitted downstream?
that guarantee would ac
Hi,
I've just briefly skimmed over the proposed interfaces. I would suggest one
addition to the Writer interface (as I understand this is the runtime
interface in this proposal?): add some availability method, to avoid, if
possible, blocking calls on the sink. We already have similar
availability
Aljoscha, thanks a lot for the detailed response. Now I have a better
understanding of the initial scope.
To me, there are possibly three different committer behaviors. For the lack
of better names, let's call them
* No/NoopCommitter
* Committer / LocalCommitter (file sink?)
* GlobalCommitter (Ice
Regarding the FLIP itself, I like the motivation section and the
What/How/When/Where section a lot!
I don't understand why we need the "Drain and Snapshot" section. It
seems to be some details about stop-with-savepoint and drain, and the
relation to BATCH execution but I don't know if it is ne
Hi Everyone,
thanks to Guowei for publishing the FLIP, and thanks Steven for the very
thoughtful email!
We thought a lot internally about some of the questions you posted but
left a lot (almost all) of the implementation details out of the FLIP
for now because we wanted to focus on semantics
Guowei,
Thanks a lot for the proposal and starting the discussion thread. Very
excited.
For the big question of "Is the sink an operator or a topology?", I have a
few related sub questions.
* Where should we run the committers?
* Is the committer parallel or single parallelism?
* Can a single cho
Hi, devs & users
As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
of DataStream API and Table API. Users should be able to use DataStream API
to write jobs that support both bounded and unbounded execution modes.
However, Flink does not provide a sink API to guarantee the
61 matches
Mail list logo