On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <k...@apache.org> wrote:
> I believe the way you would implement the logic behind Flink's > KafkaProducer would be to have two steps: > > 1. Start transaction > 2. @RequiresStableInput Close transaction > I see. What happens if closing the transaction fails in (2)? Flink's 2PC requires that commit() should never hard fail once preCommit() succeeds. I think that is cost of not having an extra shuffle. It is alright since this policy has worked well for Flink so far. Overall, it will be great to have @RequiresStableInput support in Flink runner. Raghu. > The FlinkRunner would need to insert the "wait until checkpoint > finalization" logic wherever it sees @RequiresStableInput, which is already > what it would have to do. > > This matches the KafkaProducer's logic - delay closing the transaction > until checkpoint finalization. This answers my main question, which is "is > @RequiresStableInput expressive enough to allow Beam-on-Flink to have > exactly once behavior with the same performance characteristics as native > Flink checkpoint finalization?" > > Kenn > > [1] https://github.com/apache/beam/pull/7955 > > On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <re...@google.com> wrote: > >> >> >> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com> wrote: >> >>> >>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>>> KafkaProducer supports exactly-once. It simply commits the pending >>>> transaction once it has completed a checkpoint. >>> >>> >>> >>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once >>>> semantics (EOS). I think it is questionable to exclude Runners from >>>> inside a transform, but I see that the intention was to save users from >>>> surprises. >>>> >>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>>> KafkaProducer supports exactly-once. It simply commits the pending >>>> transaction once it has completed a checkpoint. >>>> >>> >>> >>> When we discussed this in Aug 2017, the understanding was that 2 Phase >>> commit utility in Flink used to implement Flink's Kafka EOS could not be >>> implemented in Beam's context. >>> See this message >>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in >>> that dev thread. Has anything changed in this regard? The whole thread is >>> relevant to this topic and worth going through. >>> >> >> I think that TwoPhaseCommit utility class wouldn't work. The Flink runner >> would probably want to directly use notifySnapshotComplete in order to >> implement @RequiresStableInput. >> >>> >>> >>>> >>>> A checkpoint is realized by sending barriers through all channels >>>> starting from the source until reaching all sinks. Every operator >>>> persists its state once it has received a barrier on all its input >>>> channels, it then forwards it to the downstream operators. >>>> >>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]: >>>> >>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds -> >>>> GroupByKey -> ExactlyOnceWriter >>>> >>>> As I understood, Spark or Dataflow use the GroupByKey stages to persist >>>> the input. That is not required in Flink to be able to take a >>>> consistent >>>> snapshot of the pipeline. >>>> >>>> Basically, for Flink we don't need any of that magic that KafkaIO does. >>>> What we would need to support EOS is a way to tell the >>>> ExactlyOnceWriter >>>> (a DoFn) to commit once a checkpoint has completed. >>> >>> I know that the new version of SDF supports checkpointing which should >>>> solve this issue. But there is still a lot of work to do to make this >>>> reality. >>>> >>> >>> I don't see how SDF solves this problem.. May be pseudo code would make >>> more clear. But if helps, that is great! >>> >>> So I think it would make sense to think about a way to make KafkaIO's >>>> EOS more accessible to Runners which support a different way of >>>> checkpointing. >>>> >>> >>> Absolutely. I would love to support EOS in KakaIO for Flink. I think >>> that will help many future exactly-once sinks.. and address fundamental >>> incompatibility between Beam model and Flink's horizontal checkpointing for >>> such applications. >>> >>> Raghu. >>> >>> >>>> Cheers, >>>> Max >>>> >>>> PS: I found this document about RequiresStableInput [3], but IMHO >>>> defining an annotation only manifests the conceptual difference between >>>> the Runners. >>>> >>>> >>>> [1] >>>> >>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>>> [2] >>>> >>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>>> [3] >>>> >>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>>> >>>