This is exactly what RequiresStableInput is supposed to do. On the Flink runner, this would be implemented by delaying processing until the current checkpoint is done . In fact many sinks are probably subtly broken on the Flink runner today without RequiresStableInput, so we really need to finish this work and add a Flink implementation of it.
Reuven On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org> wrote: > Hi, > > I came across KafkaIO's Runner whitelist [1] for enabling exactly-once > semantics (EOS). I think it is questionable to exclude Runners from > inside a transform, but I see that the intention was to save users from > surprises. > > Now why does the Flink Runner not support KafkaIO EOS? Flink's native > KafkaProducer supports exactly-once. It simply commits the pending > transaction once it has completed a checkpoint. > > A checkpoint is realized by sending barriers through all channels > starting from the source until reaching all sinks. Every operator > persists its state once it has received a barrier on all its input > channels, it then forwards it to the downstream operators. > > The architecture of Beam's KafkaExactlyOnceSink is as follows[2]: > > Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds -> > GroupByKey -> ExactlyOnceWriter > > As I understood, Spark or Dataflow use the GroupByKey stages to persist > the input. That is not required in Flink to be able to take a consistent > snapshot of the pipeline. > > Basically, for Flink we don't need any of that magic that KafkaIO does. > What we would need to support EOS is a way to tell the ExactlyOnceWriter > (a DoFn) to commit once a checkpoint has completed. > > I know that the new version of SDF supports checkpointing which should > solve this issue. But there is still a lot of work to do to make this > reality. > > So I think it would make sense to think about a way to make KafkaIO's > EOS more accessible to Runners which support a different way of > checkpointing. > > Cheers, > Max > > PS: I found this document about RequiresStableInput [3], but IMHO > defining an annotation only manifests the conceptual difference between > the Runners. > > > [1] > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 > [2] > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 > [3] > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >