I think I am fundamentally misunderstanding checkpointing in Flink. If you randomly generate shard ids, buffer those until finalization, finalize a checkpoint so that you never need to re-run that generation, isn't the result stable from that point onwards?
Kenn On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected]> wrote: > Fully agree. I think we can improve the situation drastically. For > KafkaIO EOS with Flink we need to make these two changes: > > 1) Introduce buffering while the checkpoint is being taken > 2) Replace the random shard id assignment with something deterministic > > However, we won't be able to provide full compatibility with > RequiresStableInput because Flink only guarantees stable input after a > checkpoint. RequiresStableInput requires input at any point in time to > be stable. IMHO the only way to achieve that is materializing output > which Flink does not currently support. > > KafkaIO does not need all the power of RequiresStableInput to achieve > EOS with Flink, but for the general case I don't see a good solution at > the moment. > > -Max > > On 01.03.19 16:45, Reuven Lax wrote: > > Yeah, the person who was working on it originally stopped working on > > Beam, and nobody else ever finished it. I think it is important to > > finish though. Many of the existing Sinks are only fully correct for > > Dataflow today, because they generate either Reshuffle or GroupByKey to > > ensure input stability before outputting (in many cases this code was > > inherited from before Beam existed). On Flink today, these sinks might > > occasionally produce duplicate output in the case of failures. > > > > Reuven > > > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected] > > <mailto:[email protected]>> wrote: > > > > Circling back to the RequiresStableInput annotation[1]. I've done > some > > protoyping to see how this could be integrated into Flink. I'm > > currently > > writing a test based on RequiresStableInput. > > > > I found out there are already checks in place at the Runners to > > throw in > > case transforms use RequiresStableInput and its not supported. > However, > > not a single transform actually uses the annotation. > > > > It seems that the effort stopped at some point? Would it make sense > to > > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We > > could then get rid of the whitelist. > > > > -Max > > > > [1] > > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM > > > > > > > > On 01.03.19 14:28, Maximilian Michels wrote: > > > Just realized that transactions do not spawn multiple elements in > > > KafkaExactlyOnceSink. So the proposed solution to stop processing > > > elements while a snapshot is pending would work. > > > > > > It is certainly not optimal in terms of performance for Flink and > > poses > > > problems when checkpoints take long to complete, but it would be > > > worthwhile to implement this to make use of the EOS feature. > > > > > > Thanks, > > > Max > > > > > > On 01.03.19 12:23, Maximilian Michels wrote: > > >> Thanks you for the prompt replies. It's great to see that there > is > > >> good understanding of how EOS in Flink works. > > >> > > >>> This is exactly what RequiresStableInput is supposed to do. On > the > > >>> Flink runner, this would be implemented by delaying processing > > until > > >>> the current checkpoint is done. > > >> > > >> I don't think that works because we have no control over the > Kafka > > >> transactions. Imagine: > > >> > > >> 1) ExactlyOnceWriter writes records to Kafka and commits, then > > starts > > >> a new transaction. > > >> 2) Flink checkpoints, delaying the processing of elements, the > > >> checkpoint fails. > > >> 3) We restore from an old checkpoint and will start writing > > duplicate > > >> data to Kafka. The de-duplication that the sink performs does not > > >> help, especially because the random shards ids might be assigned > > >> differently. > > >> > > >> IMHO we have to have control over commit to be able to provide > EOS. > > >> > > >>> When we discussed this in Aug 2017, the understanding was that 2 > > >>> Phase commit utility in Flink used to implement Flink's Kafka > EOS > > >>> could not be implemented in Beam's context. > > >> > > >> That's also my understanding, unless we change the interface. > > >> > > >>> I don't see how SDF solves this problem.. > > >> > > >> SDF has a checkpoint method which the Runner can call, but I > think > > >> that you are right, that the above problem would be the same. > > >> > > >>> Absolutely. I would love to support EOS in KakaIO for Flink. I > > think > > >>> that will help many future exactly-once sinks.. and address > > >>> fundamental incompatibility between Beam model and Flink's > > horizontal > > >>> checkpointing for such applications. > > >> > > >> Great :) > > >> > > >>> The FlinkRunner would need to insert the "wait until checkpoint > > >>> finalization" logic wherever it sees @RequiresStableInput, > > which is > > >>> already what it would have to do. > > >> > > >> I don't think that fixes the problem. See above example. > > >> > > >> Thanks, > > >> Max > > >> > > >> On 01.03.19 00:04, Raghu Angadi wrote: > > >>> > > >>> > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi <[email protected] > > <mailto:[email protected]> > > >>> <mailto:[email protected] <mailto:[email protected]>>> wrote: > > >>> > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles > > <[email protected] <mailto:[email protected]> > > >>> <mailto:[email protected] <mailto:[email protected]>>> wrote: > > >>> > > >>> I'm not sure what a hard fail is. I probably have a > shallow > > >>> understanding, but doesn't @RequiresStableInput work > > for 2PC? > > >>> The preCommit() phase should establish the transaction > and > > >>> commit() is not called until after checkpoint > > finalization. Can > > >>> you describe the way that it does not work a little bit > > more? > > >>> > > >>> > > >>> - preCommit() is called before checkpoint. Kafka EOS in > > Flink starts > > >>> the transaction before this and makes sure it flushes all > > records in > > >>> preCommit(). So far good. > > >>> - commit is called after checkpoint is persisted. Now, > imagine > > >>> commit() fails for some reason. There is no option to rerun > > the 1st > > >>> phase to write the records again in a new transaction. This > > is a > > >>> hard failure for the the job. In practice Flink might > > attempt to > > >>> commit again (not sure how many times), which is likely to > > fail and > > >>> eventually results in job failure. > > >>> > > >>> > > >>> In Apache Beam, the records could be stored in state, and can be > > >>> written inside commit() to work around this issue. It could have > > >>> scalability issues if checkpoints are not frequent enough in > Flink > > >>> runner. > > >>> > > >>> Raghu. > > >>> > > >>> > > >>> > > >>> Kenn > > >>> > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi > > <[email protected] <mailto:[email protected]> > > >>> <mailto:[email protected] <mailto:[email protected]>>> > wrote: > > >>> > > >>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles > > >>> <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > >>> > > >>> I believe the way you would implement the logic > > behind > > >>> Flink's KafkaProducer would be to have two > steps: > > >>> > > >>> 1. Start transaction > > >>> 2. @RequiresStableInput Close transaction > > >>> > > >>> > > >>> I see. What happens if closing the transaction > > fails in > > >>> (2)? Flink's 2PC requires that commit() should > > never hard > > >>> fail once preCommit() succeeds. I think that is > > cost of not > > >>> having an extra shuffle. It is alright since this > > policy has > > >>> worked well for Flink so far. > > >>> > > >>> Overall, it will be great to have > @RequiresStableInput > > >>> support in Flink runner. > > >>> > > >>> Raghu. > > >>> > > >>> The FlinkRunner would need to insert the "wait > > until > > >>> checkpoint finalization" logic wherever it > > >>> sees @RequiresStableInput, which is already > what it > > >>> would have to do. > > >>> > > >>> This matches the KafkaProducer's logic - delay > > closing > > >>> the transaction until checkpoint finalization. > This > > >>> answers my main question, which is "is > > >>> @RequiresStableInput expressive enough to allow > > >>> Beam-on-Flink to have exactly once behavior > > with the > > >>> same performance characteristics as native Flink > > >>> checkpoint finalization?" > > >>> > > >>> Kenn > > >>> > > >>> [1] https://github.com/apache/beam/pull/7955 > > >>> > > >>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax > > >>> <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > >>> > > >>> > > >>> > > >>> On Thu, Feb 28, 2019 at 10:41 AM Raghu > Angadi > > >>> <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > >>> > > >>> > > >>> Now why does the Flink Runner not > > support > > >>> KafkaIO EOS? Flink's native > > >>> KafkaProducer supports > exactly-once. It > > >>> simply commits the pending > > >>> transaction once it has completed a > > >>> checkpoint. > > >>> > > >>> > > >>> > > >>> On Thu, Feb 28, 2019 at 9:59 AM > Maximilian > > >>> Michels <[email protected] > > <mailto:[email protected]> <mailto:[email protected] <mailto: > [email protected]>>> > > >>> wrote: > > >>> > > >>> Hi, > > >>> > > >>> I came across KafkaIO's Runner > > whitelist [1] > > >>> for enabling exactly-once > > >>> semantics (EOS). I think it is > > questionable > > >>> to exclude Runners from > > >>> inside a transform, but I see that > the > > >>> intention was to save users from > > >>> surprises. > > >>> > > >>> Now why does the Flink Runner not > > support > > >>> KafkaIO EOS? Flink's native > > >>> KafkaProducer supports > exactly-once. It > > >>> simply commits the pending > > >>> transaction once it has completed a > > >>> checkpoint. > > >>> > > >>> > > >>> > > >>> When we discussed this in Aug 2017, the > > >>> understanding was that 2 Phase commit > > utility in > > >>> Flink used to implement Flink's Kafka > > EOS could > > >>> not be implemented in Beam's context. > > >>> See this message > > >>> <https://www.mail-archive.com/[email protected]/msg02664.html > > in > > >>> that dev thread. Has anything changed > > in this > > >>> regard? The whole thread is relevant to > > this > > >>> topic and worth going through. > > >>> > > >>> I think that TwoPhaseCommit utility class > > wouldn't > > >>> work. The Flink runner would probably want > to > > >>> directly use notifySnapshotComplete in > order to > > >>> implement @RequiresStableInput. > > >>> > > >>> > > >>> A checkpoint is realized by sending > > barriers > > >>> through all channels > > >>> starting from the source until > > reaching all > > >>> sinks. Every operator > > >>> persists its state once it has > > received a > > >>> barrier on all its input > > >>> channels, it then forwards it to the > > >>> downstream operators. > > >>> > > >>> The architecture of Beam's > > >>> KafkaExactlyOnceSink is as > follows[2]: > > >>> > > >>> Input -> AssignRandomShardIds -> > > GroupByKey > > >>> -> AssignSequenceIds -> > > >>> GroupByKey -> ExactlyOnceWriter > > >>> > > >>> As I understood, Spark or Dataflow > > use the > > >>> GroupByKey stages to persist > > >>> the input. That is not required in > > Flink to > > >>> be able to take a consistent > > >>> snapshot of the pipeline. > > >>> > > >>> Basically, for Flink we don't need > > any of > > >>> that magic that KafkaIO does. > > >>> What we would need to support EOS > > is a way > > >>> to tell the ExactlyOnceWriter > > >>> (a DoFn) to commit once a > > checkpoint has > > >>> completed. > > >>> > > >>> I know that the new version of SDF > > supports > > >>> checkpointing which should > > >>> solve this issue. But there is > > still a lot > > >>> of work to do to make this > > >>> reality. > > >>> > > >>> > > >>> I don't see how SDF solves this > > problem.. May be > > >>> pseudo code would make more clear. But > if > > >>> helps, that is great! > > >>> > > >>> So I think it would make sense to > think > > >>> about a way to make KafkaIO's > > >>> EOS more accessible to Runners > > which support > > >>> a different way of > > >>> checkpointing. > > >>> > > >>> > > >>> Absolutely. I would love to support EOS > in > > >>> KakaIO for Flink. I think that will > > help many > > >>> future exactly-once sinks.. and address > > >>> fundamental incompatibility between > > Beam model > > >>> and Flink's horizontal checkpointing > > for such > > >>> applications. > > >>> > > >>> Raghu. > > >>> > > >>> Cheers, > > >>> Max > > >>> > > >>> PS: I found this document about > > >>> RequiresStableInput [3], but IMHO > > >>> defining an annotation only > > manifests the > > >>> conceptual difference between > > >>> the Runners. > > >>> > > >>> > > >>> [1] > > >>> > > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 > > > > >>> > > >>> [2] > > >>> > > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 > > > > >>> > > >>> [3] > > >>> > > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM > > > > >>> > > >>> > > >
