On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles <[email protected]> wrote:

>
>
> On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels <[email protected]> wrote:
>
>> > If you randomly generate shard ids, buffer those until finalization,
>> finalize a checkpoint so that you never need to re-run that generation,
>> isn't the result stable from that point onwards?
>>
>> Yes, you're right :) For @RequiresStableInput we will always have to
>> buffer and emit only after a finalized checkpoint.
>>
>> 2PC is the better model for Flink, at least in the case of Kafka because
>> it can offload the buffering to Kafka via its transactions.
>> RequiresStableInput is a more general solution and it is feasible to
>> support it in the Flink Runner. However, we have to make sure that
>> checkpoints are taken frequently to avoid too much memory pressure.
>
>
>> It would be nice to also support 2PC in Beam, i.e. the Runner could
>> choose to either buffer/materialize input or do a 2PC, but it would also
>> break the purity of the existing model.
>>
>
> Still digging in to details. I think the "generate random shard ids &
> buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It
> doesn't have to be done that way if the target system has special support
> like Kafka does.
>
> For Kafka, can you get the 2PC behavior like this: Upstream step: open a
> transaction, write a bunch of stuff to it (let Kafka do the buffering) and
> emit a transaction identifier. Downstream @RequiresStableInput step: close
> transaction. Again, I may be totally missing something, but I think that
> this has identical characteristics:
>

Does Kafka garbage collect this eventually in the case where you crash and
start again  with a different transaction identifier?


>  - Kafka does the buffering
>  - checkpoint finalization is the driver of latency
>  - failure before checkpoint finalization means the old transaction sits
> around and times out eventually
>  - failure after checkpoint finalization causes retry with the same
> transaction identifier
>
> Kenn
>
>
>>
>> On 01.03.19 19:42, Kenneth Knowles wrote:
>> > I think I am fundamentally misunderstanding checkpointing in Flink.
>> >
>> > If you randomly generate shard ids, buffer those until finalization,
>> > finalize a checkpoint so that you never need to re-run that generation,
>> > isn't the result stable from that point onwards?
>> >
>> > Kenn
>> >
>> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Fully agree. I think we can improve the situation drastically. For
>> >     KafkaIO EOS with Flink we need to make these two changes:
>> >
>> >     1) Introduce buffering while the checkpoint is being taken
>> >     2) Replace the random shard id assignment with something
>> deterministic
>> >
>> >     However, we won't be able to provide full compatibility with
>> >     RequiresStableInput because Flink only guarantees stable input
>> after a
>> >     checkpoint. RequiresStableInput requires input at any point in time
>> to
>> >     be stable. IMHO the only way to achieve that is materializing output
>> >     which Flink does not currently support.
>> >
>> >     KafkaIO does not need all the power of RequiresStableInput to
>> achieve
>> >     EOS with Flink, but for the general case I don't see a good
>> solution at
>> >     the moment.
>> >
>> >     -Max
>> >
>> >     On 01.03.19 16:45, Reuven Lax wrote:
>> >      > Yeah, the person who was working on it originally stopped
>> working on
>> >      > Beam, and nobody else ever finished it. I think it is important
>> to
>> >      > finish though. Many of the existing Sinks are only fully correct
>> for
>> >      > Dataflow today, because they generate either Reshuffle or
>> >     GroupByKey to
>> >      > ensure input stability before outputting (in many cases this code
>> >     was
>> >      > inherited from before Beam existed). On Flink today, these sinks
>> >     might
>> >      > occasionally produce duplicate output in the case of failures.
>> >      >
>> >      > Reuven
>> >      >
>> >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <
>> [email protected]
>> >     <mailto:[email protected]>
>> >      > <mailto:[email protected] <mailto:[email protected]>>> wrote:
>> >      >
>> >      >     Circling back to the RequiresStableInput annotation[1]. I've
>> >     done some
>> >      >     protoyping to see how this could be integrated into Flink.
>> I'm
>> >      >     currently
>> >      >     writing a test based on RequiresStableInput.
>> >      >
>> >      >     I found out there are already checks in place at the Runners
>> to
>> >      >     throw in
>> >      >     case transforms use RequiresStableInput and its not
>> >     supported. However,
>> >      >     not a single transform actually uses the annotation.
>> >      >
>> >      >     It seems that the effort stopped at some point? Would it make
>> >     sense to
>> >      >     start annotating KafkaExactlyOnceSink with
>> >     @RequiresStableInput? We
>> >      >     could then get rid of the whitelist.
>> >      >
>> >      >     -Max
>> >      >
>> >      >     [1]
>> >      >
>> >
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>> >      >
>> >      >
>> >      >
>> >      >     On 01.03.19 14:28, Maximilian Michels wrote:
>> >      >      > Just realized that transactions do not spawn multiple
>> >     elements in
>> >      >      > KafkaExactlyOnceSink. So the proposed solution to stop
>> >     processing
>> >      >      > elements while a snapshot is pending would work.
>> >      >      >
>> >      >      > It is certainly not optimal in terms of performance for
>> >     Flink and
>> >      >     poses
>> >      >      > problems when checkpoints take long to complete, but it
>> >     would be
>> >      >      > worthwhile to implement this to make use of the EOS
>> feature.
>> >      >      >
>> >      >      > Thanks,
>> >      >      > Max
>> >      >      >
>> >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
>> >      >      >> Thanks you for the prompt replies. It's great to see that
>> >     there is
>> >      >      >> good understanding of how EOS in Flink works.
>> >      >      >>
>> >      >      >>> This is exactly what RequiresStableInput is supposed to
>> >     do. On the
>> >      >      >>> Flink runner, this would be implemented by delaying
>> >     processing
>> >      >     until
>> >      >      >>> the current checkpoint is done.
>> >      >      >>
>> >      >      >> I don't think that works because we have no control over
>> >     the Kafka
>> >      >      >> transactions. Imagine:
>> >      >      >>
>> >      >      >> 1) ExactlyOnceWriter writes records to Kafka and commits,
>> >     then
>> >      >     starts
>> >      >      >> a new transaction.
>> >      >      >> 2) Flink checkpoints, delaying the processing of
>> >     elements, the
>> >      >      >> checkpoint fails.
>> >      >      >> 3) We restore from an old checkpoint and will start
>> writing
>> >      >     duplicate
>> >      >      >> data to Kafka. The de-duplication that the sink performs
>> >     does not
>> >      >      >> help, especially because the random shards ids might be
>> >     assigned
>> >      >      >> differently.
>> >      >      >>
>> >      >      >> IMHO we have to have control over commit to be able to
>> >     provide EOS.
>> >      >      >>
>> >      >      >>> When we discussed this in Aug 2017, the understanding
>> >     was that 2
>> >      >      >>> Phase commit utility in Flink used to implement Flink's
>> >     Kafka EOS
>> >      >      >>> could not be implemented in Beam's context.
>> >      >      >>
>> >      >      >> That's also my understanding, unless we change the
>> interface.
>> >      >      >>
>> >      >      >>> I don't see how SDF solves this problem..
>> >      >      >>
>> >      >      >> SDF has a checkpoint method which the Runner can call,
>> >     but I think
>> >      >      >> that you are right, that the above problem would be the
>> same.
>> >      >      >>
>> >      >      >>> Absolutely. I would love to support EOS in KakaIO for
>> >     Flink. I
>> >      >     think
>> >      >      >>> that will help many future exactly-once sinks.. and
>> address
>> >      >      >>> fundamental incompatibility between Beam model and
>> Flink's
>> >      >     horizontal
>> >      >      >>> checkpointing for such applications.
>> >      >      >>
>> >      >      >> Great :)
>> >      >      >>
>> >      >      >>> The FlinkRunner would need to insert the "wait until
>> >     checkpoint
>> >      >      >>> finalization" logic wherever it sees
>> @RequiresStableInput,
>> >      >     which is
>> >      >      >>> already what it would have to do.
>> >      >      >>
>> >      >      >> I don't think that fixes the problem. See above example.
>> >      >      >>
>> >      >      >> Thanks,
>> >      >      >> Max
>> >      >      >>
>> >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
>> >      >      >>>
>> >      >      >>>
>> >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
>> >     <[email protected] <mailto:[email protected]>
>> >      >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >      >>> <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>> >      >      >>>
>> >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles
>> >      >     <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >      >>>     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>> >      >      >>>
>> >      >      >>>         I'm not sure what a hard fail is. I probably
>> >     have a shallow
>> >      >      >>>         understanding, but doesn't @RequiresStableInput
>> work
>> >      >     for 2PC?
>> >      >      >>>         The preCommit() phase should establish the
>> >     transaction and
>> >      >      >>>         commit() is not called until after checkpoint
>> >      >     finalization. Can
>> >      >      >>>         you describe the way that it does not work a
>> >     little bit
>> >      >     more?
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>     - preCommit() is called before checkpoint. Kafka
>> EOS in
>> >      >     Flink starts
>> >      >      >>>     the transaction before this and makes sure it
>> >     flushes all
>> >      >     records in
>> >      >      >>>     preCommit(). So far good.
>> >      >      >>>     - commit is called after checkpoint is persisted.
>> >     Now, imagine
>> >      >      >>>     commit() fails for some reason. There is no option
>> >     to rerun
>> >      >     the 1st
>> >      >      >>>     phase to write the records again in a new
>> >     transaction. This
>> >      >     is a
>> >      >      >>>     hard failure for the the job. In practice Flink
>> might
>> >      >     attempt to
>> >      >      >>>     commit again (not sure how many times), which is
>> >     likely to
>> >      >     fail and
>> >      >      >>>     eventually results in job failure.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>> In Apache Beam, the records could be stored in state,
>> >     and can be
>> >      >      >>> written inside commit() to work around this issue. It
>> >     could have
>> >      >      >>> scalability issues if checkpoints are not frequent
>> >     enough in Flink
>> >      >      >>> runner.
>> >      >      >>>
>> >      >      >>> Raghu.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>         Kenn
>> >      >      >>>
>> >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi
>> >      >     <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >      >>>         <mailto:[email protected]
>> >     <mailto:[email protected]> <mailto:[email protected]
>> >     <mailto:[email protected]>>>> wrote:
>> >      >      >>>
>> >      >      >>>             On Thu, Feb 28, 2019 at 11:01 AM Kenneth
>> Knowles
>> >      >      >>>             <[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>> >      >      >>>
>> >      >      >>>                 I believe the way you would implement
>> >     the logic
>> >      >     behind
>> >      >      >>>                 Flink's KafkaProducer would be to have
>> >     two steps:
>> >      >      >>>
>> >      >      >>>                 1. Start transaction
>> >      >      >>>                 2. @RequiresStableInput Close
>> transaction
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>             I see.  What happens if closing the
>> transaction
>> >      >     fails in
>> >      >      >>>             (2)? Flink's 2PC requires that commit()
>> should
>> >      >     never hard
>> >      >      >>>             fail once preCommit() succeeds. I think
>> that is
>> >      >     cost of not
>> >      >      >>>             having an extra shuffle. It is alright since
>> >     this
>> >      >     policy has
>> >      >      >>>             worked well for Flink so far.
>> >      >      >>>
>> >      >      >>>             Overall, it will be great to have
>> >     @RequiresStableInput
>> >      >      >>>             support in Flink runner.
>> >      >      >>>
>> >      >      >>>             Raghu.
>> >      >      >>>
>> >      >      >>>                 The FlinkRunner would need to insert the
>> >     "wait
>> >      >     until
>> >      >      >>>                 checkpoint finalization" logic wherever
>> it
>> >      >      >>>                 sees @RequiresStableInput, which is
>> >     already what it
>> >      >      >>>                 would have to do.
>> >      >      >>>
>> >      >      >>>                 This matches the KafkaProducer's logic -
>> >     delay
>> >      >     closing
>> >      >      >>>                 the transaction until checkpoint
>> >     finalization. This
>> >      >      >>>                 answers my main question, which is "is
>> >      >      >>>                 @RequiresStableInput expressive enough
>> >     to allow
>> >      >      >>>                 Beam-on-Flink to have exactly once
>> behavior
>> >      >     with the
>> >      >      >>>                 same performance characteristics as
>> >     native Flink
>> >      >      >>>                 checkpoint finalization?"
>> >      >      >>>
>> >      >      >>>                 Kenn
>> >      >      >>>
>> >      >      >>>                 [1]
>> https://github.com/apache/beam/pull/7955
>> >      >      >>>
>> >      >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM Reuven
>> Lax
>> >      >      >>>                 <[email protected]
>> >     <mailto:[email protected]> <mailto:[email protected]
>> >     <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                     On Thu, Feb 28, 2019 at 10:41 AM
>> >     Raghu Angadi
>> >      >      >>>                     <[email protected]
>> >     <mailto:[email protected]> <mailto:[email protected]
>> >     <mailto:[email protected]>>
>> >      >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>> wrote:
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                             Now why does the Flink
>> >     Runner not
>> >      >     support
>> >      >      >>>                             KafkaIO EOS? Flink's native
>> >      >      >>>                             KafkaProducer supports
>> >     exactly-once. It
>> >      >      >>>                             simply commits the pending
>> >      >      >>>                             transaction once it has
>> >     completed a
>> >      >      >>> checkpoint.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                         On Thu, Feb 28, 2019 at 9:59 AM
>> >     Maximilian
>> >      >      >>>                         Michels <[email protected]
>> >     <mailto:[email protected]>
>> >      >     <mailto:[email protected] <mailto:[email protected]>>
>> >     <mailto:[email protected] <mailto:[email protected]>
>> >     <mailto:[email protected] <mailto:[email protected]>>>>
>> >      >      >>>                         wrote:
>> >      >      >>>
>> >      >      >>>                             Hi,
>> >      >      >>>
>> >      >      >>>                             I came across KafkaIO's
>> Runner
>> >      >     whitelist [1]
>> >      >      >>>                             for enabling exactly-once
>> >      >      >>>                             semantics (EOS). I think it
>> is
>> >      >     questionable
>> >      >      >>>                             to exclude Runners from
>> >      >      >>>                             inside a transform, but I
>> >     see that the
>> >      >      >>>                             intention was to save users
>> from
>> >      >      >>>                             surprises.
>> >      >      >>>
>> >      >      >>>                             Now why does the Flink
>> >     Runner not
>> >      >     support
>> >      >      >>>                             KafkaIO EOS? Flink's native
>> >      >      >>>                             KafkaProducer supports
>> >     exactly-once. It
>> >      >      >>>                             simply commits the pending
>> >      >      >>>                             transaction once it has
>> >     completed a
>> >      >      >>> checkpoint.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                         When we discussed this in Aug
>> >     2017, the
>> >      >      >>>                         understanding was that 2 Phase
>> >     commit
>> >      >     utility in
>> >      >      >>>                         Flink used to implement Flink's
>> >     Kafka
>> >      >     EOS could
>> >      >      >>>                         not be implemented in Beam's
>> >     context.
>> >      >      >>>                         See this message
>> >      >      >>>
>> >     <https://www.mail-archive.com/[email protected]/msg02664.html> in
>> >      >      >>>                         that dev thread. Has anything
>> >     changed
>> >      >     in this
>> >      >      >>>                         regard? The whole thread is
>> >     relevant to
>> >      >     this
>> >      >      >>>                         topic and worth going through.
>> >      >      >>>
>> >      >      >>>                     I think that TwoPhaseCommit utility
>> >     class
>> >      >     wouldn't
>> >      >      >>>                     work. The Flink runner would
>> >     probably want to
>> >      >      >>>                     directly use notifySnapshotComplete
>> >     in order to
>> >      >      >>>                     implement @RequiresStableInput.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                             A checkpoint is realized by
>> >     sending
>> >      >     barriers
>> >      >      >>>                             through all channels
>> >      >      >>>                             starting from the source
>> until
>> >      >     reaching all
>> >      >      >>>                             sinks. Every operator
>> >      >      >>>                             persists its state once it
>> has
>> >      >     received a
>> >      >      >>>                             barrier on all its input
>> >      >      >>>                             channels, it then forwards
>> >     it to the
>> >      >      >>>                             downstream operators.
>> >      >      >>>
>> >      >      >>>                             The architecture of Beam's
>> >      >      >>>                             KafkaExactlyOnceSink is as
>> >     follows[2]:
>> >      >      >>>
>> >      >      >>>                             Input ->
>> AssignRandomShardIds ->
>> >      >     GroupByKey
>> >      >      >>>                             -> AssignSequenceIds ->
>> >      >      >>>                             GroupByKey ->
>> ExactlyOnceWriter
>> >      >      >>>
>> >      >      >>>                             As I understood, Spark or
>> >     Dataflow
>> >      >     use the
>> >      >      >>>                             GroupByKey stages to persist
>> >      >      >>>                             the input. That is not
>> >     required in
>> >      >     Flink to
>> >      >      >>>                             be able to take a consistent
>> >      >      >>>                             snapshot of the pipeline.
>> >      >      >>>
>> >      >      >>>                             Basically, for Flink we
>> >     don't need
>> >      >     any of
>> >      >      >>>                             that magic that KafkaIO
>> does.
>> >      >      >>>                             What we would need to
>> >     support EOS
>> >      >     is a way
>> >      >      >>>                             to tell the
>> ExactlyOnceWriter
>> >      >      >>>                             (a DoFn) to commit once a
>> >      >     checkpoint has
>> >      >      >>>                             completed.
>> >      >      >>>
>> >      >      >>>                             I know that the new version
>> >     of SDF
>> >      >     supports
>> >      >      >>>                             checkpointing which should
>> >      >      >>>                             solve this issue. But there
>> is
>> >      >     still a lot
>> >      >      >>>                             of work to do to make this
>> >      >      >>>                             reality.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                         I don't see how SDF solves this
>> >      >     problem.. May be
>> >      >      >>>                         pseudo code would make more
>> >     clear.  But if
>> >      >      >>>                         helps, that is great!
>> >      >      >>>
>> >      >      >>>                             So I think it would make
>> >     sense to think
>> >      >      >>>                             about a way to make
>> KafkaIO's
>> >      >      >>>                             EOS more accessible to
>> Runners
>> >      >     which support
>> >      >      >>>                             a different way of
>> >      >      >>>                             checkpointing.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                         Absolutely. I would love to
>> >     support EOS in
>> >      >      >>>                         KakaIO for Flink. I think that
>> will
>> >      >     help many
>> >      >      >>>                         future exactly-once sinks.. and
>> >     address
>> >      >      >>>                         fundamental incompatibility
>> between
>> >      >     Beam model
>> >      >      >>>                         and Flink's horizontal
>> checkpointing
>> >      >     for such
>> >      >      >>>                         applications.
>> >      >      >>>
>> >      >      >>>                         Raghu.
>> >      >      >>>
>> >      >      >>>                             Cheers,
>> >      >      >>>                             Max
>> >      >      >>>
>> >      >      >>>                             PS: I found this document
>> about
>> >      >      >>>                             RequiresStableInput [3], but
>> >     IMHO
>> >      >      >>>                             defining an annotation only
>> >      >     manifests the
>> >      >      >>>                             conceptual difference
>> between
>> >      >      >>>                             the Runners.
>> >      >      >>>
>> >      >      >>>
>> >      >      >>>                             [1]
>> >      >      >>>
>> >      >
>> >
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>> >      >
>> >      >      >>>
>> >      >      >>>                             [2]
>> >      >      >>>
>> >      >
>> >
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>> >      >
>> >      >      >>>
>> >      >      >>>                             [3]
>> >      >      >>>
>> >      >
>> >
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>> >      >
>> >      >      >>>
>> >      >      >>>
>> >      >
>> >
>>
>

Reply via email to