It would be interesting to see if there's something we could add to the Beam 
model that would create a better story for Kafka's EOS writes.

There would have to be a checkpoint-completed callback the DoFn can register with the Runner. Does not seem applicable for most Runners though.

This is true, however isn't it already true for such uses of Flink?

Yes, that's correct. In the case of Kafka, Flink can offload the buffering but for the general case, idempotent writes are only possible if we buffer data until the checkpoint is completed.

On 04.03.19 17:45, Reuven Lax wrote:


On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

     > Can we do 2? I seem to remember that we had trouble in some cases
    (e..g in the BigQuery case, there was no obvious way to create a
    deterministic id, which is why we went for a random number followed
    by a reshuffle). Also remember that the user ParDo that is producing
    data to the sink is not guaranteed to be deterministic; the Beam
    model allows for non-deterministic transforms.

    I believe we could use something like the worker id to make it
    deterministic, though the worker id can change after a restart. We
    could
    persist it in Flink's operator state. I do not know if we can come up
    with a Runner-independent solution.


If we did this, we would break it on runners that don't have a concept of a stable worker id :( The Dataflow runner can load balance work at any time (including moving work around between workers).


     > I'm not quite sure I understand. If a ParDo is marked with
    RequiresStableInput, can't the flink runner buffer the input message
    until after the checkpoint is complete and only then deliver it to
    the ParDo?

    You're correct. I thought that it could suffice to only buffer during a
    checkpoint and otherwise rely on the deterministic execution of the
    pipeline and KafkaIO's de-duplication code.


Yes, I want to distinguish the KafkaIO case from the general case. It would be interesting to see if there's something we could add to the Beam model that would create a better story for Kafka's EOS writes.


    In any case, emitting only after finalization of checkpoints gives us
    guaranteed stable input. It also means that the processing is tight to
    the checkpoint interval, the checkpoint duration, and the available
    memory.


This is true, however isn't it already true for such uses of Flink?


    On 01.03.19 19:41, Reuven Lax wrote:
     >
     >
     > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
    <[email protected] <mailto:[email protected]>
     > <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >
     >     Fully agree. I think we can improve the situation
    drastically. For
     >     KafkaIO EOS with Flink we need to make these two changes:
     >
     >     1) Introduce buffering while the checkpoint is being taken
     >     2) Replace the random shard id assignment with something
    deterministic
     >
     >
     > Can we do 2? I seem to remember that we had trouble in some cases
    (e..g
     > in the BigQuery case, there was no obvious way to create a
    deterministic
     > id, which is why we went for a random number followed by a
    reshuffle).
     > Also remember that the user ParDo that is producing data to the
    sink is
     > not guaranteed to be deterministic; the Beam model allows for
     > non-deterministic transforms.
     >
     >
     >     However, we won't be able to provide full compatibility with
     >     RequiresStableInput because Flink only guarantees stable
    input after a
     >     checkpoint. RequiresStableInput requires input at any point
    in time to
     >     be stable.
     >
     >
     > I'm not quite sure I understand. If a ParDo is marked with
     > RequiresStableInput, can't the flink runner buffer the input message
     > until after the checkpoint is complete and only then deliver it
    to the
     > ParDo? This adds latency of course, but I'm not sure how else to do
     > things correctly with the Beam model.
     >
     >     IMHO the only way to achieve that is materializing output
     >     which Flink does not currently support.
     >
     >     KafkaIO does not need all the power of RequiresStableInput to
    achieve
     >     EOS with Flink, but for the general case I don't see a good
    solution at
     >     the moment.
     >
     >     -Max
     >
     >     On 01.03.19 16:45, Reuven Lax wrote:
     >      > Yeah, the person who was working on it originally stopped
    working on
     >      > Beam, and nobody else ever finished it. I think it is
    important to
     >      > finish though. Many of the existing Sinks are only fully
    correct for
     >      > Dataflow today, because they generate either Reshuffle or
     >     GroupByKey to
     >      > ensure input stability before outputting (in many cases
    this code
     >     was
     >      > inherited from before Beam existed). On Flink today, these
    sinks
     >     might
     >      > occasionally produce duplicate output in the case of failures.
     >      >
     >      > Reuven
     >      >
     >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels
    <[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
     >      > <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>> wrote:
     >      >
     >      >     Circling back to the RequiresStableInput
    annotation[1]. I've
     >     done some
     >      >     protoyping to see how this could be integrated into
    Flink. I'm
     >      >     currently
     >      >     writing a test based on RequiresStableInput.
     >      >
     >      >     I found out there are already checks in place at the
    Runners to
     >      >     throw in
     >      >     case transforms use RequiresStableInput and its not
     >     supported. However,
     >      >     not a single transform actually uses the annotation.
     >      >
     >      >     It seems that the effort stopped at some point? Would
    it make
     >     sense to
     >      >     start annotating KafkaExactlyOnceSink with
     >     @RequiresStableInput? We
     >      >     could then get rid of the whitelist.
     >      >
     >      >     -Max
     >      >
     >      >     [1]
     >      >
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >      >
     >      >
     >      >
     >      >     On 01.03.19 14:28, Maximilian Michels wrote:
     >      >      > Just realized that transactions do not spawn multiple
     >     elements in
     >      >      > KafkaExactlyOnceSink. So the proposed solution to stop
     >     processing
     >      >      > elements while a snapshot is pending would work.
     >      >      >
     >      >      > It is certainly not optimal in terms of performance for
     >     Flink and
     >      >     poses
     >      >      > problems when checkpoints take long to complete, but it
     >     would be
     >      >      > worthwhile to implement this to make use of the EOS
    feature.
     >      >      >
     >      >      > Thanks,
     >      >      > Max
     >      >      >
     >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
     >      >      >> Thanks you for the prompt replies. It's great to
    see that
     >     there is
     >      >      >> good understanding of how EOS in Flink works.
     >      >      >>
     >      >      >>> This is exactly what RequiresStableInput is
    supposed to
     >     do. On the
     >      >      >>> Flink runner, this would be implemented by delaying
     >     processing
     >      >     until
     >      >      >>> the current checkpoint is done.
     >      >      >>
     >      >      >> I don't think that works because we have no
    control over
     >     the Kafka
     >      >      >> transactions. Imagine:
     >      >      >>
     >      >      >> 1) ExactlyOnceWriter writes records to Kafka and
    commits,
     >     then
     >      >     starts
     >      >      >> a new transaction.
     >      >      >> 2) Flink checkpoints, delaying the processing of
     >     elements, the
     >      >      >> checkpoint fails.
     >      >      >> 3) We restore from an old checkpoint and will
    start writing
     >      >     duplicate
     >      >      >> data to Kafka. The de-duplication that the sink
    performs
     >     does not
     >      >      >> help, especially because the random shards ids
    might be
     >     assigned
     >      >      >> differently.
     >      >      >>
     >      >      >> IMHO we have to have control over commit to be able to
     >     provide EOS.
     >      >      >>
     >      >      >>> When we discussed this in Aug 2017, the understanding
     >     was that 2
     >      >      >>> Phase commit utility in Flink used to implement
    Flink's
     >     Kafka EOS
     >      >      >>> could not be implemented in Beam's context.
     >      >      >>
     >      >      >> That's also my understanding, unless we change the
    interface.
     >      >      >>
     >      >      >>> I don't see how SDF solves this problem..
     >      >      >>
     >      >      >> SDF has a checkpoint method which the Runner can call,
     >     but I think
     >      >      >> that you are right, that the above problem would
    be the same.
     >      >      >>
     >      >      >>> Absolutely. I would love to support EOS in KakaIO for
     >     Flink. I
     >      >     think
     >      >      >>> that will help many future exactly-once sinks..
    and address
     >      >      >>> fundamental incompatibility between Beam model
    and Flink's
     >      >     horizontal
     >      >      >>> checkpointing for such applications.
     >      >      >>
     >      >      >> Great :)
     >      >      >>
     >      >      >>> The FlinkRunner would need to insert the "wait until
     >     checkpoint
     >      >      >>> finalization" logic wherever it sees
    @RequiresStableInput,
     >      >     which is
     >      >      >>> already what it would have to do.
     >      >      >>
     >      >      >> I don't think that fixes the problem. See above
    example.
     >      >      >>
     >      >      >> Thanks,
     >      >      >> Max
     >      >      >>
     >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
     >      >      >>>
     >      >      >>>
     >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
     >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >      >      >>> <mailto:[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
     >      >      >>>
     >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
     >      >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >      >      >>>     <mailto:[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
     >      >      >>>
     >      >      >>>         I'm not sure what a hard fail is. I probably
     >     have a shallow
     >      >      >>>         understanding, but doesn't
    @RequiresStableInput work
     >      >     for 2PC?
     >      >      >>>         The preCommit() phase should establish the
     >     transaction and
     >      >      >>>         commit() is not called until after checkpoint
     >      >     finalization. Can
     >      >      >>>         you describe the way that it does not work a
     >     little bit
     >      >     more?
     >      >      >>>
     >      >      >>>
     >      >      >>>     - preCommit() is called before checkpoint.
    Kafka EOS in
     >      >     Flink starts
     >      >      >>>     the transaction before this and makes sure it
     >     flushes all
     >      >     records in
     >      >      >>>     preCommit(). So far good.
     >      >      >>>     - commit is called after checkpoint is persisted.
     >     Now, imagine
     >      >      >>>     commit() fails for some reason. There is no
    option
     >     to rerun
     >      >     the 1st
     >      >      >>>     phase to write the records again in a new
     >     transaction. This
     >      >     is a
     >      >      >>>     hard failure for the the job. In practice
    Flink might
     >      >     attempt to
     >      >      >>>     commit again (not sure how many times), which is
     >     likely to
     >      >     fail and
     >      >      >>>     eventually results in job failure.
     >      >      >>>
     >      >      >>>
     >      >      >>> In Apache Beam, the records could be stored in state,
     >     and can be
     >      >      >>> written inside commit() to work around this issue. It
     >     could have
     >      >      >>> scalability issues if checkpoints are not frequent
     >     enough in Flink
     >      >      >>> runner.
     >      >      >>>
     >      >      >>> Raghu.
     >      >      >>>
     >      >      >>>
     >      >      >>>
     >      >      >>>         Kenn
     >      >      >>>
     >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
     >      >     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >      >      >>>         <mailto:[email protected]
    <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
     >      >      >>>
     >      >      >>>             On Thu, Feb 28, 2019 at 11:01 AM
    Kenneth Knowles
     >      >      >>>             <[email protected]
    <mailto:[email protected]> <mailto:[email protected]
    <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
     >      >      >>>
     >      >      >>>                 I believe the way you would implement
     >     the logic
     >      >     behind
     >      >      >>>                 Flink's KafkaProducer would be to
    have
     >     two steps:
     >      >      >>>
     >      >      >>>                 1. Start transaction
     >      >      >>>                 2. @RequiresStableInput Close
    transaction
     >      >      >>>
     >      >      >>>
     >      >      >>>             I see.  What happens if closing the
    transaction
     >      >     fails in
     >      >      >>>             (2)? Flink's 2PC requires that
    commit() should
     >      >     never hard
     >      >      >>>             fail once preCommit() succeeds. I
    think that is
     >      >     cost of not
     >      >      >>>             having an extra shuffle. It is
    alright since
     >     this
     >      >     policy has
     >      >      >>>             worked well for Flink so far.
     >      >      >>>
     >      >      >>>             Overall, it will be great to have
     >     @RequiresStableInput
     >      >      >>>             support in Flink runner.
     >      >      >>>
     >      >      >>>             Raghu.
     >      >      >>>
     >      >      >>>                 The FlinkRunner would need to
    insert the
     >     "wait
     >      >     until
     >      >      >>>                 checkpoint finalization" logic
    wherever it
     >      >      >>>                 sees @RequiresStableInput, which is
     >     already what it
     >      >      >>>                 would have to do.
     >      >      >>>
     >      >      >>>                 This matches the KafkaProducer's
    logic -
     >     delay
     >      >     closing
     >      >      >>>                 the transaction until checkpoint
     >     finalization. This
     >      >      >>>                 answers my main question, which
    is "is
     >      >      >>>                 @RequiresStableInput expressive
    enough
     >     to allow
     >      >      >>>                 Beam-on-Flink to have exactly
    once behavior
     >      >     with the
     >      >      >>>                 same performance characteristics as
     >     native Flink
     >      >      >>>                 checkpoint finalization?"
     >      >      >>>
     >      >      >>>                 Kenn
     >      >      >>>
     >      >      >>>                 [1]
    https://github.com/apache/beam/pull/7955
     >      >      >>>
     >      >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM
    Reuven Lax
     >      >      >>>                 <[email protected]
    <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>>
     >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
     >      >      >>>
     >      >      >>>
     >      >      >>>
     >      >      >>>                     On Thu, Feb 28, 2019 at 10:41 AM
     >     Raghu Angadi
     >      >      >>>                     <[email protected]
    <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
    <mailto:[email protected] <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>>
     >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
     >      >      >>>
     >      >      >>>
     >      >      >>>                             Now why does the Flink
     >     Runner not
     >      >     support
     >      >      >>>                             KafkaIO EOS? Flink's
    native
     >      >      >>>                             KafkaProducer supports
     >     exactly-once. It
     >      >      >>>                             simply commits the
    pending
     >      >      >>>                             transaction once it has
     >     completed a
     >      >      >>> checkpoint.
     >      >      >>>
     >      >      >>>
     >      >      >>>
     >      >      >>>                         On Thu, Feb 28, 2019 at
    9:59 AM
     >     Maximilian
     >      >      >>>                         Michels <[email protected]
    <mailto:[email protected]>
     >     <mailto:[email protected] <mailto:[email protected]>>
     >      >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>
     >     <mailto:[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>>>>
     >      >      >>>                         wrote:
     >      >      >>>
     >      >      >>>                             Hi,
     >      >      >>>
     >      >      >>>                             I came across
    KafkaIO's Runner
     >      >     whitelist [1]
     >      >      >>>                             for enabling exactly-once
     >      >      >>>                             semantics (EOS). I
    think it is
     >      >     questionable
     >      >      >>>                             to exclude Runners from
     >      >      >>>                             inside a transform, but I
     >     see that the
     >      >      >>>                             intention was to save
    users from
     >      >      >>>                             surprises.
     >      >      >>>
     >      >      >>>                             Now why does the Flink
     >     Runner not
     >      >     support
     >      >      >>>                             KafkaIO EOS? Flink's
    native
     >      >      >>>                             KafkaProducer supports
     >     exactly-once. It
     >      >      >>>                             simply commits the
    pending
     >      >      >>>                             transaction once it has
     >     completed a
     >      >      >>> checkpoint.
     >      >      >>>
     >      >      >>>
     >      >      >>>
     >      >      >>>                         When we discussed this in Aug
     >     2017, the
     >      >      >>>                         understanding was that 2
    Phase
     >     commit
     >      >     utility in
     >      >      >>>                         Flink used to implement
    Flink's
     >     Kafka
     >      >     EOS could
     >      >      >>>                         not be implemented in Beam's
     >     context.
     >      >      >>>                         See this message
     >      >      >>>
>  <https://www.mail-archive.com/[email protected]/msg02664.html> in
     >      >      >>>                         that dev thread. Has anything
     >     changed
     >      >     in this
     >      >      >>>                         regard? The whole thread is
     >     relevant to
     >      >     this
     >      >      >>>                         topic and worth going
    through.
     >      >      >>>
     >      >      >>>                     I think that TwoPhaseCommit
    utility
     >     class
     >      >     wouldn't
     >      >      >>>                     work. The Flink runner would
     >     probably want to
     >      >      >>>                     directly use
    notifySnapshotComplete
     >     in order to
     >      >      >>>                     implement @RequiresStableInput.
     >      >      >>>
     >      >      >>>
     >      >      >>>                             A checkpoint is
    realized by
     >     sending
     >      >     barriers
     >      >      >>>                             through all channels
     >      >      >>>                             starting from the
    source until
     >      >     reaching all
     >      >      >>>                             sinks. Every operator
     >      >      >>>                             persists its state
    once it has
     >      >     received a
     >      >      >>>                             barrier on all its input
     >      >      >>>                             channels, it then
    forwards
     >     it to the
     >      >      >>>                             downstream operators.
     >      >      >>>
     >      >      >>>                             The architecture of
    Beam's
     >      >      >>>                             KafkaExactlyOnceSink
    is as
     >     follows[2]:
     >      >      >>>
     >      >      >>>                             Input ->
    AssignRandomShardIds ->
     >      >     GroupByKey
     >      >      >>>                             -> AssignSequenceIds ->
     >      >      >>>                             GroupByKey ->
    ExactlyOnceWriter
     >      >      >>>
     >      >      >>>                             As I understood, Spark or
     >     Dataflow
     >      >     use the
     >      >      >>>                             GroupByKey stages to
    persist
     >      >      >>>                             the input. That is not
     >     required in
     >      >     Flink to
     >      >      >>>                             be able to take a
    consistent
     >      >      >>>                             snapshot of the pipeline.
     >      >      >>>
     >      >      >>>                             Basically, for Flink we
     >     don't need
     >      >     any of
     >      >      >>>                             that magic that
    KafkaIO does.
     >      >      >>>                             What we would need to
     >     support EOS
     >      >     is a way
     >      >      >>>                             to tell the
    ExactlyOnceWriter
     >      >      >>>                             (a DoFn) to commit once a
     >      >     checkpoint has
     >      >      >>>                             completed.
     >      >      >>>
     >      >      >>>                             I know that the new
    version
     >     of SDF
     >      >     supports
     >      >      >>>                             checkpointing which
    should
     >      >      >>>                             solve this issue. But
    there is
     >      >     still a lot
     >      >      >>>                             of work to do to make
    this
     >      >      >>>                             reality.
     >      >      >>>
     >      >      >>>
     >      >      >>>                         I don't see how SDF
    solves this
     >      >     problem.. May be
     >      >      >>>                         pseudo code would make more
     >     clear.  But if
     >      >      >>>                         helps, that is great!
     >      >      >>>
     >      >      >>>                             So I think it would make
     >     sense to think
     >      >      >>>                             about a way to make
    KafkaIO's
     >      >      >>>                             EOS more accessible
    to Runners
     >      >     which support
     >      >      >>>                             a different way of
     >      >      >>>                             checkpointing.
     >      >      >>>
     >      >      >>>
     >      >      >>>                         Absolutely. I would love to
     >     support EOS in
     >      >      >>>                         KakaIO for Flink. I think
    that will
     >      >     help many
     >      >      >>>                         future exactly-once
    sinks.. and
     >     address
     >      >      >>>                         fundamental
    incompatibility between
     >      >     Beam model
     >      >      >>>                         and Flink's horizontal
    checkpointing
     >      >     for such
     >      >      >>>                         applications.
     >      >      >>>
     >      >      >>>                         Raghu.
     >      >      >>>
     >      >      >>>                             Cheers,
     >      >      >>>                             Max
     >      >      >>>
     >      >      >>>                             PS: I found this
    document about
     >      >      >>>                             RequiresStableInput
    [3], but
     >     IMHO
     >      >      >>>                             defining an
    annotation only
     >      >     manifests the
     >      >      >>>                             conceptual difference
    between
     >      >      >>>                             the Runners.
     >      >      >>>
     >      >      >>>
     >      >      >>>                             [1]
     >      >      >>>
     >      >
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
     >      >
     >      >      >>>
     >      >      >>>                             [2]
     >      >      >>>
     >      >
     >
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
     >      >
     >      >      >>>
     >      >      >>>                             [3]
     >      >      >>>
     >      >
     >
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
     >      >
     >      >      >>>
     >      >      >>>
     >      >
     >

Reply via email to