On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <[email protected]> wrote:

> > Can we do 2? I seem to remember that we had trouble in some cases (e..g
> in the BigQuery case, there was no obvious way to create a deterministic
> id, which is why we went for a random number followed by a reshuffle). Also
> remember that the user ParDo that is producing data to the sink is not
> guaranteed to be deterministic; the Beam model allows for non-deterministic
> transforms.
>
> I believe we could use something like the worker id to make it
> deterministic, though the worker id can change after a restart. We could
> persist it in Flink's operator state. I do not know if we can come up
> with a Runner-independent solution.
>

If we did this, we would break it on runners that don't have a concept of a
stable worker id :( The Dataflow runner can load balance work at any time
(including moving work around between workers).

>
> > I'm not quite sure I understand. If a ParDo is marked with
> RequiresStableInput, can't the flink runner buffer the input message until
> after the checkpoint is complete and only then deliver it to the ParDo?
>
> You're correct. I thought that it could suffice to only buffer during a
> checkpoint and otherwise rely on the deterministic execution of the
> pipeline and KafkaIO's de-duplication code.
>

Yes, I want to distinguish the KafkaIO case from the general case. It would
be interesting to see if there's something we could add to the Beam model
that would create a better story for Kafka's EOS writes.

>
> In any case, emitting only after finalization of checkpoints gives us
> guaranteed stable input. It also means that the processing is tight to
> the checkpoint interval, the checkpoint duration, and the available memory.
>

This is true, however isn't it already true for such uses of Flink?


>
> On 01.03.19 19:41, Reuven Lax wrote:
> >
> >
> > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     Fully agree. I think we can improve the situation drastically. For
> >     KafkaIO EOS with Flink we need to make these two changes:
> >
> >     1) Introduce buffering while the checkpoint is being taken
> >     2) Replace the random shard id assignment with something
> deterministic
> >
> >
> > Can we do 2? I seem to remember that we had trouble in some cases (e..g
> > in the BigQuery case, there was no obvious way to create a deterministic
> > id, which is why we went for a random number followed by a reshuffle).
> > Also remember that the user ParDo that is producing data to the sink is
> > not guaranteed to be deterministic; the Beam model allows for
> > non-deterministic transforms.
> >
> >
> >     However, we won't be able to provide full compatibility with
> >     RequiresStableInput because Flink only guarantees stable input after
> a
> >     checkpoint. RequiresStableInput requires input at any point in time
> to
> >     be stable.
> >
> >
> > I'm not quite sure I understand. If a ParDo is marked with
> > RequiresStableInput, can't the flink runner buffer the input message
> > until after the checkpoint is complete and only then deliver it to the
> > ParDo? This adds latency of course, but I'm not sure how else to do
> > things correctly with the Beam model.
> >
> >     IMHO the only way to achieve that is materializing output
> >     which Flink does not currently support.
> >
> >     KafkaIO does not need all the power of RequiresStableInput to achieve
> >     EOS with Flink, but for the general case I don't see a good solution
> at
> >     the moment.
> >
> >     -Max
> >
> >     On 01.03.19 16:45, Reuven Lax wrote:
> >      > Yeah, the person who was working on it originally stopped working
> on
> >      > Beam, and nobody else ever finished it. I think it is important to
> >      > finish though. Many of the existing Sinks are only fully correct
> for
> >      > Dataflow today, because they generate either Reshuffle or
> >     GroupByKey to
> >      > ensure input stability before outputting (in many cases this code
> >     was
> >      > inherited from before Beam existed). On Flink today, these sinks
> >     might
> >      > occasionally produce duplicate output in the case of failures.
> >      >
> >      > Reuven
> >      >
> >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <[email protected]
> >     <mailto:[email protected]>
> >      > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >      >
> >      >     Circling back to the RequiresStableInput annotation[1]. I've
> >     done some
> >      >     protoyping to see how this could be integrated into Flink. I'm
> >      >     currently
> >      >     writing a test based on RequiresStableInput.
> >      >
> >      >     I found out there are already checks in place at the Runners
> to
> >      >     throw in
> >      >     case transforms use RequiresStableInput and its not
> >     supported. However,
> >      >     not a single transform actually uses the annotation.
> >      >
> >      >     It seems that the effort stopped at some point? Would it make
> >     sense to
> >      >     start annotating KafkaExactlyOnceSink with
> >     @RequiresStableInput? We
> >      >     could then get rid of the whitelist.
> >      >
> >      >     -Max
> >      >
> >      >     [1]
> >      >
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >      >
> >      >
> >      >
> >      >     On 01.03.19 14:28, Maximilian Michels wrote:
> >      >      > Just realized that transactions do not spawn multiple
> >     elements in
> >      >      > KafkaExactlyOnceSink. So the proposed solution to stop
> >     processing
> >      >      > elements while a snapshot is pending would work.
> >      >      >
> >      >      > It is certainly not optimal in terms of performance for
> >     Flink and
> >      >     poses
> >      >      > problems when checkpoints take long to complete, but it
> >     would be
> >      >      > worthwhile to implement this to make use of the EOS
> feature.
> >      >      >
> >      >      > Thanks,
> >      >      > Max
> >      >      >
> >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
> >      >      >> Thanks you for the prompt replies. It's great to see that
> >     there is
> >      >      >> good understanding of how EOS in Flink works.
> >      >      >>
> >      >      >>> This is exactly what RequiresStableInput is supposed to
> >     do. On the
> >      >      >>> Flink runner, this would be implemented by delaying
> >     processing
> >      >     until
> >      >      >>> the current checkpoint is done.
> >      >      >>
> >      >      >> I don't think that works because we have no control over
> >     the Kafka
> >      >      >> transactions. Imagine:
> >      >      >>
> >      >      >> 1) ExactlyOnceWriter writes records to Kafka and commits,
> >     then
> >      >     starts
> >      >      >> a new transaction.
> >      >      >> 2) Flink checkpoints, delaying the processing of
> >     elements, the
> >      >      >> checkpoint fails.
> >      >      >> 3) We restore from an old checkpoint and will start
> writing
> >      >     duplicate
> >      >      >> data to Kafka. The de-duplication that the sink performs
> >     does not
> >      >      >> help, especially because the random shards ids might be
> >     assigned
> >      >      >> differently.
> >      >      >>
> >      >      >> IMHO we have to have control over commit to be able to
> >     provide EOS.
> >      >      >>
> >      >      >>> When we discussed this in Aug 2017, the understanding
> >     was that 2
> >      >      >>> Phase commit utility in Flink used to implement Flink's
> >     Kafka EOS
> >      >      >>> could not be implemented in Beam's context.
> >      >      >>
> >      >      >> That's also my understanding, unless we change the
> interface.
> >      >      >>
> >      >      >>> I don't see how SDF solves this problem..
> >      >      >>
> >      >      >> SDF has a checkpoint method which the Runner can call,
> >     but I think
> >      >      >> that you are right, that the above problem would be the
> same.
> >      >      >>
> >      >      >>> Absolutely. I would love to support EOS in KakaIO for
> >     Flink. I
> >      >     think
> >      >      >>> that will help many future exactly-once sinks.. and
> address
> >      >      >>> fundamental incompatibility between Beam model and
> Flink's
> >      >     horizontal
> >      >      >>> checkpointing for such applications.
> >      >      >>
> >      >      >> Great :)
> >      >      >>
> >      >      >>> The FlinkRunner would need to insert the "wait until
> >     checkpoint
> >      >      >>> finalization" logic wherever it sees
> @RequiresStableInput,
> >      >     which is
> >      >      >>> already what it would have to do.
> >      >      >>
> >      >      >> I don't think that fixes the problem. See above example.
> >      >      >>
> >      >      >> Thanks,
> >      >      >> Max
> >      >      >>
> >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
> >      >      >>>
> >      >      >>>
> >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
> >     <[email protected] <mailto:[email protected]>
> >      >     <mailto:[email protected] <mailto:[email protected]>>
> >      >      >>> <mailto:[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >      >      >>>
> >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
> >      >     <[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>
> >      >      >>>     <mailto:[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >      >      >>>
> >      >      >>>         I'm not sure what a hard fail is. I probably
> >     have a shallow
> >      >      >>>         understanding, but doesn't @RequiresStableInput
> work
> >      >     for 2PC?
> >      >      >>>         The preCommit() phase should establish the
> >     transaction and
> >      >      >>>         commit() is not called until after checkpoint
> >      >     finalization. Can
> >      >      >>>         you describe the way that it does not work a
> >     little bit
> >      >     more?
> >      >      >>>
> >      >      >>>
> >      >      >>>     - preCommit() is called before checkpoint. Kafka EOS
> in
> >      >     Flink starts
> >      >      >>>     the transaction before this and makes sure it
> >     flushes all
> >      >     records in
> >      >      >>>     preCommit(). So far good.
> >      >      >>>     - commit is called after checkpoint is persisted.
> >     Now, imagine
> >      >      >>>     commit() fails for some reason. There is no option
> >     to rerun
> >      >     the 1st
> >      >      >>>     phase to write the records again in a new
> >     transaction. This
> >      >     is a
> >      >      >>>     hard failure for the the job. In practice Flink might
> >      >     attempt to
> >      >      >>>     commit again (not sure how many times), which is
> >     likely to
> >      >     fail and
> >      >      >>>     eventually results in job failure.
> >      >      >>>
> >      >      >>>
> >      >      >>> In Apache Beam, the records could be stored in state,
> >     and can be
> >      >      >>> written inside commit() to work around this issue. It
> >     could have
> >      >      >>> scalability issues if checkpoints are not frequent
> >     enough in Flink
> >      >      >>> runner.
> >      >      >>>
> >      >      >>> Raghu.
> >      >      >>>
> >      >      >>>
> >      >      >>>
> >      >      >>>         Kenn
> >      >      >>>
> >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
> >      >     <[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>
> >      >      >>>         <mailto:[email protected]
> >     <mailto:[email protected]> <mailto:[email protected]
> >     <mailto:[email protected]>>>> wrote:
> >      >      >>>
> >      >      >>>             On Thu, Feb 28, 2019 at 11:01 AM Kenneth
> Knowles
> >      >      >>>             <[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>
> >      >     <mailto:[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >      >      >>>
> >      >      >>>                 I believe the way you would implement
> >     the logic
> >      >     behind
> >      >      >>>                 Flink's KafkaProducer would be to have
> >     two steps:
> >      >      >>>
> >      >      >>>                 1. Start transaction
> >      >      >>>                 2. @RequiresStableInput Close transaction
> >      >      >>>
> >      >      >>>
> >      >      >>>             I see.  What happens if closing the
> transaction
> >      >     fails in
> >      >      >>>             (2)? Flink's 2PC requires that commit()
> should
> >      >     never hard
> >      >      >>>             fail once preCommit() succeeds. I think that
> is
> >      >     cost of not
> >      >      >>>             having an extra shuffle. It is alright since
> >     this
> >      >     policy has
> >      >      >>>             worked well for Flink so far.
> >      >      >>>
> >      >      >>>             Overall, it will be great to have
> >     @RequiresStableInput
> >      >      >>>             support in Flink runner.
> >      >      >>>
> >      >      >>>             Raghu.
> >      >      >>>
> >      >      >>>                 The FlinkRunner would need to insert the
> >     "wait
> >      >     until
> >      >      >>>                 checkpoint finalization" logic wherever
> it
> >      >      >>>                 sees @RequiresStableInput, which is
> >     already what it
> >      >      >>>                 would have to do.
> >      >      >>>
> >      >      >>>                 This matches the KafkaProducer's logic -
> >     delay
> >      >     closing
> >      >      >>>                 the transaction until checkpoint
> >     finalization. This
> >      >      >>>                 answers my main question, which is "is
> >      >      >>>                 @RequiresStableInput expressive enough
> >     to allow
> >      >      >>>                 Beam-on-Flink to have exactly once
> behavior
> >      >     with the
> >      >      >>>                 same performance characteristics as
> >     native Flink
> >      >      >>>                 checkpoint finalization?"
> >      >      >>>
> >      >      >>>                 Kenn
> >      >      >>>
> >      >      >>>                 [1]
> https://github.com/apache/beam/pull/7955
> >      >      >>>
> >      >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM Reuven
> Lax
> >      >      >>>                 <[email protected]
> >     <mailto:[email protected]> <mailto:[email protected]
> >     <mailto:[email protected]>>
> >      >     <mailto:[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >      >      >>>
> >      >      >>>
> >      >      >>>
> >      >      >>>                     On Thu, Feb 28, 2019 at 10:41 AM
> >     Raghu Angadi
> >      >      >>>                     <[email protected]
> >     <mailto:[email protected]> <mailto:[email protected]
> >     <mailto:[email protected]>>
> >      >     <mailto:[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> >      >      >>>
> >      >      >>>
> >      >      >>>                             Now why does the Flink
> >     Runner not
> >      >     support
> >      >      >>>                             KafkaIO EOS? Flink's native
> >      >      >>>                             KafkaProducer supports
> >     exactly-once. It
> >      >      >>>                             simply commits the pending
> >      >      >>>                             transaction once it has
> >     completed a
> >      >      >>> checkpoint.
> >      >      >>>
> >      >      >>>
> >      >      >>>
> >      >      >>>                         On Thu, Feb 28, 2019 at 9:59 AM
> >     Maximilian
> >      >      >>>                         Michels <[email protected]
> >     <mailto:[email protected]>
> >      >     <mailto:[email protected] <mailto:[email protected]>>
> >     <mailto:[email protected] <mailto:[email protected]>
> >     <mailto:[email protected] <mailto:[email protected]>>>>
> >      >      >>>                         wrote:
> >      >      >>>
> >      >      >>>                             Hi,
> >      >      >>>
> >      >      >>>                             I came across KafkaIO's
> Runner
> >      >     whitelist [1]
> >      >      >>>                             for enabling exactly-once
> >      >      >>>                             semantics (EOS). I think it
> is
> >      >     questionable
> >      >      >>>                             to exclude Runners from
> >      >      >>>                             inside a transform, but I
> >     see that the
> >      >      >>>                             intention was to save users
> from
> >      >      >>>                             surprises.
> >      >      >>>
> >      >      >>>                             Now why does the Flink
> >     Runner not
> >      >     support
> >      >      >>>                             KafkaIO EOS? Flink's native
> >      >      >>>                             KafkaProducer supports
> >     exactly-once. It
> >      >      >>>                             simply commits the pending
> >      >      >>>                             transaction once it has
> >     completed a
> >      >      >>> checkpoint.
> >      >      >>>
> >      >      >>>
> >      >      >>>
> >      >      >>>                         When we discussed this in Aug
> >     2017, the
> >      >      >>>                         understanding was that 2 Phase
> >     commit
> >      >     utility in
> >      >      >>>                         Flink used to implement Flink's
> >     Kafka
> >      >     EOS could
> >      >      >>>                         not be implemented in Beam's
> >     context.
> >      >      >>>                         See this message
> >      >      >>>
> >     <https://www.mail-archive.com/[email protected]/msg02664.html> in
> >      >      >>>                         that dev thread. Has anything
> >     changed
> >      >     in this
> >      >      >>>                         regard? The whole thread is
> >     relevant to
> >      >     this
> >      >      >>>                         topic and worth going through.
> >      >      >>>
> >      >      >>>                     I think that TwoPhaseCommit utility
> >     class
> >      >     wouldn't
> >      >      >>>                     work. The Flink runner would
> >     probably want to
> >      >      >>>                     directly use notifySnapshotComplete
> >     in order to
> >      >      >>>                     implement @RequiresStableInput.
> >      >      >>>
> >      >      >>>
> >      >      >>>                             A checkpoint is realized by
> >     sending
> >      >     barriers
> >      >      >>>                             through all channels
> >      >      >>>                             starting from the source
> until
> >      >     reaching all
> >      >      >>>                             sinks. Every operator
> >      >      >>>                             persists its state once it
> has
> >      >     received a
> >      >      >>>                             barrier on all its input
> >      >      >>>                             channels, it then forwards
> >     it to the
> >      >      >>>                             downstream operators.
> >      >      >>>
> >      >      >>>                             The architecture of Beam's
> >      >      >>>                             KafkaExactlyOnceSink is as
> >     follows[2]:
> >      >      >>>
> >      >      >>>                             Input ->
> AssignRandomShardIds ->
> >      >     GroupByKey
> >      >      >>>                             -> AssignSequenceIds ->
> >      >      >>>                             GroupByKey ->
> ExactlyOnceWriter
> >      >      >>>
> >      >      >>>                             As I understood, Spark or
> >     Dataflow
> >      >     use the
> >      >      >>>                             GroupByKey stages to persist
> >      >      >>>                             the input. That is not
> >     required in
> >      >     Flink to
> >      >      >>>                             be able to take a consistent
> >      >      >>>                             snapshot of the pipeline.
> >      >      >>>
> >      >      >>>                             Basically, for Flink we
> >     don't need
> >      >     any of
> >      >      >>>                             that magic that KafkaIO does.
> >      >      >>>                             What we would need to
> >     support EOS
> >      >     is a way
> >      >      >>>                             to tell the ExactlyOnceWriter
> >      >      >>>                             (a DoFn) to commit once a
> >      >     checkpoint has
> >      >      >>>                             completed.
> >      >      >>>
> >      >      >>>                             I know that the new version
> >     of SDF
> >      >     supports
> >      >      >>>                             checkpointing which should
> >      >      >>>                             solve this issue. But there
> is
> >      >     still a lot
> >      >      >>>                             of work to do to make this
> >      >      >>>                             reality.
> >      >      >>>
> >      >      >>>
> >      >      >>>                         I don't see how SDF solves this
> >      >     problem.. May be
> >      >      >>>                         pseudo code would make more
> >     clear.  But if
> >      >      >>>                         helps, that is great!
> >      >      >>>
> >      >      >>>                             So I think it would make
> >     sense to think
> >      >      >>>                             about a way to make KafkaIO's
> >      >      >>>                             EOS more accessible to
> Runners
> >      >     which support
> >      >      >>>                             a different way of
> >      >      >>>                             checkpointing.
> >      >      >>>
> >      >      >>>
> >      >      >>>                         Absolutely. I would love to
> >     support EOS in
> >      >      >>>                         KakaIO for Flink. I think that
> will
> >      >     help many
> >      >      >>>                         future exactly-once sinks.. and
> >     address
> >      >      >>>                         fundamental incompatibility
> between
> >      >     Beam model
> >      >      >>>                         and Flink's horizontal
> checkpointing
> >      >     for such
> >      >      >>>                         applications.
> >      >      >>>
> >      >      >>>                         Raghu.
> >      >      >>>
> >      >      >>>                             Cheers,
> >      >      >>>                             Max
> >      >      >>>
> >      >      >>>                             PS: I found this document
> about
> >      >      >>>                             RequiresStableInput [3], but
> >     IMHO
> >      >      >>>                             defining an annotation only
> >      >     manifests the
> >      >      >>>                             conceptual difference between
> >      >      >>>                             the Runners.
> >      >      >>>
> >      >      >>>
> >      >      >>>                             [1]
> >      >      >>>
> >      >
> >
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> >      >
> >      >      >>>
> >      >      >>>                             [2]
> >      >      >>>
> >      >
> >
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> >      >
> >      >      >>>
> >      >      >>>                             [3]
> >      >      >>>
> >      >
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >      >
> >      >      >>>
> >      >      >>>
> >      >
> >
>

Reply via email to