FYI I created a PR for supporting @RequiresStableInput by delaying processing until checkpoints complete: https://github.com/apache/beam/pull/7991

On 05.03.19 16:49, Reuven Lax wrote:
BTW - as a followup - there is a cost to having a Flink-specific override for the Kafka sink. Part of that is test coverage - users who write DirectRunner tests for their pipeline will now be using a different version of the code than is used on the actual Flink runner. It also makes the code less obvious: people who read the KafkaIO code will tend not to realize that Flink is running something a bit different, and this can lead to confusion.

Reuven

On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <[email protected] <mailto:[email protected]>> wrote:

    RE: Kenn's suggestion. i think Raghu looked into something that, and
    something about it didn't work. I don't remember all the details,
    but I think there might have been some subtle problem with it that
    wasn't obvious. Doesn't mean that there isn't another way to solve
    that issue.

    Hopefully we can make that work. Another possibility if we can't is
    to do something special for Flink. Beam allows runners to splice out
    well-known transforms with their own implementation. Dataflow
    already does that for Google Cloud Pub/Sub sources/sinks. The Flink
    runner could splice out the Kafka sink with one that uses
    Flink-specific functionality.  Ideally this would reuse most of the
    existing Kafka code (maybe we could refactor just the EOS part into
    something that could be subbed out).

    Reuven

    On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels <[email protected]
    <mailto:[email protected]>> wrote:

         > It would be interesting to see if there's something we could
        add to the Beam model that would create a better story for
        Kafka's EOS writes.

        There would have to be a checkpoint-completed callback the DoFn can
        register with the Runner. Does not seem applicable for most
        Runners though.

         > This is true, however isn't it already true for such uses of
        Flink?

        Yes, that's correct. In the case of Kafka, Flink can offload the
        buffering but for the general case, idempotent writes are only
        possible
        if we buffer data until the checkpoint is completed.

        On 04.03.19 17:45, Reuven Lax wrote:
         >
         >
         > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels
        <[email protected] <mailto:[email protected]>
         > <mailto:[email protected] <mailto:[email protected]>>> wrote:
         >
         >      > Can we do 2? I seem to remember that we had trouble in
        some cases
         >     (e..g in the BigQuery case, there was no obvious way to
        create a
         >     deterministic id, which is why we went for a random
        number followed
         >     by a reshuffle). Also remember that the user ParDo that
        is producing
         >     data to the sink is not guaranteed to be deterministic;
        the Beam
         >     model allows for non-deterministic transforms.
         >
         >     I believe we could use something like the worker id to
        make it
         >     deterministic, though the worker id can change after a
        restart. We
         >     could
         >     persist it in Flink's operator state. I do not know if we
        can come up
         >     with a Runner-independent solution.
         >
         >
         > If we did this, we would break it on runners that don't have
        a concept
         > of a stable worker id :( The Dataflow runner can load balance
        work at
         > any time (including moving work around between workers).
         >
         >
         >      > I'm not quite sure I understand. If a ParDo is marked with
         >     RequiresStableInput, can't the flink runner buffer the
        input message
         >     until after the checkpoint is complete and only then
        deliver it to
         >     the ParDo?
         >
         >     You're correct. I thought that it could suffice to only
        buffer during a
         >     checkpoint and otherwise rely on the deterministic
        execution of the
         >     pipeline and KafkaIO's de-duplication code.
         >
         >
         > Yes, I want to distinguish the KafkaIO case from the general
        case. It
         > would be interesting to see if there's something we could add
        to the
         > Beam model that would create a better story for Kafka's EOS
        writes.
         >
         >
         >     In any case, emitting only after finalization of
        checkpoints gives us
         >     guaranteed stable input. It also means that the
        processing is tight to
         >     the checkpoint interval, the checkpoint duration, and the
        available
         >     memory.
         >
         >
         > This is true, however isn't it already true for such uses of
        Flink?
         >
         >
         >     On 01.03.19 19:41, Reuven Lax wrote:
         >      >
         >      >
         >      > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
         >     <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >      > <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>> wrote:
         >      >
         >      >     Fully agree. I think we can improve the situation
         >     drastically. For
         >      >     KafkaIO EOS with Flink we need to make these two
        changes:
         >      >
         >      >     1) Introduce buffering while the checkpoint is
        being taken
         >      >     2) Replace the random shard id assignment with
        something
         >     deterministic
         >      >
         >      >
         >      > Can we do 2? I seem to remember that we had trouble in
        some cases
         >     (e..g
         >      > in the BigQuery case, there was no obvious way to create a
         >     deterministic
         >      > id, which is why we went for a random number followed by a
         >     reshuffle).
         >      > Also remember that the user ParDo that is producing
        data to the
         >     sink is
         >      > not guaranteed to be deterministic; the Beam model
        allows for
         >      > non-deterministic transforms.
         >      >
         >      >
         >      >     However, we won't be able to provide full
        compatibility with
         >      >     RequiresStableInput because Flink only guarantees
        stable
         >     input after a
         >      >     checkpoint. RequiresStableInput requires input at
        any point
         >     in time to
         >      >     be stable.
         >      >
         >      >
         >      > I'm not quite sure I understand. If a ParDo is marked with
         >      > RequiresStableInput, can't the flink runner buffer the
        input message
         >      > until after the checkpoint is complete and only then
        deliver it
         >     to the
         >      > ParDo? This adds latency of course, but I'm not sure
        how else to do
         >      > things correctly with the Beam model.
         >      >
         >      >     IMHO the only way to achieve that is materializing
        output
         >      >     which Flink does not currently support.
         >      >
         >      >     KafkaIO does not need all the power of
        RequiresStableInput to
         >     achieve
         >      >     EOS with Flink, but for the general case I don't
        see a good
         >     solution at
         >      >     the moment.
         >      >
         >      >     -Max
         >      >
         >      >     On 01.03.19 16:45, Reuven Lax wrote:
         >      >      > Yeah, the person who was working on it
        originally stopped
         >     working on
         >      >      > Beam, and nobody else ever finished it. I think
        it is
         >     important to
         >      >      > finish though. Many of the existing Sinks are
        only fully
         >     correct for
         >      >      > Dataflow today, because they generate either
        Reshuffle or
         >      >     GroupByKey to
         >      >      > ensure input stability before outputting (in
        many cases
         >     this code
         >      >     was
         >      >      > inherited from before Beam existed). On Flink
        today, these
         >     sinks
         >      >     might
         >      >      > occasionally produce duplicate output in the
        case of failures.
         >      >      >
         >      >      > Reuven
         >      >      >
         >      >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels
         >     <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >      > <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
         >      >      >
         >      >      >     Circling back to the RequiresStableInput
         >     annotation[1]. I've
         >      >     done some
         >      >      >     protoyping to see how this could be
        integrated into
         >     Flink. I'm
         >      >      >     currently
         >      >      >     writing a test based on RequiresStableInput.
         >      >      >
         >      >      >     I found out there are already checks in
        place at the
         >     Runners to
         >      >      >     throw in
         >      >      >     case transforms use RequiresStableInput and
        its not
         >      >     supported. However,
         >      >      >     not a single transform actually uses the
        annotation.
         >      >      >
         >      >      >     It seems that the effort stopped at some
        point? Would
         >     it make
         >      >     sense to
         >      >      >     start annotating KafkaExactlyOnceSink with
         >      >     @RequiresStableInput? We
         >      >      >     could then get rid of the whitelist.
         >      >      >
         >      >      >     -Max
         >      >      >
         >      >      >     [1]
         >      >      >
         >      >
         >
        
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
         >      >      >
         >      >      >
         >      >      >
         >      >      >     On 01.03.19 14:28, Maximilian Michels wrote:
         >      >      >      > Just realized that transactions do not
        spawn multiple
         >      >     elements in
         >      >      >      > KafkaExactlyOnceSink. So the proposed
        solution to stop
         >      >     processing
         >      >      >      > elements while a snapshot is pending
        would work.
         >      >      >      >
         >      >      >      > It is certainly not optimal in terms of
        performance for
         >      >     Flink and
         >      >      >     poses
         >      >      >      > problems when checkpoints take long to
        complete, but it
         >      >     would be
         >      >      >      > worthwhile to implement this to make use
        of the EOS
         >     feature.
         >      >      >      >
         >      >      >      > Thanks,
         >      >      >      > Max
         >      >      >      >
         >      >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
         >      >      >      >> Thanks you for the prompt replies. It's
        great to
         >     see that
         >      >     there is
         >      >      >      >> good understanding of how EOS in Flink
        works.
         >      >      >      >>
         >      >      >      >>> This is exactly what
        RequiresStableInput is
         >     supposed to
         >      >     do. On the
         >      >      >      >>> Flink runner, this would be
        implemented by delaying
         >      >     processing
         >      >      >     until
         >      >      >      >>> the current checkpoint is done.
         >      >      >      >>
         >      >      >      >> I don't think that works because we have no
         >     control over
         >      >     the Kafka
         >      >      >      >> transactions. Imagine:
         >      >      >      >>
         >      >      >      >> 1) ExactlyOnceWriter writes records to
        Kafka and
         >     commits,
         >      >     then
         >      >      >     starts
         >      >      >      >> a new transaction.
         >      >      >      >> 2) Flink checkpoints, delaying the
        processing of
         >      >     elements, the
         >      >      >      >> checkpoint fails.
         >      >      >      >> 3) We restore from an old checkpoint
        and will
         >     start writing
         >      >      >     duplicate
         >      >      >      >> data to Kafka. The de-duplication that
        the sink
         >     performs
         >      >     does not
         >      >      >      >> help, especially because the random
        shards ids
         >     might be
         >      >     assigned
         >      >      >      >> differently.
         >      >      >      >>
         >      >      >      >> IMHO we have to have control over
        commit to be able to
         >      >     provide EOS.
         >      >      >      >>
         >      >      >      >>> When we discussed this in Aug 2017,
        the understanding
         >      >     was that 2
         >      >      >      >>> Phase commit utility in Flink used to
        implement
         >     Flink's
         >      >     Kafka EOS
         >      >      >      >>> could not be implemented in Beam's
        context.
         >      >      >      >>
         >      >      >      >> That's also my understanding, unless we
        change the
         >     interface.
         >      >      >      >>
         >      >      >      >>> I don't see how SDF solves this problem..
         >      >      >      >>
         >      >      >      >> SDF has a checkpoint method which the
        Runner can call,
         >      >     but I think
         >      >      >      >> that you are right, that the above
        problem would
         >     be the same.
         >      >      >      >>
         >      >      >      >>> Absolutely. I would love to support
        EOS in KakaIO for
         >      >     Flink. I
         >      >      >     think
         >      >      >      >>> that will help many future
        exactly-once sinks..
         >     and address
         >      >      >      >>> fundamental incompatibility between
        Beam model
         >     and Flink's
         >      >      >     horizontal
         >      >      >      >>> checkpointing for such applications.
         >      >      >      >>
         >      >      >      >> Great :)
         >      >      >      >>
         >      >      >      >>> The FlinkRunner would need to insert
        the "wait until
         >      >     checkpoint
         >      >      >      >>> finalization" logic wherever it sees
         >     @RequiresStableInput,
         >      >      >     which is
         >      >      >      >>> already what it would have to do.
         >      >      >      >>
         >      >      >      >> I don't think that fixes the problem.
        See above
         >     example.
         >      >      >      >>
         >      >      >      >> Thanks,
         >      >      >      >> Max
         >      >      >      >>
         >      >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu
        Angadi
         >      >     <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >      >     <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >      >      >>> <mailto:[email protected]
        <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
        <mailto:[email protected] <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
         >      >      >      >>>
         >      >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM
        Kenneth Knowles
         >      >      >     <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >      >      >>>     <mailto:[email protected]
        <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
        <mailto:[email protected] <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
         >      >      >      >>>
         >      >      >      >>>         I'm not sure what a hard fail
        is. I probably
         >      >     have a shallow
         >      >      >      >>>         understanding, but doesn't
         >     @RequiresStableInput work
         >      >      >     for 2PC?
         >      >      >      >>>         The preCommit() phase should
        establish the
         >      >     transaction and
         >      >      >      >>>         commit() is not called until
        after checkpoint
         >      >      >     finalization. Can
         >      >      >      >>>         you describe the way that it
        does not work a
         >      >     little bit
         >      >      >     more?
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>     - preCommit() is called before
        checkpoint.
         >     Kafka EOS in
         >      >      >     Flink starts
         >      >      >      >>>     the transaction before this and
        makes sure it
         >      >     flushes all
         >      >      >     records in
         >      >      >      >>>     preCommit(). So far good.
         >      >      >      >>>     - commit is called after
        checkpoint is persisted.
         >      >     Now, imagine
         >      >      >      >>>     commit() fails for some reason.
        There is no
         >     option
         >      >     to rerun
         >      >      >     the 1st
         >      >      >      >>>     phase to write the records again
        in a new
         >      >     transaction. This
         >      >      >     is a
         >      >      >      >>>     hard failure for the the job. In
        practice
         >     Flink might
         >      >      >     attempt to
         >      >      >      >>>     commit again (not sure how many
        times), which is
         >      >     likely to
         >      >      >     fail and
         >      >      >      >>>     eventually results in job failure.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>> In Apache Beam, the records could be
        stored in state,
         >      >     and can be
         >      >      >      >>> written inside commit() to work around
        this issue. It
         >      >     could have
         >      >      >      >>> scalability issues if checkpoints are
        not frequent
         >      >     enough in Flink
         >      >      >      >>> runner.
         >      >      >      >>>
         >      >      >      >>> Raghu.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>         Kenn
         >      >      >      >>>
         >      >      >      >>>         On Thu, Feb 28, 2019 at 1:25
        PM Raghu Angadi
         >      >      >     <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >      >      >>>         <mailto:[email protected]
        <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
         >      >      >      >>>
         >      >      >      >>>             On Thu, Feb 28, 2019 at
        11:01 AM
         >     Kenneth Knowles
         >      >      >      >>>             <[email protected]
        <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
        <mailto:[email protected] <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >      >     <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
         >      >      >      >>>
         >      >      >      >>>                 I believe the way you
        would implement
         >      >     the logic
         >      >      >     behind
         >      >      >      >>>                 Flink's KafkaProducer
        would be to
         >     have
         >      >     two steps:
         >      >      >      >>>
         >      >      >      >>>                 1. Start transaction
>      >      >      >>> 2. @RequiresStableInput Close
         >     transaction
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>             I see.  What happens if
        closing the
         >     transaction
         >      >      >     fails in
         >      >      >      >>>             (2)? Flink's 2PC requires that
         >     commit() should
         >      >      >     never hard
         >      >      >      >>>             fail once preCommit()
        succeeds. I
         >     think that is
         >      >      >     cost of not
         >      >      >      >>>             having an extra shuffle. It is
         >     alright since
         >      >     this
         >      >      >     policy has
         >      >      >      >>>             worked well for Flink so far.
         >      >      >      >>>
         >      >      >      >>>             Overall, it will be great
        to have
         >      >     @RequiresStableInput
         >      >      >      >>>             support in Flink runner.
         >      >      >      >>>
         >      >      >      >>>             Raghu.
         >      >      >      >>>
         >      >      >      >>>                 The FlinkRunner would
        need to
         >     insert the
         >      >     "wait
         >      >      >     until
         >      >      >      >>>                 checkpoint
        finalization" logic
         >     wherever it
>      >      >      >>> sees @RequiresStableInput, which is
         >      >     already what it
         >      >      >      >>>                 would have to do.
         >      >      >      >>>
         >      >      >      >>>                 This matches the
        KafkaProducer's
         >     logic -
         >      >     delay
         >      >      >     closing
         >      >      >      >>>                 the transaction until
        checkpoint
         >      >     finalization. This
         >      >      >      >>>                 answers my main
        question, which
         >     is "is
         >      >      >      >>>                 @RequiresStableInput
        expressive
         >     enough
         >      >     to allow
         >      >      >      >>>                 Beam-on-Flink to have
        exactly
         >     once behavior
         >      >      >     with the
         >      >      >      >>>                 same performance
        characteristics as
         >      >     native Flink
         >      >      >      >>>                 checkpoint finalization?"
         >      >      >      >>>
         >      >      >      >>>                 Kenn
         >      >      >      >>>
         >      >      >      >>>                 [1]
         > https://github.com/apache/beam/pull/7955
         >      >      >      >>>
         >      >      >      >>>                 On Thu, Feb 28, 2019
        at 10:43 AM
         >     Reuven Lax
         >      >      >      >>>                 <[email protected]
        <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >      >     <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                     On Thu, Feb 28,
        2019 at 10:41 AM
         >      >     Raghu Angadi
         >      >      >      >>>                     <[email protected]
        <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >      >     <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>> wrote:
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                             Now why
        does the Flink
         >      >     Runner not
         >      >      >     support
         >      >      >      >>>                             KafkaIO
        EOS? Flink's
         >     native
>      >      >      >>> KafkaProducer supports
         >      >     exactly-once. It
         >      >      >      >>>                             simply
        commits the
         >     pending
>      >      >      >>> transaction once it has
         >      >     completed a
         >      >      >      >>> checkpoint.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                         On Thu, Feb
        28, 2019 at
         >     9:59 AM
         >      >     Maximilian
         >      >      >      >>>                         Michels
        <[email protected] <mailto:[email protected]>
         >     <mailto:[email protected] <mailto:[email protected]>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >      >     <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >      >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
         >     <mailto:[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>>>>
         >      >      >      >>>                         wrote:
         >      >      >      >>>
         >      >      >      >>>                             Hi,
         >      >      >      >>>
         >      >      >      >>>                             I came across
         >     KafkaIO's Runner
         >      >      >     whitelist [1]
         >      >      >      >>>                             for
        enabling exactly-once
         >      >      >      >>>                             semantics
        (EOS). I
         >     think it is
         >      >      >     questionable
         >      >      >      >>>                             to exclude
        Runners from
         >      >      >      >>>                             inside a
        transform, but I
         >      >     see that the
         >      >      >      >>>                             intention
        was to save
         >     users from
         >      >      >      >>>                             surprises.
         >      >      >      >>>
         >      >      >      >>>                             Now why
        does the Flink
         >      >     Runner not
         >      >      >     support
         >      >      >      >>>                             KafkaIO
        EOS? Flink's
         >     native
>      >      >      >>> KafkaProducer supports
         >      >     exactly-once. It
         >      >      >      >>>                             simply
        commits the
         >     pending
>      >      >      >>> transaction once it has
         >      >     completed a
         >      >      >      >>> checkpoint.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                         When we
        discussed this in Aug
         >      >     2017, the
         >      >      >      >>>                         understanding
        was that 2
         >     Phase
         >      >     commit
         >      >      >     utility in
         >      >      >      >>>                         Flink used to
        implement
         >     Flink's
         >      >     Kafka
         >      >      >     EOS could
         >      >      >      >>>                         not be
        implemented in Beam's
         >      >     context.
         >      >      >      >>>                         See this message
         >      >      >      >>>
         >      >
>  <https://www.mail-archive.com/[email protected]/msg02664.html> in
         >      >      >      >>>                         that dev
        thread. Has anything
         >      >     changed
         >      >      >     in this
         >      >      >      >>>                         regard? The
        whole thread is
         >      >     relevant to
         >      >      >     this
         >      >      >      >>>                         topic and
        worth going
         >     through.
         >      >      >      >>>
         >      >      >      >>>                     I think that
        TwoPhaseCommit
         >     utility
         >      >     class
         >      >      >     wouldn't
         >      >      >      >>>                     work. The Flink
        runner would
         >      >     probably want to
         >      >      >      >>>                     directly use
         >     notifySnapshotComplete
         >      >     in order to
>      >      >      >>> implement @RequiresStableInput.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                             A
        checkpoint is
         >     realized by
         >      >     sending
         >      >      >     barriers
         >      >      >      >>>                             through
        all channels
         >      >      >      >>>                             starting
        from the
         >     source until
         >      >      >     reaching all
         >      >      >      >>>                             sinks.
        Every operator
         >      >      >      >>>                             persists
        its state
         >     once it has
         >      >      >     received a
         >      >      >      >>>                             barrier on
        all its input
         >      >      >      >>>                             channels,
        it then
         >     forwards
         >      >     it to the
         >      >      >      >>>                             downstream
        operators.
         >      >      >      >>>
         >      >      >      >>>                             The
        architecture of
         >     Beam's
>      >      >      >>> KafkaExactlyOnceSink
         >     is as
         >      >     follows[2]:
         >      >      >      >>>
         >      >      >      >>>                             Input ->
         >     AssignRandomShardIds ->
         >      >      >     GroupByKey
         >      >      >      >>>                             ->
        AssignSequenceIds ->
         >      >      >      >>>                             GroupByKey ->
         >     ExactlyOnceWriter
         >      >      >      >>>
         >      >      >      >>>                             As I
        understood, Spark or
         >      >     Dataflow
         >      >      >     use the
         >      >      >      >>>                             GroupByKey
        stages to
         >     persist
         >      >      >      >>>                             the input.
        That is not
         >      >     required in
         >      >      >     Flink to
         >      >      >      >>>                             be able to
        take a
         >     consistent
         >      >      >      >>>                             snapshot
        of the pipeline.
         >      >      >      >>>
         >      >      >      >>>                             Basically,
        for Flink we
         >      >     don't need
         >      >      >     any of
         >      >      >      >>>                             that magic
        that
         >     KafkaIO does.
         >      >      >      >>>                             What we
        would need to
         >      >     support EOS
         >      >      >     is a way
         >      >      >      >>>                             to tell the
         >     ExactlyOnceWriter
         >      >      >      >>>                             (a DoFn)
        to commit once a
         >      >      >     checkpoint has
         >      >      >      >>>                             completed.
         >      >      >      >>>
         >      >      >      >>>                             I know
        that the new
         >     version
         >      >     of SDF
         >      >      >     supports
>      >      >      >>> checkpointing which
         >     should
         >      >      >      >>>                             solve this
        issue. But
         >     there is
         >      >      >     still a lot
         >      >      >      >>>                             of work to
        do to make
         >     this
         >      >      >      >>>                             reality.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                         I don't see
        how SDF
         >     solves this
         >      >      >     problem.. May be
         >      >      >      >>>                         pseudo code
        would make more
         >      >     clear.  But if
         >      >      >      >>>                         helps, that is
        great!
         >      >      >      >>>
         >      >      >      >>>                             So I think
        it would make
         >      >     sense to think
         >      >      >      >>>                             about a
        way to make
         >     KafkaIO's
         >      >      >      >>>                             EOS more
        accessible
         >     to Runners
         >      >      >     which support
         >      >      >      >>>                             a
        different way of
         >      >      >      >>>                             checkpointing.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                         Absolutely. I
        would love to
         >      >     support EOS in
         >      >      >      >>>                         KakaIO for
        Flink. I think
         >     that will
         >      >      >     help many
         >      >      >      >>>                         future
        exactly-once
         >     sinks.. and
         >      >     address
         >      >      >      >>>                         fundamental
         >     incompatibility between
         >      >      >     Beam model
         >      >      >      >>>                         and Flink's
        horizontal
         >     checkpointing
         >      >      >     for such
         >      >      >      >>>                         applications.
         >      >      >      >>>
         >      >      >      >>>                         Raghu.
         >      >      >      >>>
         >      >      >      >>>                             Cheers,
         >      >      >      >>>                             Max
         >      >      >      >>>
         >      >      >      >>>                             PS: I
        found this
         >     document about
>      >      >      >>> RequiresStableInput
         >     [3], but
         >      >     IMHO
         >      >      >      >>>                             defining an
         >     annotation only
         >      >      >     manifests the
         >      >      >      >>>                             conceptual
        difference
         >     between
         >      >      >      >>>                             the Runners.
         >      >      >      >>>
         >      >      >      >>>
         >      >      >      >>>                             [1]
         >      >      >      >>>
         >      >      >
         >      >
         >
        
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
         >      >      >
         >      >      >      >>>
         >      >      >      >>>                             [2]
         >      >      >      >>>
         >      >      >
         >      >
         >
        
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
         >      >      >
         >      >      >      >>>
         >      >      >      >>>                             [3]
         >      >      >      >>>
         >      >      >
         >      >
         >
        
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
         >      >      >
         >      >      >      >>>
         >      >      >      >>>
         >      >      >
         >      >
         >

Reply via email to