But there is still the possibility that we fail to flush the buffer after the 
checkpoint is complete (data loss)?

Since we have already checkpointed the buffered data we can retry flushing it in case of failures. We may emit elements multiple times but that is because the Kafka EOS sink will skip records which are already part of a committed transaction.

-Max

On 06.03.19 19:28, Thomas Weise wrote:
A fair amount of work for true true exactly once output was done in Apex. Different from almost exactly-once :)

The takeaway was that the mechanism to achieve it depends on the external system. The implementation looks different for let's say a file sink or JDBC or Kafka.

Apex had an exactly-once producer before Kafka supported transactions. That producer relied on the ability to discover what was already written to Kafka upon recovery from failure. Why?

Runners are not distributed transaction coordinators and no matter how we write the code, there is always the small possibility that one of two resources fails to commit, resulting in either data loss or duplicates. The Kafka EOS was a hybrid of producer and consumer, the consumer part used during recovery to find out what was already produced previously.

Flink and Apex have very similar checkpointing model, that's why this thread caught my attention. Within the topology/runner, exactly-once is achieved by replay having the same effect. For sinks, it needs to rely on the capabilities of the respective system (like atomic rename for file sink, or transaction with metadata table for JDBC).

The buffering until checkpoint is complete is a mechanism to get away from sink specific implementations. It comes with the latency penalty (memory overhead could be solved with a write ahead log). But there is still the possibility that we fail to flush the buffer after the checkpoint is complete (data loss)?

Thanks,
Thomas


On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote:

    On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi <ang...@gmail.com
    <mailto:ang...@gmail.com>> wrote:



        On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com
        <mailto:re...@google.com>> wrote:

            RE: Kenn's suggestion. i think Raghu looked into something
            that, and something about it didn't work. I don't remember
            all the details, but I think there might have been some
            subtle problem with it that wasn't obvious. Doesn't mean
            that there isn't another way to solve that issue.'


        Two disadvantages:
        - A transaction in Kafka are tied to single producer instance.
        There is no official API to start a txn in one process and
        access it in another process. Flink's sink uses an internal REST
        API for this.


    Can you say more about how this works?

        - There is one failure case that I mentioned earlier: if closing
        the transaction in downstream transform fails, it is data loss,
        there is no way to replay the upstream transform that wrote the
        records to Kafka.


    With coupling of unrelated failures due to fusion, this is a severe
    problem. I think I see now how 2PC affects this. From my reading, I
    can't see the difference in how Flink works. If the checkpoint
    finalization callback that does the Kafka commit fails, does it
    invalidate the checkpoint so the start transaction + write elements
    is retried?

    Kenn


        GBKs don't have major scalability limitations in most runner.
        Extra GBK is fine in practice for such a sink (at least no one
        has complained about it yet, though I don't know real usage
        numbers in practice). Flink's implentation in Beam
        using @RequiresStableInput  does have storage requirements and
        latency costs that increase with checkpoint interval. I think is
        still just as useful. Good to see @RequiresStableInput support
        added to Flink runner in Max's PR.


            Hopefully we can make that work. Another possibility if we
            can't is to do something special for Flink. Beam allows
            runners to splice out well-known transforms with their own
            implementation. Dataflow already does that for Google Cloud
            Pub/Sub sources/sinks. The Flink runner could splice out the
Kafka sink with one that uses Flink-specific functionality. Ideally this would reuse most of the existing Kafka code
            (maybe we could refactor just the EOS part into something
            that could be subbed out).

            Reuven

            On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels
            <m...@apache.org <mailto:m...@apache.org>> wrote:

                 > It would be interesting to see if there's something
                we could add to the Beam model that would create a
                better story for Kafka's EOS writes.

                There would have to be a checkpoint-completed callback
                the DoFn can
                register with the Runner. Does not seem applicable for
                most Runners though.

                 > This is true, however isn't it already true for such
                uses of Flink?

                Yes, that's correct. In the case of Kafka, Flink can
                offload the
                buffering but for the general case, idempotent writes
                are only possible
                if we buffer data until the checkpoint is completed.

                On 04.03.19 17:45, Reuven Lax wrote:
                 >
                 >
                 > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels
                <m...@apache.org <mailto:m...@apache.org>
                 > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
                 >
                 >      > Can we do 2? I seem to remember that we had
                trouble in some cases
                 >     (e..g in the BigQuery case, there was no obvious
                way to create a
                 >     deterministic id, which is why we went for a
                random number followed
                 >     by a reshuffle). Also remember that the user
                ParDo that is producing
                 >     data to the sink is not guaranteed to be
                deterministic; the Beam
                 >     model allows for non-deterministic transforms.
                 >
                 >     I believe we could use something like the worker
                id to make it
                 >     deterministic, though the worker id can change
                after a restart. We
                 >     could
                 >     persist it in Flink's operator state. I do not
                know if we can come up
                 >     with a Runner-independent solution.
                 >
                 >
                 > If we did this, we would break it on runners that
                don't have a concept
                 > of a stable worker id :( The Dataflow runner can load
                balance work at
                 > any time (including moving work around between workers).
                 >
                 >
                 >      > I'm not quite sure I understand. If a ParDo is
                marked with
                 >     RequiresStableInput, can't the flink runner
                buffer the input message
                 >     until after the checkpoint is complete and only
                then deliver it to
                 >     the ParDo?
                 >
                 >     You're correct. I thought that it could suffice
                to only buffer during a
                 >     checkpoint and otherwise rely on the
                deterministic execution of the
                 >     pipeline and KafkaIO's de-duplication code.
                 >
                 >
                 > Yes, I want to distinguish the KafkaIO case from the
                general case. It
                 > would be interesting to see if there's something we
                could add to the
                 > Beam model that would create a better story for
                Kafka's EOS writes.
                 >
                 >
                 >     In any case, emitting only after finalization of
                checkpoints gives us
                 >     guaranteed stable input. It also means that the
                processing is tight to
                 >     the checkpoint interval, the checkpoint duration,
                and the available
                 >     memory.
                 >
                 >
                 > This is true, however isn't it already true for such
                uses of Flink?
                 >
                 >
                 >     On 01.03.19 19:41, Reuven Lax wrote:
                 >      >
                 >      >
                 >      > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
                 >     <m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>
                 >      > <mailto:m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
                 >      >
                 >      >     Fully agree. I think we can improve the
                situation
                 >     drastically. For
                 >      >     KafkaIO EOS with Flink we need to make
                these two changes:
                 >      >
                 >      >     1) Introduce buffering while the
                checkpoint is being taken
                 >      >     2) Replace the random shard id assignment
                with something
                 >     deterministic
                 >      >
                 >      >
                 >      > Can we do 2? I seem to remember that we had
                trouble in some cases
                 >     (e..g
                 >      > in the BigQuery case, there was no obvious way
                to create a
                 >     deterministic
                 >      > id, which is why we went for a random number
                followed by a
                 >     reshuffle).
                 >      > Also remember that the user ParDo that is
                producing data to the
                 >     sink is
                 >      > not guaranteed to be deterministic; the Beam
                model allows for
                 >      > non-deterministic transforms.
                 >      >
                 >      >
                 >      >     However, we won't be able to provide full
                compatibility with
                 >      >     RequiresStableInput because Flink only
                guarantees stable
                 >     input after a
                 >      >     checkpoint. RequiresStableInput requires
                input at any point
                 >     in time to
                 >      >     be stable.
                 >      >
                 >      >
                 >      > I'm not quite sure I understand. If a ParDo is
                marked with
                 >      > RequiresStableInput, can't the flink runner
                buffer the input message
                 >      > until after the checkpoint is complete and
                only then deliver it
                 >     to the
                 >      > ParDo? This adds latency of course, but I'm
                not sure how else to do
                 >      > things correctly with the Beam model.
                 >      >
                 >      >     IMHO the only way to achieve that is
                materializing output
                 >      >     which Flink does not currently support.
                 >      >
                 >      >     KafkaIO does not need all the power of
                RequiresStableInput to
                 >     achieve
                 >      >     EOS with Flink, but for the general case I
                don't see a good
                 >     solution at
                 >      >     the moment.
                 >      >
                 >      >     -Max
                 >      >
                 >      >     On 01.03.19 16:45, Reuven Lax wrote:
                 >      >      > Yeah, the person who was working on it
                originally stopped
                 >     working on
                 >      >      > Beam, and nobody else ever finished it.
                I think it is
                 >     important to
                 >      >      > finish though. Many of the existing
                Sinks are only fully
                 >     correct for
                 >      >      > Dataflow today, because they generate
                either Reshuffle or
                 >      >     GroupByKey to
                 >      >      > ensure input stability before
                outputting (in many cases
                 >     this code
                 >      >     was
                 >      >      > inherited from before Beam existed). On
                Flink today, these
                 >     sinks
                 >      >     might
                 >      >      > occasionally produce duplicate output
                in the case of failures.
                 >      >      >
                 >      >      > Reuven
                 >      >      >
                 >      >      > On Fri, Mar 1, 2019 at 7:18 AM
                Maximilian Michels
                 >     <m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>
                 >      >     <mailto:m...@apache.org
                <mailto:m...@apache.org> <mailto:m...@apache.org
                <mailto:m...@apache.org>>>
                 >      >      > <mailto:m...@apache.org
                <mailto:m...@apache.org> <mailto:m...@apache.org
                <mailto:m...@apache.org>>
                 >     <mailto:m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
                 >      >      >
                 >      >      >     Circling back to the
                RequiresStableInput
                 >     annotation[1]. I've
                 >      >     done some
                 >      >      >     protoyping to see how this could be
                integrated into
                 >     Flink. I'm
                 >      >      >     currently
                 >      >      >     writing a test based on
                RequiresStableInput.
                 >      >      >
                 >      >      >     I found out there are already
                checks in place at the
                 >     Runners to
                 >      >      >     throw in
                 >      >      >     case transforms use
                RequiresStableInput and its not
                 >      >     supported. However,
                 >      >      >     not a single transform actually
                uses the annotation.
                 >      >      >
                 >      >      >     It seems that the effort stopped at
                some point? Would
                 >     it make
                 >      >     sense to
                 >      >      >     start annotating
                KafkaExactlyOnceSink with
                 >      >     @RequiresStableInput? We
                 >      >      >     could then get rid of the whitelist.
                 >      >      >
                 >      >      >     -Max
                 >      >      >
                 >      >      >     [1]
                 >      >      >
                 >      >
                 >
                
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
                 >      >      >
                 >      >      >
                 >      >      >
                 >      >      >     On 01.03.19 14:28, Maximilian
                Michels wrote:
                 >      >      >      > Just realized that transactions
                do not spawn multiple
                 >      >     elements in
                 >      >      >      > KafkaExactlyOnceSink. So the
                proposed solution to stop
                 >      >     processing
                 >      >      >      > elements while a snapshot is
                pending would work.
                 >      >      >      >
                 >      >      >      > It is certainly not optimal in
                terms of performance for
                 >      >     Flink and
                 >      >      >     poses
                 >      >      >      > problems when checkpoints take
                long to complete, but it
                 >      >     would be
                 >      >      >      > worthwhile to implement this to
                make use of the EOS
                 >     feature.
                 >      >      >      >
                 >      >      >      > Thanks,
                 >      >      >      > Max
                 >      >      >      >
                 >      >      >      > On 01.03.19 12:23, Maximilian
                Michels wrote:
                 >      >      >      >> Thanks you for the prompt
                replies. It's great to
                 >     see that
                 >      >     there is
                 >      >      >      >> good understanding of how EOS
                in Flink works.
                 >      >      >      >>
                 >      >      >      >>> This is exactly what
                RequiresStableInput is
                 >     supposed to
                 >      >     do. On the
                 >      >      >      >>> Flink runner, this would be
                implemented by delaying
                 >      >     processing
                 >      >      >     until
                 >      >      >      >>> the current checkpoint is done.
                 >      >      >      >>
                 >      >      >      >> I don't think that works
                because we have no
                 >     control over
                 >      >     the Kafka
                 >      >      >      >> transactions. Imagine:
                 >      >      >      >>
                 >      >      >      >> 1) ExactlyOnceWriter writes
                records to Kafka and
                 >     commits,
                 >      >     then
                 >      >      >     starts
                 >      >      >      >> a new transaction.
                 >      >      >      >> 2) Flink checkpoints, delaying
                the processing of
                 >      >     elements, the
                 >      >      >      >> checkpoint fails.
                 >      >      >      >> 3) We restore from an old
                checkpoint and will
                 >     start writing
                 >      >      >     duplicate
                 >      >      >      >> data to Kafka. The
                de-duplication that the sink
                 >     performs
                 >      >     does not
                 >      >      >      >> help, especially because the
                random shards ids
                 >     might be
                 >      >     assigned
                 >      >      >      >> differently.
                 >      >      >      >>
                 >      >      >      >> IMHO we have to have control
                over commit to be able to
                 >      >     provide EOS.
                 >      >      >      >>
                 >      >      >      >>> When we discussed this in Aug
                2017, the understanding
                 >      >     was that 2
                 >      >      >      >>> Phase commit utility in Flink
                used to implement
                 >     Flink's
                 >      >     Kafka EOS
                 >      >      >      >>> could not be implemented in
                Beam's context.
                 >      >      >      >>
                 >      >      >      >> That's also my understanding,
                unless we change the
                 >     interface.
                 >      >      >      >>
                 >      >      >      >>> I don't see how SDF solves
                this problem..
                 >      >      >      >>
                 >      >      >      >> SDF has a checkpoint method
                which the Runner can call,
                 >      >     but I think
                 >      >      >      >> that you are right, that the
                above problem would
                 >     be the same.
                 >      >      >      >>
                 >      >      >      >>> Absolutely. I would love to
                support EOS in KakaIO for
                 >      >     Flink. I
                 >      >      >     think
                 >      >      >      >>> that will help many future
                exactly-once sinks..
                 >     and address
                 >      >      >      >>> fundamental incompatibility
                between Beam model
                 >     and Flink's
                 >      >      >     horizontal
                 >      >      >      >>> checkpointing for such
                applications.
                 >      >      >      >>
                 >      >      >      >> Great :)
                 >      >      >      >>
                 >      >      >      >>> The FlinkRunner would need to
                insert the "wait until
                 >      >     checkpoint
                 >      >      >      >>> finalization" logic wherever
                it sees
                 >     @RequiresStableInput,
                 >      >      >     which is
                 >      >      >      >>> already what it would have to do.
                 >      >      >      >>
                 >      >      >      >> I don't think that fixes the
                problem. See above
                 >     example.
                 >      >      >      >>
                 >      >      >      >> Thanks,
                 >      >      >      >> Max
                 >      >      >      >>
                 >      >      >      >> On 01.03.19 00:04, Raghu Angadi
                wrote:
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>> On Thu, Feb 28, 2019 at 2:42
                PM Raghu Angadi
                 >      >     <ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>
                 >      >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>>
                 >      >      >      >>> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>
                 >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>>>> wrote:
                 >      >      >      >>>
                 >      >      >      >>>     On Thu, Feb 28, 2019 at
                2:34 PM Kenneth Knowles
                 >      >      >     <k...@apache.org
                <mailto:k...@apache.org> <mailto:k...@apache.org
                <mailto:k...@apache.org>>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>
                <mailto:k...@apache.org <mailto:k...@apache.org>>>
                 >      >     <mailto:k...@apache.org
                <mailto:k...@apache.org> <mailto:k...@apache.org
                <mailto:k...@apache.org>>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>
                <mailto:k...@apache.org <mailto:k...@apache.org>>>>
                 >      >      >      >>>     <mailto:k...@apache.org
                <mailto:k...@apache.org>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>>
                <mailto:k...@apache.org <mailto:k...@apache.org>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
                 >      >     <mailto:k...@apache.org
                <mailto:k...@apache.org> <mailto:k...@apache.org
                <mailto:k...@apache.org>>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>
                <mailto:k...@apache.org <mailto:k...@apache.org>>>>>> wrote:
                 >      >      >      >>>
                 >      >      >      >>>         I'm not sure what a
                hard fail is. I probably
                 >      >     have a shallow
                 >      >      >      >>>         understanding, but doesn't
                 >     @RequiresStableInput work
                 >      >      >     for 2PC?
                 >      >      >      >>>         The preCommit() phase
                should establish the
                 >      >     transaction and
                 >      >      >      >>>         commit() is not called
                until after checkpoint
                 >      >      >     finalization. Can
                 >      >      >      >>>         you describe the way
                that it does not work a
                 >      >     little bit
                 >      >      >     more?
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>     - preCommit() is called
                before checkpoint.
                 >     Kafka EOS in
                 >      >      >     Flink starts
                 >      >      >      >>>     the transaction before
                this and makes sure it
                 >      >     flushes all
                 >      >      >     records in
                 >      >      >      >>>     preCommit(). So far good.
                 >      >      >      >>>     - commit is called after
                checkpoint is persisted.
                 >      >     Now, imagine
                 >      >      >      >>>     commit() fails for some
                reason. There is no
                 >     option
                 >      >     to rerun
                 >      >      >     the 1st
                 >      >      >      >>>     phase to write the records
                again in a new
                 >      >     transaction. This
                 >      >      >     is a
                 >      >      >      >>>     hard failure for the the
                job. In practice
                 >     Flink might
                 >      >      >     attempt to
                 >      >      >      >>>     commit again (not sure how
                many times), which is
                 >      >     likely to
                 >      >      >     fail and
                 >      >      >      >>>     eventually results in job
                failure.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>> In Apache Beam, the records
                could be stored in state,
                 >      >     and can be
                 >      >      >      >>> written inside commit() to
                work around this issue. It
                 >      >     could have
                 >      >      >      >>> scalability issues if
                checkpoints are not frequent
                 >      >     enough in Flink
                 >      >      >      >>> runner.
                 >      >      >      >>>
                 >      >      >      >>> Raghu.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>         Kenn
                 >      >      >      >>>
                 >      >      >      >>>         On Thu, Feb 28, 2019
                at 1:25 PM Raghu Angadi
                 >      >      >     <ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>>
>      >      >      >>> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
                 >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>>>> wrote:
                 >      >      >      >>>
                 >      >      >      >>>             On Thu, Feb 28,
                2019 at 11:01 AM
                 >     Kenneth Knowles
                 >      >      >      >>>             <k...@apache.org
                <mailto:k...@apache.org>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>>
                <mailto:k...@apache.org <mailto:k...@apache.org>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
                 >      >     <mailto:k...@apache.org
                <mailto:k...@apache.org> <mailto:k...@apache.org
                <mailto:k...@apache.org>>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>
                <mailto:k...@apache.org <mailto:k...@apache.org>>>>
                 >      >      >     <mailto:k...@apache.org
                <mailto:k...@apache.org> <mailto:k...@apache.org
                <mailto:k...@apache.org>>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>
                <mailto:k...@apache.org <mailto:k...@apache.org>>>
                 >      >     <mailto:k...@apache.org
                <mailto:k...@apache.org> <mailto:k...@apache.org
                <mailto:k...@apache.org>>
                 >     <mailto:k...@apache.org <mailto:k...@apache.org>
                <mailto:k...@apache.org <mailto:k...@apache.org>>>>>> wrote:
                 >      >      >      >>>
                 >      >      >      >>>                 I believe the
                way you would implement
                 >      >     the logic
                 >      >      >     behind
                 >      >      >      >>>                 Flink's
                KafkaProducer would be to
                 >     have
                 >      >     two steps:
                 >      >      >      >>>
                 >      >      >      >>>                 1. Start
                transaction
>      >      >      >>> 2. @RequiresStableInput Close
                 >     transaction
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>             I see.  What
                happens if closing the
                 >     transaction
                 >      >      >     fails in
                 >      >      >      >>>             (2)? Flink's 2PC
                requires that
                 >     commit() should
                 >      >      >     never hard
                 >      >      >      >>>             fail once
                preCommit() succeeds. I
                 >     think that is
                 >      >      >     cost of not
                 >      >      >      >>>             having an extra
                shuffle. It is
                 >     alright since
                 >      >     this
                 >      >      >     policy has
                 >      >      >      >>>             worked well for
                Flink so far.
                 >      >      >      >>>
                 >      >      >      >>>             Overall, it will
                be great to have
                 >      >     @RequiresStableInput
                 >      >      >      >>>             support in Flink
                runner.
                 >      >      >      >>>
                 >      >      >      >>>             Raghu.
                 >      >      >      >>>
                 >      >      >      >>>                 The
                FlinkRunner would need to
                 >     insert the
                 >      >     "wait
                 >      >      >     until
                 >      >      >      >>>                 checkpoint
                finalization" logic
                 >     wherever it
>      >      >      >>> sees @RequiresStableInput, which is
                 >      >     already what it
                 >      >      >      >>>                 would have to do.
                 >      >      >      >>>
                 >      >      >      >>>                 This matches
                the KafkaProducer's
                 >     logic -
                 >      >     delay
                 >      >      >     closing
                 >      >      >      >>>                 the
                transaction until checkpoint
                 >      >     finalization. This
                 >      >      >      >>>                 answers my
                main question, which
                 >     is "is
>      >      >      >>> @RequiresStableInput expressive
                 >     enough
                 >      >     to allow
                 >      >      >      >>>                 Beam-on-Flink
                to have exactly
                 >     once behavior
                 >      >      >     with the
                 >      >      >      >>>                 same
                performance characteristics as
                 >      >     native Flink
                 >      >      >      >>>                 checkpoint
                finalization?"
                 >      >      >      >>>
                 >      >      >      >>>                 Kenn
                 >      >      >      >>>
                 >      >      >      >>>                 [1]
                 > https://github.com/apache/beam/pull/7955
                 >      >      >      >>>
                 >      >      >      >>>                 On Thu, Feb
                28, 2019 at 10:43 AM
                 >     Reuven Lax
>      >      >      >>> <re...@google.com <mailto:re...@google.com>
                 >     <mailto:re...@google.com <mailto:re...@google.com>>
                 >      >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>>
                 >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>
                 >      >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>>>
                 >      >      >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>
                 >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>>
                 >      >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>
                 >     <mailto:re...@google.com
                <mailto:re...@google.com> <mailto:re...@google.com
                <mailto:re...@google.com>>>>>> wrote:
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>                     On Thu,
                Feb 28, 2019 at 10:41 AM
                 >      >     Raghu Angadi
>      >      >      >>> <ang...@gmail.com <mailto:ang...@gmail.com>
                 >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>>
                 >      >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>
                 >      >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>
                 >     <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
                <mailto:ang...@gmail.com>>>>>> wrote:
                 >      >      >      >>>
                 >      >      >      >>>
>      >      >      >>> Now why does the Flink
                 >      >     Runner not
                 >      >      >     support
>      >      >      >>> KafkaIO EOS? Flink's
                 >     native
>      >      >      >>> KafkaProducer supports
                 >      >     exactly-once. It
>      >      >      >>> simply commits the
                 >     pending
>      >      >      >>> transaction once it has
                 >      >     completed a
                 >      >      >      >>> checkpoint.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>                         On
                Thu, Feb 28, 2019 at
                 >     9:59 AM
                 >      >     Maximilian
>      >      >      >>> Michels <m...@apache.org <mailto:m...@apache.org>
                 >     <mailto:m...@apache.org <mailto:m...@apache.org>>
                 >      >     <mailto:m...@apache.org
                <mailto:m...@apache.org> <mailto:m...@apache.org
                <mailto:m...@apache.org>>>
                 >      >      >     <mailto:m...@apache.org
                <mailto:m...@apache.org> <mailto:m...@apache.org
                <mailto:m...@apache.org>>
                 >     <mailto:m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>>>
                 >      >     <mailto:m...@apache.org
                <mailto:m...@apache.org> <mailto:m...@apache.org
                <mailto:m...@apache.org>>
                 >     <mailto:m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>>
                 >      >     <mailto:m...@apache.org
                <mailto:m...@apache.org> <mailto:m...@apache.org
                <mailto:m...@apache.org>>
                 >     <mailto:m...@apache.org <mailto:m...@apache.org>
                <mailto:m...@apache.org <mailto:m...@apache.org>>>>>>
                 >      >      >      >>>                         wrote:
                 >      >      >      >>>
                 >      >      >      >>>                             Hi,
                 >      >      >      >>>
                 >      >      >      >>>                             I
                came across
                 >     KafkaIO's Runner
                 >      >      >     whitelist [1]
>      >      >      >>> for enabling exactly-once >      >      >      >>> semantics (EOS). I
                 >     think it is
                 >      >      >     questionable
                 >      >      >      >>>                             to
                exclude Runners from
>      >      >      >>> inside a transform, but I
                 >      >     see that the
>      >      >      >>> intention was to save
                 >     users from
>      >      >      >>> surprises.
                 >      >      >      >>>
>      >      >      >>> Now why does the Flink
                 >      >     Runner not
                 >      >      >     support
>      >      >      >>> KafkaIO EOS? Flink's
                 >     native
>      >      >      >>> KafkaProducer supports
                 >      >     exactly-once. It
>      >      >      >>> simply commits the
                 >     pending
>      >      >      >>> transaction once it has
                 >      >     completed a
                 >      >      >      >>> checkpoint.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>                         When
                we discussed this in Aug
                 >      >     2017, the
>      >      >      >>> understanding was that 2
                 >     Phase
                 >      >     commit
                 >      >      >     utility in
                 >      >      >      >>>                         Flink
                used to implement
                 >     Flink's
                 >      >     Kafka
                 >      >      >     EOS could
                 >      >      >      >>>                         not be
                implemented in Beam's
                 >      >     context.
                 >      >      >      >>>                         See
                this message
                 >      >      >      >>>
                 >      >
>  <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
                 >      >      >      >>>                         that
                dev thread. Has anything
                 >      >     changed
                 >      >      >     in this
>      >      >      >>> regard? The whole thread is
                 >      >     relevant to
                 >      >      >     this
                 >      >      >      >>>                         topic
                and worth going
                 >     through.
                 >      >      >      >>>
                 >      >      >      >>>                     I think
                that TwoPhaseCommit
                 >     utility
                 >      >     class
                 >      >      >     wouldn't
                 >      >      >      >>>                     work. The
                Flink runner would
                 >      >     probably want to
                 >      >      >      >>>                     directly use
                 >     notifySnapshotComplete
                 >      >     in order to
>      >      >      >>> implement @RequiresStableInput.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>                             A
                checkpoint is
                 >     realized by
                 >      >     sending
                 >      >      >     barriers
>      >      >      >>> through all channels >      >      >      >>> starting from the
                 >     source until
                 >      >      >     reaching all
>      >      >      >>> sinks. Every operator >      >      >      >>> persists its state
                 >     once it has
                 >      >      >     received a
>      >      >      >>> barrier on all its input >      >      >      >>> channels, it then
                 >     forwards
                 >      >     it to the
>      >      >      >>> downstream operators.
                 >      >      >      >>>
>      >      >      >>> The architecture of
                 >     Beam's
>      >      >      >>> KafkaExactlyOnceSink
                 >     is as
                 >      >     follows[2]:
                 >      >      >      >>>
>      >      >      >>> Input ->
                 >     AssignRandomShardIds ->
                 >      >      >     GroupByKey
                 >      >      >      >>>                             ->
                AssignSequenceIds ->
>      >      >      >>> GroupByKey ->
                 >     ExactlyOnceWriter
                 >      >      >      >>>
                 >      >      >      >>>                             As
                I understood, Spark or
                 >      >     Dataflow
                 >      >      >     use the
>      >      >      >>> GroupByKey stages to
                 >     persist
>      >      >      >>> the input. That is not
                 >      >     required in
                 >      >      >     Flink to
                 >      >      >      >>>                             be
                able to take a
                 >     consistent
>      >      >      >>> snapshot of the pipeline.
                 >      >      >      >>>
>      >      >      >>> Basically, for Flink we
                 >      >     don't need
                 >      >      >     any of
>      >      >      >>> that magic that
                 >     KafkaIO does.
>      >      >      >>> What we would need to
                 >      >     support EOS
                 >      >      >     is a way
                 >      >      >      >>>                             to
                tell the
                 >     ExactlyOnceWriter
                 >      >      >      >>>                             (a
                DoFn) to commit once a
                 >      >      >     checkpoint has
>      >      >      >>> completed.
                 >      >      >      >>>
                 >      >      >      >>>                             I
                know that the new
                 >     version
                 >      >     of SDF
                 >      >      >     supports
>      >      >      >>> checkpointing which
                 >     should
>      >      >      >>> solve this issue. But
                 >     there is
                 >      >      >     still a lot
                 >      >      >      >>>                             of
                work to do to make
                 >     this
>      >      >      >>> reality.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>                         I
                don't see how SDF
                 >     solves this
                 >      >      >     problem.. May be
                 >      >      >      >>>                         pseudo
                code would make more
                 >      >     clear.  But if
                 >      >      >      >>>                         helps,
                that is great!
                 >      >      >      >>>
                 >      >      >      >>>                             So
                I think it would make
                 >      >     sense to think
>      >      >      >>> about a way to make
                 >     KafkaIO's
>      >      >      >>> EOS more accessible
                 >     to Runners
                 >      >      >     which support
                 >      >      >      >>>                             a
                different way of
>      >      >      >>> checkpointing.
                 >      >      >      >>>
                 >      >      >      >>>
>      >      >      >>> Absolutely. I would love to
                 >      >     support EOS in
                 >      >      >      >>>                         KakaIO
                for Flink. I think
                 >     that will
                 >      >      >     help many
                 >      >      >      >>>                         future
                exactly-once
                 >     sinks.. and
                 >      >     address
>      >      >      >>> fundamental
                 >     incompatibility between
                 >      >      >     Beam model
                 >      >      >      >>>                         and
                Flink's horizontal
                 >     checkpointing
                 >      >      >     for such
>      >      >      >>> applications.
                 >      >      >      >>>
                 >      >      >      >>>                         Raghu.
                 >      >      >      >>>
>      >      >      >>> Cheers,
                 >      >      >      >>>                             Max
                 >      >      >      >>>
>      >      >      >>> PS: I found this
                 >     document about
>      >      >      >>> RequiresStableInput
                 >     [3], but
                 >      >     IMHO
>      >      >      >>> defining an
                 >     annotation only
                 >      >      >     manifests the
>      >      >      >>> conceptual difference
                 >     between
>      >      >      >>> the Runners.
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >      >>>                             [1]
                 >      >      >      >>>
                 >      >      >
                 >      >
                 >
                
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
                 >      >      >
                 >      >      >      >>>
                 >      >      >      >>>                             [2]
                 >      >      >      >>>
                 >      >      >
                 >      >
                 >
                
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
                 >      >      >
                 >      >      >      >>>
                 >      >      >      >>>                             [3]
                 >      >      >      >>>
                 >      >      >
                 >      >
                 >
                
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
                 >      >      >
                 >      >      >      >>>
                 >      >      >      >>>
                 >      >      >
                 >      >
                 >

Reply via email to