On Mon, Mar 4, 2019 at 9:18 AM Reuven Lax <[email protected]> wrote: > > > On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles <[email protected]> wrote: > >> >> >> On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels <[email protected]> wrote: >> >>> > If you randomly generate shard ids, buffer those until finalization, >>> finalize a checkpoint so that you never need to re-run that generation, >>> isn't the result stable from that point onwards? >>> >>> Yes, you're right :) For @RequiresStableInput we will always have to >>> buffer and emit only after a finalized checkpoint. >>> >>> 2PC is the better model for Flink, at least in the case of Kafka because >>> it can offload the buffering to Kafka via its transactions. >>> RequiresStableInput is a more general solution and it is feasible to >>> support it in the Flink Runner. However, we have to make sure that >>> checkpoints are taken frequently to avoid too much memory pressure. >> >> >>> It would be nice to also support 2PC in Beam, i.e. the Runner could >>> choose to either buffer/materialize input or do a 2PC, but it would also >>> break the purity of the existing model. >>> >> >> Still digging in to details. I think the "generate random shard ids & >> buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It >> doesn't have to be done that way if the target system has special support >> like Kafka does. >> >> For Kafka, can you get the 2PC behavior like this: Upstream step: open a >> transaction, write a bunch of stuff to it (let Kafka do the buffering) and >> emit a transaction identifier. Downstream @RequiresStableInput step: close >> transaction. Again, I may be totally missing something, but I think that >> this has identical characteristics: >> > > Does Kafka garbage collect this eventually in the case where you crash and > start again with a different transaction identifier? >
I believe that is what I read on the page about Flink's Kafka 2PC, though I cannot find it any more. What would the alternative be for Kafka? You always have to be ready for a client that goes away. Kenn > >> - Kafka does the buffering >> - checkpoint finalization is the driver of latency >> - failure before checkpoint finalization means the old transaction sits >> around and times out eventually >> - failure after checkpoint finalization causes retry with the same >> transaction identifier >> >> Kenn >> >> >>> >>> On 01.03.19 19:42, Kenneth Knowles wrote: >>> > I think I am fundamentally misunderstanding checkpointing in Flink. >>> > >>> > If you randomly generate shard ids, buffer those until finalization, >>> > finalize a checkpoint so that you never need to re-run that >>> generation, >>> > isn't the result stable from that point onwards? >>> > >>> > Kenn >>> > >>> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected] >>> > <mailto:[email protected]>> wrote: >>> > >>> > Fully agree. I think we can improve the situation drastically. For >>> > KafkaIO EOS with Flink we need to make these two changes: >>> > >>> > 1) Introduce buffering while the checkpoint is being taken >>> > 2) Replace the random shard id assignment with something >>> deterministic >>> > >>> > However, we won't be able to provide full compatibility with >>> > RequiresStableInput because Flink only guarantees stable input >>> after a >>> > checkpoint. RequiresStableInput requires input at any point in >>> time to >>> > be stable. IMHO the only way to achieve that is materializing >>> output >>> > which Flink does not currently support. >>> > >>> > KafkaIO does not need all the power of RequiresStableInput to >>> achieve >>> > EOS with Flink, but for the general case I don't see a good >>> solution at >>> > the moment. >>> > >>> > -Max >>> > >>> > On 01.03.19 16:45, Reuven Lax wrote: >>> > > Yeah, the person who was working on it originally stopped >>> working on >>> > > Beam, and nobody else ever finished it. I think it is important >>> to >>> > > finish though. Many of the existing Sinks are only fully >>> correct for >>> > > Dataflow today, because they generate either Reshuffle or >>> > GroupByKey to >>> > > ensure input stability before outputting (in many cases this >>> code >>> > was >>> > > inherited from before Beam existed). On Flink today, these sinks >>> > might >>> > > occasionally produce duplicate output in the case of failures. >>> > > >>> > > Reuven >>> > > >>> > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels < >>> [email protected] >>> > <mailto:[email protected]> >>> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: >>> > > >>> > > Circling back to the RequiresStableInput annotation[1]. I've >>> > done some >>> > > protoyping to see how this could be integrated into Flink. >>> I'm >>> > > currently >>> > > writing a test based on RequiresStableInput. >>> > > >>> > > I found out there are already checks in place at the >>> Runners to >>> > > throw in >>> > > case transforms use RequiresStableInput and its not >>> > supported. However, >>> > > not a single transform actually uses the annotation. >>> > > >>> > > It seems that the effort stopped at some point? Would it >>> make >>> > sense to >>> > > start annotating KafkaExactlyOnceSink with >>> > @RequiresStableInput? We >>> > > could then get rid of the whitelist. >>> > > >>> > > -Max >>> > > >>> > > [1] >>> > > >>> > >>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>> > > >>> > > >>> > > >>> > > On 01.03.19 14:28, Maximilian Michels wrote: >>> > > > Just realized that transactions do not spawn multiple >>> > elements in >>> > > > KafkaExactlyOnceSink. So the proposed solution to stop >>> > processing >>> > > > elements while a snapshot is pending would work. >>> > > > >>> > > > It is certainly not optimal in terms of performance for >>> > Flink and >>> > > poses >>> > > > problems when checkpoints take long to complete, but it >>> > would be >>> > > > worthwhile to implement this to make use of the EOS >>> feature. >>> > > > >>> > > > Thanks, >>> > > > Max >>> > > > >>> > > > On 01.03.19 12:23, Maximilian Michels wrote: >>> > > >> Thanks you for the prompt replies. It's great to see >>> that >>> > there is >>> > > >> good understanding of how EOS in Flink works. >>> > > >> >>> > > >>> This is exactly what RequiresStableInput is supposed to >>> > do. On the >>> > > >>> Flink runner, this would be implemented by delaying >>> > processing >>> > > until >>> > > >>> the current checkpoint is done. >>> > > >> >>> > > >> I don't think that works because we have no control over >>> > the Kafka >>> > > >> transactions. Imagine: >>> > > >> >>> > > >> 1) ExactlyOnceWriter writes records to Kafka and >>> commits, >>> > then >>> > > starts >>> > > >> a new transaction. >>> > > >> 2) Flink checkpoints, delaying the processing of >>> > elements, the >>> > > >> checkpoint fails. >>> > > >> 3) We restore from an old checkpoint and will start >>> writing >>> > > duplicate >>> > > >> data to Kafka. The de-duplication that the sink performs >>> > does not >>> > > >> help, especially because the random shards ids might be >>> > assigned >>> > > >> differently. >>> > > >> >>> > > >> IMHO we have to have control over commit to be able to >>> > provide EOS. >>> > > >> >>> > > >>> When we discussed this in Aug 2017, the understanding >>> > was that 2 >>> > > >>> Phase commit utility in Flink used to implement Flink's >>> > Kafka EOS >>> > > >>> could not be implemented in Beam's context. >>> > > >> >>> > > >> That's also my understanding, unless we change the >>> interface. >>> > > >> >>> > > >>> I don't see how SDF solves this problem.. >>> > > >> >>> > > >> SDF has a checkpoint method which the Runner can call, >>> > but I think >>> > > >> that you are right, that the above problem would be the >>> same. >>> > > >> >>> > > >>> Absolutely. I would love to support EOS in KakaIO for >>> > Flink. I >>> > > think >>> > > >>> that will help many future exactly-once sinks.. and >>> address >>> > > >>> fundamental incompatibility between Beam model and >>> Flink's >>> > > horizontal >>> > > >>> checkpointing for such applications. >>> > > >> >>> > > >> Great :) >>> > > >> >>> > > >>> The FlinkRunner would need to insert the "wait until >>> > checkpoint >>> > > >>> finalization" logic wherever it sees >>> @RequiresStableInput, >>> > > which is >>> > > >>> already what it would have to do. >>> > > >> >>> > > >> I don't think that fixes the problem. See above example. >>> > > >> >>> > > >> Thanks, >>> > > >> Max >>> > > >> >>> > > >> On 01.03.19 00:04, Raghu Angadi wrote: >>> > > >>> >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi >>> > <[email protected] <mailto:[email protected]> >>> > > <mailto:[email protected] <mailto:[email protected]>> >>> > > >>> <mailto:[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles >>> > > <[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>> >>> > > >>> <mailto:[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >>> > > >>> >>> > > >>> I'm not sure what a hard fail is. I probably >>> > have a shallow >>> > > >>> understanding, but doesn't >>> @RequiresStableInput work >>> > > for 2PC? >>> > > >>> The preCommit() phase should establish the >>> > transaction and >>> > > >>> commit() is not called until after checkpoint >>> > > finalization. Can >>> > > >>> you describe the way that it does not work a >>> > little bit >>> > > more? >>> > > >>> >>> > > >>> >>> > > >>> - preCommit() is called before checkpoint. Kafka >>> EOS in >>> > > Flink starts >>> > > >>> the transaction before this and makes sure it >>> > flushes all >>> > > records in >>> > > >>> preCommit(). So far good. >>> > > >>> - commit is called after checkpoint is persisted. >>> > Now, imagine >>> > > >>> commit() fails for some reason. There is no option >>> > to rerun >>> > > the 1st >>> > > >>> phase to write the records again in a new >>> > transaction. This >>> > > is a >>> > > >>> hard failure for the the job. In practice Flink >>> might >>> > > attempt to >>> > > >>> commit again (not sure how many times), which is >>> > likely to >>> > > fail and >>> > > >>> eventually results in job failure. >>> > > >>> >>> > > >>> >>> > > >>> In Apache Beam, the records could be stored in state, >>> > and can be >>> > > >>> written inside commit() to work around this issue. It >>> > could have >>> > > >>> scalability issues if checkpoints are not frequent >>> > enough in Flink >>> > > >>> runner. >>> > > >>> >>> > > >>> Raghu. >>> > > >>> >>> > > >>> >>> > > >>> >>> > > >>> Kenn >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi >>> > > <[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>> >>> > > >>> <mailto:[email protected] >>> > <mailto:[email protected]> <mailto:[email protected] >>> > <mailto:[email protected]>>>> wrote: >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth >>> Knowles >>> > > >>> <[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>> >>> > > <mailto:[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >>> > > >>> >>> > > >>> I believe the way you would implement >>> > the logic >>> > > behind >>> > > >>> Flink's KafkaProducer would be to have >>> > two steps: >>> > > >>> >>> > > >>> 1. Start transaction >>> > > >>> 2. @RequiresStableInput Close >>> transaction >>> > > >>> >>> > > >>> >>> > > >>> I see. What happens if closing the >>> transaction >>> > > fails in >>> > > >>> (2)? Flink's 2PC requires that commit() >>> should >>> > > never hard >>> > > >>> fail once preCommit() succeeds. I think >>> that is >>> > > cost of not >>> > > >>> having an extra shuffle. It is alright >>> since >>> > this >>> > > policy has >>> > > >>> worked well for Flink so far. >>> > > >>> >>> > > >>> Overall, it will be great to have >>> > @RequiresStableInput >>> > > >>> support in Flink runner. >>> > > >>> >>> > > >>> Raghu. >>> > > >>> >>> > > >>> The FlinkRunner would need to insert >>> the >>> > "wait >>> > > until >>> > > >>> checkpoint finalization" logic >>> wherever it >>> > > >>> sees @RequiresStableInput, which is >>> > already what it >>> > > >>> would have to do. >>> > > >>> >>> > > >>> This matches the KafkaProducer's logic >>> - >>> > delay >>> > > closing >>> > > >>> the transaction until checkpoint >>> > finalization. This >>> > > >>> answers my main question, which is "is >>> > > >>> @RequiresStableInput expressive enough >>> > to allow >>> > > >>> Beam-on-Flink to have exactly once >>> behavior >>> > > with the >>> > > >>> same performance characteristics as >>> > native Flink >>> > > >>> checkpoint finalization?" >>> > > >>> >>> > > >>> Kenn >>> > > >>> >>> > > >>> [1] >>> https://github.com/apache/beam/pull/7955 >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 10:43 AM >>> Reuven Lax >>> > > >>> <[email protected] >>> > <mailto:[email protected]> <mailto:[email protected] >>> > <mailto:[email protected]>> >>> > > <mailto:[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >>> > > >>> >>> > > >>> >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 10:41 AM >>> > Raghu Angadi >>> > > >>> <[email protected] >>> > <mailto:[email protected]> <mailto:[email protected] >>> > <mailto:[email protected]>> >>> > > <mailto:[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >>> > > >>> >>> > > >>> >>> > > >>> Now why does the Flink >>> > Runner not >>> > > support >>> > > >>> KafkaIO EOS? Flink's native >>> > > >>> KafkaProducer supports >>> > exactly-once. It >>> > > >>> simply commits the pending >>> > > >>> transaction once it has >>> > completed a >>> > > >>> checkpoint. >>> > > >>> >>> > > >>> >>> > > >>> >>> > > >>> On Thu, Feb 28, 2019 at 9:59 AM >>> > Maximilian >>> > > >>> Michels <[email protected] >>> > <mailto:[email protected]> >>> > > <mailto:[email protected] <mailto:[email protected]>> >>> > <mailto:[email protected] <mailto:[email protected]> >>> > <mailto:[email protected] <mailto:[email protected]>>>> >>> > > >>> wrote: >>> > > >>> >>> > > >>> Hi, >>> > > >>> >>> > > >>> I came across KafkaIO's >>> Runner >>> > > whitelist [1] >>> > > >>> for enabling exactly-once >>> > > >>> semantics (EOS). I think >>> it is >>> > > questionable >>> > > >>> to exclude Runners from >>> > > >>> inside a transform, but I >>> > see that the >>> > > >>> intention was to save >>> users from >>> > > >>> surprises. >>> > > >>> >>> > > >>> Now why does the Flink >>> > Runner not >>> > > support >>> > > >>> KafkaIO EOS? Flink's native >>> > > >>> KafkaProducer supports >>> > exactly-once. It >>> > > >>> simply commits the pending >>> > > >>> transaction once it has >>> > completed a >>> > > >>> checkpoint. >>> > > >>> >>> > > >>> >>> > > >>> >>> > > >>> When we discussed this in Aug >>> > 2017, the >>> > > >>> understanding was that 2 Phase >>> > commit >>> > > utility in >>> > > >>> Flink used to implement Flink's >>> > Kafka >>> > > EOS could >>> > > >>> not be implemented in Beam's >>> > context. >>> > > >>> See this message >>> > > >>> >>> > <https://www.mail-archive.com/[email protected]/msg02664.html >>> > in >>> > > >>> that dev thread. Has anything >>> > changed >>> > > in this >>> > > >>> regard? The whole thread is >>> > relevant to >>> > > this >>> > > >>> topic and worth going through. >>> > > >>> >>> > > >>> I think that TwoPhaseCommit utility >>> > class >>> > > wouldn't >>> > > >>> work. The Flink runner would >>> > probably want to >>> > > >>> directly use notifySnapshotComplete >>> > in order to >>> > > >>> implement @RequiresStableInput. >>> > > >>> >>> > > >>> >>> > > >>> A checkpoint is realized by >>> > sending >>> > > barriers >>> > > >>> through all channels >>> > > >>> starting from the source >>> until >>> > > reaching all >>> > > >>> sinks. Every operator >>> > > >>> persists its state once it >>> has >>> > > received a >>> > > >>> barrier on all its input >>> > > >>> channels, it then forwards >>> > it to the >>> > > >>> downstream operators. >>> > > >>> >>> > > >>> The architecture of Beam's >>> > > >>> KafkaExactlyOnceSink is as >>> > follows[2]: >>> > > >>> >>> > > >>> Input -> >>> AssignRandomShardIds -> >>> > > GroupByKey >>> > > >>> -> AssignSequenceIds -> >>> > > >>> GroupByKey -> >>> ExactlyOnceWriter >>> > > >>> >>> > > >>> As I understood, Spark or >>> > Dataflow >>> > > use the >>> > > >>> GroupByKey stages to >>> persist >>> > > >>> the input. That is not >>> > required in >>> > > Flink to >>> > > >>> be able to take a >>> consistent >>> > > >>> snapshot of the pipeline. >>> > > >>> >>> > > >>> Basically, for Flink we >>> > don't need >>> > > any of >>> > > >>> that magic that KafkaIO >>> does. >>> > > >>> What we would need to >>> > support EOS >>> > > is a way >>> > > >>> to tell the >>> ExactlyOnceWriter >>> > > >>> (a DoFn) to commit once a >>> > > checkpoint has >>> > > >>> completed. >>> > > >>> >>> > > >>> I know that the new version >>> > of SDF >>> > > supports >>> > > >>> checkpointing which should >>> > > >>> solve this issue. But >>> there is >>> > > still a lot >>> > > >>> of work to do to make this >>> > > >>> reality. >>> > > >>> >>> > > >>> >>> > > >>> I don't see how SDF solves this >>> > > problem.. May be >>> > > >>> pseudo code would make more >>> > clear. But if >>> > > >>> helps, that is great! >>> > > >>> >>> > > >>> So I think it would make >>> > sense to think >>> > > >>> about a way to make >>> KafkaIO's >>> > > >>> EOS more accessible to >>> Runners >>> > > which support >>> > > >>> a different way of >>> > > >>> checkpointing. >>> > > >>> >>> > > >>> >>> > > >>> Absolutely. I would love to >>> > support EOS in >>> > > >>> KakaIO for Flink. I think that >>> will >>> > > help many >>> > > >>> future exactly-once sinks.. and >>> > address >>> > > >>> fundamental incompatibility >>> between >>> > > Beam model >>> > > >>> and Flink's horizontal >>> checkpointing >>> > > for such >>> > > >>> applications. >>> > > >>> >>> > > >>> Raghu. >>> > > >>> >>> > > >>> Cheers, >>> > > >>> Max >>> > > >>> >>> > > >>> PS: I found this document >>> about >>> > > >>> RequiresStableInput [3], >>> but >>> > IMHO >>> > > >>> defining an annotation only >>> > > manifests the >>> > > >>> conceptual difference >>> between >>> > > >>> the Runners. >>> > > >>> >>> > > >>> >>> > > >>> [1] >>> > > >>> >>> > > >>> > >>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 >>> > > >>> > > >>> >>> > > >>> [2] >>> > > >>> >>> > > >>> > >>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 >>> > > >>> > > >>> >>> > > >>> [3] >>> > > >>> >>> > > >>> > >>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM >>> > > >>> > > >>> >>> > > >>> >>> > > >>> > >>> >>
