This is exactly what RequiresStableInput is supposed to do. On the Flink
runner, this would be implemented by delaying processing until the current
checkpoint is done . In fact many sinks are probably subtly broken on the
Flink runner today without RequiresStableInput, so we really need to finish
this work and add a Flink implementation of it.

Reuven

On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org> wrote:

> Hi,
>
> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
> semantics (EOS). I think it is questionable to exclude Runners from
> inside a transform, but I see that the intention was to save users from
> surprises.
>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.
>
> A checkpoint is realized by sending barriers through all channels
> starting from the source until reaching all sinks. Every operator
> persists its state once it has received a barrier on all its input
> channels, it then forwards it to the downstream operators.
>
> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>
> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
> GroupByKey -> ExactlyOnceWriter
>
> As I understood, Spark or Dataflow use the GroupByKey stages to persist
> the input. That is not required in Flink to be able to take a consistent
> snapshot of the pipeline.
>
> Basically, for Flink we don't need any of that magic that KafkaIO does.
> What we would need to support EOS is a way to tell the ExactlyOnceWriter
> (a DoFn) to commit once a checkpoint has completed.
>
> I know that the new version of SDF supports checkpointing which should
> solve this issue. But there is still a lot of work to do to make this
> reality.
>
> So I think it would make sense to think about a way to make KafkaIO's
> EOS more accessible to Runners which support a different way of
> checkpointing.
>
> Cheers,
> Max
>
> PS: I found this document about RequiresStableInput [3], but IMHO
> defining an annotation only manifests the conceptual difference between
> the Runners.
>
>
> [1]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> [2]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> [3]
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>

Reply via email to