Thanks for the summary!

On 16.09.20 06:29, Guowei Ma wrote:
## Consensus

1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.

Agreed!

## Not Consensus

1. What should the “Unified Sink API” support/cover? The API can
“unified”(decoupe) the commit operation in the term of supporting exactly
once semantics. However, even if we narrow down the initial supported
system to the file system there would be different topology requirements.
These requirements come from performance optimization
(IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
“finished”).  Should the unified sink API support these requirements?

Yes, this is still tricky. What is the current state, would the introduction of a "LocalCommit" and a "GlobalCommit" already solve both the Iceberg and Hive cases? I believe Hive is the most tricky one here, but if we introduce the "combine" method on GlobalCommit, that could serve the same purpose as the "aggregation operation" on the individual files, and we could even execute that "combine" in a distributed way.

To answer the more general question, I think we will offer a couple of different commit strategies and sinks can implement 0 to n of them. What is unified about the sink is that the same sink implementation will work for both STREAMING and BATCH execution mode.

2. The API does not expose the checkpoint-id because the batch execution
mode does not have the normal checkpoint. But there still some
implementations depend on this.(IceBergSink uses this to do some dedupe).
I think how to support this requirement depends on the first open question.

I think this can be solved by introducing a nonce, see more thorough explanation below.

3. Whether the `Writer` supports async functionality or not. Currently I do
not know which sink could benefit from it. Maybe it is just my own problem.

Here, I don't really know. We can introduce an "isAvailable()" method and mostly ignore it for now and sinks can just always return true. Or, as an alternative, we don't add the method now but can add it later with a default implementation. Either way, we will probably not take advantage of the "isAvailable()" now because that would require more runtime changes.

On 17.09.20 06:28, Guowei Ma wrote:
But my understanding is: if the committer function is idempotent, the
framework can guarantee exactly once semantics in batch/stream execution
mode. But I think maybe the idempotence should be guaranteed by the sink
developer, not on the basic API.

I believe the problem here is that some sinks (including Iceberg) can only be idempotent with a little help from the framework.

The process would be like this:

1. collect all committables, generate unique ID (nonce), store committables and ID in fault tolerant storage

2. call commitGlobal(committables, nonce)

3. Iceberg checks if there is already a commit with the given nonce, if not it will append a commit of the committables along with the nonce to the log structure/meta store

The problem is that Iceberg cannot decide without some extra data whether a set of committables has already been committed because the commit basically just appends some information to the end of a log. And we just just keep appending the same data if we didn't check the nonce.

We would have this same problem if we wanted to implement a write-ahead-log Kafka sink where the "commit" would just take some records from a file and append it to Kafka. Without looking at Kafka and checking if you already committed the same records you don't know if you already committed.




Reply via email to