On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels <[email protected]> wrote:
> > If you randomly generate shard ids, buffer those until finalization, > finalize a checkpoint so that you never need to re-run that generation, > isn't the result stable from that point onwards? > > Yes, you're right :) For @RequiresStableInput we will always have to > buffer and emit only after a finalized checkpoint. > > 2PC is the better model for Flink, at least in the case of Kafka because > it can offload the buffering to Kafka via its transactions. > RequiresStableInput is a more general solution and it is feasible to > support it in the Flink Runner. However, we have to make sure that > checkpoints are taken frequently to avoid too much memory pressure. > It would be nice to also support 2PC in Beam, i.e. the Runner could > choose to either buffer/materialize input or do a 2PC, but it would also > break the purity of the existing model. > Still digging in to details. I think the "generate random shard ids & buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It doesn't have to be done that way if the target system has special support like Kafka does. For Kafka, can you get the 2PC behavior like this: Upstream step: open a transaction, write a bunch of stuff to it (let Kafka do the buffering) and emit a transaction identifier. Downstream @RequiresStableInput step: close transaction. Again, I may be totally missing something, but I think that this has identical characteristics: - Kafka does the buffering - checkpoint finalization is the driver of latency - failure before checkpoint finalization means the old transaction sits around and times out eventually - failure after checkpoint finalization causes retry with the same transaction identifier Kenn > > On 01.03.19 19:42, Kenneth Knowles wrote: > > I think I am fundamentally misunderstanding checkpointing in Flink. > > > > If you randomly generate shard ids, buffer those until finalization, > > finalize a checkpoint so that you never need to re-run that generation, > > isn't the result stable from that point onwards? > > > > Kenn > > > > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected] > > <mailto:[email protected]>> wrote: > > > > Fully agree. I think we can improve the situation drastically. For > > KafkaIO EOS with Flink we need to make these two changes: > > > > 1) Introduce buffering while the checkpoint is being taken > > 2) Replace the random shard id assignment with something > deterministic > > > > However, we won't be able to provide full compatibility with > > RequiresStableInput because Flink only guarantees stable input after > a > > checkpoint. RequiresStableInput requires input at any point in time > to > > be stable. IMHO the only way to achieve that is materializing output > > which Flink does not currently support. > > > > KafkaIO does not need all the power of RequiresStableInput to achieve > > EOS with Flink, but for the general case I don't see a good solution > at > > the moment. > > > > -Max > > > > On 01.03.19 16:45, Reuven Lax wrote: > > > Yeah, the person who was working on it originally stopped working > on > > > Beam, and nobody else ever finished it. I think it is important to > > > finish though. Many of the existing Sinks are only fully correct > for > > > Dataflow today, because they generate either Reshuffle or > > GroupByKey to > > > ensure input stability before outputting (in many cases this code > > was > > > inherited from before Beam existed). On Flink today, these sinks > > might > > > occasionally produce duplicate output in the case of failures. > > > > > > Reuven > > > > > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > > > Circling back to the RequiresStableInput annotation[1]. I've > > done some > > > protoyping to see how this could be integrated into Flink. I'm > > > currently > > > writing a test based on RequiresStableInput. > > > > > > I found out there are already checks in place at the Runners > to > > > throw in > > > case transforms use RequiresStableInput and its not > > supported. However, > > > not a single transform actually uses the annotation. > > > > > > It seems that the effort stopped at some point? Would it make > > sense to > > > start annotating KafkaExactlyOnceSink with > > @RequiresStableInput? We > > > could then get rid of the whitelist. > > > > > > -Max > > > > > > [1] > > > > > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM > > > > > > > > > > > > On 01.03.19 14:28, Maximilian Michels wrote: > > > > Just realized that transactions do not spawn multiple > > elements in > > > > KafkaExactlyOnceSink. So the proposed solution to stop > > processing > > > > elements while a snapshot is pending would work. > > > > > > > > It is certainly not optimal in terms of performance for > > Flink and > > > poses > > > > problems when checkpoints take long to complete, but it > > would be > > > > worthwhile to implement this to make use of the EOS > feature. > > > > > > > > Thanks, > > > > Max > > > > > > > > On 01.03.19 12:23, Maximilian Michels wrote: > > > >> Thanks you for the prompt replies. It's great to see that > > there is > > > >> good understanding of how EOS in Flink works. > > > >> > > > >>> This is exactly what RequiresStableInput is supposed to > > do. On the > > > >>> Flink runner, this would be implemented by delaying > > processing > > > until > > > >>> the current checkpoint is done. > > > >> > > > >> I don't think that works because we have no control over > > the Kafka > > > >> transactions. Imagine: > > > >> > > > >> 1) ExactlyOnceWriter writes records to Kafka and commits, > > then > > > starts > > > >> a new transaction. > > > >> 2) Flink checkpoints, delaying the processing of > > elements, the > > > >> checkpoint fails. > > > >> 3) We restore from an old checkpoint and will start > writing > > > duplicate > > > >> data to Kafka. The de-duplication that the sink performs > > does not > > > >> help, especially because the random shards ids might be > > assigned > > > >> differently. > > > >> > > > >> IMHO we have to have control over commit to be able to > > provide EOS. > > > >> > > > >>> When we discussed this in Aug 2017, the understanding > > was that 2 > > > >>> Phase commit utility in Flink used to implement Flink's > > Kafka EOS > > > >>> could not be implemented in Beam's context. > > > >> > > > >> That's also my understanding, unless we change the > interface. > > > >> > > > >>> I don't see how SDF solves this problem.. > > > >> > > > >> SDF has a checkpoint method which the Runner can call, > > but I think > > > >> that you are right, that the above problem would be the > same. > > > >> > > > >>> Absolutely. I would love to support EOS in KakaIO for > > Flink. I > > > think > > > >>> that will help many future exactly-once sinks.. and > address > > > >>> fundamental incompatibility between Beam model and > Flink's > > > horizontal > > > >>> checkpointing for such applications. > > > >> > > > >> Great :) > > > >> > > > >>> The FlinkRunner would need to insert the "wait until > > checkpoint > > > >>> finalization" logic wherever it sees > @RequiresStableInput, > > > which is > > > >>> already what it would have to do. > > > >> > > > >> I don't think that fixes the problem. See above example. > > > >> > > > >> Thanks, > > > >> Max > > > >> > > > >> On 01.03.19 00:04, Raghu Angadi wrote: > > > >>> > > > >>> > > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>> > > > >>> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >>> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> I'm not sure what a hard fail is. I probably > > have a shallow > > > >>> understanding, but doesn't @RequiresStableInput > work > > > for 2PC? > > > >>> The preCommit() phase should establish the > > transaction and > > > >>> commit() is not called until after checkpoint > > > finalization. Can > > > >>> you describe the way that it does not work a > > little bit > > > more? > > > >>> > > > >>> > > > >>> - preCommit() is called before checkpoint. Kafka EOS > in > > > Flink starts > > > >>> the transaction before this and makes sure it > > flushes all > > > records in > > > >>> preCommit(). So far good. > > > >>> - commit is called after checkpoint is persisted. > > Now, imagine > > > >>> commit() fails for some reason. There is no option > > to rerun > > > the 1st > > > >>> phase to write the records again in a new > > transaction. This > > > is a > > > >>> hard failure for the the job. In practice Flink might > > > attempt to > > > >>> commit again (not sure how many times), which is > > likely to > > > fail and > > > >>> eventually results in job failure. > > > >>> > > > >>> > > > >>> In Apache Beam, the records could be stored in state, > > and can be > > > >>> written inside commit() to work around this issue. It > > could have > > > >>> scalability issues if checkpoints are not frequent > > enough in Flink > > > >>> runner. > > > >>> > > > >>> Raghu. > > > >>> > > > >>> > > > >>> > > > >>> Kenn > > > >>> > > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >>> <mailto:[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth > Knowles > > > >>> <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> I believe the way you would implement > > the logic > > > behind > > > >>> Flink's KafkaProducer would be to have > > two steps: > > > >>> > > > >>> 1. Start transaction > > > >>> 2. @RequiresStableInput Close transaction > > > >>> > > > >>> > > > >>> I see. What happens if closing the > transaction > > > fails in > > > >>> (2)? Flink's 2PC requires that commit() > should > > > never hard > > > >>> fail once preCommit() succeeds. I think that > is > > > cost of not > > > >>> having an extra shuffle. It is alright since > > this > > > policy has > > > >>> worked well for Flink so far. > > > >>> > > > >>> Overall, it will be great to have > > @RequiresStableInput > > > >>> support in Flink runner. > > > >>> > > > >>> Raghu. > > > >>> > > > >>> The FlinkRunner would need to insert the > > "wait > > > until > > > >>> checkpoint finalization" logic wherever > it > > > >>> sees @RequiresStableInput, which is > > already what it > > > >>> would have to do. > > > >>> > > > >>> This matches the KafkaProducer's logic - > > delay > > > closing > > > >>> the transaction until checkpoint > > finalization. This > > > >>> answers my main question, which is "is > > > >>> @RequiresStableInput expressive enough > > to allow > > > >>> Beam-on-Flink to have exactly once > behavior > > > with the > > > >>> same performance characteristics as > > native Flink > > > >>> checkpoint finalization?" > > > >>> > > > >>> Kenn > > > >>> > > > >>> [1] > https://github.com/apache/beam/pull/7955 > > > >>> > > > >>> On Thu, Feb 28, 2019 at 10:43 AM Reuven > Lax > > > >>> <[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> > > > >>> > > > >>> On Thu, Feb 28, 2019 at 10:41 AM > > Raghu Angadi > > > >>> <[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> > > > >>> Now why does the Flink > > Runner not > > > support > > > >>> KafkaIO EOS? Flink's native > > > >>> KafkaProducer supports > > exactly-once. It > > > >>> simply commits the pending > > > >>> transaction once it has > > completed a > > > >>> checkpoint. > > > >>> > > > >>> > > > >>> > > > >>> On Thu, Feb 28, 2019 at 9:59 AM > > Maximilian > > > >>> Michels <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>> > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> > > > >>> wrote: > > > >>> > > > >>> Hi, > > > >>> > > > >>> I came across KafkaIO's > Runner > > > whitelist [1] > > > >>> for enabling exactly-once > > > >>> semantics (EOS). I think it > is > > > questionable > > > >>> to exclude Runners from > > > >>> inside a transform, but I > > see that the > > > >>> intention was to save users > from > > > >>> surprises. > > > >>> > > > >>> Now why does the Flink > > Runner not > > > support > > > >>> KafkaIO EOS? Flink's native > > > >>> KafkaProducer supports > > exactly-once. It > > > >>> simply commits the pending > > > >>> transaction once it has > > completed a > > > >>> checkpoint. > > > >>> > > > >>> > > > >>> > > > >>> When we discussed this in Aug > > 2017, the > > > >>> understanding was that 2 Phase > > commit > > > utility in > > > >>> Flink used to implement Flink's > > Kafka > > > EOS could > > > >>> not be implemented in Beam's > > context. > > > >>> See this message > > > >>> > > <https://www.mail-archive.com/[email protected]/msg02664.html> in > > > >>> that dev thread. Has anything > > changed > > > in this > > > >>> regard? The whole thread is > > relevant to > > > this > > > >>> topic and worth going through. > > > >>> > > > >>> I think that TwoPhaseCommit utility > > class > > > wouldn't > > > >>> work. The Flink runner would > > probably want to > > > >>> directly use notifySnapshotComplete > > in order to > > > >>> implement @RequiresStableInput. > > > >>> > > > >>> > > > >>> A checkpoint is realized by > > sending > > > barriers > > > >>> through all channels > > > >>> starting from the source > until > > > reaching all > > > >>> sinks. Every operator > > > >>> persists its state once it > has > > > received a > > > >>> barrier on all its input > > > >>> channels, it then forwards > > it to the > > > >>> downstream operators. > > > >>> > > > >>> The architecture of Beam's > > > >>> KafkaExactlyOnceSink is as > > follows[2]: > > > >>> > > > >>> Input -> > AssignRandomShardIds -> > > > GroupByKey > > > >>> -> AssignSequenceIds -> > > > >>> GroupByKey -> > ExactlyOnceWriter > > > >>> > > > >>> As I understood, Spark or > > Dataflow > > > use the > > > >>> GroupByKey stages to persist > > > >>> the input. That is not > > required in > > > Flink to > > > >>> be able to take a consistent > > > >>> snapshot of the pipeline. > > > >>> > > > >>> Basically, for Flink we > > don't need > > > any of > > > >>> that magic that KafkaIO does. > > > >>> What we would need to > > support EOS > > > is a way > > > >>> to tell the ExactlyOnceWriter > > > >>> (a DoFn) to commit once a > > > checkpoint has > > > >>> completed. > > > >>> > > > >>> I know that the new version > > of SDF > > > supports > > > >>> checkpointing which should > > > >>> solve this issue. But there > is > > > still a lot > > > >>> of work to do to make this > > > >>> reality. > > > >>> > > > >>> > > > >>> I don't see how SDF solves this > > > problem.. May be > > > >>> pseudo code would make more > > clear. But if > > > >>> helps, that is great! > > > >>> > > > >>> So I think it would make > > sense to think > > > >>> about a way to make KafkaIO's > > > >>> EOS more accessible to > Runners > > > which support > > > >>> a different way of > > > >>> checkpointing. > > > >>> > > > >>> > > > >>> Absolutely. I would love to > > support EOS in > > > >>> KakaIO for Flink. I think that > will > > > help many > > > >>> future exactly-once sinks.. and > > address > > > >>> fundamental incompatibility > between > > > Beam model > > > >>> and Flink's horizontal > checkpointing > > > for such > > > >>> applications. > > > >>> > > > >>> Raghu. > > > >>> > > > >>> Cheers, > > > >>> Max > > > >>> > > > >>> PS: I found this document > about > > > >>> RequiresStableInput [3], but > > IMHO > > > >>> defining an annotation only > > > manifests the > > > >>> conceptual difference between > > > >>> the Runners. > > > >>> > > > >>> > > > >>> [1] > > > >>> > > > > > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 > > > > > > >>> > > > >>> [2] > > > >>> > > > > > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 > > > > > > >>> > > > >>> [3] > > > >>> > > > > > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM > > > > > > >>> > > > >>> > > > > > >
