Thanks for the summary!
On 16.09.20 06:29, Guowei Ma wrote:
## Consensus
1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.
Agreed!
## Not Consensus
1. What should the “Unified Sink API” support/cover? The API can
“unified”(decoupe) the commit operation in the term of supporting exactly
once semantics. However, even if we narrow down the initial supported
system to the file system there would be different topology requirements.
These requirements come from performance optimization
(IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
“finished”). Should the unified sink API support these requirements?
Yes, this is still tricky. What is the current state, would the
introduction of a "LocalCommit" and a "GlobalCommit" already solve both
the Iceberg and Hive cases? I believe Hive is the most tricky one here,
but if we introduce the "combine" method on GlobalCommit, that could
serve the same purpose as the "aggregation operation" on the individual
files, and we could even execute that "combine" in a distributed way.
To answer the more general question, I think we will offer a couple of
different commit strategies and sinks can implement 0 to n of them. What
is unified about the sink is that the same sink implementation will work
for both STREAMING and BATCH execution mode.
2. The API does not expose the checkpoint-id because the batch execution
mode does not have the normal checkpoint. But there still some
implementations depend on this.(IceBergSink uses this to do some dedupe).
I think how to support this requirement depends on the first open question.
I think this can be solved by introducing a nonce, see more thorough
explanation below.
3. Whether the `Writer` supports async functionality or not. Currently I do
not know which sink could benefit from it. Maybe it is just my own problem.
Here, I don't really know. We can introduce an "isAvailable()" method
and mostly ignore it for now and sinks can just always return true. Or,
as an alternative, we don't add the method now but can add it later with
a default implementation. Either way, we will probably not take
advantage of the "isAvailable()" now because that would require more
runtime changes.
On 17.09.20 06:28, Guowei Ma wrote:
But my understanding is: if the committer function is idempotent, the
framework can guarantee exactly once semantics in batch/stream execution
mode. But I think maybe the idempotence should be guaranteed by the sink
developer, not on the basic API.
I believe the problem here is that some sinks (including Iceberg) can
only be idempotent with a little help from the framework.
The process would be like this:
1. collect all committables, generate unique ID (nonce), store
committables and ID in fault tolerant storage
2. call commitGlobal(committables, nonce)
3. Iceberg checks if there is already a commit with the given nonce, if
not it will append a commit of the committables along with the nonce to
the log structure/meta store
The problem is that Iceberg cannot decide without some extra data
whether a set of committables has already been committed because the
commit basically just appends some information to the end of a log. And
we just just keep appending the same data if we didn't check the nonce.
We would have this same problem if we wanted to implement a
write-ahead-log Kafka sink where the "commit" would just take some
records from a file and append it to Kafka. Without looking at Kafka and
checking if you already committed the same records you don't know if you
already committed.