Can we do 2? I seem to remember that we had trouble in some cases (e..g in the 
BigQuery case, there was no obvious way to create a deterministic id, which is 
why we went for a random number followed by a reshuffle). Also remember that 
the user ParDo that is producing data to the sink is not guaranteed to be 
deterministic; the Beam model allows for non-deterministic transforms.

I believe we could use something like the worker id to make it deterministic, though the worker id can change after a restart. We could persist it in Flink's operator state. I do not know if we can come up with a Runner-independent solution.

I'm not quite sure I understand. If a ParDo is marked with RequiresStableInput, 
can't the flink runner buffer the input message until after the checkpoint is 
complete and only then deliver it to the ParDo?

You're correct. I thought that it could suffice to only buffer during a checkpoint and otherwise rely on the deterministic execution of the pipeline and KafkaIO's de-duplication code.

In any case, emitting only after finalization of checkpoints gives us guaranteed stable input. It also means that the processing is tight to the checkpoint interval, the checkpoint duration, and the available memory.

On 01.03.19 19:41, Reuven Lax wrote:


On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Fully agree. I think we can improve the situation drastically. For
    KafkaIO EOS with Flink we need to make these two changes:

    1) Introduce buffering while the checkpoint is being taken
    2) Replace the random shard id assignment with something deterministic


Can we do 2? I seem to remember that we had trouble in some cases (e..g in the BigQuery case, there was no obvious way to create a deterministic id, which is why we went for a random number followed by a reshuffle). Also remember that the user ParDo that is producing data to the sink is not guaranteed to be deterministic; the Beam model allows for non-deterministic transforms.


    However, we won't be able to provide full compatibility with
    RequiresStableInput because Flink only guarantees stable input after a
    checkpoint. RequiresStableInput requires input at any point in time to
be stable.

I'm not quite sure I understand. If a ParDo is marked with RequiresStableInput, can't the flink runner buffer the input message until after the checkpoint is complete and only then deliver it to the ParDo? This adds latency of course, but I'm not sure how else to do things correctly with the Beam model.

    IMHO the only way to achieve that is materializing output
    which Flink does not currently support.

    KafkaIO does not need all the power of RequiresStableInput to achieve
    EOS with Flink, but for the general case I don't see a good solution at
    the moment.

    -Max

    On 01.03.19 16:45, Reuven Lax wrote:
     > Yeah, the person who was working on it originally stopped working on
     > Beam, and nobody else ever finished it. I think it is important to
     > finish though. Many of the existing Sinks are only fully correct for
     > Dataflow today, because they generate either Reshuffle or
    GroupByKey to
     > ensure input stability before outputting (in many cases this code
    was
     > inherited from before Beam existed). On Flink today, these sinks
    might
     > occasionally produce duplicate output in the case of failures.
     >
     > Reuven
     >
     > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected]
    <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >     Circling back to the RequiresStableInput annotation[1]. I've
    done some
     >     protoyping to see how this could be integrated into Flink. I'm
     >     currently
     >     writing a test based on RequiresStableInput.
     >
     >     I found out there are already checks in place at the Runners to
     >     throw in
     >     case transforms use RequiresStableInput and its not
    supported. However,
     >     not a single transform actually uses the annotation.
     >
     >     It seems that the effort stopped at some point? Would it make
    sense to
     >     start annotating KafkaExactlyOnceSink with
    @RequiresStableInput? We
     >     could then get rid of the whitelist.
     >
     >     -Max
     >
     >     [1]
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >
     >
     >
     >     On 01.03.19 14:28, Maximilian Michels wrote:
     >      > Just realized that transactions do not spawn multiple
    elements in
     >      > KafkaExactlyOnceSink. So the proposed solution to stop
    processing
     >      > elements while a snapshot is pending would work.
     >      >
     >      > It is certainly not optimal in terms of performance for
    Flink and
     >     poses
     >      > problems when checkpoints take long to complete, but it
    would be
     >      > worthwhile to implement this to make use of the EOS feature.
     >      >
     >      > Thanks,
     >      > Max
     >      >
     >      > On 01.03.19 12:23, Maximilian Michels wrote:
     >      >> Thanks you for the prompt replies. It's great to see that
    there is
     >      >> good understanding of how EOS in Flink works.
     >      >>
     >      >>> This is exactly what RequiresStableInput is supposed to
    do. On the
     >      >>> Flink runner, this would be implemented by delaying
    processing
     >     until
     >      >>> the current checkpoint is done.
     >      >>
     >      >> I don't think that works because we have no control over
    the Kafka
     >      >> transactions. Imagine:
     >      >>
     >      >> 1) ExactlyOnceWriter writes records to Kafka and commits,
    then
     >     starts
     >      >> a new transaction.
     >      >> 2) Flink checkpoints, delaying the processing of
    elements, the
     >      >> checkpoint fails.
     >      >> 3) We restore from an old checkpoint and will start writing
     >     duplicate
     >      >> data to Kafka. The de-duplication that the sink performs
    does not
     >      >> help, especially because the random shards ids might be
    assigned
     >      >> differently.
     >      >>
     >      >> IMHO we have to have control over commit to be able to
    provide EOS.
     >      >>
     >      >>> When we discussed this in Aug 2017, the understanding
    was that 2
     >      >>> Phase commit utility in Flink used to implement Flink's
    Kafka EOS
     >      >>> could not be implemented in Beam's context.
     >      >>
     >      >> That's also my understanding, unless we change the interface.
     >      >>
     >      >>> I don't see how SDF solves this problem..
     >      >>
     >      >> SDF has a checkpoint method which the Runner can call,
    but I think
     >      >> that you are right, that the above problem would be the same.
     >      >>
     >      >>> Absolutely. I would love to support EOS in KakaIO for
    Flink. I
     >     think
     >      >>> that will help many future exactly-once sinks.. and address
     >      >>> fundamental incompatibility between Beam model and Flink's
     >     horizontal
     >      >>> checkpointing for such applications.
     >      >>
     >      >> Great :)
     >      >>
     >      >>> The FlinkRunner would need to insert the "wait until
    checkpoint
     >      >>> finalization" logic wherever it sees @RequiresStableInput,
     >     which is
     >      >>> already what it would have to do.
     >      >>
     >      >> I don't think that fixes the problem. See above example.
     >      >>
     >      >> Thanks,
     >      >> Max
     >      >>
     >      >> On 01.03.19 00:04, Raghu Angadi wrote:
     >      >>>
     >      >>>
     >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
    <[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
     >      >>> <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >>>
     >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
     >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >      >>>     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >>>
     >      >>>         I'm not sure what a hard fail is. I probably
    have a shallow
     >      >>>         understanding, but doesn't @RequiresStableInput work
     >     for 2PC?
     >      >>>         The preCommit() phase should establish the
    transaction and
     >      >>>         commit() is not called until after checkpoint
     >     finalization. Can
     >      >>>         you describe the way that it does not work a
    little bit
     >     more?
     >      >>>
     >      >>>
     >      >>>     - preCommit() is called before checkpoint. Kafka EOS in
     >     Flink starts
     >      >>>     the transaction before this and makes sure it
    flushes all
     >     records in
     >      >>>     preCommit(). So far good.
     >      >>>     - commit is called after checkpoint is persisted.
    Now, imagine
     >      >>>     commit() fails for some reason. There is no option
    to rerun
     >     the 1st
     >      >>>     phase to write the records again in a new
    transaction. This
     >     is a
     >      >>>     hard failure for the the job. In practice Flink might
     >     attempt to
     >      >>>     commit again (not sure how many times), which is
    likely to
     >     fail and
     >      >>>     eventually results in job failure.
     >      >>>
     >      >>>
     >      >>> In Apache Beam, the records could be stored in state,
    and can be
     >      >>> written inside commit() to work around this issue. It
    could have
     >      >>> scalability issues if checkpoints are not frequent
    enough in Flink
     >      >>> runner.
     >      >>>
     >      >>> Raghu.
     >      >>>
     >      >>>
     >      >>>
     >      >>>         Kenn
     >      >>>
     >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
     >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >      >>>         <mailto:[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>>> wrote:
     >      >>>
     >      >>>             On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
     >      >>>             <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >>>
     >      >>>                 I believe the way you would implement
    the logic
     >     behind
     >      >>>                 Flink's KafkaProducer would be to have
    two steps:
     >      >>>
     >      >>>                 1. Start transaction
     >      >>>                 2. @RequiresStableInput Close transaction
     >      >>>
     >      >>>
     >      >>>             I see.  What happens if closing the transaction
     >     fails in
     >      >>>             (2)? Flink's 2PC requires that commit() should
     >     never hard
     >      >>>             fail once preCommit() succeeds. I think that is
     >     cost of not
     >      >>>             having an extra shuffle. It is alright since
    this
     >     policy has
     >      >>>             worked well for Flink so far.
     >      >>>
     >      >>>             Overall, it will be great to have
    @RequiresStableInput
     >      >>>             support in Flink runner.
     >      >>>
     >      >>>             Raghu.
     >      >>>
     >      >>>                 The FlinkRunner would need to insert the
    "wait
     >     until
     >      >>>                 checkpoint finalization" logic wherever it
     >      >>>                 sees @RequiresStableInput, which is
    already what it
     >      >>>                 would have to do.
     >      >>>
     >      >>>                 This matches the KafkaProducer's logic -
    delay
     >     closing
     >      >>>                 the transaction until checkpoint
    finalization. This
     >      >>>                 answers my main question, which is "is
     >      >>>                 @RequiresStableInput expressive enough
    to allow
     >      >>>                 Beam-on-Flink to have exactly once behavior
     >     with the
     >      >>>                 same performance characteristics as
    native Flink
     >      >>>                 checkpoint finalization?"
     >      >>>
     >      >>>                 Kenn
     >      >>>
     >      >>>                 [1] https://github.com/apache/beam/pull/7955
     >      >>>
     >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
     >      >>>                 <[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >>>
     >      >>>
     >      >>>
     >      >>>                     On Thu, Feb 28, 2019 at 10:41 AM
    Raghu Angadi
     >      >>>                     <[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >>>
     >      >>>
     >      >>>                             Now why does the Flink
    Runner not
     >     support
     >      >>>                             KafkaIO EOS? Flink's native
     >      >>>                             KafkaProducer supports
    exactly-once. It
     >      >>>                             simply commits the pending
     >      >>>                             transaction once it has
    completed a
     >      >>> checkpoint.
     >      >>>
     >      >>>
     >      >>>
     >      >>>                         On Thu, Feb 28, 2019 at 9:59 AM
    Maximilian
     >      >>>                         Michels <[email protected]
    <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>
     >      >>>                         wrote:
     >      >>>
     >      >>>                             Hi,
     >      >>>
     >      >>>                             I came across KafkaIO's Runner
     >     whitelist [1]
     >      >>>                             for enabling exactly-once
     >      >>>                             semantics (EOS). I think it is
     >     questionable
     >      >>>                             to exclude Runners from
     >      >>>                             inside a transform, but I
    see that the
     >      >>>                             intention was to save users from
     >      >>>                             surprises.
     >      >>>
     >      >>>                             Now why does the Flink
    Runner not
     >     support
     >      >>>                             KafkaIO EOS? Flink's native
     >      >>>                             KafkaProducer supports
    exactly-once. It
     >      >>>                             simply commits the pending
     >      >>>                             transaction once it has
    completed a
     >      >>> checkpoint.
     >      >>>
     >      >>>
     >      >>>
     >      >>>                         When we discussed this in Aug
    2017, the
     >      >>>                         understanding was that 2 Phase
    commit
     >     utility in
     >      >>>                         Flink used to implement Flink's
    Kafka
     >     EOS could
     >      >>>                         not be implemented in Beam's
    context.
     >      >>>                         See this message
     >      >>>
    <https://www.mail-archive.com/[email protected]/msg02664.html> in
     >      >>>                         that dev thread. Has anything
    changed
     >     in this
     >      >>>                         regard? The whole thread is
    relevant to
     >     this
     >      >>>                         topic and worth going through.
     >      >>>
     >      >>>                     I think that TwoPhaseCommit utility
    class
     >     wouldn't
     >      >>>                     work. The Flink runner would
    probably want to
     >      >>>                     directly use notifySnapshotComplete
    in order to
     >      >>>                     implement @RequiresStableInput.
     >      >>>
     >      >>>
     >      >>>                             A checkpoint is realized by
    sending
     >     barriers
     >      >>>                             through all channels
     >      >>>                             starting from the source until
     >     reaching all
     >      >>>                             sinks. Every operator
     >      >>>                             persists its state once it has
     >     received a
     >      >>>                             barrier on all its input
     >      >>>                             channels, it then forwards
    it to the
     >      >>>                             downstream operators.
     >      >>>
     >      >>>                             The architecture of Beam's
     >      >>>                             KafkaExactlyOnceSink is as
    follows[2]:
     >      >>>
     >      >>>                             Input -> AssignRandomShardIds ->
     >     GroupByKey
     >      >>>                             -> AssignSequenceIds ->
     >      >>>                             GroupByKey -> ExactlyOnceWriter
     >      >>>
     >      >>>                             As I understood, Spark or
    Dataflow
     >     use the
     >      >>>                             GroupByKey stages to persist
     >      >>>                             the input. That is not
    required in
     >     Flink to
     >      >>>                             be able to take a consistent
     >      >>>                             snapshot of the pipeline.
     >      >>>
     >      >>>                             Basically, for Flink we
    don't need
     >     any of
     >      >>>                             that magic that KafkaIO does.
     >      >>>                             What we would need to
    support EOS
     >     is a way
     >      >>>                             to tell the ExactlyOnceWriter
     >      >>>                             (a DoFn) to commit once a
     >     checkpoint has
     >      >>>                             completed.
     >      >>>
     >      >>>                             I know that the new version
    of SDF
     >     supports
     >      >>>                             checkpointing which should
     >      >>>                             solve this issue. But there is
     >     still a lot
     >      >>>                             of work to do to make this
     >      >>>                             reality.
     >      >>>
     >      >>>
     >      >>>                         I don't see how SDF solves this
     >     problem.. May be
     >      >>>                         pseudo code would make more
    clear.  But if
     >      >>>                         helps, that is great!
     >      >>>
     >      >>>                             So I think it would make
    sense to think
     >      >>>                             about a way to make KafkaIO's
     >      >>>                             EOS more accessible to Runners
     >     which support
     >      >>>                             a different way of
     >      >>>                             checkpointing.
     >      >>>
     >      >>>
     >      >>>                         Absolutely. I would love to
    support EOS in
     >      >>>                         KakaIO for Flink. I think that will
     >     help many
     >      >>>                         future exactly-once sinks.. and
    address
     >      >>>                         fundamental incompatibility between
     >     Beam model
     >      >>>                         and Flink's horizontal checkpointing
     >     for such
     >      >>>                         applications.
     >      >>>
     >      >>>                         Raghu.
     >      >>>
     >      >>>                             Cheers,
     >      >>>                             Max
     >      >>>
     >      >>>                             PS: I found this document about
     >      >>>                             RequiresStableInput [3], but
    IMHO
     >      >>>                             defining an annotation only
     >     manifests the
     >      >>>                             conceptual difference between
     >      >>>                             the Runners.
     >      >>>
     >      >>>
     >      >>>                             [1]
     >      >>>
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
     >
     >      >>>
     >      >>>                             [2]
     >      >>>
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
     >
     >      >>>
     >      >>>                             [3]
     >      >>>
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >
     >      >>>
     >      >>>
     >

Reply via email to