On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <[email protected]> wrote:
> > Can we do 2? I seem to remember that we had trouble in some cases (e..g > in the BigQuery case, there was no obvious way to create a deterministic > id, which is why we went for a random number followed by a reshuffle). Also > remember that the user ParDo that is producing data to the sink is not > guaranteed to be deterministic; the Beam model allows for non-deterministic > transforms. > > I believe we could use something like the worker id to make it > deterministic, though the worker id can change after a restart. We could > persist it in Flink's operator state. I do not know if we can come up > with a Runner-independent solution. > If we did this, we would break it on runners that don't have a concept of a stable worker id :( The Dataflow runner can load balance work at any time (including moving work around between workers). > > > I'm not quite sure I understand. If a ParDo is marked with > RequiresStableInput, can't the flink runner buffer the input message until > after the checkpoint is complete and only then deliver it to the ParDo? > > You're correct. I thought that it could suffice to only buffer during a > checkpoint and otherwise rely on the deterministic execution of the > pipeline and KafkaIO's de-duplication code. > Yes, I want to distinguish the KafkaIO case from the general case. It would be interesting to see if there's something we could add to the Beam model that would create a better story for Kafka's EOS writes. > > In any case, emitting only after finalization of checkpoints gives us > guaranteed stable input. It also means that the processing is tight to > the checkpoint interval, the checkpoint duration, and the available memory. > This is true, however isn't it already true for such uses of Flink? > > On 01.03.19 19:41, Reuven Lax wrote: > > > > > > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels <[email protected] > > <mailto:[email protected]>> wrote: > > > > Fully agree. I think we can improve the situation drastically. For > > KafkaIO EOS with Flink we need to make these two changes: > > > > 1) Introduce buffering while the checkpoint is being taken > > 2) Replace the random shard id assignment with something > deterministic > > > > > > Can we do 2? I seem to remember that we had trouble in some cases (e..g > > in the BigQuery case, there was no obvious way to create a deterministic > > id, which is why we went for a random number followed by a reshuffle). > > Also remember that the user ParDo that is producing data to the sink is > > not guaranteed to be deterministic; the Beam model allows for > > non-deterministic transforms. > > > > > > However, we won't be able to provide full compatibility with > > RequiresStableInput because Flink only guarantees stable input after > a > > checkpoint. RequiresStableInput requires input at any point in time > to > > be stable. > > > > > > I'm not quite sure I understand. If a ParDo is marked with > > RequiresStableInput, can't the flink runner buffer the input message > > until after the checkpoint is complete and only then deliver it to the > > ParDo? This adds latency of course, but I'm not sure how else to do > > things correctly with the Beam model. > > > > IMHO the only way to achieve that is materializing output > > which Flink does not currently support. > > > > KafkaIO does not need all the power of RequiresStableInput to achieve > > EOS with Flink, but for the general case I don't see a good solution > at > > the moment. > > > > -Max > > > > On 01.03.19 16:45, Reuven Lax wrote: > > > Yeah, the person who was working on it originally stopped working > on > > > Beam, and nobody else ever finished it. I think it is important to > > > finish though. Many of the existing Sinks are only fully correct > for > > > Dataflow today, because they generate either Reshuffle or > > GroupByKey to > > > ensure input stability before outputting (in many cases this code > > was > > > inherited from before Beam existed). On Flink today, these sinks > > might > > > occasionally produce duplicate output in the case of failures. > > > > > > Reuven > > > > > > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > > > Circling back to the RequiresStableInput annotation[1]. I've > > done some > > > protoyping to see how this could be integrated into Flink. I'm > > > currently > > > writing a test based on RequiresStableInput. > > > > > > I found out there are already checks in place at the Runners > to > > > throw in > > > case transforms use RequiresStableInput and its not > > supported. However, > > > not a single transform actually uses the annotation. > > > > > > It seems that the effort stopped at some point? Would it make > > sense to > > > start annotating KafkaExactlyOnceSink with > > @RequiresStableInput? We > > > could then get rid of the whitelist. > > > > > > -Max > > > > > > [1] > > > > > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM > > > > > > > > > > > > On 01.03.19 14:28, Maximilian Michels wrote: > > > > Just realized that transactions do not spawn multiple > > elements in > > > > KafkaExactlyOnceSink. So the proposed solution to stop > > processing > > > > elements while a snapshot is pending would work. > > > > > > > > It is certainly not optimal in terms of performance for > > Flink and > > > poses > > > > problems when checkpoints take long to complete, but it > > would be > > > > worthwhile to implement this to make use of the EOS > feature. > > > > > > > > Thanks, > > > > Max > > > > > > > > On 01.03.19 12:23, Maximilian Michels wrote: > > > >> Thanks you for the prompt replies. It's great to see that > > there is > > > >> good understanding of how EOS in Flink works. > > > >> > > > >>> This is exactly what RequiresStableInput is supposed to > > do. On the > > > >>> Flink runner, this would be implemented by delaying > > processing > > > until > > > >>> the current checkpoint is done. > > > >> > > > >> I don't think that works because we have no control over > > the Kafka > > > >> transactions. Imagine: > > > >> > > > >> 1) ExactlyOnceWriter writes records to Kafka and commits, > > then > > > starts > > > >> a new transaction. > > > >> 2) Flink checkpoints, delaying the processing of > > elements, the > > > >> checkpoint fails. > > > >> 3) We restore from an old checkpoint and will start > writing > > > duplicate > > > >> data to Kafka. The de-duplication that the sink performs > > does not > > > >> help, especially because the random shards ids might be > > assigned > > > >> differently. > > > >> > > > >> IMHO we have to have control over commit to be able to > > provide EOS. > > > >> > > > >>> When we discussed this in Aug 2017, the understanding > > was that 2 > > > >>> Phase commit utility in Flink used to implement Flink's > > Kafka EOS > > > >>> could not be implemented in Beam's context. > > > >> > > > >> That's also my understanding, unless we change the > interface. > > > >> > > > >>> I don't see how SDF solves this problem.. > > > >> > > > >> SDF has a checkpoint method which the Runner can call, > > but I think > > > >> that you are right, that the above problem would be the > same. > > > >> > > > >>> Absolutely. I would love to support EOS in KakaIO for > > Flink. I > > > think > > > >>> that will help many future exactly-once sinks.. and > address > > > >>> fundamental incompatibility between Beam model and > Flink's > > > horizontal > > > >>> checkpointing for such applications. > > > >> > > > >> Great :) > > > >> > > > >>> The FlinkRunner would need to insert the "wait until > > checkpoint > > > >>> finalization" logic wherever it sees > @RequiresStableInput, > > > which is > > > >>> already what it would have to do. > > > >> > > > >> I don't think that fixes the problem. See above example. > > > >> > > > >> Thanks, > > > >> Max > > > >> > > > >> On 01.03.19 00:04, Raghu Angadi wrote: > > > >>> > > > >>> > > > >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>> > > > >>> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >>> <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> I'm not sure what a hard fail is. I probably > > have a shallow > > > >>> understanding, but doesn't @RequiresStableInput > work > > > for 2PC? > > > >>> The preCommit() phase should establish the > > transaction and > > > >>> commit() is not called until after checkpoint > > > finalization. Can > > > >>> you describe the way that it does not work a > > little bit > > > more? > > > >>> > > > >>> > > > >>> - preCommit() is called before checkpoint. Kafka EOS > in > > > Flink starts > > > >>> the transaction before this and makes sure it > > flushes all > > > records in > > > >>> preCommit(). So far good. > > > >>> - commit is called after checkpoint is persisted. > > Now, imagine > > > >>> commit() fails for some reason. There is no option > > to rerun > > > the 1st > > > >>> phase to write the records again in a new > > transaction. This > > > is a > > > >>> hard failure for the the job. In practice Flink might > > > attempt to > > > >>> commit again (not sure how many times), which is > > likely to > > > fail and > > > >>> eventually results in job failure. > > > >>> > > > >>> > > > >>> In Apache Beam, the records could be stored in state, > > and can be > > > >>> written inside commit() to work around this issue. It > > could have > > > >>> scalability issues if checkpoints are not frequent > > enough in Flink > > > >>> runner. > > > >>> > > > >>> Raghu. > > > >>> > > > >>> > > > >>> > > > >>> Kenn > > > >>> > > > >>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > >>> <mailto:[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth > Knowles > > > >>> <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> I believe the way you would implement > > the logic > > > behind > > > >>> Flink's KafkaProducer would be to have > > two steps: > > > >>> > > > >>> 1. Start transaction > > > >>> 2. @RequiresStableInput Close transaction > > > >>> > > > >>> > > > >>> I see. What happens if closing the > transaction > > > fails in > > > >>> (2)? Flink's 2PC requires that commit() > should > > > never hard > > > >>> fail once preCommit() succeeds. I think that > is > > > cost of not > > > >>> having an extra shuffle. It is alright since > > this > > > policy has > > > >>> worked well for Flink so far. > > > >>> > > > >>> Overall, it will be great to have > > @RequiresStableInput > > > >>> support in Flink runner. > > > >>> > > > >>> Raghu. > > > >>> > > > >>> The FlinkRunner would need to insert the > > "wait > > > until > > > >>> checkpoint finalization" logic wherever > it > > > >>> sees @RequiresStableInput, which is > > already what it > > > >>> would have to do. > > > >>> > > > >>> This matches the KafkaProducer's logic - > > delay > > > closing > > > >>> the transaction until checkpoint > > finalization. This > > > >>> answers my main question, which is "is > > > >>> @RequiresStableInput expressive enough > > to allow > > > >>> Beam-on-Flink to have exactly once > behavior > > > with the > > > >>> same performance characteristics as > > native Flink > > > >>> checkpoint finalization?" > > > >>> > > > >>> Kenn > > > >>> > > > >>> [1] > https://github.com/apache/beam/pull/7955 > > > >>> > > > >>> On Thu, Feb 28, 2019 at 10:43 AM Reuven > Lax > > > >>> <[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> > > > >>> > > > >>> On Thu, Feb 28, 2019 at 10:41 AM > > Raghu Angadi > > > >>> <[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > >>> > > > >>> > > > >>> Now why does the Flink > > Runner not > > > support > > > >>> KafkaIO EOS? Flink's native > > > >>> KafkaProducer supports > > exactly-once. It > > > >>> simply commits the pending > > > >>> transaction once it has > > completed a > > > >>> checkpoint. > > > >>> > > > >>> > > > >>> > > > >>> On Thu, Feb 28, 2019 at 9:59 AM > > Maximilian > > > >>> Michels <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>> > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> > > > >>> wrote: > > > >>> > > > >>> Hi, > > > >>> > > > >>> I came across KafkaIO's > Runner > > > whitelist [1] > > > >>> for enabling exactly-once > > > >>> semantics (EOS). I think it > is > > > questionable > > > >>> to exclude Runners from > > > >>> inside a transform, but I > > see that the > > > >>> intention was to save users > from > > > >>> surprises. > > > >>> > > > >>> Now why does the Flink > > Runner not > > > support > > > >>> KafkaIO EOS? Flink's native > > > >>> KafkaProducer supports > > exactly-once. It > > > >>> simply commits the pending > > > >>> transaction once it has > > completed a > > > >>> checkpoint. > > > >>> > > > >>> > > > >>> > > > >>> When we discussed this in Aug > > 2017, the > > > >>> understanding was that 2 Phase > > commit > > > utility in > > > >>> Flink used to implement Flink's > > Kafka > > > EOS could > > > >>> not be implemented in Beam's > > context. > > > >>> See this message > > > >>> > > <https://www.mail-archive.com/[email protected]/msg02664.html> in > > > >>> that dev thread. Has anything > > changed > > > in this > > > >>> regard? The whole thread is > > relevant to > > > this > > > >>> topic and worth going through. > > > >>> > > > >>> I think that TwoPhaseCommit utility > > class > > > wouldn't > > > >>> work. The Flink runner would > > probably want to > > > >>> directly use notifySnapshotComplete > > in order to > > > >>> implement @RequiresStableInput. > > > >>> > > > >>> > > > >>> A checkpoint is realized by > > sending > > > barriers > > > >>> through all channels > > > >>> starting from the source > until > > > reaching all > > > >>> sinks. Every operator > > > >>> persists its state once it > has > > > received a > > > >>> barrier on all its input > > > >>> channels, it then forwards > > it to the > > > >>> downstream operators. > > > >>> > > > >>> The architecture of Beam's > > > >>> KafkaExactlyOnceSink is as > > follows[2]: > > > >>> > > > >>> Input -> > AssignRandomShardIds -> > > > GroupByKey > > > >>> -> AssignSequenceIds -> > > > >>> GroupByKey -> > ExactlyOnceWriter > > > >>> > > > >>> As I understood, Spark or > > Dataflow > > > use the > > > >>> GroupByKey stages to persist > > > >>> the input. That is not > > required in > > > Flink to > > > >>> be able to take a consistent > > > >>> snapshot of the pipeline. > > > >>> > > > >>> Basically, for Flink we > > don't need > > > any of > > > >>> that magic that KafkaIO does. > > > >>> What we would need to > > support EOS > > > is a way > > > >>> to tell the ExactlyOnceWriter > > > >>> (a DoFn) to commit once a > > > checkpoint has > > > >>> completed. > > > >>> > > > >>> I know that the new version > > of SDF > > > supports > > > >>> checkpointing which should > > > >>> solve this issue. But there > is > > > still a lot > > > >>> of work to do to make this > > > >>> reality. > > > >>> > > > >>> > > > >>> I don't see how SDF solves this > > > problem.. May be > > > >>> pseudo code would make more > > clear. But if > > > >>> helps, that is great! > > > >>> > > > >>> So I think it would make > > sense to think > > > >>> about a way to make KafkaIO's > > > >>> EOS more accessible to > Runners > > > which support > > > >>> a different way of > > > >>> checkpointing. > > > >>> > > > >>> > > > >>> Absolutely. I would love to > > support EOS in > > > >>> KakaIO for Flink. I think that > will > > > help many > > > >>> future exactly-once sinks.. and > > address > > > >>> fundamental incompatibility > between > > > Beam model > > > >>> and Flink's horizontal > checkpointing > > > for such > > > >>> applications. > > > >>> > > > >>> Raghu. > > > >>> > > > >>> Cheers, > > > >>> Max > > > >>> > > > >>> PS: I found this document > about > > > >>> RequiresStableInput [3], but > > IMHO > > > >>> defining an annotation only > > > manifests the > > > >>> conceptual difference between > > > >>> the Runners. > > > >>> > > > >>> > > > >>> [1] > > > >>> > > > > > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144 > > > > > > >>> > > > >>> [2] > > > >>> > > > > > > https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166 > > > > > > >>> > > > >>> [3] > > > >>> > > > > > > https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM > > > > > > >>> > > > >>> > > > > > >
