Regarding the FLIP itself, I like the motivation section and the What/How/When/Where section a lot!

I don't understand why we need the "Drain and Snapshot" section. It seems to be some details about stop-with-savepoint and drain, and the relation to BATCH execution but I don't know if it is needed to understand the rest of the document. I'm happy to be wrong here, though, if there's good reasons for the section.

On the question of Alternative 1 and 2, I have a strong preference for Alternative 1 because we could avoid strong coupling to other modules. With Alternative 2 we would depend on `flink-streaming-java` and even `flink-runtime`. For the new source API (FLIP-27) we managed to keep the dependencies slim and the code is in flink-core. I'd be very happy if we can manage the same for the new sink API.

Best,
Aljoscha

On 11.09.20 12:02, Aljoscha Krettek wrote:
Hi Everyone,

thanks to Guowei for publishing the FLIP, and thanks Steven for the very thoughtful email!

We thought a lot internally about some of the questions you posted but left a lot (almost all) of the implementation details out of the FLIP for now because we wanted to focus on semantics and API. I'll try and address the points below.

## Initial Scope of the new Sink API

We need to accept some initial scope that we want to achieve for Flink 1.12. I don't think we can try and find the solution that will work for all current and future external systems. For me, the initial goal would be to produce a Sink API and implementations for systems where you can prepare "committables" in one process and commit those from another process. Those are systems that support "real" transactions as you need them in a two-phase commit protocol. This includes:

  - File Sink, including HDFS, S3 via special part-file uploads
  - Iceberg
  - HDFS

The work should include runtime support for both BATCH and STREAMING execution as outlined in https://s.apache.org/FLIP-134.

Supporting Kafka already becomes difficult but I'll go into that below.

## Where to run the Committer

Guowei hinted at this in the FLIP: the idea is that the Framework decides where to run the committer based on requirements and based on the execution mode (STREAMING or BATCH).

Something that is not in the FLIP but which we thought about is that we need to allow different types of committers. I'm currently thinking we need at least a normal "Committer" and a "GlobalCommiter" (name TBD).

The Committer is as described in the FLIP, it's basically a function "void commit(Committable)". The GobalCommitter would be a function "void commit(List<Committable>)". The former would be used by an S3 sink where we can individually commit files to S3, a committable would be the list of part uploads that will form the final file and the commit operation creates the metadata in S3. The latter would be used by something like Iceberg where the Committer needs a global view of all the commits to be efficient and not overwhelm the system.

I don't know yet if sinks would only implement on type of commit function or potentially both at the same time, and maybe Commit can return some CommitResult that gets shipped to the GlobalCommit function.

An interesting read on this topic is the discussion on https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop FileOutputCommitter and the two different available algorithms for committing final job/task results.

These interfaces untie the sink implementation from the Runtime and we could, for example, have a runtime like this:

### BATCH

 - Collect all committables and store them in a fault tolerant way until the job finishes  - For a normal Commit function, call it on the individual commits. We can potentially distribute this if it becomes a bottleneck  - For GlobalCommit function, call it will all the commits. This cannot be distributed

We can collect the committables in an OperatorCoordinator or potentially somehow in a task. Though I prefer an OperatorCoordinator right now. The operator coordinator needs to keep the commits in a fault-tolerant way.

### STREAMING

 - For normal Commit, keep the committables in state on the individual tasks, commit them when a checkpoint completes  - For global CommitFunction we have options: collect them in a DOP-1 operator in the topology or send them to an OperatorCoordinator to do the commit there. This is where the source/sink duality that Steven mentions becomes visible.

## Kafka

Kafka is a problematic case because it doesn't really support transactions as outlined above. Our current Sink implementations works around that with hacks but that only gets us so far.

The problem with Kafka is that we need to aggressively clean up pending transactions in case a failure happens. Otherwise stale transactions would block downstream consumers. See here for details: http://kafka.apache.org/documentation/#isolation.level.

The way we solve this in the current Kafka sink is by using a fixed pool of transactional IDs and then cancelling all outstanding transactions for the IDs when we restore from a savepoint. In order for this to work we need to recycle the IDs, so there needs to be a back-channel from the Committer to the Writter, or they need to share internal state.

I don't get see a satisfying solution for this so I think we should exclude this from the initial version.

## On Write-Ahead-Log Sinks

Some sinks, like ES or Cassandra would require that we keep a WAL in Flink and then ship the contents to the external system on checkpoint. The reason is that these systems don't support real transactions where you can prepare them in one process and commit them from another process.

Best,
Aljoscha


On 11.09.20 02:23, Steven Wu wrote:
Guowei,

Thanks a lot for the proposal and starting the discussion thread. Very
excited.

For the big question of "Is the sink an operator or a topology?", I have a
few related sub questions.
* Where should we run the committers?
* Is the committer parallel or single parallelism?
* Can a single choice satisfy all sinks?

Trying to envision how some sinks can be implemented with this new unified
sink interface.

1. Kafka sink

Kafka supports non-transactional and transactional writes
* Non-transaction writes don't need commit action. we can have *parallel
writers and no/no-op committers*. This is probably true for other
non-transactional message queues.
* Transaction writes can be implemented as *parallel writers and parallel
committers*. In this case, I don't know if it makes sense to separate
writers and committers into two separate operators, because they probably
need to share the same KafkaProducer object.

Either way, both writers and committers probably should *run inside task
managers*.

2. ES sink

ES sink typically buffers the data up to a certain size or time threshold
and then uploads/commits a batch to ES. Writers buffer data and flush when
needed, and committer does the HTTP bulk upload to commit. To avoid
serialization/deserialization cost, we should run *parallel writers and
parallel committers* and they *should be* *chained or bundled together*
while *running inside task managers*.

It can also be implemented as *parallel writers and no/no-op committers*,
where all logics (batching and upload) are put inside the writers.

3. Iceberg [1] sink

It is currently implemented as two-stage operators with *parallel writers
and single-parallelism committers*.
* *parallel writers* that write records into data files. Upon checkpoint,
writers flush and upload the files, and send the metadata/location of the
data files to the downstream committer. Writers need to do the flush inside the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before
forwarding the checkpoint barrier to the committer
* single-parallelism committer operator. It collects data files from
upstream writers. During "snapshotState", it saves collected data files (or an uber metadata file) into state. When the checkpoint is completed, inside "notifyCheckpointComplete" it commits those data files to Iceberg tables. *The committer has to be single parallelism*, because we don't want hundreds or
thousands of parallel committers to compete for commit operations with
opportunistic concurrency control. It will be very inefficient and probably
infeasible if the parallelism is high. Too many tiny commits/transactions
can also slow down both the table write and read paths due to too many
manifest files.

Right now, both Iceberg writer and committer operators run inside task
managers. It has one major drawback. With Iceberg sink, embarrassingly
parallel jobs won't be embarrassingly parallel anymore. That breaks the
benefit of region recovery for embarrassingly parallel DAG. Conceptually,
the Writer-Committer sink pattern is like the mirroring of the FLIP-27
Enumerator-Reader source pattern. It will be better *if the committer can
run inside the job manager* like the SplitEnumerator for the FLIP-27
source.

-----------------------
Additional questions regarding the doc/API
* Any example for the writer shared state (Writer#snapshotSharedState)?
* We allow the case where the writer has no state, right? Meaning WriterS
can be Void.

[1] https://iceberg.apache.org/

Thanks,
Steven

On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <guowei....@gmail.com> wrote:

Hi, devs & users

As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor of DataStream API and Table API. Users should be able to use DataStream API
to write jobs that support both bounded and unbounded execution modes.
However, Flink does not provide a sink API to guarantee the Exactly-once
semantics in both bounded and unbounded scenarios, which blocks the
unification.

So we want to introduce a new unified sink API which could let the user
develop the sink once and run it everywhere. You could find more details in
FLIP-143[2].

The FLIP contains some open questions that I'd really appreciate inputs
from the community. Some of the open questions include:

    1. We provide two alternative Sink API in the FLIP. The only
    difference between the two versions is how to expose the state to the user.
    We want to know which one is your preference?
    2. How does the sink API support to write to the Hive?
    3. Is the sink an operator or a topology?

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

Best,
Guowei




Reply via email to