On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi <ang...@gmail.com> wrote:

>
>
> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com> wrote:
>
>> RE: Kenn's suggestion. i think Raghu looked into something that, and
>> something about it didn't work. I don't remember all the details, but I
>> think there might have been some subtle problem with it that wasn't
>> obvious. Doesn't mean that there isn't another way to solve that issue.'
>>
>
> Two disadvantages:
> - A transaction in Kafka are tied to single producer instance. There is no
> official API to start a txn in one process and access it in another
> process. Flink's sink uses an internal REST API for this.
>

Can you say more about how this works?

- There is one failure case that I mentioned earlier: if closing the
> transaction in downstream transform fails, it is data loss, there is no way
> to replay the upstream transform that wrote the records to Kafka.
>

With coupling of unrelated failures due to fusion, this is a severe
problem. I think I see now how 2PC affects this. From my reading, I can't
see the difference in how Flink works. If the checkpoint finalization
callback that does the Kafka commit fails, does it invalidate the
checkpoint so the start transaction + write elements is retried?

Kenn


>
> GBKs don't have major scalability limitations in most runner. Extra GBK is
> fine in practice for such a sink (at least no one has complained about it
> yet, though I don't know real usage numbers in practice). Flink's
> implentation in Beam using @RequiresStableInput  does have storage
> requirements and latency costs that increase with checkpoint interval. I
> think is still just as useful. Good to see @RequiresStableInput support
> added to Flink runner in Max's PR.
>
>
>> Hopefully we can make that work. Another possibility if we can't is to do
>> something special for Flink. Beam allows runners to splice out well-known
>> transforms with their own implementation. Dataflow already does that for
>> Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the
>> Kafka sink with one that uses Flink-specific functionality.  Ideally this
>> would reuse most of the existing Kafka code (maybe we could refactor just
>> the EOS part into something that could be subbed out).
>>
>> Reuven
>>
>> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels <m...@apache.org> wrote:
>>
>>> > It would be interesting to see if there's something we could add to
>>> the Beam model that would create a better story for Kafka's EOS writes.
>>>
>>> There would have to be a checkpoint-completed callback the DoFn can
>>> register with the Runner. Does not seem applicable for most Runners
>>> though.
>>>
>>> > This is true, however isn't it already true for such uses of Flink?
>>>
>>> Yes, that's correct. In the case of Kafka, Flink can offload the
>>> buffering but for the general case, idempotent writes are only possible
>>> if we buffer data until the checkpoint is completed.
>>>
>>> On 04.03.19 17:45, Reuven Lax wrote:
>>> >
>>> >
>>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels <m...@apache.org
>>> > <mailto:m...@apache.org>> wrote:
>>> >
>>> >      > Can we do 2? I seem to remember that we had trouble in some
>>> cases
>>> >     (e..g in the BigQuery case, there was no obvious way to create a
>>> >     deterministic id, which is why we went for a random number followed
>>> >     by a reshuffle). Also remember that the user ParDo that is
>>> producing
>>> >     data to the sink is not guaranteed to be deterministic; the Beam
>>> >     model allows for non-deterministic transforms.
>>> >
>>> >     I believe we could use something like the worker id to make it
>>> >     deterministic, though the worker id can change after a restart. We
>>> >     could
>>> >     persist it in Flink's operator state. I do not know if we can come
>>> up
>>> >     with a Runner-independent solution.
>>> >
>>> >
>>> > If we did this, we would break it on runners that don't have a concept
>>> > of a stable worker id :( The Dataflow runner can load balance work at
>>> > any time (including moving work around between workers).
>>> >
>>> >
>>> >      > I'm not quite sure I understand. If a ParDo is marked with
>>> >     RequiresStableInput, can't the flink runner buffer the input
>>> message
>>> >     until after the checkpoint is complete and only then deliver it to
>>> >     the ParDo?
>>> >
>>> >     You're correct. I thought that it could suffice to only buffer
>>> during a
>>> >     checkpoint and otherwise rely on the deterministic execution of the
>>> >     pipeline and KafkaIO's de-duplication code.
>>> >
>>> >
>>> > Yes, I want to distinguish the KafkaIO case from the general case. It
>>> > would be interesting to see if there's something we could add to the
>>> > Beam model that would create a better story for Kafka's EOS writes.
>>> >
>>> >
>>> >     In any case, emitting only after finalization of checkpoints gives
>>> us
>>> >     guaranteed stable input. It also means that the processing is
>>> tight to
>>> >     the checkpoint interval, the checkpoint duration, and the available
>>> >     memory.
>>> >
>>> >
>>> > This is true, however isn't it already true for such uses of Flink?
>>> >
>>> >
>>> >     On 01.03.19 19:41, Reuven Lax wrote:
>>> >      >
>>> >      >
>>> >      > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
>>> >     <m...@apache.org <mailto:m...@apache.org>
>>> >      > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>>> >      >
>>> >      >     Fully agree. I think we can improve the situation
>>> >     drastically. For
>>> >      >     KafkaIO EOS with Flink we need to make these two changes:
>>> >      >
>>> >      >     1) Introduce buffering while the checkpoint is being taken
>>> >      >     2) Replace the random shard id assignment with something
>>> >     deterministic
>>> >      >
>>> >      >
>>> >      > Can we do 2? I seem to remember that we had trouble in some
>>> cases
>>> >     (e..g
>>> >      > in the BigQuery case, there was no obvious way to create a
>>> >     deterministic
>>> >      > id, which is why we went for a random number followed by a
>>> >     reshuffle).
>>> >      > Also remember that the user ParDo that is producing data to the
>>> >     sink is
>>> >      > not guaranteed to be deterministic; the Beam model allows for
>>> >      > non-deterministic transforms.
>>> >      >
>>> >      >
>>> >      >     However, we won't be able to provide full compatibility with
>>> >      >     RequiresStableInput because Flink only guarantees stable
>>> >     input after a
>>> >      >     checkpoint. RequiresStableInput requires input at any point
>>> >     in time to
>>> >      >     be stable.
>>> >      >
>>> >      >
>>> >      > I'm not quite sure I understand. If a ParDo is marked with
>>> >      > RequiresStableInput, can't the flink runner buffer the input
>>> message
>>> >      > until after the checkpoint is complete and only then deliver it
>>> >     to the
>>> >      > ParDo? This adds latency of course, but I'm not sure how else
>>> to do
>>> >      > things correctly with the Beam model.
>>> >      >
>>> >      >     IMHO the only way to achieve that is materializing output
>>> >      >     which Flink does not currently support.
>>> >      >
>>> >      >     KafkaIO does not need all the power of RequiresStableInput
>>> to
>>> >     achieve
>>> >      >     EOS with Flink, but for the general case I don't see a good
>>> >     solution at
>>> >      >     the moment.
>>> >      >
>>> >      >     -Max
>>> >      >
>>> >      >     On 01.03.19 16:45, Reuven Lax wrote:
>>> >      >      > Yeah, the person who was working on it originally stopped
>>> >     working on
>>> >      >      > Beam, and nobody else ever finished it. I think it is
>>> >     important to
>>> >      >      > finish though. Many of the existing Sinks are only fully
>>> >     correct for
>>> >      >      > Dataflow today, because they generate either Reshuffle or
>>> >      >     GroupByKey to
>>> >      >      > ensure input stability before outputting (in many cases
>>> >     this code
>>> >      >     was
>>> >      >      > inherited from before Beam existed). On Flink today,
>>> these
>>> >     sinks
>>> >      >     might
>>> >      >      > occasionally produce duplicate output in the case of
>>> failures.
>>> >      >      >
>>> >      >      > Reuven
>>> >      >      >
>>> >      >      > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels
>>> >     <m...@apache.org <mailto:m...@apache.org>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>>> >      >      > <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
>>> >      >      >
>>> >      >      >     Circling back to the RequiresStableInput
>>> >     annotation[1]. I've
>>> >      >     done some
>>> >      >      >     protoyping to see how this could be integrated into
>>> >     Flink. I'm
>>> >      >      >     currently
>>> >      >      >     writing a test based on RequiresStableInput.
>>> >      >      >
>>> >      >      >     I found out there are already checks in place at the
>>> >     Runners to
>>> >      >      >     throw in
>>> >      >      >     case transforms use RequiresStableInput and its not
>>> >      >     supported. However,
>>> >      >      >     not a single transform actually uses the annotation.
>>> >      >      >
>>> >      >      >     It seems that the effort stopped at some point? Would
>>> >     it make
>>> >      >     sense to
>>> >      >      >     start annotating KafkaExactlyOnceSink with
>>> >      >     @RequiresStableInput? We
>>> >      >      >     could then get rid of the whitelist.
>>> >      >      >
>>> >      >      >     -Max
>>> >      >      >
>>> >      >      >     [1]
>>> >      >      >
>>> >      >
>>> >
>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>> >      >      >
>>> >      >      >
>>> >      >      >
>>> >      >      >     On 01.03.19 14:28, Maximilian Michels wrote:
>>> >      >      >      > Just realized that transactions do not spawn
>>> multiple
>>> >      >     elements in
>>> >      >      >      > KafkaExactlyOnceSink. So the proposed solution to
>>> stop
>>> >      >     processing
>>> >      >      >      > elements while a snapshot is pending would work.
>>> >      >      >      >
>>> >      >      >      > It is certainly not optimal in terms of
>>> performance for
>>> >      >     Flink and
>>> >      >      >     poses
>>> >      >      >      > problems when checkpoints take long to complete,
>>> but it
>>> >      >     would be
>>> >      >      >      > worthwhile to implement this to make use of the
>>> EOS
>>> >     feature.
>>> >      >      >      >
>>> >      >      >      > Thanks,
>>> >      >      >      > Max
>>> >      >      >      >
>>> >      >      >      > On 01.03.19 12:23, Maximilian Michels wrote:
>>> >      >      >      >> Thanks you for the prompt replies. It's great to
>>> >     see that
>>> >      >     there is
>>> >      >      >      >> good understanding of how EOS in Flink works.
>>> >      >      >      >>
>>> >      >      >      >>> This is exactly what RequiresStableInput is
>>> >     supposed to
>>> >      >     do. On the
>>> >      >      >      >>> Flink runner, this would be implemented by
>>> delaying
>>> >      >     processing
>>> >      >      >     until
>>> >      >      >      >>> the current checkpoint is done.
>>> >      >      >      >>
>>> >      >      >      >> I don't think that works because we have no
>>> >     control over
>>> >      >     the Kafka
>>> >      >      >      >> transactions. Imagine:
>>> >      >      >      >>
>>> >      >      >      >> 1) ExactlyOnceWriter writes records to Kafka and
>>> >     commits,
>>> >      >     then
>>> >      >      >     starts
>>> >      >      >      >> a new transaction.
>>> >      >      >      >> 2) Flink checkpoints, delaying the processing of
>>> >      >     elements, the
>>> >      >      >      >> checkpoint fails.
>>> >      >      >      >> 3) We restore from an old checkpoint and will
>>> >     start writing
>>> >      >      >     duplicate
>>> >      >      >      >> data to Kafka. The de-duplication that the sink
>>> >     performs
>>> >      >     does not
>>> >      >      >      >> help, especially because the random shards ids
>>> >     might be
>>> >      >     assigned
>>> >      >      >      >> differently.
>>> >      >      >      >>
>>> >      >      >      >> IMHO we have to have control over commit to be
>>> able to
>>> >      >     provide EOS.
>>> >      >      >      >>
>>> >      >      >      >>> When we discussed this in Aug 2017, the
>>> understanding
>>> >      >     was that 2
>>> >      >      >      >>> Phase commit utility in Flink used to implement
>>> >     Flink's
>>> >      >     Kafka EOS
>>> >      >      >      >>> could not be implemented in Beam's context.
>>> >      >      >      >>
>>> >      >      >      >> That's also my understanding, unless we change
>>> the
>>> >     interface.
>>> >      >      >      >>
>>> >      >      >      >>> I don't see how SDF solves this problem..
>>> >      >      >      >>
>>> >      >      >      >> SDF has a checkpoint method which the Runner can
>>> call,
>>> >      >     but I think
>>> >      >      >      >> that you are right, that the above problem would
>>> >     be the same.
>>> >      >      >      >>
>>> >      >      >      >>> Absolutely. I would love to support EOS in
>>> KakaIO for
>>> >      >     Flink. I
>>> >      >      >     think
>>> >      >      >      >>> that will help many future exactly-once sinks..
>>> >     and address
>>> >      >      >      >>> fundamental incompatibility between Beam model
>>> >     and Flink's
>>> >      >      >     horizontal
>>> >      >      >      >>> checkpointing for such applications.
>>> >      >      >      >>
>>> >      >      >      >> Great :)
>>> >      >      >      >>
>>> >      >      >      >>> The FlinkRunner would need to insert the "wait
>>> until
>>> >      >     checkpoint
>>> >      >      >      >>> finalization" logic wherever it sees
>>> >     @RequiresStableInput,
>>> >      >      >     which is
>>> >      >      >      >>> already what it would have to do.
>>> >      >      >      >>
>>> >      >      >      >> I don't think that fixes the problem. See above
>>> >     example.
>>> >      >      >      >>
>>> >      >      >      >> Thanks,
>>> >      >      >      >> Max
>>> >      >      >      >>
>>> >      >      >      >> On 01.03.19 00:04, Raghu Angadi wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi
>>> >      >     <ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>>> >      >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
>>> >      >      >      >>> <mailto:ang...@gmail.com
>>> >     <mailto:ang...@gmail.com> <mailto:ang...@gmail.com
>>> >     <mailto:ang...@gmail.com>>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>     On Thu, Feb 28, 2019 at 2:34 PM Kenneth
>>> Knowles
>>> >      >      >     <k...@apache.org <mailto:k...@apache.org>
>>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>
>>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
>>> >      >      >      >>>     <mailto:k...@apache.org
>>> >     <mailto:k...@apache.org> <mailto:k...@apache.org
>>> >     <mailto:k...@apache.org>>
>>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>         I'm not sure what a hard fail is. I
>>> probably
>>> >      >     have a shallow
>>> >      >      >      >>>         understanding, but doesn't
>>> >     @RequiresStableInput work
>>> >      >      >     for 2PC?
>>> >      >      >      >>>         The preCommit() phase should establish
>>> the
>>> >      >     transaction and
>>> >      >      >      >>>         commit() is not called until after
>>> checkpoint
>>> >      >      >     finalization. Can
>>> >      >      >      >>>         you describe the way that it does not
>>> work a
>>> >      >     little bit
>>> >      >      >     more?
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>     - preCommit() is called before checkpoint.
>>> >     Kafka EOS in
>>> >      >      >     Flink starts
>>> >      >      >      >>>     the transaction before this and makes sure
>>> it
>>> >      >     flushes all
>>> >      >      >     records in
>>> >      >      >      >>>     preCommit(). So far good.
>>> >      >      >      >>>     - commit is called after checkpoint is
>>> persisted.
>>> >      >     Now, imagine
>>> >      >      >      >>>     commit() fails for some reason. There is no
>>> >     option
>>> >      >     to rerun
>>> >      >      >     the 1st
>>> >      >      >      >>>     phase to write the records again in a new
>>> >      >     transaction. This
>>> >      >      >     is a
>>> >      >      >      >>>     hard failure for the the job. In practice
>>> >     Flink might
>>> >      >      >     attempt to
>>> >      >      >      >>>     commit again (not sure how many times),
>>> which is
>>> >      >     likely to
>>> >      >      >     fail and
>>> >      >      >      >>>     eventually results in job failure.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>> In Apache Beam, the records could be stored in
>>> state,
>>> >      >     and can be
>>> >      >      >      >>> written inside commit() to work around this
>>> issue. It
>>> >      >     could have
>>> >      >      >      >>> scalability issues if checkpoints are not
>>> frequent
>>> >      >     enough in Flink
>>> >      >      >      >>> runner.
>>> >      >      >      >>>
>>> >      >      >      >>> Raghu.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>         Kenn
>>> >      >      >      >>>
>>> >      >      >      >>>         On Thu, Feb 28, 2019 at 1:25 PM Raghu
>>> Angadi
>>> >      >      >     <ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
>>> >      >      >      >>>         <mailto:ang...@gmail.com
>>> >     <mailto:ang...@gmail.com>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>>
>>> wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>             On Thu, Feb 28, 2019 at 11:01 AM
>>> >     Kenneth Knowles
>>> >      >      >      >>>             <k...@apache.org
>>> >     <mailto:k...@apache.org> <mailto:k...@apache.org
>>> >     <mailto:k...@apache.org>>
>>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>
>>> >      >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>
>>> >      >     <mailto:k...@apache.org <mailto:k...@apache.org>
>>> >     <mailto:k...@apache.org <mailto:k...@apache.org>>>>> wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>                 I believe the way you would
>>> implement
>>> >      >     the logic
>>> >      >      >     behind
>>> >      >      >      >>>                 Flink's KafkaProducer would be
>>> to
>>> >     have
>>> >      >     two steps:
>>> >      >      >      >>>
>>> >      >      >      >>>                 1. Start transaction
>>> >      >      >      >>>                 2. @RequiresStableInput Close
>>> >     transaction
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>             I see.  What happens if closing the
>>> >     transaction
>>> >      >      >     fails in
>>> >      >      >      >>>             (2)? Flink's 2PC requires that
>>> >     commit() should
>>> >      >      >     never hard
>>> >      >      >      >>>             fail once preCommit() succeeds. I
>>> >     think that is
>>> >      >      >     cost of not
>>> >      >      >      >>>             having an extra shuffle. It is
>>> >     alright since
>>> >      >     this
>>> >      >      >     policy has
>>> >      >      >      >>>             worked well for Flink so far.
>>> >      >      >      >>>
>>> >      >      >      >>>             Overall, it will be great to have
>>> >      >     @RequiresStableInput
>>> >      >      >      >>>             support in Flink runner.
>>> >      >      >      >>>
>>> >      >      >      >>>             Raghu.
>>> >      >      >      >>>
>>> >      >      >      >>>                 The FlinkRunner would need to
>>> >     insert the
>>> >      >     "wait
>>> >      >      >     until
>>> >      >      >      >>>                 checkpoint finalization" logic
>>> >     wherever it
>>> >      >      >      >>>                 sees @RequiresStableInput,
>>> which is
>>> >      >     already what it
>>> >      >      >      >>>                 would have to do.
>>> >      >      >      >>>
>>> >      >      >      >>>                 This matches the KafkaProducer's
>>> >     logic -
>>> >      >     delay
>>> >      >      >     closing
>>> >      >      >      >>>                 the transaction until checkpoint
>>> >      >     finalization. This
>>> >      >      >      >>>                 answers my main question, which
>>> >     is "is
>>> >      >      >      >>>                 @RequiresStableInput expressive
>>> >     enough
>>> >      >     to allow
>>> >      >      >      >>>                 Beam-on-Flink to have exactly
>>> >     once behavior
>>> >      >      >     with the
>>> >      >      >      >>>                 same performance
>>> characteristics as
>>> >      >     native Flink
>>> >      >      >      >>>                 checkpoint finalization?"
>>> >      >      >      >>>
>>> >      >      >      >>>                 Kenn
>>> >      >      >      >>>
>>> >      >      >      >>>                 [1]
>>> >     https://github.com/apache/beam/pull/7955
>>> >      >      >      >>>
>>> >      >      >      >>>                 On Thu, Feb 28, 2019 at 10:43 AM
>>> >     Reuven Lax
>>> >      >      >      >>>                 <re...@google.com
>>> >     <mailto:re...@google.com>
>>> >      >     <mailto:re...@google.com <mailto:re...@google.com>>
>>> >     <mailto:re...@google.com <mailto:re...@google.com>
>>> >      >     <mailto:re...@google.com <mailto:re...@google.com>>>
>>> >      >      >     <mailto:re...@google.com <mailto:re...@google.com>
>>> >     <mailto:re...@google.com <mailto:re...@google.com>>
>>> >      >     <mailto:re...@google.com <mailto:re...@google.com>
>>> >     <mailto:re...@google.com <mailto:re...@google.com>>>>> wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                     On Thu, Feb 28, 2019 at
>>> 10:41 AM
>>> >      >     Raghu Angadi
>>> >      >      >      >>>                     <ang...@gmail.com
>>> >     <mailto:ang...@gmail.com>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
>>> >      >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
>>> >      >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>
>>> >     <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>>> wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                             Now why does the
>>> Flink
>>> >      >     Runner not
>>> >      >      >     support
>>> >      >      >      >>>                             KafkaIO EOS? Flink's
>>> >     native
>>> >      >      >      >>>                             KafkaProducer
>>> supports
>>> >      >     exactly-once. It
>>> >      >      >      >>>                             simply commits the
>>> >     pending
>>> >      >      >      >>>                             transaction once it
>>> has
>>> >      >     completed a
>>> >      >      >      >>> checkpoint.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                         On Thu, Feb 28, 2019 at
>>> >     9:59 AM
>>> >      >     Maximilian
>>> >      >      >      >>>                         Michels <m...@apache.org
>>> >     <mailto:m...@apache.org>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>>> >      >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>
>>> >      >     <mailto:m...@apache.org <mailto:m...@apache.org>
>>> >     <mailto:m...@apache.org <mailto:m...@apache.org>>>>>
>>> >      >      >      >>>                         wrote:
>>> >      >      >      >>>
>>> >      >      >      >>>                             Hi,
>>> >      >      >      >>>
>>> >      >      >      >>>                             I came across
>>> >     KafkaIO's Runner
>>> >      >      >     whitelist [1]
>>> >      >      >      >>>                             for enabling
>>> exactly-once
>>> >      >      >      >>>                             semantics (EOS). I
>>> >     think it is
>>> >      >      >     questionable
>>> >      >      >      >>>                             to exclude Runners
>>> from
>>> >      >      >      >>>                             inside a transform,
>>> but I
>>> >      >     see that the
>>> >      >      >      >>>                             intention was to
>>> save
>>> >     users from
>>> >      >      >      >>>                             surprises.
>>> >      >      >      >>>
>>> >      >      >      >>>                             Now why does the
>>> Flink
>>> >      >     Runner not
>>> >      >      >     support
>>> >      >      >      >>>                             KafkaIO EOS? Flink's
>>> >     native
>>> >      >      >      >>>                             KafkaProducer
>>> supports
>>> >      >     exactly-once. It
>>> >      >      >      >>>                             simply commits the
>>> >     pending
>>> >      >      >      >>>                             transaction once it
>>> has
>>> >      >     completed a
>>> >      >      >      >>> checkpoint.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                         When we discussed this
>>> in Aug
>>> >      >     2017, the
>>> >      >      >      >>>                         understanding was that 2
>>> >     Phase
>>> >      >     commit
>>> >      >      >     utility in
>>> >      >      >      >>>                         Flink used to implement
>>> >     Flink's
>>> >      >     Kafka
>>> >      >      >     EOS could
>>> >      >      >      >>>                         not be implemented in
>>> Beam's
>>> >      >     context.
>>> >      >      >      >>>                         See this message
>>> >      >      >      >>>
>>> >      >
>>> >       <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html
>>> > in
>>> >      >      >      >>>                         that dev thread. Has
>>> anything
>>> >      >     changed
>>> >      >      >     in this
>>> >      >      >      >>>                         regard? The whole
>>> thread is
>>> >      >     relevant to
>>> >      >      >     this
>>> >      >      >      >>>                         topic and worth going
>>> >     through.
>>> >      >      >      >>>
>>> >      >      >      >>>                     I think that TwoPhaseCommit
>>> >     utility
>>> >      >     class
>>> >      >      >     wouldn't
>>> >      >      >      >>>                     work. The Flink runner would
>>> >      >     probably want to
>>> >      >      >      >>>                     directly use
>>> >     notifySnapshotComplete
>>> >      >     in order to
>>> >      >      >      >>>
>>> implement @RequiresStableInput.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                             A checkpoint is
>>> >     realized by
>>> >      >     sending
>>> >      >      >     barriers
>>> >      >      >      >>>                             through all channels
>>> >      >      >      >>>                             starting from the
>>> >     source until
>>> >      >      >     reaching all
>>> >      >      >      >>>                             sinks. Every
>>> operator
>>> >      >      >      >>>                             persists its state
>>> >     once it has
>>> >      >      >     received a
>>> >      >      >      >>>                             barrier on all its
>>> input
>>> >      >      >      >>>                             channels, it then
>>> >     forwards
>>> >      >     it to the
>>> >      >      >      >>>                             downstream
>>> operators.
>>> >      >      >      >>>
>>> >      >      >      >>>                             The architecture of
>>> >     Beam's
>>> >      >      >      >>>                             KafkaExactlyOnceSink
>>> >     is as
>>> >      >     follows[2]:
>>> >      >      >      >>>
>>> >      >      >      >>>                             Input ->
>>> >     AssignRandomShardIds ->
>>> >      >      >     GroupByKey
>>> >      >      >      >>>                             ->
>>> AssignSequenceIds ->
>>> >      >      >      >>>                             GroupByKey ->
>>> >     ExactlyOnceWriter
>>> >      >      >      >>>
>>> >      >      >      >>>                             As I understood,
>>> Spark or
>>> >      >     Dataflow
>>> >      >      >     use the
>>> >      >      >      >>>                             GroupByKey stages to
>>> >     persist
>>> >      >      >      >>>                             the input. That is
>>> not
>>> >      >     required in
>>> >      >      >     Flink to
>>> >      >      >      >>>                             be able to take a
>>> >     consistent
>>> >      >      >      >>>                             snapshot of the
>>> pipeline.
>>> >      >      >      >>>
>>> >      >      >      >>>                             Basically, for
>>> Flink we
>>> >      >     don't need
>>> >      >      >     any of
>>> >      >      >      >>>                             that magic that
>>> >     KafkaIO does.
>>> >      >      >      >>>                             What we would need
>>> to
>>> >      >     support EOS
>>> >      >      >     is a way
>>> >      >      >      >>>                             to tell the
>>> >     ExactlyOnceWriter
>>> >      >      >      >>>                             (a DoFn) to commit
>>> once a
>>> >      >      >     checkpoint has
>>> >      >      >      >>>                             completed.
>>> >      >      >      >>>
>>> >      >      >      >>>                             I know that the new
>>> >     version
>>> >      >     of SDF
>>> >      >      >     supports
>>> >      >      >      >>>                             checkpointing which
>>> >     should
>>> >      >      >      >>>                             solve this issue.
>>> But
>>> >     there is
>>> >      >      >     still a lot
>>> >      >      >      >>>                             of work to do to
>>> make
>>> >     this
>>> >      >      >      >>>                             reality.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                         I don't see how SDF
>>> >     solves this
>>> >      >      >     problem.. May be
>>> >      >      >      >>>                         pseudo code would make
>>> more
>>> >      >     clear.  But if
>>> >      >      >      >>>                         helps, that is great!
>>> >      >      >      >>>
>>> >      >      >      >>>                             So I think it would
>>> make
>>> >      >     sense to think
>>> >      >      >      >>>                             about a way to make
>>> >     KafkaIO's
>>> >      >      >      >>>                             EOS more accessible
>>> >     to Runners
>>> >      >      >     which support
>>> >      >      >      >>>                             a different way of
>>> >      >      >      >>>                             checkpointing.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                         Absolutely. I would
>>> love to
>>> >      >     support EOS in
>>> >      >      >      >>>                         KakaIO for Flink. I
>>> think
>>> >     that will
>>> >      >      >     help many
>>> >      >      >      >>>                         future exactly-once
>>> >     sinks.. and
>>> >      >     address
>>> >      >      >      >>>                         fundamental
>>> >     incompatibility between
>>> >      >      >     Beam model
>>> >      >      >      >>>                         and Flink's horizontal
>>> >     checkpointing
>>> >      >      >     for such
>>> >      >      >      >>>                         applications.
>>> >      >      >      >>>
>>> >      >      >      >>>                         Raghu.
>>> >      >      >      >>>
>>> >      >      >      >>>                             Cheers,
>>> >      >      >      >>>                             Max
>>> >      >      >      >>>
>>> >      >      >      >>>                             PS: I found this
>>> >     document about
>>> >      >      >      >>>                             RequiresStableInput
>>> >     [3], but
>>> >      >     IMHO
>>> >      >      >      >>>                             defining an
>>> >     annotation only
>>> >      >      >     manifests the
>>> >      >      >      >>>                             conceptual
>>> difference
>>> >     between
>>> >      >      >      >>>                             the Runners.
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >      >>>                             [1]
>>> >      >      >      >>>
>>> >      >      >
>>> >      >
>>> >
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>> >      >      >
>>> >      >      >      >>>
>>> >      >      >      >>>                             [2]
>>> >      >      >      >>>
>>> >      >      >
>>> >      >
>>> >
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>> >      >      >
>>> >      >      >      >>>
>>> >      >      >      >>>                             [3]
>>> >      >      >      >>>
>>> >      >      >
>>> >      >
>>> >
>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>> >      >      >
>>> >      >      >      >>>
>>> >      >      >      >>>
>>> >      >      >
>>> >      >
>>> >
>>>
>>

Reply via email to