On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles <[email protected]> wrote:
> I'm not sure what a hard fail is. I probably have a shallow understanding, > but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should > establish the transaction and commit() is not called until after checkpoint > finalization. Can you describe the way that it does not work a little bit > more? > - preCommit() is called before checkpoint. Kafka EOS in Flink starts the transaction before this and makes sure it flushes all records in preCommit(). So far good. - commit is called after checkpoint is persisted. Now, imagine commit() fails for some reason. There is no option to rerun the 1st phase to write the records again in a new transaction. This is a hard failure for the the job. In practice Flink might attempt to commit again (not sure how many times), which is likely to fail and eventually results in job failure. > Kenn > > On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <[email protected]> wrote: > >> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <[email protected]> wrote: >> >>> I believe the way you would implement the logic behind Flink's >>> KafkaProducer would be to have two steps: >>> >>> 1. Start transaction >>> 2. @RequiresStableInput Close transaction >>> >> >> I see. What happens if closing the transaction fails in (2)? Flink's 2PC >> requires that commit() should never hard fail once preCommit() succeeds. I >> think that is cost of not having an extra shuffle. It is alright since this >> policy has worked well for Flink so far. >> >> Overall, it will be great to have @RequiresStableInput support in Flink >> runner. >> >> Raghu. >> >>> The FlinkRunner would need to insert the "wait until checkpoint >>> finalization" logic wherever it sees @RequiresStableInput, which is already >>> what it would have to do. >>> >>> This matches the KafkaProducer's logic - delay closing the transaction >>> until checkpoint finalization. This answers my main question, which is "is >>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have >>> exactly once behavior with the same performance characteristics as native >>> Flink checkpoint finalization?" >>> >>> Kenn >>> >>> [1] https://github.com/apache/beam/pull/7955 >>> >>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <[email protected]> wrote: >>> >>>> >>>> >>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <[email protected]> wrote: >>>> >>>>> >>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>>>>> KafkaProducer supports exactly-once. It simply commits the pending >>>>>> transaction once it has completed a checkpoint. >>>>> >>>>> >>>>> >>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I came across KafkaIO's Runner whitelist [1] for enabling >>>>>> exactly-once >>>>>> semantics (EOS). I think it is questionable to exclude Runners from >>>>>> inside a transform, but I see that the intention was to save users >>>>>> from >>>>>> surprises. >>>>>> >>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native >>>>>> KafkaProducer supports exactly-once. It simply commits the pending >>>>>> transaction once it has completed a checkpoint. >>>>>> >>>>> >>>>> >>>>> When we discussed this in Aug 2017, the understanding was that 2 Phase >>>>> commit utility in Flink used to implement Flink's Kafka EOS could not be >>>>> implemented in Beam's context. >>>>> See this message >>>>> <https://www.mail-archive.com/[email protected]/msg02664.html> in >>>>> that dev thread. Has anything changed in this regard? The whole thread is >>>>> relevant to this topic and worth going through. >>>>> >>>> >>>> I think that TwoPhaseCommit utility class wouldn't work. The Flink >>>> runner would probably want to directly use notifySnapshotComplete in order >>>> to implement @RequiresStableInput. >>>> >>>>> >>>>> >>>>>> >>>>>> A checkpoint is realized by sending barriers through all channels >>>>>> starting from the source until reaching all sinks. Every operator >>>>>> persists its state once it has received a barrier on all its input >>>>>> channels, it then forwards it to the downstream operators. >>>>>> >>>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]: >>>>>> >>>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds -> >>>>>> GroupByKey -> ExactlyOnceWriter >>>>>> >>>>>> As I understood, Spark or Dataflow use the GroupByKey stages to >>>>>> persist >>>>>> the input. That is not required in Flink to be able to take a >>>>>> consistent >>>>>> snapshot of the pipeline. >>>>>> >>>>>> Basically, for Flink we don't need any of that magic that KafkaIO >>>>>> does. >>>>>> What we would need to support EOS is a way to tell the >>>>>> ExactlyOnceWriter >>>>>> (a DoFn) to commit once a checkpoint has completed. >>>>> >>>>> I know that the new version of SDF supports checkpointing which should >>>>>> solve this issue. But there is still a lot of work to do to make this >>>>>> reality. >>>>>> >>>>> >>>>> I don't see how SDF solves this problem.. May be pseudo code would >>>>> make more clear. But if helps, that is great! >>>>> >>>>> So I think it would make sense to think about a way to make KafkaIO's >>>>>> EOS more accessible to Runners which support a different way of >>>>>> checkpointing. >>>>>> >>>>> >>>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think >>>>> that will help many future exactly-once sinks.. and address fundamental >>>>> incompatibility between Beam model and Flink's horizontal checkpointing >>>>> for >>>>> such applications. >>>>> >>>>> Raghu. >>>>> >>>>> >>>>>> Cheers, >>>>>> Max >>>>>> >>>>>> PS: I found this document about RequiresStableInput [3], but IMHO >>>>>> defining an annotation only manifests the conceptual difference >>>>>> between >>>>>> the Runners. >>>>>> >>>>>> >>>>>> [1] >>>>>> >>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>>>>> [2] >>>>>> >>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>>>>> [3] >>>>>> >>>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>>>>> >>>>>
