Just realized that transactions do not spawn multiple elements in KafkaExactlyOnceSink. So the proposed solution to stop processing elements while a snapshot is pending would work.

It is certainly not optimal in terms of performance for Flink and poses problems when checkpoints take long to complete, but it would be worthwhile to implement this to make use of the EOS feature.

Thanks,
Max

On 01.03.19 12:23, Maximilian Michels wrote:
Thanks you for the prompt replies. It's great to see that there is good understanding of how EOS in Flink works.

This is exactly what RequiresStableInput is supposed to do. On the Flink runner, this would be implemented by delaying processing until the current checkpoint is done.

I don't think that works because we have no control over the Kafka transactions. Imagine:

1) ExactlyOnceWriter writes records to Kafka and commits, then starts a new transaction. 2) Flink checkpoints, delaying the processing of elements, the checkpoint fails. 3) We restore from an old checkpoint and will start writing duplicate data to Kafka. The de-duplication that the sink performs does not help, especially because the random shards ids might be assigned differently.

IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 Phase commit utility in Flink used to implement Flink's Kafka EOS could not be implemented in Beam's context.

That's also my understanding, unless we change the interface.

I don't see how SDF solves this problem..

SDF has a checkpoint method which the Runner can call, but I think that you are right, that the above problem would be the same.

Absolutely. I would love to support EOS in KakaIO for Flink. I think that will help many future exactly-once sinks.. and address fundamental incompatibility between Beam model and Flink's horizontal checkpointing for such applications.

Great :)

The FlinkRunner would need to insert the "wait until checkpoint finalization" logic wherever it sees @RequiresStableInput, which is already what it would have to do.

I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:


On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi <ang...@gmail.com <mailto:ang...@gmail.com>> wrote:

    On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        I'm not sure what a hard fail is. I probably have a shallow
        understanding, but doesn't @RequiresStableInput work for 2PC?
        The preCommit() phase should establish the transaction and
        commit() is not called until after checkpoint finalization. Can
        you describe the way that it does not work a little bit more?


    - preCommit() is called before checkpoint. Kafka EOS in Flink starts
    the transaction before this and makes sure it flushes all records in
    preCommit(). So far good.
    - commit is called after checkpoint is persisted. Now, imagine
    commit() fails for some reason. There is no option to rerun the 1st
    phase to write the records again in a new transaction. This is a
    hard failure for the the job. In practice Flink might attempt to
    commit again (not sure how many times), which is likely to fail and
    eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be written inside commit() to work around this issue. It could have scalability issues if checkpoints are not frequent enough in Flink runner.

Raghu.



        Kenn

        On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <ang...@gmail.com
        <mailto:ang...@gmail.com>> wrote:

            On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
            <k...@apache.org <mailto:k...@apache.org>> wrote:

                I believe the way you would implement the logic behind
                Flink's KafkaProducer would be to have two steps:

                1. Start transaction
                2. @RequiresStableInput Close transaction


            I see.  What happens if closing the transaction fails in
            (2)? Flink's 2PC requires that commit() should never hard
            fail once preCommit() succeeds. I think that is cost of not
            having an extra shuffle. It is alright since this policy has
            worked well for Flink so far.

            Overall, it will be great to have @RequiresStableInput
            support in Flink runner.

            Raghu.

                The FlinkRunner would need to insert the "wait until
                checkpoint finalization" logic wherever it
                sees @RequiresStableInput, which is already what it
                would have to do.

                This matches the KafkaProducer's logic - delay closing
                the transaction until checkpoint finalization. This
                answers my main question, which is "is
                @RequiresStableInput expressive enough to allow
                Beam-on-Flink to have exactly once behavior with the
                same performance characteristics as native Flink
                checkpoint finalization?"

                Kenn

                [1] https://github.com/apache/beam/pull/7955

                On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
                <re...@google.com <mailto:re...@google.com>> wrote:



                    On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi
                    <ang...@gmail.com <mailto:ang...@gmail.com>> wrote:


                            Now why does the Flink Runner not support
                            KafkaIO EOS? Flink's native
                            KafkaProducer supports exactly-once. It
                            simply commits the pending
                            transaction once it has completed a checkpoint.



                        On Thu, Feb 28, 2019 at 9:59 AM Maximilian
                        Michels <m...@apache.org <mailto:m...@apache.org>>
                        wrote:

                            Hi,

                            I came across KafkaIO's Runner whitelist [1]
                            for enabling exactly-once
                            semantics (EOS). I think it is questionable
                            to exclude Runners from
                            inside a transform, but I see that the
                            intention was to save users from
                            surprises.

                            Now why does the Flink Runner not support
                            KafkaIO EOS? Flink's native
                            KafkaProducer supports exactly-once. It
                            simply commits the pending
                            transaction once it has completed a checkpoint.



                        When we discussed this in Aug 2017, the
                        understanding was that 2 Phase commit utility in
                        Flink used to implement Flink's Kafka EOS could
                        not be implemented in Beam's context.
                        See this message
<https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
                        that dev thread. Has anything changed in this
                        regard? The whole thread is relevant to this
                        topic and worth going through.

                    I think that TwoPhaseCommit utility class wouldn't
                    work. The Flink runner would probably want to
                    directly use notifySnapshotComplete in order to
                    implement @RequiresStableInput.


                            A checkpoint is realized by sending barriers
                            through all channels
                            starting from the source until reaching all
                            sinks. Every operator
                            persists its state once it has received a
                            barrier on all its input
                            channels, it then forwards it to the
                            downstream operators.

                            The architecture of Beam's
                            KafkaExactlyOnceSink is as follows[2]:

                            Input -> AssignRandomShardIds -> GroupByKey
                            -> AssignSequenceIds ->
                            GroupByKey -> ExactlyOnceWriter

                            As I understood, Spark or Dataflow use the
                            GroupByKey stages to persist
                            the input. That is not required in Flink to
                            be able to take a consistent
                            snapshot of the pipeline.

                            Basically, for Flink we don't need any of
                            that magic that KafkaIO does.
                            What we would need to support EOS is a way
                            to tell the ExactlyOnceWriter
                            (a DoFn) to commit once a checkpoint has
                            completed.

                            I know that the new version of SDF supports
                            checkpointing which should
                            solve this issue. But there is still a lot
                            of work to do to make this
                            reality.


                        I don't see how SDF solves this problem.. May be
                        pseudo code would make more clear.  But if
                        helps, that is great!

                            So I think it would make sense to think
                            about a way to make KafkaIO's
                            EOS more accessible to Runners which support
                            a different way of
                            checkpointing.


                        Absolutely. I would love to support EOS in
                        KakaIO for Flink. I think that will help many
                        future exactly-once sinks.. and address
                        fundamental incompatibility between Beam model
                        and Flink's horizontal checkpointing for such
                        applications.

                        Raghu.

                            Cheers,
                            Max

                            PS: I found this document about
                            RequiresStableInput [3], but IMHO
                            defining an annotation only manifests the
                            conceptual difference between
                            the Runners.


                            [1]
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
                            [2]
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
                            [3]
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM

Reply via email to