I'm not sure what a hard fail is. I probably have a shallow understanding, but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should establish the transaction and commit() is not called until after checkpoint finalization. Can you describe the way that it does not work a little bit more?
Kenn On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <ang...@gmail.com> wrote: > On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <k...@apache.org> wrote: > >> I believe the way you would implement the logic behind Flink's >> KafkaProducer would be to have two steps: >> >> 1. Start transaction >> 2. @RequiresStableInput Close transaction >> > > I see. What happens if closing the transaction fails in (2)? Flink's 2PC > requires that commit() should never hard fail once preCommit() succeeds. I > think that is cost of not having an extra shuffle. It is alright since this > policy has worked well for Flink so far. > > Overall, it will be great to have @RequiresStableInput support in Flink > runner. > > Raghu. > >> The FlinkRunner would need to insert the "wait until checkpoint >> finalization" logic wherever it sees @RequiresStableInput, which is already >> what it would have to do. >> >> This matches the KafkaProducer's logic - delay closing the transaction >> until checkpoint finalization. This answers my main question, which is "is >> @RequiresStableInput expressive enough to allow Beam-on-Flink to have >> exactly once behavior with the same performance characteristics as native >> Flink checkpoint finalization?" >> >> Kenn >> >> [1] https://github.com/apache/beam/pull/7955 >> >> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <re...@google.com> wrote: >> >>> >>> >>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com> wrote: >>> >>>> >>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>>>> KafkaProducer supports exactly-once. It simply commits the pending >>>>> transaction once it has completed a checkpoint. >>>> >>>> >>>> >>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once >>>>> semantics (EOS). I think it is questionable to exclude Runners from >>>>> inside a transform, but I see that the intention was to save users >>>>> from >>>>> surprises. >>>>> >>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>>>> KafkaProducer supports exactly-once. It simply commits the pending >>>>> transaction once it has completed a checkpoint. >>>>> >>>> >>>> >>>> When we discussed this in Aug 2017, the understanding was that 2 Phase >>>> commit utility in Flink used to implement Flink's Kafka EOS could not be >>>> implemented in Beam's context. >>>> See this message >>>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in >>>> that dev thread. Has anything changed in this regard? The whole thread is >>>> relevant to this topic and worth going through. >>>> >>> >>> I think that TwoPhaseCommit utility class wouldn't work. The Flink >>> runner would probably want to directly use notifySnapshotComplete in order >>> to implement @RequiresStableInput. >>> >>>> >>>> >>>>> >>>>> A checkpoint is realized by sending barriers through all channels >>>>> starting from the source until reaching all sinks. Every operator >>>>> persists its state once it has received a barrier on all its input >>>>> channels, it then forwards it to the downstream operators. >>>>> >>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]: >>>>> >>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds -> >>>>> GroupByKey -> ExactlyOnceWriter >>>>> >>>>> As I understood, Spark or Dataflow use the GroupByKey stages to >>>>> persist >>>>> the input. That is not required in Flink to be able to take a >>>>> consistent >>>>> snapshot of the pipeline. >>>>> >>>>> Basically, for Flink we don't need any of that magic that KafkaIO >>>>> does. >>>>> What we would need to support EOS is a way to tell the >>>>> ExactlyOnceWriter >>>>> (a DoFn) to commit once a checkpoint has completed. >>>> >>>> I know that the new version of SDF supports checkpointing which should >>>>> solve this issue. But there is still a lot of work to do to make this >>>>> reality. >>>>> >>>> >>>> I don't see how SDF solves this problem.. May be pseudo code would make >>>> more clear. But if helps, that is great! >>>> >>>> So I think it would make sense to think about a way to make KafkaIO's >>>>> EOS more accessible to Runners which support a different way of >>>>> checkpointing. >>>>> >>>> >>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think >>>> that will help many future exactly-once sinks.. and address fundamental >>>> incompatibility between Beam model and Flink's horizontal checkpointing for >>>> such applications. >>>> >>>> Raghu. >>>> >>>> >>>>> Cheers, >>>>> Max >>>>> >>>>> PS: I found this document about RequiresStableInput [3], but IMHO >>>>> defining an annotation only manifests the conceptual difference >>>>> between >>>>> the Runners. >>>>> >>>>> >>>>> [1] >>>>> >>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>>>> [2] >>>>> >>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>>>> [3] >>>>> >>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>>>> >>>>