Thanks you for the prompt replies. It's great to see that there is good
understanding of how EOS in Flink works.
This is exactly what RequiresStableInput is supposed to do. On the Flink runner, this would be implemented by delaying processing until the current checkpoint is done.
I don't think that works because we have no control over the Kafka
transactions. Imagine:
1) ExactlyOnceWriter writes records to Kafka and commits, then starts a
new transaction.
2) Flink checkpoints, delaying the processing of elements, the
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate
data to Kafka. The de-duplication that the sink performs does not help,
especially because the random shards ids might be assigned differently.
IMHO we have to have control over commit to be able to provide EOS.
When we discussed this in Aug 2017, the understanding was that 2 Phase commit utility in Flink used to implement Flink's Kafka EOS could not be implemented in Beam's context.
That's also my understanding, unless we change the interface.
I don't see how SDF solves this problem..
SDF has a checkpoint method which the Runner can call, but I think that
you are right, that the above problem would be the same.
Absolutely. I would love to support EOS in KakaIO for Flink. I think that will
help many future exactly-once sinks.. and address fundamental incompatibility
between Beam model and Flink's horizontal checkpointing for such applications.
Great :)
The FlinkRunner would need to insert the "wait until checkpoint finalization"
logic wherever it sees @RequiresStableInput, which is already what it would have to do.
I don't think that fixes the problem. See above example.
Thanks,
Max
On 01.03.19 00:04, Raghu Angadi wrote:
On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi <[email protected]
<mailto:[email protected]>> wrote:
On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles <[email protected]
<mailto:[email protected]>> wrote:
I'm not sure what a hard fail is. I probably have a shallow
understanding, but doesn't @RequiresStableInput work for 2PC?
The preCommit() phase should establish the transaction and
commit() is not called until after checkpoint finalization. Can
you describe the way that it does not work a little bit more?
- preCommit() is called before checkpoint. Kafka EOS in Flink starts
the transaction before this and makes sure it flushes all records in
preCommit(). So far good.
- commit is called after checkpoint is persisted. Now, imagine
commit() fails for some reason. There is no option to rerun the 1st
phase to write the records again in a new transaction. This is a
hard failure for the the job. In practice Flink might attempt to
commit again (not sure how many times), which is likely to fail and
eventually results in job failure.
In Apache Beam, the records could be stored in state, and can be written
inside commit() to work around this issue. It could have scalability
issues if checkpoints are not frequent enough in Flink runner.
Raghu.
Kenn
On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <[email protected]
<mailto:[email protected]>> wrote:
On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
<[email protected] <mailto:[email protected]>> wrote:
I believe the way you would implement the logic behind
Flink's KafkaProducer would be to have two steps:
1. Start transaction
2. @RequiresStableInput Close transaction
I see. What happens if closing the transaction fails in
(2)? Flink's 2PC requires that commit() should never hard
fail once preCommit() succeeds. I think that is cost of not
having an extra shuffle. It is alright since this policy has
worked well for Flink so far.
Overall, it will be great to have @RequiresStableInput
support in Flink runner.
Raghu.
The FlinkRunner would need to insert the "wait until
checkpoint finalization" logic wherever it
sees @RequiresStableInput, which is already what it
would have to do.
This matches the KafkaProducer's logic - delay closing
the transaction until checkpoint finalization. This
answers my main question, which is "is
@RequiresStableInput expressive enough to allow
Beam-on-Flink to have exactly once behavior with the
same performance characteristics as native Flink
checkpoint finalization?"
Kenn
[1] https://github.com/apache/beam/pull/7955
On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi
<[email protected] <mailto:[email protected]>> wrote:
Now why does the Flink Runner not support
KafkaIO EOS? Flink's native
KafkaProducer supports exactly-once. It
simply commits the pending
transaction once it has completed a checkpoint.
On Thu, Feb 28, 2019 at 9:59 AM Maximilian
Michels <[email protected] <mailto:[email protected]>>
wrote:
Hi,
I came across KafkaIO's Runner whitelist [1]
for enabling exactly-once
semantics (EOS). I think it is questionable
to exclude Runners from
inside a transform, but I see that the
intention was to save users from
surprises.
Now why does the Flink Runner not support
KafkaIO EOS? Flink's native
KafkaProducer supports exactly-once. It
simply commits the pending
transaction once it has completed a checkpoint.
When we discussed this in Aug 2017, the
understanding was that 2 Phase commit utility in
Flink used to implement Flink's Kafka EOS could
not be implemented in Beam's context.
See this message
<https://www.mail-archive.com/[email protected]/msg02664.html> in
that dev thread. Has anything changed in this
regard? The whole thread is relevant to this
topic and worth going through.
I think that TwoPhaseCommit utility class wouldn't
work. The Flink runner would probably want to
directly use notifySnapshotComplete in order to
implement @RequiresStableInput.
A checkpoint is realized by sending barriers
through all channels
starting from the source until reaching all
sinks. Every operator
persists its state once it has received a
barrier on all its input
channels, it then forwards it to the
downstream operators.
The architecture of Beam's
KafkaExactlyOnceSink is as follows[2]:
Input -> AssignRandomShardIds -> GroupByKey
-> AssignSequenceIds ->
GroupByKey -> ExactlyOnceWriter
As I understood, Spark or Dataflow use the
GroupByKey stages to persist
the input. That is not required in Flink to
be able to take a consistent
snapshot of the pipeline.
Basically, for Flink we don't need any of
that magic that KafkaIO does.
What we would need to support EOS is a way
to tell the ExactlyOnceWriter
(a DoFn) to commit once a checkpoint has
completed.
I know that the new version of SDF supports
checkpointing which should
solve this issue. But there is still a lot
of work to do to make this
reality.
I don't see how SDF solves this problem.. May be
pseudo code would make more clear. But if
helps, that is great!
So I think it would make sense to think
about a way to make KafkaIO's
EOS more accessible to Runners which support
a different way of
checkpointing.
Absolutely. I would love to support EOS in
KakaIO for Flink. I think that will help many
future exactly-once sinks.. and address
fundamental incompatibility between Beam model
and Flink's horizontal checkpointing for such
applications.
Raghu.
Cheers,
Max
PS: I found this document about
RequiresStableInput [3], but IMHO
defining an annotation only manifests the
conceptual difference between
the Runners.
[1]
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
[2]
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
[3]
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM