I think I am fundamentally misunderstanding checkpointing in Flink.

If you randomly generate shard ids, buffer those until finalization,
finalize a checkpoint so that you never need to re-run that generation,
isn't the result stable from that point onwards?

Kenn

On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected]> wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable. IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     Circling back to the RequiresStableInput annotation[1]. I've done
> some
> >     protoyping to see how this could be integrated into Flink. I'm
> >     currently
> >     writing a test based on RequiresStableInput.
> >
> >     I found out there are already checks in place at the Runners to
> >     throw in
> >     case transforms use RequiresStableInput and its not supported.
> However,
> >     not a single transform actually uses the annotation.
> >
> >     It seems that the effort stopped at some point? Would it make sense
> to
> >     start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> >     could then get rid of the whitelist.
> >
> >     -Max
> >
> >     [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> >     On 01.03.19 14:28, Maximilian Michels wrote:
> >      > Just realized that transactions do not spawn multiple elements in
> >      > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >      > elements while a snapshot is pending would work.
> >      >
> >      > It is certainly not optimal in terms of performance for Flink and
> >     poses
> >      > problems when checkpoints take long to complete, but it would be
> >      > worthwhile to implement this to make use of the EOS feature.
> >      >
> >      > Thanks,
> >      > Max
> >      >
> >      > On 01.03.19 12:23, Maximilian Michels wrote:
> >      >> Thanks you for the prompt replies. It's great to see that there
> is
> >      >> good understanding of how EOS in Flink works.
> >      >>
> >      >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >      >>> Flink runner, this would be implemented by delaying processing
> >     until
> >      >>> the current checkpoint is done.
> >      >>
> >      >> I don't think that works because we have no control over the
> Kafka
> >      >> transactions. Imagine:
> >      >>
> >      >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> >     starts
> >      >> a new transaction.
> >      >> 2) Flink checkpoints, delaying the processing of elements, the
> >      >> checkpoint fails.
> >      >> 3) We restore from an old checkpoint and will start writing
> >     duplicate
> >      >> data to Kafka. The de-duplication that the sink performs does not
> >      >> help, especially because the random shards ids might be assigned
> >      >> differently.
> >      >>
> >      >> IMHO we have to have control over commit to be able to provide
> EOS.
> >      >>
> >      >>> When we discussed this in Aug 2017, the understanding was that 2
> >      >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >      >>> could not be implemented in Beam's context.
> >      >>
> >      >> That's also my understanding, unless we change the interface.
> >      >>
> >      >>> I don't see how SDF solves this problem..
> >      >>
> >      >> SDF has a checkpoint method which the Runner can call, but I
> think
> >      >> that you are right, that the above problem would be the same.
> >      >>
> >      >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
> >     think
> >      >>> that will help many future exactly-once sinks.. and address
> >      >>> fundamental incompatibility between Beam model and Flink's
> >     horizontal
> >      >>> checkpointing for such applications.
> >      >>
> >      >> Great :)
> >      >>
> >      >>> The FlinkRunner would need to insert the "wait until checkpoint
> >      >>> finalization" logic wherever it sees @RequiresStableInput,
> >     which is
> >      >>> already what it would have to do.
> >      >>
> >      >> I don't think that fixes the problem. See above example.
> >      >>
> >      >> Thanks,
> >      >> Max
> >      >>
> >      >> On 01.03.19 00:04, Raghu Angadi wrote:
> >      >>>
> >      >>>
> >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi <[email protected]
> >     <mailto:[email protected]>
> >      >>> <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >      >>>
> >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
> >     <[email protected] <mailto:[email protected]>
> >      >>>     <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >      >>>
> >      >>>         I'm not sure what a hard fail is. I probably have a
> shallow
> >      >>>         understanding, but doesn't @RequiresStableInput work
> >     for 2PC?
> >      >>>         The preCommit() phase should establish the transaction
> and
> >      >>>         commit() is not called until after checkpoint
> >     finalization. Can
> >      >>>         you describe the way that it does not work a little bit
> >     more?
> >      >>>
> >      >>>
> >      >>>     - preCommit() is called before checkpoint. Kafka EOS in
> >     Flink starts
> >      >>>     the transaction before this and makes sure it flushes all
> >     records in
> >      >>>     preCommit(). So far good.
> >      >>>     - commit is called after checkpoint is persisted. Now,
> imagine
> >      >>>     commit() fails for some reason. There is no option to rerun
> >     the 1st
> >      >>>     phase to write the records again in a new transaction. This
> >     is a
> >      >>>     hard failure for the the job. In practice Flink might
> >     attempt to
> >      >>>     commit again (not sure how many times), which is likely to
> >     fail and
> >      >>>     eventually results in job failure.
> >      >>>
> >      >>>
> >      >>> In Apache Beam, the records could be stored in state, and can be
> >      >>> written inside commit() to work around this issue. It could have
> >      >>> scalability issues if checkpoints are not frequent enough in
> Flink
> >      >>> runner.
> >      >>>
> >      >>> Raghu.
> >      >>>
> >      >>>
> >      >>>
> >      >>>         Kenn
> >      >>>
> >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
> >     <[email protected] <mailto:[email protected]>
> >      >>>         <mailto:[email protected] <mailto:[email protected]>>>
> wrote:
> >      >>>
> >      >>>             On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
> >      >>>             <[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >      >>>
> >      >>>                 I believe the way you would implement the logic
> >     behind
> >      >>>                 Flink's KafkaProducer would be to have two
> steps:
> >      >>>
> >      >>>                 1. Start transaction
> >      >>>                 2. @RequiresStableInput Close transaction
> >      >>>
> >      >>>
> >      >>>             I see.  What happens if closing the transaction
> >     fails in
> >      >>>             (2)? Flink's 2PC requires that commit() should
> >     never hard
> >      >>>             fail once preCommit() succeeds. I think that is
> >     cost of not
> >      >>>             having an extra shuffle. It is alright since this
> >     policy has
> >      >>>             worked well for Flink so far.
> >      >>>
> >      >>>             Overall, it will be great to have
> @RequiresStableInput
> >      >>>             support in Flink runner.
> >      >>>
> >      >>>             Raghu.
> >      >>>
> >      >>>                 The FlinkRunner would need to insert the "wait
> >     until
> >      >>>                 checkpoint finalization" logic wherever it
> >      >>>                 sees @RequiresStableInput, which is already
> what it
> >      >>>                 would have to do.
> >      >>>
> >      >>>                 This matches the KafkaProducer's logic - delay
> >     closing
> >      >>>                 the transaction until checkpoint finalization.
> This
> >      >>>                 answers my main question, which is "is
> >      >>>                 @RequiresStableInput expressive enough to allow
> >      >>>                 Beam-on-Flink to have exactly once behavior
> >     with the
> >      >>>                 same performance characteristics as native Flink
> >      >>>                 checkpoint finalization?"
> >      >>>
> >      >>>                 Kenn
> >      >>>
> >      >>>                 [1] https://github.com/apache/beam/pull/7955
> >      >>>
> >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
> >      >>>                 <[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >      >>>
> >      >>>
> >      >>>
> >      >>>                     On Thu, Feb 28, 2019 at 10:41 AM Raghu
> Angadi
> >      >>>                     <[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >      >>>
> >      >>>
> >      >>>                             Now why does the Flink Runner not
> >     support
> >      >>>                             KafkaIO EOS? Flink's native
> >      >>>                             KafkaProducer supports
> exactly-once. It
> >      >>>                             simply commits the pending
> >      >>>                             transaction once it has completed a
> >      >>> checkpoint.
> >      >>>
> >      >>>
> >      >>>
> >      >>>                         On Thu, Feb 28, 2019 at 9:59 AM
> Maximilian
> >      >>>                         Michels <[email protected]
> >     <mailto:[email protected]> <mailto:[email protected] <mailto:
> [email protected]>>>
> >      >>>                         wrote:
> >      >>>
> >      >>>                             Hi,
> >      >>>
> >      >>>                             I came across KafkaIO's Runner
> >     whitelist [1]
> >      >>>                             for enabling exactly-once
> >      >>>                             semantics (EOS). I think it is
> >     questionable
> >      >>>                             to exclude Runners from
> >      >>>                             inside a transform, but I see that
> the
> >      >>>                             intention was to save users from
> >      >>>                             surprises.
> >      >>>
> >      >>>                             Now why does the Flink Runner not
> >     support
> >      >>>                             KafkaIO EOS? Flink's native
> >      >>>                             KafkaProducer supports
> exactly-once. It
> >      >>>                             simply commits the pending
> >      >>>                             transaction once it has completed a
> >      >>> checkpoint.
> >      >>>
> >      >>>
> >      >>>
> >      >>>                         When we discussed this in Aug 2017, the
> >      >>>                         understanding was that 2 Phase commit
> >     utility in
> >      >>>                         Flink used to implement Flink's Kafka
> >     EOS could
> >      >>>                         not be implemented in Beam's context.
> >      >>>                         See this message
> >      >>> <https://www.mail-archive.com/[email protected]/msg02664.html
> > in
> >      >>>                         that dev thread. Has anything changed
> >     in this
> >      >>>                         regard? The whole thread is relevant to
> >     this
> >      >>>                         topic and worth going through.
> >      >>>
> >      >>>                     I think that TwoPhaseCommit utility class
> >     wouldn't
> >      >>>                     work. The Flink runner would probably want
> to
> >      >>>                     directly use notifySnapshotComplete in
> order to
> >      >>>                     implement @RequiresStableInput.
> >      >>>
> >      >>>
> >      >>>                             A checkpoint is realized by sending
> >     barriers
> >      >>>                             through all channels
> >      >>>                             starting from the source until
> >     reaching all
> >      >>>                             sinks. Every operator
> >      >>>                             persists its state once it has
> >     received a
> >      >>>                             barrier on all its input
> >      >>>                             channels, it then forwards it to the
> >      >>>                             downstream operators.
> >      >>>
> >      >>>                             The architecture of Beam's
> >      >>>                             KafkaExactlyOnceSink is as
> follows[2]:
> >      >>>
> >      >>>                             Input -> AssignRandomShardIds ->
> >     GroupByKey
> >      >>>                             -> AssignSequenceIds ->
> >      >>>                             GroupByKey -> ExactlyOnceWriter
> >      >>>
> >      >>>                             As I understood, Spark or Dataflow
> >     use the
> >      >>>                             GroupByKey stages to persist
> >      >>>                             the input. That is not required in
> >     Flink to
> >      >>>                             be able to take a consistent
> >      >>>                             snapshot of the pipeline.
> >      >>>
> >      >>>                             Basically, for Flink we don't need
> >     any of
> >      >>>                             that magic that KafkaIO does.
> >      >>>                             What we would need to support EOS
> >     is a way
> >      >>>                             to tell the ExactlyOnceWriter
> >      >>>                             (a DoFn) to commit once a
> >     checkpoint has
> >      >>>                             completed.
> >      >>>
> >      >>>                             I know that the new version of SDF
> >     supports
> >      >>>                             checkpointing which should
> >      >>>                             solve this issue. But there is
> >     still a lot
> >      >>>                             of work to do to make this
> >      >>>                             reality.
> >      >>>
> >      >>>
> >      >>>                         I don't see how SDF solves this
> >     problem.. May be
> >      >>>                         pseudo code would make more clear.  But
> if
> >      >>>                         helps, that is great!
> >      >>>
> >      >>>                             So I think it would make sense to
> think
> >      >>>                             about a way to make KafkaIO's
> >      >>>                             EOS more accessible to Runners
> >     which support
> >      >>>                             a different way of
> >      >>>                             checkpointing.
> >      >>>
> >      >>>
> >      >>>                         Absolutely. I would love to support EOS
> in
> >      >>>                         KakaIO for Flink. I think that will
> >     help many
> >      >>>                         future exactly-once sinks.. and address
> >      >>>                         fundamental incompatibility between
> >     Beam model
> >      >>>                         and Flink's horizontal checkpointing
> >     for such
> >      >>>                         applications.
> >      >>>
> >      >>>                         Raghu.
> >      >>>
> >      >>>                             Cheers,
> >      >>>                             Max
> >      >>>
> >      >>>                             PS: I found this document about
> >      >>>                             RequiresStableInput [3], but IMHO
> >      >>>                             defining an annotation only
> >     manifests the
> >      >>>                             conceptual difference between
> >      >>>                             the Runners.
> >      >>>
> >      >>>
> >      >>>                             [1]
> >      >>>
> >
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> >
> >      >>>
> >      >>>                             [2]
> >      >>>
> >
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> >
> >      >>>
> >      >>>                             [3]
> >      >>>
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >      >>>
> >      >>>
> >
>

Reply via email to