A fair amount of work for true true exactly once output was done in
Apex. Different from almost exactly-once :)
The takeaway was that the mechanism to achieve it depends on the
external system. The implementation looks different for let's say a file
sink or JDBC or Kafka.
Apex had an exactly-once producer before Kafka supported transactions.
That producer relied on the ability to discover what was already written
to Kafka upon recovery from failure. Why?
Runners are not distributed transaction coordinators and no matter how
we write the code, there is always the small possibility that one of two
resources fails to commit, resulting in either data loss or duplicates.
The Kafka EOS was a hybrid of producer and consumer, the consumer part
used during recovery to find out what was already produced previously.
Flink and Apex have very similar checkpointing model, that's why this
thread caught my attention. Within the topology/runner, exactly-once is
achieved by replay having the same effect. For sinks, it needs to rely
on the capabilities of the respective system (like atomic rename for
file sink, or transaction with metadata table for JDBC).
The buffering until checkpoint is complete is a mechanism to get away
from sink specific implementations. It comes with the latency penalty
(memory overhead could be solved with a write ahead log). But there is
still the possibility that we fail to flush the buffer after the
checkpoint is complete (data loss)?
Thanks,
Thomas
On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi <ang...@gmail.com
<mailto:ang...@gmail.com>> wrote:
On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
RE: Kenn's suggestion. i think Raghu looked into something
that, and something about it didn't work. I don't remember
all the details, but I think there might have been some
subtle problem with it that wasn't obvious. Doesn't mean
that there isn't another way to solve that issue.'
Two disadvantages:
- A transaction in Kafka are tied to single producer instance.
There is no official API to start a txn in one process and
access it in another process. Flink's sink uses an internal REST
API for this.
Can you say more about how this works?
- There is one failure case that I mentioned earlier: if closing
the transaction in downstream transform fails, it is data loss,
there is no way to replay the upstream transform that wrote the
records to Kafka.
With coupling of unrelated failures due to fusion, this is a severe
problem. I think I see now how 2PC affects this. From my reading, I
can't see the difference in how Flink works. If the checkpoint
finalization callback that does the Kafka commit fails, does it
invalidate the checkpoint so the start transaction + write elements
is retried?
Kenn
GBKs don't have major scalability limitations in most runner.
Extra GBK is fine in practice for such a sink (at least no one
has complained about it yet, though I don't know real usage
numbers in practice). Flink's implentation in Beam
using @RequiresStableInput does have storage requirements and
latency costs that increase with checkpoint interval. I think is
still just as useful. Good to see @RequiresStableInput support
added to Flink runner in Max's PR.
Hopefully we can make that work. Another possibility if we
can't is to do something special for Flink. Beam allows
runners to splice out well-known transforms with their own
implementation. Dataflow already does that for Google Cloud
Pub/Sub sources/sinks. The Flink runner could splice out the
Kafka sink with one that uses Flink-specific functionality.
Ideally this would reuse most of the existing Kafka code
(maybe we could refactor just the EOS part into something
that could be subbed out).
Reuven
On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>> wrote:
> It would be interesting to see if there's something
we could add to the Beam model that would create a
better story for Kafka's EOS writes.
There would have to be a checkpoint-completed callback
the DoFn can
register with the Runner. Does not seem applicable for
most Runners though.
> This is true, however isn't it already true for such
uses of Flink?
Yes, that's correct. In the case of Kafka, Flink can
offload the
buffering but for the general case, idempotent writes
are only possible
if we buffer data until the checkpoint is completed.
On 04.03.19 17:45, Reuven Lax wrote:
>
>
> On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> > Can we do 2? I seem to remember that we had
trouble in some cases
> (e..g in the BigQuery case, there was no obvious
way to create a
> deterministic id, which is why we went for a
random number followed
> by a reshuffle). Also remember that the user
ParDo that is producing
> data to the sink is not guaranteed to be
deterministic; the Beam
> model allows for non-deterministic transforms.
>
> I believe we could use something like the worker
id to make it
> deterministic, though the worker id can change
after a restart. We
> could
> persist it in Flink's operator state. I do not
know if we can come up
> with a Runner-independent solution.
>
>
> If we did this, we would break it on runners that
don't have a concept
> of a stable worker id :( The Dataflow runner can load
balance work at
> any time (including moving work around between workers).
>
>
> > I'm not quite sure I understand. If a ParDo is
marked with
> RequiresStableInput, can't the flink runner
buffer the input message
> until after the checkpoint is complete and only
then deliver it to
> the ParDo?
>
> You're correct. I thought that it could suffice
to only buffer during a
> checkpoint and otherwise rely on the
deterministic execution of the
> pipeline and KafkaIO's de-duplication code.
>
>
> Yes, I want to distinguish the KafkaIO case from the
general case. It
> would be interesting to see if there's something we
could add to the
> Beam model that would create a better story for
Kafka's EOS writes.
>
>
> In any case, emitting only after finalization of
checkpoints gives us
> guaranteed stable input. It also means that the
processing is tight to
> the checkpoint interval, the checkpoint duration,
and the available
> memory.
>
>
> This is true, however isn't it already true for such
uses of Flink?
>
>
> On 01.03.19 19:41, Reuven Lax wrote:
> >
> >
> > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>> wrote:
> >
> > Fully agree. I think we can improve the
situation
> drastically. For
> > KafkaIO EOS with Flink we need to make
these two changes:
> >
> > 1) Introduce buffering while the
checkpoint is being taken
> > 2) Replace the random shard id assignment
with something
> deterministic
> >
> >
> > Can we do 2? I seem to remember that we had
trouble in some cases
> (e..g
> > in the BigQuery case, there was no obvious way
to create a
> deterministic
> > id, which is why we went for a random number
followed by a
> reshuffle).
> > Also remember that the user ParDo that is
producing data to the
> sink is
> > not guaranteed to be deterministic; the Beam
model allows for
> > non-deterministic transforms.
> >
> >
> > However, we won't be able to provide full
compatibility with
> > RequiresStableInput because Flink only
guarantees stable
> input after a
> > checkpoint. RequiresStableInput requires
input at any point
> in time to
> > be stable.
> >
> >
> > I'm not quite sure I understand. If a ParDo is
marked with
> > RequiresStableInput, can't the flink runner
buffer the input message
> > until after the checkpoint is complete and
only then deliver it
> to the
> > ParDo? This adds latency of course, but I'm
not sure how else to do
> > things correctly with the Beam model.
> >
> > IMHO the only way to achieve that is
materializing output
> > which Flink does not currently support.
> >
> > KafkaIO does not need all the power of
RequiresStableInput to
> achieve
> > EOS with Flink, but for the general case I
don't see a good
> solution at
> > the moment.
> >
> > -Max
> >
> > On 01.03.19 16:45, Reuven Lax wrote:
> > > Yeah, the person who was working on it
originally stopped
> working on
> > > Beam, and nobody else ever finished it.
I think it is
> important to
> > > finish though. Many of the existing
Sinks are only fully
> correct for
> > > Dataflow today, because they generate
either Reshuffle or
> > GroupByKey to
> > > ensure input stability before
outputting (in many cases
> this code
> > was
> > > inherited from before Beam existed). On
Flink today, these
> sinks
> > might
> > > occasionally produce duplicate output
in the case of failures.
> > >
> > > Reuven
> > >
> > > On Fri, Mar 1, 2019 at 7:18 AM
Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>> wrote:
> > >
> > > Circling back to the
RequiresStableInput
> annotation[1]. I've
> > done some
> > > protoyping to see how this could be
integrated into
> Flink. I'm
> > > currently
> > > writing a test based on
RequiresStableInput.
> > >
> > > I found out there are already
checks in place at the
> Runners to
> > > throw in
> > > case transforms use
RequiresStableInput and its not
> > supported. However,
> > > not a single transform actually
uses the annotation.
> > >
> > > It seems that the effort stopped at
some point? Would
> it make
> > sense to
> > > start annotating
KafkaExactlyOnceSink with
> > @RequiresStableInput? We
> > > could then get rid of the whitelist.
> > >
> > > -Max
> > >
> > > [1]
> > >
> >
>
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> > >
> > >
> > >
> > > On 01.03.19 14:28, Maximilian
Michels wrote:
> > > > Just realized that transactions
do not spawn multiple
> > elements in
> > > > KafkaExactlyOnceSink. So the
proposed solution to stop
> > processing
> > > > elements while a snapshot is
pending would work.
> > > >
> > > > It is certainly not optimal in
terms of performance for
> > Flink and
> > > poses
> > > > problems when checkpoints take
long to complete, but it
> > would be
> > > > worthwhile to implement this to
make use of the EOS
> feature.
> > > >
> > > > Thanks,
> > > > Max
> > > >
> > > > On 01.03.19 12:23, Maximilian
Michels wrote:
> > > >> Thanks you for the prompt
replies. It's great to
> see that
> > there is
> > > >> good understanding of how EOS
in Flink works.
> > > >>
> > > >>> This is exactly what
RequiresStableInput is
> supposed to
> > do. On the
> > > >>> Flink runner, this would be
implemented by delaying
> > processing
> > > until
> > > >>> the current checkpoint is done.
> > > >>
> > > >> I don't think that works
because we have no
> control over
> > the Kafka
> > > >> transactions. Imagine:
> > > >>
> > > >> 1) ExactlyOnceWriter writes
records to Kafka and
> commits,
> > then
> > > starts
> > > >> a new transaction.
> > > >> 2) Flink checkpoints, delaying
the processing of
> > elements, the
> > > >> checkpoint fails.
> > > >> 3) We restore from an old
checkpoint and will
> start writing
> > > duplicate
> > > >> data to Kafka. The
de-duplication that the sink
> performs
> > does not
> > > >> help, especially because the
random shards ids
> might be
> > assigned
> > > >> differently.
> > > >>
> > > >> IMHO we have to have control
over commit to be able to
> > provide EOS.
> > > >>
> > > >>> When we discussed this in Aug
2017, the understanding
> > was that 2
> > > >>> Phase commit utility in Flink
used to implement
> Flink's
> > Kafka EOS
> > > >>> could not be implemented in
Beam's context.
> > > >>
> > > >> That's also my understanding,
unless we change the
> interface.
> > > >>
> > > >>> I don't see how SDF solves
this problem..
> > > >>
> > > >> SDF has a checkpoint method
which the Runner can call,
> > but I think
> > > >> that you are right, that the
above problem would
> be the same.
> > > >>
> > > >>> Absolutely. I would love to
support EOS in KakaIO for
> > Flink. I
> > > think
> > > >>> that will help many future
exactly-once sinks..
> and address
> > > >>> fundamental incompatibility
between Beam model
> and Flink's
> > > horizontal
> > > >>> checkpointing for such
applications.
> > > >>
> > > >> Great :)
> > > >>
> > > >>> The FlinkRunner would need to
insert the "wait until
> > checkpoint
> > > >>> finalization" logic wherever
it sees
> @RequiresStableInput,
> > > which is
> > > >>> already what it would have to do.
> > > >>
> > > >> I don't think that fixes the
problem. See above
> example.
> > > >>
> > > >> Thanks,
> > > >> Max
> > > >>
> > > >> On 01.03.19 00:04, Raghu Angadi
wrote:
> > > >>>
> > > >>>
> > > >>> On Thu, Feb 28, 2019 at 2:42
PM Raghu Angadi
> > <ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > >>> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>> wrote:
> > > >>>
> > > >>> On Thu, Feb 28, 2019 at
2:34 PM Kenneth Knowles
> > > <k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>
> > > >>> <mailto:k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>> wrote:
> > > >>>
> > > >>> I'm not sure what a
hard fail is. I probably
> > have a shallow
> > > >>> understanding, but doesn't
> @RequiresStableInput work
> > > for 2PC?
> > > >>> The preCommit() phase
should establish the
> > transaction and
> > > >>> commit() is not called
until after checkpoint
> > > finalization. Can
> > > >>> you describe the way
that it does not work a
> > little bit
> > > more?
> > > >>>
> > > >>>
> > > >>> - preCommit() is called
before checkpoint.
> Kafka EOS in
> > > Flink starts
> > > >>> the transaction before
this and makes sure it
> > flushes all
> > > records in
> > > >>> preCommit(). So far good.
> > > >>> - commit is called after
checkpoint is persisted.
> > Now, imagine
> > > >>> commit() fails for some
reason. There is no
> option
> > to rerun
> > > the 1st
> > > >>> phase to write the records
again in a new
> > transaction. This
> > > is a
> > > >>> hard failure for the the
job. In practice
> Flink might
> > > attempt to
> > > >>> commit again (not sure how
many times), which is
> > likely to
> > > fail and
> > > >>> eventually results in job
failure.
> > > >>>
> > > >>>
> > > >>> In Apache Beam, the records
could be stored in state,
> > and can be
> > > >>> written inside commit() to
work around this issue. It
> > could have
> > > >>> scalability issues if
checkpoints are not frequent
> > enough in Flink
> > > >>> runner.
> > > >>>
> > > >>> Raghu.
> > > >>>
> > > >>>
> > > >>>
> > > >>> Kenn
> > > >>>
> > > >>> On Thu, Feb 28, 2019
at 1:25 PM Raghu Angadi
> > > <ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > >>>
<mailto:ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>> wrote:
> > > >>>
> > > >>> On Thu, Feb 28,
2019 at 11:01 AM
> Kenneth Knowles
> > > >>> <k...@apache.org
<mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>
<mailto:k...@apache.org <mailto:k...@apache.org>
> <mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>
> > > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>
> > <mailto:k...@apache.org
<mailto:k...@apache.org> <mailto:k...@apache.org
<mailto:k...@apache.org>>
> <mailto:k...@apache.org <mailto:k...@apache.org>
<mailto:k...@apache.org <mailto:k...@apache.org>>>>>> wrote:
> > > >>>
> > > >>> I believe the
way you would implement
> > the logic
> > > behind
> > > >>> Flink's
KafkaProducer would be to
> have
> > two steps:
> > > >>>
> > > >>> 1. Start
transaction
> > > >>>
2. @RequiresStableInput Close
> transaction
> > > >>>
> > > >>>
> > > >>> I see. What
happens if closing the
> transaction
> > > fails in
> > > >>> (2)? Flink's 2PC
requires that
> commit() should
> > > never hard
> > > >>> fail once
preCommit() succeeds. I
> think that is
> > > cost of not
> > > >>> having an extra
shuffle. It is
> alright since
> > this
> > > policy has
> > > >>> worked well for
Flink so far.
> > > >>>
> > > >>> Overall, it will
be great to have
> > @RequiresStableInput
> > > >>> support in Flink
runner.
> > > >>>
> > > >>> Raghu.
> > > >>>
> > > >>> The
FlinkRunner would need to
> insert the
> > "wait
> > > until
> > > >>> checkpoint
finalization" logic
> wherever it
> > > >>>
sees @RequiresStableInput, which is
> > already what it
> > > >>> would have to do.
> > > >>>
> > > >>> This matches
the KafkaProducer's
> logic -
> > delay
> > > closing
> > > >>> the
transaction until checkpoint
> > finalization. This
> > > >>> answers my
main question, which
> is "is
> > > >>>
@RequiresStableInput expressive
> enough
> > to allow
> > > >>> Beam-on-Flink
to have exactly
> once behavior
> > > with the
> > > >>> same
performance characteristics as
> > native Flink
> > > >>> checkpoint
finalization?"
> > > >>>
> > > >>> Kenn
> > > >>>
> > > >>> [1]
> https://github.com/apache/beam/pull/7955
> > > >>>
> > > >>> On Thu, Feb
28, 2019 at 10:43 AM
> Reuven Lax
> > > >>>
<re...@google.com <mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>
> <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>
> > > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>
> <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>
> > <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>
> <mailto:re...@google.com
<mailto:re...@google.com> <mailto:re...@google.com
<mailto:re...@google.com>>>>>> wrote:
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Thu,
Feb 28, 2019 at 10:41 AM
> > Raghu Angadi
> > > >>>
<ang...@gmail.com <mailto:ang...@gmail.com>
> <mailto:ang...@gmail.com <mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>
> > > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>
> > <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>
> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com> <mailto:ang...@gmail.com
<mailto:ang...@gmail.com>>>>>> wrote:
> > > >>>
> > > >>>
> > > >>>
Now why does the Flink
> > Runner not
> > > support
> > > >>>
KafkaIO EOS? Flink's
> native
> > > >>>
KafkaProducer supports
> > exactly-once. It
> > > >>>
simply commits the
> pending
> > > >>>
transaction once it has
> > completed a
> > > >>> checkpoint.
> > > >>>
> > > >>>
> > > >>>
> > > >>> On
Thu, Feb 28, 2019 at
> 9:59 AM
> > Maximilian
> > > >>>
Michels <m...@apache.org <mailto:m...@apache.org>
> <mailto:m...@apache.org <mailto:m...@apache.org>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>>
> > > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>
> > <mailto:m...@apache.org
<mailto:m...@apache.org> <mailto:m...@apache.org
<mailto:m...@apache.org>>
> <mailto:m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>>>>>
> > > >>> wrote:
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>> I
came across
> KafkaIO's Runner
> > > whitelist [1]
> > > >>>
for enabling exactly-once
> > > >>>
semantics (EOS). I
> think it is
> > > questionable
> > > >>> to
exclude Runners from
> > > >>>
inside a transform, but I
> > see that the
> > > >>>
intention was to save
> users from
> > > >>>
surprises.
> > > >>>
> > > >>>
Now why does the Flink
> > Runner not
> > > support
> > > >>>
KafkaIO EOS? Flink's
> native
> > > >>>
KafkaProducer supports
> > exactly-once. It
> > > >>>
simply commits the
> pending
> > > >>>
transaction once it has
> > completed a
> > > >>> checkpoint.
> > > >>>
> > > >>>
> > > >>>
> > > >>> When
we discussed this in Aug
> > 2017, the
> > > >>>
understanding was that 2
> Phase
> > commit
> > > utility in
> > > >>> Flink
used to implement
> Flink's
> > Kafka
> > > EOS could
> > > >>> not be
implemented in Beam's
> > context.
> > > >>> See
this message
> > > >>>
> >
>
<https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
> > > >>> that
dev thread. Has anything
> > changed
> > > in this
> > > >>>
regard? The whole thread is
> > relevant to
> > > this
> > > >>> topic
and worth going
> through.
> > > >>>
> > > >>> I think
that TwoPhaseCommit
> utility
> > class
> > > wouldn't
> > > >>> work. The
Flink runner would
> > probably want to
> > > >>> directly use
> notifySnapshotComplete
> > in order to
> > > >>>
implement @RequiresStableInput.
> > > >>>
> > > >>>
> > > >>> A
checkpoint is
> realized by
> > sending
> > > barriers
> > > >>>
through all channels
> > > >>>
starting from the
> source until
> > > reaching all
> > > >>>
sinks. Every operator
> > > >>>
persists its state
> once it has
> > > received a
> > > >>>
barrier on all its input
> > > >>>
channels, it then
> forwards
> > it to the
> > > >>>
downstream operators.
> > > >>>
> > > >>>
The architecture of
> Beam's
> > > >>>
KafkaExactlyOnceSink
> is as
> > follows[2]:
> > > >>>
> > > >>>
Input ->
> AssignRandomShardIds ->
> > > GroupByKey
> > > >>> ->
AssignSequenceIds ->
> > > >>>
GroupByKey ->
> ExactlyOnceWriter
> > > >>>
> > > >>> As
I understood, Spark or
> > Dataflow
> > > use the
> > > >>>
GroupByKey stages to
> persist
> > > >>>
the input. That is not
> > required in
> > > Flink to
> > > >>> be
able to take a
> consistent
> > > >>>
snapshot of the pipeline.
> > > >>>
> > > >>>
Basically, for Flink we
> > don't need
> > > any of
> > > >>>
that magic that
> KafkaIO does.
> > > >>>
What we would need to
> > support EOS
> > > is a way
> > > >>> to
tell the
> ExactlyOnceWriter
> > > >>> (a
DoFn) to commit once a
> > > checkpoint has
> > > >>>
completed.
> > > >>>
> > > >>> I
know that the new
> version
> > of SDF
> > > supports
> > > >>>
checkpointing which
> should
> > > >>>
solve this issue. But
> there is
> > > still a lot
> > > >>> of
work to do to make
> this
> > > >>>
reality.
> > > >>>
> > > >>>
> > > >>> I
don't see how SDF
> solves this
> > > problem.. May be
> > > >>> pseudo
code would make more
> > clear. But if
> > > >>> helps,
that is great!
> > > >>>
> > > >>> So
I think it would make
> > sense to think
> > > >>>
about a way to make
> KafkaIO's
> > > >>>
EOS more accessible
> to Runners
> > > which support
> > > >>> a
different way of
> > > >>>
checkpointing.
> > > >>>
> > > >>>
> > > >>>
Absolutely. I would love to
> > support EOS in
> > > >>> KakaIO
for Flink. I think
> that will
> > > help many
> > > >>> future
exactly-once
> sinks.. and
> > address
> > > >>>
fundamental
> incompatibility between
> > > Beam model
> > > >>> and
Flink's horizontal
> checkpointing
> > > for such
> > > >>>
applications.
> > > >>>
> > > >>> Raghu.
> > > >>>
> > > >>>
Cheers,
> > > >>> Max
> > > >>>
> > > >>>
PS: I found this
> document about
> > > >>>
RequiresStableInput
> [3], but
> > IMHO
> > > >>>
defining an
> annotation only
> > > manifests the
> > > >>>
conceptual difference
> between
> > > >>>
the Runners.
> > > >>>
> > > >>>
> > > >>> [1]
> > > >>>
> > >
> >
>
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> > >
> > > >>>
> > > >>> [2]
> > > >>>
> > >
> >
>
https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> > >
> > > >>>
> > > >>> [3]
> > > >>>
> > >
> >
>
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> > >
> > > >>>
> > > >>>
> > >
> >
>