I believe the way you would implement the logic behind Flink's KafkaProducer would be to have two steps:
1. Start transaction 2. @RequiresStableInput Close transaction The FlinkRunner would need to insert the "wait until checkpoint finalization" logic wherever it sees @RequiresStableInput, which is already what it would have to do. This matches the KafkaProducer's logic - delay closing the transaction until checkpoint finalization. This answers my main question, which is "is @RequiresStableInput expressive enough to allow Beam-on-Flink to have exactly once behavior with the same performance characteristics as native Flink checkpoint finalization?" Kenn [1] https://github.com/apache/beam/pull/7955 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <[email protected]> wrote: > > > On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <[email protected]> wrote: > >> >> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>> KafkaProducer supports exactly-once. It simply commits the pending >>> transaction once it has completed a checkpoint. >> >> >> >> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <[email protected]> >> wrote: >> >>> Hi, >>> >>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once >>> semantics (EOS). I think it is questionable to exclude Runners from >>> inside a transform, but I see that the intention was to save users from >>> surprises. >>> >>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>> KafkaProducer supports exactly-once. It simply commits the pending >>> transaction once it has completed a checkpoint. >>> >> >> >> When we discussed this in Aug 2017, the understanding was that 2 Phase >> commit utility in Flink used to implement Flink's Kafka EOS could not be >> implemented in Beam's context. >> See this message >> <https://www.mail-archive.com/[email protected]/msg02664.html> in that >> dev thread. Has anything changed in this regard? The whole thread is >> relevant to this topic and worth going through. >> > > I think that TwoPhaseCommit utility class wouldn't work. The Flink runner > would probably want to directly use notifySnapshotComplete in order to > implement @RequiresStableInput. > >> >> >>> >>> A checkpoint is realized by sending barriers through all channels >>> starting from the source until reaching all sinks. Every operator >>> persists its state once it has received a barrier on all its input >>> channels, it then forwards it to the downstream operators. >>> >>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]: >>> >>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds -> >>> GroupByKey -> ExactlyOnceWriter >>> >>> As I understood, Spark or Dataflow use the GroupByKey stages to persist >>> the input. That is not required in Flink to be able to take a consistent >>> snapshot of the pipeline. >>> >>> Basically, for Flink we don't need any of that magic that KafkaIO does. >>> What we would need to support EOS is a way to tell the ExactlyOnceWriter >>> (a DoFn) to commit once a checkpoint has completed. >> >> I know that the new version of SDF supports checkpointing which should >>> solve this issue. But there is still a lot of work to do to make this >>> reality. >>> >> >> I don't see how SDF solves this problem.. May be pseudo code would make >> more clear. But if helps, that is great! >> >> So I think it would make sense to think about a way to make KafkaIO's >>> EOS more accessible to Runners which support a different way of >>> checkpointing. >>> >> >> Absolutely. I would love to support EOS in KakaIO for Flink. I think that >> will help many future exactly-once sinks.. and address fundamental >> incompatibility between Beam model and Flink's horizontal checkpointing for >> such applications. >> >> Raghu. >> >> >>> Cheers, >>> Max >>> >>> PS: I found this document about RequiresStableInput [3], but IMHO >>> defining an annotation only manifests the conceptual difference between >>> the Runners. >>> >>> >>> [1] >>> >>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>> [2] >>> >>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>> [3] >>> >>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>> >>
