On 14.09.20 01:23, Steven Wu wrote:
## Writer interface
For the Writer interface, should we add "*prepareSnapshot"* before the
checkpoint barrier emitted downstream? IcebergWriter would need it. Or
would the framework call "*flush*" before the barrier emitted downstream?
that guarantee would achieve the same goal.
I would think that we only need flush() and the semantics are that it
prepares for a commit, so on a physical level it would be called from
"prepareSnapshotPreBarrier". Now that I'm thinking about it more I think
flush() should be renamed to something like "prepareCommit()".
@Guowei, what do you think about this?
In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
tuple to the committer. The committer needs checkpointId to separate out
data files for different checkpoints if concurrent checkpoints are enabled.
When can this happen? Even with concurrent checkpoints the snapshot
barriers would still cleanly segregate the input stream of an operator
into tranches that should manifest in only one checkpoint. With
concurrent checkpoints, all that can happen is that we start a
checkpoint before a last one is confirmed completed.
Unless there is some weirdness in the sources and some sources start
chk1 first and some other ones start chk2 first?
@Piotrek, do you think this is a problem?
For the Committer interface, I am wondering if we should split the single
commit method into separate "*collect"* and "*commit"* methods? This way,
it can handle both single and multiple CommT objects.
I think we can't do this. If the sink only needs a regular Commiter, we
can perform the commits in parallel, possibly on different machines.
Only when the sink needs a GlobalCommitter would we need to ship all
commits to a single process and perform the commit there. If both
methods were unified in one interface we couldn't make the decision of
were to commit in the framework code.
For Iceberg, writers don't need any state. But the GlobalCommitter needs to
checkpoint StateT. For the committer, CommT is "DataFile". Since a single
committer can collect thousands (or more) data files in one checkpoint
cycle, as an optimization we checkpoint a single "ManifestFile" (for the
collected thousands data files) as StateT. This allows us to absorb
extended commit outages without losing written/uploaded data files, as
operator state size is as small as one manifest file per checkpoint cycle
You could have a point here. Is the code for this available in
open-source? I was checking out
https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
and didn't find the ManifestFile optimization there.
Best,
Aljoscha