Fully agree. I think we can improve the situation drastically. For KafkaIO EOS with Flink we need to make these two changes:

1) Introduce buffering while the checkpoint is being taken
2) Replace the random shard id assignment with something deterministic

However, we won't be able to provide full compatibility with RequiresStableInput because Flink only guarantees stable input after a checkpoint. RequiresStableInput requires input at any point in time to be stable. IMHO the only way to achieve that is materializing output which Flink does not currently support.

KafkaIO does not need all the power of RequiresStableInput to achieve EOS with Flink, but for the general case I don't see a good solution at the moment.

-Max

On 01.03.19 16:45, Reuven Lax wrote:
Yeah, the person who was working on it originally stopped working on Beam, and nobody else ever finished it. I think it is important to finish though. Many of the existing Sinks are only fully correct for Dataflow today, because they generate either Reshuffle or GroupByKey to ensure input stability before outputting (in many cases this code was inherited from before Beam existed). On Flink today, these sinks might occasionally produce duplicate output in the case of failures.

Reuven

On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Circling back to the RequiresStableInput annotation[1]. I've done some
    protoyping to see how this could be integrated into Flink. I'm
    currently
    writing a test based on RequiresStableInput.

    I found out there are already checks in place at the Runners to
    throw in
    case transforms use RequiresStableInput and its not supported. However,
    not a single transform actually uses the annotation.

    It seems that the effort stopped at some point? Would it make sense to
    start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
    could then get rid of the whitelist.

    -Max

    [1]
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM



    On 01.03.19 14:28, Maximilian Michels wrote:
     > Just realized that transactions do not spawn multiple elements in
     > KafkaExactlyOnceSink. So the proposed solution to stop processing
     > elements while a snapshot is pending would work.
     >
     > It is certainly not optimal in terms of performance for Flink and
    poses
     > problems when checkpoints take long to complete, but it would be
     > worthwhile to implement this to make use of the EOS feature.
     >
     > Thanks,
     > Max
     >
     > On 01.03.19 12:23, Maximilian Michels wrote:
     >> Thanks you for the prompt replies. It's great to see that there is
     >> good understanding of how EOS in Flink works.
     >>
     >>> This is exactly what RequiresStableInput is supposed to do. On the
     >>> Flink runner, this would be implemented by delaying processing
    until
     >>> the current checkpoint is done.
     >>
     >> I don't think that works because we have no control over the Kafka
     >> transactions. Imagine:
     >>
     >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
    starts
     >> a new transaction.
     >> 2) Flink checkpoints, delaying the processing of elements, the
     >> checkpoint fails.
     >> 3) We restore from an old checkpoint and will start writing
    duplicate
     >> data to Kafka. The de-duplication that the sink performs does not
     >> help, especially because the random shards ids might be assigned
     >> differently.
     >>
     >> IMHO we have to have control over commit to be able to provide EOS.
     >>
     >>> When we discussed this in Aug 2017, the understanding was that 2
     >>> Phase commit utility in Flink used to implement Flink's Kafka EOS
     >>> could not be implemented in Beam's context.
     >>
     >> That's also my understanding, unless we change the interface.
     >>
     >>> I don't see how SDF solves this problem..
     >>
     >> SDF has a checkpoint method which the Runner can call, but I think
     >> that you are right, that the above problem would be the same.
     >>
     >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
    think
     >>> that will help many future exactly-once sinks.. and address
     >>> fundamental incompatibility between Beam model and Flink's
    horizontal
     >>> checkpointing for such applications.
     >>
     >> Great :)
     >>
     >>> The FlinkRunner would need to insert the "wait until checkpoint
     >>> finalization" logic wherever it sees @RequiresStableInput,
    which is
     >>> already what it would have to do.
     >>
     >> I don't think that fixes the problem. See above example.
     >>
     >> Thanks,
     >> Max
     >>
     >> On 01.03.19 00:04, Raghu Angadi wrote:
     >>>
     >>>
     >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi <[email protected]
    <mailto:[email protected]>
     >>> <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >>>
     >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
    <[email protected] <mailto:[email protected]>
     >>>     <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >>>
     >>>         I'm not sure what a hard fail is. I probably have a shallow
     >>>         understanding, but doesn't @RequiresStableInput work
    for 2PC?
     >>>         The preCommit() phase should establish the transaction and
     >>>         commit() is not called until after checkpoint
    finalization. Can
     >>>         you describe the way that it does not work a little bit
    more?
     >>>
     >>>
     >>>     - preCommit() is called before checkpoint. Kafka EOS in
    Flink starts
     >>>     the transaction before this and makes sure it flushes all
    records in
     >>>     preCommit(). So far good.
     >>>     - commit is called after checkpoint is persisted. Now, imagine
     >>>     commit() fails for some reason. There is no option to rerun
    the 1st
     >>>     phase to write the records again in a new transaction. This
    is a
     >>>     hard failure for the the job. In practice Flink might
    attempt to
     >>>     commit again (not sure how many times), which is likely to
    fail and
     >>>     eventually results in job failure.
     >>>
     >>>
     >>> In Apache Beam, the records could be stored in state, and can be
     >>> written inside commit() to work around this issue. It could have
     >>> scalability issues if checkpoints are not frequent enough in Flink
     >>> runner.
     >>>
     >>> Raghu.
     >>>
     >>>
     >>>
     >>>         Kenn
     >>>
     >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
    <[email protected] <mailto:[email protected]>
     >>>         <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >>>
     >>>             On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
     >>>             <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >>>
     >>>                 I believe the way you would implement the logic
    behind
     >>>                 Flink's KafkaProducer would be to have two steps:
     >>>
     >>>                 1. Start transaction
     >>>                 2. @RequiresStableInput Close transaction
     >>>
     >>>
     >>>             I see.  What happens if closing the transaction
    fails in
     >>>             (2)? Flink's 2PC requires that commit() should
    never hard
     >>>             fail once preCommit() succeeds. I think that is
    cost of not
     >>>             having an extra shuffle. It is alright since this
    policy has
     >>>             worked well for Flink so far.
     >>>
     >>>             Overall, it will be great to have @RequiresStableInput
     >>>             support in Flink runner.
     >>>
     >>>             Raghu.
     >>>
     >>>                 The FlinkRunner would need to insert the "wait
    until
     >>>                 checkpoint finalization" logic wherever it
     >>>                 sees @RequiresStableInput, which is already what it
     >>>                 would have to do.
     >>>
     >>>                 This matches the KafkaProducer's logic - delay
    closing
     >>>                 the transaction until checkpoint finalization. This
     >>>                 answers my main question, which is "is
     >>>                 @RequiresStableInput expressive enough to allow
     >>>                 Beam-on-Flink to have exactly once behavior
    with the
     >>>                 same performance characteristics as native Flink
     >>>                 checkpoint finalization?"
     >>>
     >>>                 Kenn
     >>>
     >>>                 [1] https://github.com/apache/beam/pull/7955
     >>>
     >>>                 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
     >>>                 <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >>>
     >>>
     >>>
     >>>                     On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi
     >>>                     <[email protected] <mailto:[email protected]>
    <mailto:[email protected] <mailto:[email protected]>>> wrote:
     >>>
     >>>
     >>>                             Now why does the Flink Runner not
    support
     >>>                             KafkaIO EOS? Flink's native
     >>>                             KafkaProducer supports exactly-once. It
     >>>                             simply commits the pending
     >>>                             transaction once it has completed a
     >>> checkpoint.
     >>>
     >>>
     >>>
     >>>                         On Thu, Feb 28, 2019 at 9:59 AM Maximilian
     >>>                         Michels <[email protected]
    <mailto:[email protected]> <mailto:[email protected] <mailto:[email protected]>>>
     >>>                         wrote:
     >>>
     >>>                             Hi,
     >>>
     >>>                             I came across KafkaIO's Runner
    whitelist [1]
     >>>                             for enabling exactly-once
     >>>                             semantics (EOS). I think it is
    questionable
     >>>                             to exclude Runners from
     >>>                             inside a transform, but I see that the
     >>>                             intention was to save users from
     >>>                             surprises.
     >>>
     >>>                             Now why does the Flink Runner not
    support
     >>>                             KafkaIO EOS? Flink's native
     >>>                             KafkaProducer supports exactly-once. It
     >>>                             simply commits the pending
     >>>                             transaction once it has completed a
     >>> checkpoint.
     >>>
     >>>
     >>>
     >>>                         When we discussed this in Aug 2017, the
     >>>                         understanding was that 2 Phase commit
    utility in
     >>>                         Flink used to implement Flink's Kafka
    EOS could
     >>>                         not be implemented in Beam's context.
     >>>                         See this message
     >>> <https://www.mail-archive.com/[email protected]/msg02664.html> in
     >>>                         that dev thread. Has anything changed
    in this
     >>>                         regard? The whole thread is relevant to
    this
     >>>                         topic and worth going through.
     >>>
     >>>                     I think that TwoPhaseCommit utility class
    wouldn't
     >>>                     work. The Flink runner would probably want to
     >>>                     directly use notifySnapshotComplete in order to
     >>>                     implement @RequiresStableInput.
     >>>
     >>>
     >>>                             A checkpoint is realized by sending
    barriers
     >>>                             through all channels
     >>>                             starting from the source until
    reaching all
     >>>                             sinks. Every operator
     >>>                             persists its state once it has
    received a
     >>>                             barrier on all its input
     >>>                             channels, it then forwards it to the
     >>>                             downstream operators.
     >>>
     >>>                             The architecture of Beam's
     >>>                             KafkaExactlyOnceSink is as follows[2]:
     >>>
     >>>                             Input -> AssignRandomShardIds ->
    GroupByKey
     >>>                             -> AssignSequenceIds ->
     >>>                             GroupByKey -> ExactlyOnceWriter
     >>>
     >>>                             As I understood, Spark or Dataflow
    use the
     >>>                             GroupByKey stages to persist
     >>>                             the input. That is not required in
    Flink to
     >>>                             be able to take a consistent
     >>>                             snapshot of the pipeline.
     >>>
     >>>                             Basically, for Flink we don't need
    any of
     >>>                             that magic that KafkaIO does.
     >>>                             What we would need to support EOS
    is a way
     >>>                             to tell the ExactlyOnceWriter
     >>>                             (a DoFn) to commit once a
    checkpoint has
     >>>                             completed.
     >>>
     >>>                             I know that the new version of SDF
    supports
     >>>                             checkpointing which should
     >>>                             solve this issue. But there is
    still a lot
     >>>                             of work to do to make this
     >>>                             reality.
     >>>
     >>>
     >>>                         I don't see how SDF solves this
    problem.. May be
     >>>                         pseudo code would make more clear.  But if
     >>>                         helps, that is great!
     >>>
     >>>                             So I think it would make sense to think
     >>>                             about a way to make KafkaIO's
     >>>                             EOS more accessible to Runners
    which support
     >>>                             a different way of
     >>>                             checkpointing.
     >>>
     >>>
     >>>                         Absolutely. I would love to support EOS in
     >>>                         KakaIO for Flink. I think that will
    help many
     >>>                         future exactly-once sinks.. and address
     >>>                         fundamental incompatibility between
    Beam model
     >>>                         and Flink's horizontal checkpointing
    for such
     >>>                         applications.
     >>>
     >>>                         Raghu.
     >>>
     >>>                             Cheers,
     >>>                             Max
     >>>
     >>>                             PS: I found this document about
     >>>                             RequiresStableInput [3], but IMHO
     >>>                             defining an annotation only
    manifests the
     >>>                             conceptual difference between
     >>>                             the Runners.
     >>>
     >>>
     >>>                             [1]
     >>>
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144

     >>>
     >>>                             [2]
     >>>
    
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166

     >>>
     >>>                             [3]
     >>>
    
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM

     >>>
     >>>

Reply via email to