On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles <[email protected]> wrote:
> > > On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels <[email protected]> wrote: > >> > If you randomly generate shard ids, buffer those until finalization, >> finalize a checkpoint so that you never need to re-run that generation, >> isn't the result stable from that point onwards? >> >> Yes, you're right :) For @RequiresStableInput we will always have to >> buffer and emit only after a finalized checkpoint. >> >> 2PC is the better model for Flink, at least in the case of Kafka because >> it can offload the buffering to Kafka via its transactions. >> RequiresStableInput is a more general solution and it is feasible to >> support it in the Flink Runner. However, we have to make sure that >> checkpoints are taken frequently to avoid too much memory pressure. > > >> It would be nice to also support 2PC in Beam, i.e. the Runner could >> choose to either buffer/materialize input or do a 2PC, but it would also >> break the purity of the existing model. >> > > Still digging in to details. I think the "generate random shard ids & > buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It > doesn't have to be done that way if the target system has special support > like Kafka does. > > For Kafka, can you get the 2PC behavior like this: Upstream step: open a > transaction, write a bunch of stuff to it (let Kafka do the buffering) and > emit a transaction identifier. Downstream @RequiresStableInput step: close > transaction. Again, I may be totally missing something, but I think that > this has identical characteristics: > Does Kafka garbage collect this eventually in the case where you crash and start again with a different transaction identifier? > - Kafka does the buffering > - checkpoint finalization is the driver of latency > - failure before checkpoint finalization means the old transaction sits > around and times out eventually > - failure after checkpoint finalization causes retry with the same > transaction identifier > > Kenn > > >> >> On 01.03.19 19:42, Kenneth Knowles wrote: >> > I think I am fundamentally misunderstanding checkpointing in Flink. >> > >> > If you randomly generate shard ids, buffer those until finalization, >> > finalize a checkpoint so that you never need to re-run that generation, >> > isn't the result stable from that point onwards? >> > >> > Kenn >> > >> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Fully agree. I think we can improve the situation drastically. For >> > KafkaIO EOS with Flink we need to make these two changes: >> > >> > 1) Introduce buffering while the checkpoint is being taken >> > 2) Replace the random shard id assignment with something >> deterministic >> > >> > However, we won't be able to provide full compatibility with >> > RequiresStableInput because Flink only guarantees stable input >> after a >> > checkpoint. RequiresStableInput requires input at any point in time >> to >> > be stable. IMHO the only way to achieve that is materializing output >> > which Flink does not currently support. >> > >> > KafkaIO does not need all the power of RequiresStableInput to >> achieve >> > EOS with Flink, but for the general case I don't see a good >> solution at >> > the moment. >> > >> > -Max >> > >> > On 01.03.19 16:45, Reuven Lax wrote: >> > > Yeah, the person who was working on it originally stopped >> working on >> > > Beam, and nobody else ever finished it. I think it is important >> to >> > > finish though. Many of the existing Sinks are only fully correct >> for >> > > Dataflow today, because they generate either Reshuffle or >> > GroupByKey to >> > > ensure input stability before outputting (in many cases this code >> > was >> > > inherited from before Beam existed). On Flink today, these sinks >> > might >> > > occasionally produce duplicate output in the case of failures. >> > > >> > > Reuven >> > > >> > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels < >> [email protected] >> > <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: >> > > >> > > Circling back to the RequiresStableInput annotation[1]. I've >> > done some >> > > protoyping to see how this could be integrated into Flink. >> I'm >> > > currently >> > > writing a test based on RequiresStableInput. >> > > >> > > I found out there are already checks in place at the Runners >> to >> > > throw in >> > > case transforms use RequiresStableInput and its not >> > supported. However, >> > > not a single transform actually uses the annotation. >> > > >> > > It seems that the effort stopped at some point? Would it make >> > sense to >> > > start annotating KafkaExactlyOnceSink with >> > @RequiresStableInput? We >> > > could then get rid of the whitelist. >> > > >> > > -Max >> > > >> > > [1] >> > > >> > >> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >> > > >> > > >> > > >> > > On 01.03.19 14:28, Maximilian Michels wrote: >> > > > Just realized that transactions do not spawn multiple >> > elements in >> > > > KafkaExactlyOnceSink. So the proposed solution to stop >> > processing >> > > > elements while a snapshot is pending would work. >> > > > >> > > > It is certainly not optimal in terms of performance for >> > Flink and >> > > poses >> > > > problems when checkpoints take long to complete, but it >> > would be >> > > > worthwhile to implement this to make use of the EOS >> feature. >> > > > >> > > > Thanks, >> > > > Max >> > > > >> > > > On 01.03.19 12:23, Maximilian Michels wrote: >> > > >> Thanks you for the prompt replies. It's great to see that >> > there is >> > > >> good understanding of how EOS in Flink works. >> > > >> >> > > >>> This is exactly what RequiresStableInput is supposed to >> > do. On the >> > > >>> Flink runner, this would be implemented by delaying >> > processing >> > > until >> > > >>> the current checkpoint is done. >> > > >> >> > > >> I don't think that works because we have no control over >> > the Kafka >> > > >> transactions. Imagine: >> > > >> >> > > >> 1) ExactlyOnceWriter writes records to Kafka and commits, >> > then >> > > starts >> > > >> a new transaction. >> > > >> 2) Flink checkpoints, delaying the processing of >> > elements, the >> > > >> checkpoint fails. >> > > >> 3) We restore from an old checkpoint and will start >> writing >> > > duplicate >> > > >> data to Kafka. The de-duplication that the sink performs >> > does not >> > > >> help, especially because the random shards ids might be >> > assigned >> > > >> differently. >> > > >> >> > > >> IMHO we have to have control over commit to be able to >> > provide EOS. >> > > >> >> > > >>> When we discussed this in Aug 2017, the understanding >> > was that 2 >> > > >>> Phase commit utility in Flink used to implement Flink's >> > Kafka EOS >> > > >>> could not be implemented in Beam's context. >> > > >> >> > > >> That's also my understanding, unless we change the >> interface. >> > > >> >> > > >>> I don't see how SDF solves this problem.. >> > > >> >> > > >> SDF has a checkpoint method which the Runner can call, >> > but I think >> > > >> that you are right, that the above problem would be the >> same. >> > > >> >> > > >>> Absolutely. I would love to support EOS in KakaIO for >> > Flink. I >> > > think >> > > >>> that will help many future exactly-once sinks.. and >> address >> > > >>> fundamental incompatibility between Beam model and >> Flink's >> > > horizontal >> > > >>> checkpointing for such applications. >> > > >> >> > > >> Great :) >> > > >> >> > > >>> The FlinkRunner would need to insert the "wait until >> > checkpoint >> > > >>> finalization" logic wherever it sees >> @RequiresStableInput, >> > > which is >> > > >>> already what it would have to do. >> > > >> >> > > >> I don't think that fixes the problem. See above example. >> > > >> >> > > >> Thanks, >> > > >> Max >> > > >> >> > > >> On 01.03.19 00:04, Raghu Angadi wrote: >> > > >>> >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>> >> > > >>> <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > >>> <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > >>> >> > > >>> I'm not sure what a hard fail is. I probably >> > have a shallow >> > > >>> understanding, but doesn't @RequiresStableInput >> work >> > > for 2PC? >> > > >>> The preCommit() phase should establish the >> > transaction and >> > > >>> commit() is not called until after checkpoint >> > > finalization. Can >> > > >>> you describe the way that it does not work a >> > little bit >> > > more? >> > > >>> >> > > >>> >> > > >>> - preCommit() is called before checkpoint. Kafka >> EOS in >> > > Flink starts >> > > >>> the transaction before this and makes sure it >> > flushes all >> > > records in >> > > >>> preCommit(). So far good. >> > > >>> - commit is called after checkpoint is persisted. >> > Now, imagine >> > > >>> commit() fails for some reason. There is no option >> > to rerun >> > > the 1st >> > > >>> phase to write the records again in a new >> > transaction. This >> > > is a >> > > >>> hard failure for the the job. In practice Flink >> might >> > > attempt to >> > > >>> commit again (not sure how many times), which is >> > likely to >> > > fail and >> > > >>> eventually results in job failure. >> > > >>> >> > > >>> >> > > >>> In Apache Beam, the records could be stored in state, >> > and can be >> > > >>> written inside commit() to work around this issue. It >> > could have >> > > >>> scalability issues if checkpoints are not frequent >> > enough in Flink >> > > >>> runner. >> > > >>> >> > > >>> Raghu. >> > > >>> >> > > >>> >> > > >>> >> > > >>> Kenn >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > >>> <mailto:[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>>>> wrote: >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth >> Knowles >> > > >>> <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > >>> >> > > >>> I believe the way you would implement >> > the logic >> > > behind >> > > >>> Flink's KafkaProducer would be to have >> > two steps: >> > > >>> >> > > >>> 1. Start transaction >> > > >>> 2. @RequiresStableInput Close >> transaction >> > > >>> >> > > >>> >> > > >>> I see. What happens if closing the >> transaction >> > > fails in >> > > >>> (2)? Flink's 2PC requires that commit() >> should >> > > never hard >> > > >>> fail once preCommit() succeeds. I think >> that is >> > > cost of not >> > > >>> having an extra shuffle. It is alright since >> > this >> > > policy has >> > > >>> worked well for Flink so far. >> > > >>> >> > > >>> Overall, it will be great to have >> > @RequiresStableInput >> > > >>> support in Flink runner. >> > > >>> >> > > >>> Raghu. >> > > >>> >> > > >>> The FlinkRunner would need to insert the >> > "wait >> > > until >> > > >>> checkpoint finalization" logic wherever >> it >> > > >>> sees @RequiresStableInput, which is >> > already what it >> > > >>> would have to do. >> > > >>> >> > > >>> This matches the KafkaProducer's logic - >> > delay >> > > closing >> > > >>> the transaction until checkpoint >> > finalization. This >> > > >>> answers my main question, which is "is >> > > >>> @RequiresStableInput expressive enough >> > to allow >> > > >>> Beam-on-Flink to have exactly once >> behavior >> > > with the >> > > >>> same performance characteristics as >> > native Flink >> > > >>> checkpoint finalization?" >> > > >>> >> > > >>> Kenn >> > > >>> >> > > >>> [1] >> https://github.com/apache/beam/pull/7955 >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 10:43 AM Reuven >> Lax >> > > >>> <[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > >>> >> > > >>> >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 10:41 AM >> > Raghu Angadi >> > > >>> <[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > >>> >> > > >>> >> > > >>> Now why does the Flink >> > Runner not >> > > support >> > > >>> KafkaIO EOS? Flink's native >> > > >>> KafkaProducer supports >> > exactly-once. It >> > > >>> simply commits the pending >> > > >>> transaction once it has >> > completed a >> > > >>> checkpoint. >> > > >>> >> > > >>> >> > > >>> >> > > >>> On Thu, Feb 28, 2019 at 9:59 AM >> > Maximilian >> > > >>> Michels <[email protected] >> > <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>> >> > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> >> > > >>> wrote: >> > > >>> >> > > >>> Hi, >> > > >>> >> > > >>> I came across KafkaIO's >> Runner >> > > whitelist [1] >> > > >>> for enabling exactly-once >> > > >>> semantics (EOS). I think it >> is >> > > questionable >> > > >>> to exclude Runners from >> > > >>> inside a transform, but I >> > see that the >> > > >>> intention was to save users >> from >> > > >>> surprises. >> > > >>> >> > > >>> Now why does the Flink >> > Runner not >> > > support >> > > >>> KafkaIO EOS? Flink's native >> > > >>> KafkaProducer supports >> > exactly-once. It >> > > >>> simply commits the pending >> > > >>> transaction once it has >> > completed a >> > > >>> checkpoint. >> > > >>> >> > > >>> >> > > >>> >> > > >>> When we discussed this in Aug >> > 2017, the >> > > >>> understanding was that 2 Phase >> > commit >> > > utility in >> > > >>> Flink used to implement Flink's >> > Kafka >> > > EOS could >> > > >>> not be implemented in Beam's >> > context. >> > > >>> See this message >> > > >>> >> > <https://www.mail-archive.com/[email protected]/msg02664.html> in >> > > >>> that dev thread. Has anything >> > changed >> > > in this >> > > >>> regard? The whole thread is >> > relevant to >> > > this >> > > >>> topic and worth going through. >> > > >>> >> > > >>> I think that TwoPhaseCommit utility >> > class >> > > wouldn't >> > > >>> work. The Flink runner would >> > probably want to >> > > >>> directly use notifySnapshotComplete >> > in order to >> > > >>> implement @RequiresStableInput. >> > > >>> >> > > >>> >> > > >>> A checkpoint is realized by >> > sending >> > > barriers >> > > >>> through all channels >> > > >>> starting from the source >> until >> > > reaching all >> > > >>> sinks. Every operator >> > > >>> persists its state once it >> has >> > > received a >> > > >>> barrier on all its input >> > > >>> channels, it then forwards >> > it to the >> > > >>> downstream operators. >> > > >>> >> > > >>> The architecture of Beam's >> > > >>> KafkaExactlyOnceSink is as >> > follows[2]: >> > > >>> >> > > >>> Input -> >> AssignRandomShardIds -> >> > > GroupByKey >> > > >>> -> AssignSequenceIds -> >> > > >>> GroupByKey -> >> ExactlyOnceWriter >> > > >>> >> > > >>> As I understood, Spark or >> > Dataflow >> > > use the >> > > >>> GroupByKey stages to persist >> > > >>> the input. That is not >> > required in >> > > Flink to >> > > >>> be able to take a consistent >> > > >>> snapshot of the pipeline. >> > > >>> >> > > >>> Basically, for Flink we >> > don't need >> > > any of >> > > >>> that magic that KafkaIO >> does. >> > > >>> What we would need to >> > support EOS >> > > is a way >> > > >>> to tell the >> ExactlyOnceWriter >> > > >>> (a DoFn) to commit once a >> > > checkpoint has >> > > >>> completed. >> > > >>> >> > > >>> I know that the new version >> > of SDF >> > > supports >> > > >>> checkpointing which should >> > > >>> solve this issue. But there >> is >> > > still a lot >> > > >>> of work to do to make this >> > > >>> reality. >> > > >>> >> > > >>> >> > > >>> I don't see how SDF solves this >> > > problem.. May be >> > > >>> pseudo code would make more >> > clear. But if >> > > >>> helps, that is great! >> > > >>> >> > > >>> So I think it would make >> > sense to think >> > > >>> about a way to make >> KafkaIO's >> > > >>> EOS more accessible to >> Runners >> > > which support >> > > >>> a different way of >> > > >>> checkpointing. >> > > >>> >> > > >>> >> > > >>> Absolutely. I would love to >> > support EOS in >> > > >>> KakaIO for Flink. I think that >> will >> > > help many >> > > >>> future exactly-once sinks.. and >> > address >> > > >>> fundamental incompatibility >> between >> > > Beam model >> > > >>> and Flink's horizontal >> checkpointing >> > > for such >> > > >>> applications. >> > > >>> >> > > >>> Raghu. >> > > >>> >> > > >>> Cheers, >> > > >>> Max >> > > >>> >> > > >>> PS: I found this document >> about >> > > >>> RequiresStableInput [3], but >> > IMHO >> > > >>> defining an annotation only >> > > manifests the >> > > >>> conceptual difference >> between >> > > >>> the Runners. >> > > >>> >> > > >>> >> > > >>> [1] >> > > >>> >> > > >> > >> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >> > > >> > > >>> >> > > >>> [2] >> > > >>> >> > > >> > >> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >> > > >> > > >>> >> > > >>> [3] >> > > >>> >> > > >> > >> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >> > > >> > > >>> >> > > >>> >> > > >> > >> >
