On 14.09.20 01:23, Steven Wu wrote:
## Writer interface

For the Writer interface, should we add "*prepareSnapshot"* before the
checkpoint barrier emitted downstream?  IcebergWriter would need it. Or
would the framework call "*flush*" before the barrier emitted downstream?
that guarantee would achieve the same goal.

I would think that we only need flush() and the semantics are that it prepares for a commit, so on a physical level it would be called from "prepareSnapshotPreBarrier". Now that I'm thinking about it more I think flush() should be renamed to something like "prepareCommit()".

@Guowei, what do you think about this?

In [1], we discussed the reason for Writer to emit (checkpointId, CommT)
tuple to the committer. The committer needs checkpointId to separate out
data files for different checkpoints if concurrent checkpoints are enabled.

When can this happen? Even with concurrent checkpoints the snapshot barriers would still cleanly segregate the input stream of an operator into tranches that should manifest in only one checkpoint. With concurrent checkpoints, all that can happen is that we start a checkpoint before a last one is confirmed completed.

Unless there is some weirdness in the sources and some sources start chk1 first and some other ones start chk2 first?

@Piotrek, do you think this is a problem?

For the Committer interface, I am wondering if we should split the single
commit method into separate "*collect"* and "*commit"* methods? This way,
it can handle both single and multiple CommT objects.

I think we can't do this. If the sink only needs a regular Commiter, we can perform the commits in parallel, possibly on different machines. Only when the sink needs a GlobalCommitter would we need to ship all commits to a single process and perform the commit there. If both methods were unified in one interface we couldn't make the decision of were to commit in the framework code.

For Iceberg, writers don't need any state. But the GlobalCommitter needs to
checkpoint StateT. For the committer, CommT is "DataFile". Since a single
committer can collect thousands (or more) data files in one checkpoint
cycle, as an optimization we checkpoint a single "ManifestFile" (for the
collected thousands data files) as StateT. This allows us to absorb
extended commit outages without losing written/uploaded data files, as
operator state size is as small as one manifest file per checkpoint cycle

You could have a point here. Is the code for this available in open-source? I was checking out https://github.com/apache/iceberg/blob/master/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java and didn't find the ManifestFile optimization there.

Best,
Aljoscha

Reply via email to