Hi Everyone,
thanks to Guowei for publishing the FLIP, and thanks Steven for the very
thoughtful email!
We thought a lot internally about some of the questions you posted but
left a lot (almost all) of the implementation details out of the FLIP
for now because we wanted to focus on semantics and API. I'll try and
address the points below.
## Initial Scope of the new Sink API
We need to accept some initial scope that we want to achieve for Flink
1.12. I don't think we can try and find the solution that will work for
all current and future external systems. For me, the initial goal would
be to produce a Sink API and implementations for systems where you can
prepare "committables" in one process and commit those from another
process. Those are systems that support "real" transactions as you need
them in a two-phase commit protocol. This includes:
- File Sink, including HDFS, S3 via special part-file uploads
- Iceberg
- HDFS
The work should include runtime support for both BATCH and STREAMING
execution as outlined in https://s.apache.org/FLIP-134.
Supporting Kafka already becomes difficult but I'll go into that below.
## Where to run the Committer
Guowei hinted at this in the FLIP: the idea is that the Framework
decides where to run the committer based on requirements and based on
the execution mode (STREAMING or BATCH).
Something that is not in the FLIP but which we thought about is that we
need to allow different types of committers. I'm currently thinking we
need at least a normal "Committer" and a "GlobalCommiter" (name TBD).
The Committer is as described in the FLIP, it's basically a function
"void commit(Committable)". The GobalCommitter would be a function "void
commit(List<Committable>)". The former would be used by an S3 sink where
we can individually commit files to S3, a committable would be the list
of part uploads that will form the final file and the commit operation
creates the metadata in S3. The latter would be used by something like
Iceberg where the Committer needs a global view of all the commits to be
efficient and not overwhelm the system.
I don't know yet if sinks would only implement on type of commit
function or potentially both at the same time, and maybe Commit can
return some CommitResult that gets shipped to the GlobalCommit function.
An interesting read on this topic is the discussion on
https://issues.apache.org/jira/browse/MAPREDUCE-4815. About the Hadoop
FileOutputCommitter and the two different available algorithms for
committing final job/task results.
These interfaces untie the sink implementation from the Runtime and we
could, for example, have a runtime like this:
### BATCH
- Collect all committables and store them in a fault tolerant way
until the job finishes
- For a normal Commit function, call it on the individual commits. We
can potentially distribute this if it becomes a bottleneck
- For GlobalCommit function, call it will all the commits. This cannot
be distributed
We can collect the committables in an OperatorCoordinator or potentially
somehow in a task. Though I prefer an OperatorCoordinator right now. The
operator coordinator needs to keep the commits in a fault-tolerant way.
### STREAMING
- For normal Commit, keep the committables in state on the individual
tasks, commit them when a checkpoint completes
- For global CommitFunction we have options: collect them in a DOP-1
operator in the topology or send them to an OperatorCoordinator to do
the commit there. This is where the source/sink duality that Steven
mentions becomes visible.
## Kafka
Kafka is a problematic case because it doesn't really support
transactions as outlined above. Our current Sink implementations works
around that with hacks but that only gets us so far.
The problem with Kafka is that we need to aggressively clean up pending
transactions in case a failure happens. Otherwise stale transactions
would block downstream consumers. See here for details:
http://kafka.apache.org/documentation/#isolation.level.
The way we solve this in the current Kafka sink is by using a fixed pool
of transactional IDs and then cancelling all outstanding transactions
for the IDs when we restore from a savepoint. In order for this to work
we need to recycle the IDs, so there needs to be a back-channel from the
Committer to the Writter, or they need to share internal state.
I don't get see a satisfying solution for this so I think we should
exclude this from the initial version.
## On Write-Ahead-Log Sinks
Some sinks, like ES or Cassandra would require that we keep a WAL in
Flink and then ship the contents to the external system on checkpoint.
The reason is that these systems don't support real transactions where
you can prepare them in one process and commit them from another process.
Best,
Aljoscha
On 11.09.20 02:23, Steven Wu wrote:
Guowei,
Thanks a lot for the proposal and starting the discussion thread. Very
excited.
For the big question of "Is the sink an operator or a topology?", I have a
few related sub questions.
* Where should we run the committers?
* Is the committer parallel or single parallelism?
* Can a single choice satisfy all sinks?
Trying to envision how some sinks can be implemented with this new unified
sink interface.
1. Kafka sink
Kafka supports non-transactional and transactional writes
* Non-transaction writes don't need commit action. we can have *parallel
writers and no/no-op committers*. This is probably true for other
non-transactional message queues.
* Transaction writes can be implemented as *parallel writers and parallel
committers*. In this case, I don't know if it makes sense to separate
writers and committers into two separate operators, because they probably
need to share the same KafkaProducer object.
Either way, both writers and committers probably should *run inside task
managers*.
2. ES sink
ES sink typically buffers the data up to a certain size or time threshold
and then uploads/commits a batch to ES. Writers buffer data and flush when
needed, and committer does the HTTP bulk upload to commit. To avoid
serialization/deserialization cost, we should run *parallel writers and
parallel committers* and they *should be* *chained or bundled together*
while *running inside task managers*.
It can also be implemented as *parallel writers and no/no-op committers*,
where all logics (batching and upload) are put inside the writers.
3. Iceberg [1] sink
It is currently implemented as two-stage operators with *parallel writers
and single-parallelism committers*.
* *parallel writers* that write records into data files. Upon checkpoint,
writers flush and upload the files, and send the metadata/location of the
data files to the downstream committer. Writers need to do the flush inside
the "prepareSnapshotPreBarrier" method (NOT "snapshotState" method) before
forwarding the checkpoint barrier to the committer
* single-parallelism committer operator. It collects data files from
upstream writers. During "snapshotState", it saves collected data files (or
an uber metadata file) into state. When the checkpoint is completed, inside
"notifyCheckpointComplete" it commits those data files to Iceberg tables. *The
committer has to be single parallelism*, because we don't want hundreds or
thousands of parallel committers to compete for commit operations with
opportunistic concurrency control. It will be very inefficient and probably
infeasible if the parallelism is high. Too many tiny commits/transactions
can also slow down both the table write and read paths due to too many
manifest files.
Right now, both Iceberg writer and committer operators run inside task
managers. It has one major drawback. With Iceberg sink, embarrassingly
parallel jobs won't be embarrassingly parallel anymore. That breaks the
benefit of region recovery for embarrassingly parallel DAG. Conceptually,
the Writer-Committer sink pattern is like the mirroring of the FLIP-27
Enumerator-Reader source pattern. It will be better *if the committer can
run inside the job manager* like the SplitEnumerator for the FLIP-27
source.
-----------------------
Additional questions regarding the doc/API
* Any example for the writer shared state (Writer#snapshotSharedState)?
* We allow the case where the writer has no state, right? Meaning WriterS
can be Void.
[1] https://iceberg.apache.org/
Thanks,
Steven
On Thu, Sep 10, 2020 at 6:44 AM Guowei Ma <guowei....@gmail.com> wrote:
Hi, devs & users
As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
of DataStream API and Table API. Users should be able to use DataStream API
to write jobs that support both bounded and unbounded execution modes.
However, Flink does not provide a sink API to guarantee the Exactly-once
semantics in both bounded and unbounded scenarios, which blocks the
unification.
So we want to introduce a new unified sink API which could let the user
develop the sink once and run it everywhere. You could find more details in
FLIP-143[2].
The FLIP contains some open questions that I'd really appreciate inputs
from the community. Some of the open questions include:
1. We provide two alternative Sink API in the FLIP. The only
difference between the two versions is how to expose the state to the user.
We want to know which one is your preference?
2. How does the sink API support to write to the Hive?
3. Is the sink an operator or a topology?
[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
Best,
Guowei