Hi,
I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
semantics (EOS). I think it is questionable to exclude Runners from
inside a transform, but I see that the intention was to save users from
surprises.
Now why does the Flink Runner not support KafkaIO EOS? Flink's native
KafkaProducer supports exactly-once. It simply commits the pending
transaction once it has completed a checkpoint.
A checkpoint is realized by sending barriers through all channels
starting from the source until reaching all sinks. Every operator
persists its state once it has received a barrier on all its input
channels, it then forwards it to the downstream operators.
The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
GroupByKey -> ExactlyOnceWriter
As I understood, Spark or Dataflow use the GroupByKey stages to persist
the input. That is not required in Flink to be able to take a consistent
snapshot of the pipeline.
Basically, for Flink we don't need any of that magic that KafkaIO does.
What we would need to support EOS is a way to tell the ExactlyOnceWriter
(a DoFn) to commit once a checkpoint has completed.
I know that the new version of SDF supports checkpointing which should
solve this issue. But there is still a lot of work to do to make this
reality.
So I think it would make sense to think about a way to make KafkaIO's
EOS more accessible to Runners which support a different way of
checkpointing.
Cheers,
Max
PS: I found this document about RequiresStableInput [3], but IMHO
defining an annotation only manifests the conceptual difference between
the Runners.
[1]
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
[2]
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
[3]
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM