Re: KafkaIO Exactly-Once & Flink Runner

2019-03-21 Thread Thomas Weise
Tracked as https://issues.apache.org/jira/browse/BEAM-6879


On Fri, Mar 15, 2019 at 10:13 AM Kenneth Knowles  wrote:

> Yes, the ParDoPayload has to contain most of the information that is on
> DoFnSignature. Everything except the details for feeding the bits to the
> Java DoFn.
>
> Kenn
>
> On Fri, Mar 15, 2019 at 9:59 AM Reuven Lax  wrote:
>
>> I think this attribute needs to be added to the portability protos.
>>
>> On Fri, Mar 15, 2019 at 9:49 AM Thomas Weise  wrote:
>>
>>>
>>>
>>> On Tue, Mar 12, 2019 at 10:13 AM Maximilian Michels 
>>> wrote:
>>>
 > I think that is what Max's PR does. KafkaIO writes entire list of
 values associated with a key in one transaction. So it depends on how Flink
 runner bundles > after a GBK. I would think all of the buffered
 records would be queued. Here, the key is the shard id.

 We do not change the execution logic in case of stable input. Elements
 will still be processed key-wise.

>>>
>>> Wouldn't that require the KafkaEOS to support a different processing
>>> mode where the elements are committed with @FinishBundle? The runner could
>>> then align bundles and checkpointing as needed.
>>>
>>> I'm now also curious how @RequiresStableInput is supposed to work with
>>> portable pipelines? The runner is not able to inspect the ParDo, so this
>>> would need to be provided explicitly as part of the executable stage?
>>>
>>>
>>>




Re: KafkaIO Exactly-Once & Flink Runner

2019-03-15 Thread Kenneth Knowles
Yes, the ParDoPayload has to contain most of the information that is on
DoFnSignature. Everything except the details for feeding the bits to the
Java DoFn.

Kenn

On Fri, Mar 15, 2019 at 9:59 AM Reuven Lax  wrote:

> I think this attribute needs to be added to the portability protos.
>
> On Fri, Mar 15, 2019 at 9:49 AM Thomas Weise  wrote:
>
>>
>>
>> On Tue, Mar 12, 2019 at 10:13 AM Maximilian Michels 
>> wrote:
>>
>>> > I think that is what Max's PR does. KafkaIO writes entire list of
>>> values associated with a key in one transaction. So it depends on how Flink
>>> runner bundles > after a GBK. I would think all of the buffered
>>> records would be queued. Here, the key is the shard id.
>>>
>>> We do not change the execution logic in case of stable input. Elements
>>> will still be processed key-wise.
>>>
>>
>> Wouldn't that require the KafkaEOS to support a different processing mode
>> where the elements are committed with @FinishBundle? The runner could then
>> align bundles and checkpointing as needed.
>>
>> I'm now also curious how @RequiresStableInput is supposed to work with
>> portable pipelines? The runner is not able to inspect the ParDo, so this
>> would need to be provided explicitly as part of the executable stage?
>>
>>
>>
>>>
>>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-15 Thread Reuven Lax
I think this attribute needs to be added to the portability protos.

On Fri, Mar 15, 2019 at 9:49 AM Thomas Weise  wrote:

>
>
> On Tue, Mar 12, 2019 at 10:13 AM Maximilian Michels 
> wrote:
>
>> > I think that is what Max's PR does. KafkaIO writes entire list of
>> values associated with a key in one transaction. So it depends on how Flink
>> runner bundles > after a GBK. I would think all of the buffered
>> records would be queued. Here, the key is the shard id.
>>
>> We do not change the execution logic in case of stable input. Elements
>> will still be processed key-wise.
>>
>
> Wouldn't that require the KafkaEOS to support a different processing mode
> where the elements are committed with @FinishBundle? The runner could then
> align bundles and checkpointing as needed.
>
> I'm now also curious how @RequiresStableInput is supposed to work with
> portable pipelines? The runner is not able to inspect the ParDo, so this
> would need to be provided explicitly as part of the executable stage?
>
>
>
>>
>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-15 Thread Thomas Weise
On Tue, Mar 12, 2019 at 10:13 AM Maximilian Michels  wrote:

> > I think that is what Max's PR does. KafkaIO writes entire list of values
> associated with a key in one transaction. So it depends on how Flink runner
> bundles > after a GBK. I would think all of the buffered records
> would be queued. Here, the key is the shard id.
>
> We do not change the execution logic in case of stable input. Elements
> will still be processed key-wise.
>

Wouldn't that require the KafkaEOS to support a different processing mode
where the elements are committed with @FinishBundle? The runner could then
align bundles and checkpointing as needed.

I'm now also curious how @RequiresStableInput is supposed to work with
portable pipelines? The runner is not able to inspect the ParDo, so this
would need to be provided explicitly as part of the executable stage?



>
>


Re: KafkaIO Exactly-Once & Flink Runner

2019-03-11 Thread Maximilian Michels

We cannot reason about correct exactly-once behavior of a transform without 
understanding how state management and fault-tolerance in the runner work.


Generally, we require a transforms's writes to be idempotent for 
exactly-once semantics, even with @RequiresStableInput.


In the case of KafkaIO, we have transactions which means writes cannot 
be indempotent per se. That's why we drop already-committed records by 
recovering the current committed id from Kafka itself: 
https://github.com/apache/beam/blob/99d5d9138acbf9e5b87e7068183c5fd27448043e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L300


Beam's state interface is only used to persist the current record id. 
This is necessary to be able to replay the same ids upon restoring a 
failed job.


-Max

On 11.03.19 17:38, Thomas Weise wrote:
We cannot reason about correct exactly-once behavior of a transform 
without understanding how state management and fault-tolerance in the 
runner work.


Max pinged me this link to the Kafka EOS logic [1]. It uses a state 
variable to find out what was already written. That state variable would 
be part of a future Flink checkpoint. If after a failure we revert to 
the previous checkpoint, it won't help to discover/skip duplicates?


The general problem is that we are trying to rely on state in two 
different places to achieve EOS. This blog 
 
[2] describes how Kafka streams can provide the exactly-once guarantee, 
by using only Kafka as transactional resource (and committing all 
changes in a single TX). Everything else would require a distributed 
transaction coordinator (expensive) or a retry with duplicate detection 
mechanism in the external system (like check if record/reference was 
already written to Kafka, JDBC etc. or for file system, check if the 
file that would result from atomic rename already exists).


Thomas

[1] 
https://github.com/apache/beam/blob/99d5d9138acbf9e5b87e7068183c5fd27448043e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L329 

[2] 
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/


On Mon, Mar 11, 2019 at 7:54 AM Maximilian Michels > wrote:


This is not really about barriers, those are an implementation detail.

If a transform is annotated with @RequiresStableInput, no data will be
processed by this transform until a complete checkpoint has been taken.
After checkpoint completion, the elements will be processed. In case of
any failures, the checkpoint will be restored and the elements will be
processed again. This requires idempotent writes. KafkaIO's EOS mode
does that by ignoring all elements which are already part of a commit.

-Max

On 11.03.19 15:15, Thomas Weise wrote:
 > So all records between 2 checkpoint barriers will be buffered and on
 > checkpoint complete notification sent in a single transaction to
Kafka?
 >
 > The next question then is what happens if the Kafka transaction
does not
 > complete (and checkpoint complete callback fails)? Will the
callback be
 > repeated after Flink recovers?
 >
 >
 > On Mon, Mar 11, 2019 at 3:02 AM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >      > But there is still the possibility that we fail to flush the
 >     buffer after the checkpoint is complete (data loss)?
 >
 >     Since we have already checkpointed the buffered data we can retry
 >     flushing it in case of failures. We may emit elements
multiple times
 >     but
 >     that is because the Kafka EOS sink will skip records which
are already
 >     part of a committed transaction.
 >
 >     -Max
 >
 >     On 06.03.19 19:28, Thomas Weise wrote:
 >      > A fair amount of work for true true exactly once output
was done in
 >      > Apex. Different from almost exactly-once :)
 >      >
 >      > The takeaway was that the mechanism to achieve it depends
on the
 >      > external system. The implementation looks different for
let's say
 >     a file
 >      > sink or JDBC or Kafka.
 >      >
 >      > Apex had an exactly-once producer before Kafka supported
 >     transactions.
 >      > That producer relied on the ability to discover what was
already
 >     written
 >      > to Kafka upon recovery from failure. Why?
 >      >
 >      > Runners are not distributed transaction coordinators and no
 >     matter how
 >      > we write the code, there is always the small possibility
that one
 >     of two
 >      > resources fails to commit, resulting in either data loss or
 >     duplicates.
 >      > The Kafka 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-11 Thread Maximilian Michels

This is not really about barriers, those are an implementation detail.

If a transform is annotated with @RequiresStableInput, no data will be 
processed by this transform until a complete checkpoint has been taken. 
After checkpoint completion, the elements will be processed. In case of 
any failures, the checkpoint will be restored and the elements will be 
processed again. This requires idempotent writes. KafkaIO's EOS mode 
does that by ignoring all elements which are already part of a commit.


-Max

On 11.03.19 15:15, Thomas Weise wrote:
So all records between 2 checkpoint barriers will be buffered and on 
checkpoint complete notification sent in a single transaction to Kafka?


The next question then is what happens if the Kafka transaction does not 
complete (and checkpoint complete callback fails)? Will the callback be 
repeated after Flink recovers?



On Mon, Mar 11, 2019 at 3:02 AM Maximilian Michels > wrote:


 > But there is still the possibility that we fail to flush the
buffer after the checkpoint is complete (data loss)?

Since we have already checkpointed the buffered data we can retry
flushing it in case of failures. We may emit elements multiple times
but
that is because the Kafka EOS sink will skip records which are already
part of a committed transaction.

-Max

On 06.03.19 19:28, Thomas Weise wrote:
 > A fair amount of work for true true exactly once output was done in
 > Apex. Different from almost exactly-once :)
 >
 > The takeaway was that the mechanism to achieve it depends on the
 > external system. The implementation looks different for let's say
a file
 > sink or JDBC or Kafka.
 >
 > Apex had an exactly-once producer before Kafka supported
transactions.
 > That producer relied on the ability to discover what was already
written
 > to Kafka upon recovery from failure. Why?
 >
 > Runners are not distributed transaction coordinators and no
matter how
 > we write the code, there is always the small possibility that one
of two
 > resources fails to commit, resulting in either data loss or
duplicates.
 > The Kafka EOS was a hybrid of producer and consumer, the consumer
part
 > used during recovery to find out what was already produced
previously.
 >
 > Flink and Apex have very similar checkpointing model, that's why
this
 > thread caught my attention. Within the topology/runner,
exactly-once is
 > achieved by replay having the same effect. For sinks, it needs to
rely
 > on the capabilities of the respective system (like atomic rename for
 > file sink, or transaction with metadata table for JDBC).
 >
 > The buffering until checkpoint is complete is a mechanism to get
away
 > from sink specific implementations. It comes with the latency
penalty
 > (memory overhead could be solved with a write ahead log). But
there is
 > still the possibility that we fail to flush the buffer after the
 > checkpoint is complete (data loss)?
 >
 > Thanks,
 > Thomas
 >
 >
 > On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles mailto:k...@apache.org>
 > >> wrote:
 >
 >     On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi
mailto:ang...@gmail.com>
 >     >> wrote:
 >
 >
 >
 >         On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax
mailto:re...@google.com>
 >         >> wrote:
 >
 >             RE: Kenn's suggestion. i think Raghu looked into
something
 >             that, and something about it didn't work. I don't
remember
 >             all the details, but I think there might have been some
 >             subtle problem with it that wasn't obvious. Doesn't mean
 >             that there isn't another way to solve that issue.'
 >
 >
 >         Two disadvantages:
 >         - A transaction in Kafka are tied to single producer
instance.
 >         There is no official API to start a txn in one process and
 >         access it in another process. Flink's sink uses an
internal REST
 >         API for this.
 >
 >
 >     Can you say more about how this works?
 >
 >         - There is one failure case that I mentioned earlier: if
closing
 >         the transaction in downstream transform fails, it is data
loss,
 >         there is no way to replay the upstream transform that
wrote the
 >         records to Kafka.
 >
 >
 >     With coupling of unrelated failures due to fusion, this is a
severe
 >     problem. I think I see now how 2PC affects this. From my
reading, I
 >     can't see the difference in how Flink works. If the checkpoint
 >     

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-11 Thread Maximilian Michels

Just realized, there was a word missing:

Since we have already checkpointed the buffered data we can retry 
flushing it in case of failures. We may emit elements multiple times but 
that is __fine__ because the Kafka EOS sink will skip records which are 
already part of a committed transaction.


On 11.03.19 11:02, Maximilian Michels wrote:
But there is still the possibility that we fail to flush the buffer 
after the checkpoint is complete (data loss)?


Since we have already checkpointed the buffered data we can retry 
flushing it in case of failures. We may emit elements multiple times but 
that is because the Kafka EOS sink will skip records which are already 
part of a committed transaction.


-Max

On 06.03.19 19:28, Thomas Weise wrote:
A fair amount of work for true true exactly once output was done in 
Apex. Different from almost exactly-once :)


The takeaway was that the mechanism to achieve it depends on the 
external system. The implementation looks different for let's say a 
file sink or JDBC or Kafka.


Apex had an exactly-once producer before Kafka supported transactions. 
That producer relied on the ability to discover what was already 
written to Kafka upon recovery from failure. Why?


Runners are not distributed transaction coordinators and no matter how 
we write the code, there is always the small possibility that one of 
two resources fails to commit, resulting in either data loss or 
duplicates. The Kafka EOS was a hybrid of producer and consumer, the 
consumer part used during recovery to find out what was already 
produced previously.


Flink and Apex have very similar checkpointing model, that's why this 
thread caught my attention. Within the topology/runner, exactly-once 
is achieved by replay having the same effect. For sinks, it needs to 
rely on the capabilities of the respective system (like atomic rename 
for file sink, or transaction with metadata table for JDBC).


The buffering until checkpoint is complete is a mechanism to get away 
from sink specific implementations. It comes with the latency penalty 
(memory overhead could be solved with a write ahead log). But there is 
still the possibility that we fail to flush the buffer after the 
checkpoint is complete (data loss)?


Thanks,
Thomas


On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles > wrote:


    On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:



    On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax mailto:re...@google.com>> wrote:

    RE: Kenn's suggestion. i think Raghu looked into something
    that, and something about it didn't work. I don't remember
    all the details, but I think there might have been some
    subtle problem with it that wasn't obvious. Doesn't mean
    that there isn't another way to solve that issue.'


    Two disadvantages:
    - A transaction in Kafka are tied to single producer instance.
    There is no official API to start a txn in one process and
    access it in another process. Flink's sink uses an internal REST
    API for this.


    Can you say more about how this works?

    - There is one failure case that I mentioned earlier: if closing
    the transaction in downstream transform fails, it is data loss,
    there is no way to replay the upstream transform that wrote the
    records to Kafka.


    With coupling of unrelated failures due to fusion, this is a severe
    problem. I think I see now how 2PC affects this. From my reading, I
    can't see the difference in how Flink works. If the checkpoint
    finalization callback that does the Kafka commit fails, does it
    invalidate the checkpoint so the start transaction + write elements
    is retried?

    Kenn


    GBKs don't have major scalability limitations in most runner.
    Extra GBK is fine in practice for such a sink (at least no one
    has complained about it yet, though I don't know real usage
    numbers in practice). Flink's implentation in Beam
    using @RequiresStableInput  does have storage requirements and
    latency costs that increase with checkpoint interval. I think is
    still just as useful. Good to see @RequiresStableInput support
    added to Flink runner in Max's PR.


    Hopefully we can make that work. Another possibility if we
    can't is to do something special for Flink. Beam allows
    runners to splice out well-known transforms with their own
    implementation. Dataflow already does that for Google Cloud
    Pub/Sub sources/sinks. The Flink runner could splice out the
    Kafka sink with one that uses Flink-specific 
functionality.     Ideally this would reuse most of the 
existing Kafka code

    (maybe we could refactor just the EOS part into something
    that could be subbed out).

    Reuven

    On Tue, Mar 5, 2019 at 2:53 AM 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-11 Thread Maximilian Michels

But there is still the possibility that we fail to flush the buffer after the 
checkpoint is complete (data loss)?


Since we have already checkpointed the buffered data we can retry 
flushing it in case of failures. We may emit elements multiple times but 
that is because the Kafka EOS sink will skip records which are already 
part of a committed transaction.


-Max

On 06.03.19 19:28, Thomas Weise wrote:
A fair amount of work for true true exactly once output was done in 
Apex. Different from almost exactly-once :)


The takeaway was that the mechanism to achieve it depends on the 
external system. The implementation looks different for let's say a file 
sink or JDBC or Kafka.


Apex had an exactly-once producer before Kafka supported transactions. 
That producer relied on the ability to discover what was already written 
to Kafka upon recovery from failure. Why?


Runners are not distributed transaction coordinators and no matter how 
we write the code, there is always the small possibility that one of two 
resources fails to commit, resulting in either data loss or duplicates. 
The Kafka EOS was a hybrid of producer and consumer, the consumer part 
used during recovery to find out what was already produced previously.


Flink and Apex have very similar checkpointing model, that's why this 
thread caught my attention. Within the topology/runner, exactly-once is 
achieved by replay having the same effect. For sinks, it needs to rely 
on the capabilities of the respective system (like atomic rename for 
file sink, or transaction with metadata table for JDBC).


The buffering until checkpoint is complete is a mechanism to get away 
from sink specific implementations. It comes with the latency penalty 
(memory overhead could be solved with a write ahead log). But there is 
still the possibility that we fail to flush the buffer after the 
checkpoint is complete (data loss)?


Thanks,
Thomas


On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles > wrote:


On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:



On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax mailto:re...@google.com>> wrote:

RE: Kenn's suggestion. i think Raghu looked into something
that, and something about it didn't work. I don't remember
all the details, but I think there might have been some
subtle problem with it that wasn't obvious. Doesn't mean
that there isn't another way to solve that issue.'


Two disadvantages:
- A transaction in Kafka are tied to single producer instance.
There is no official API to start a txn in one process and
access it in another process. Flink's sink uses an internal REST
API for this.


Can you say more about how this works?

- There is one failure case that I mentioned earlier: if closing
the transaction in downstream transform fails, it is data loss,
there is no way to replay the upstream transform that wrote the
records to Kafka.


With coupling of unrelated failures due to fusion, this is a severe
problem. I think I see now how 2PC affects this. From my reading, I
can't see the difference in how Flink works. If the checkpoint
finalization callback that does the Kafka commit fails, does it
invalidate the checkpoint so the start transaction + write elements
is retried?

Kenn


GBKs don't have major scalability limitations in most runner.
Extra GBK is fine in practice for such a sink (at least no one
has complained about it yet, though I don't know real usage
numbers in practice). Flink's implentation in Beam
using @RequiresStableInput  does have storage requirements and
latency costs that increase with checkpoint interval. I think is
still just as useful. Good to see @RequiresStableInput support
added to Flink runner in Max's PR.


Hopefully we can make that work. Another possibility if we
can't is to do something special for Flink. Beam allows
runners to splice out well-known transforms with their own
implementation. Dataflow already does that for Google Cloud
Pub/Sub sources/sinks. The Flink runner could splice out the
Kafka sink with one that uses Flink-specific functionality. 
Ideally this would reuse most of the existing Kafka code

(maybe we could refactor just the EOS part into something
that could be subbed out).

Reuven

On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

 > It would be interesting to see if there's something
we could add to the Beam model that would create a
better story for Kafka's EOS writes.

There would have to be a checkpoint-completed callback
the 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-06 Thread Raghu Angadi
On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles  wrote:

> On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi  wrote:
>
>>
>>
>> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:
>>
>>> RE: Kenn's suggestion. i think Raghu looked into something that, and
>>> something about it didn't work. I don't remember all the details, but I
>>> think there might have been some subtle problem with it that wasn't
>>> obvious. Doesn't mean that there isn't another way to solve that issue.'
>>>
>>
>> Two disadvantages:
>> - A transaction in Kafka are tied to single producer instance. There is
>> no official API to start a txn in one process and access it in another
>> process. Flink's sink uses an internal REST API for this.
>>
>
> Can you say more about how this works?
>

I remember this from discussion PR that added EOS Kafka sink in Flink in
Aug 2017. From the comment at
https://github.com/apache/flink/pull/4239#issuecomment-321392085 :
"Resuming transactions is not a part of KafkaProducer's API, however
Kafka's REST API allows to do that. However I'm aware that it wasn't an
intention of the authors to do so."
Implementation of 'resumeTransaction()' : link

.

In the case of Flink, this hack is required only when there is a recovery
(since preCommit & postCommit operations run on the same worker in normal
operation). But for Dataflow using GBK, it would have to be handled this
way for every transaction top avoid shuffling the records. A
@RequiresStableInput implementation in Dataflow could actually run pre- and
post- transforms on same worker similar to two fused transforms. In that
sense essentially @RequiesStableInput would be 'fusion break without
shuffle' in Dataflow.

- There is one failure case that I mentioned earlier: if closing the
>> transaction in downstream transform fails, it is data loss, there is no way
>> to replay the upstream transform that wrote the records to Kafka.
>>
>
> With coupling of unrelated failures due to fusion, this is a severe
> problem. I think I see now how 2PC affects this. From my reading, I can't
> see the difference in how Flink works. If the checkpoint finalization
> callback that does the Kafka commit fails, does it invalidate the
> checkpoint so the start transaction + write elements is retried?
>

It doesn't retry since Kafka does not save records, it only save txn
information. 2PC Javadoc

clearly states that recoverAndCommit() should eventually succeed, otherwise
it is data loss.

In practice committing a txn in Kafka may rarely fail, that's why this
issue does not seem to be an issue for Flink sink.

Overall though, I don't think extra shuffle of messages is prohibitively
expensive and keeps the design clean. In EOS Javadoc, I mentioned bigger
cost could be due to extra serialization & deserialization of the records,
which is reasonably simple to avoid by writing serialized byte-records to
KafkaIO.

Kenn
>
>
>>
>> GBKs don't have major scalability limitations in most runner. Extra GBK
>> is fine in practice for such a sink (at least no one has complained about
>> it yet, though I don't know real usage numbers in practice). Flink's
>> implentation in Beam using @RequiresStableInput  does have storage
>> requirements and latency costs that increase with checkpoint interval. I
>> think is still just as useful. Good to see @RequiresStableInput support
>> added to Flink runner in Max's PR.
>>
>>
>>> Hopefully we can make that work. Another possibility if we can't is to
>>> do something special for Flink. Beam allows runners to splice out
>>> well-known transforms with their own implementation. Dataflow already does
>>> that for Google Cloud Pub/Sub sources/sinks. The Flink runner could splice
>>> out the Kafka sink with one that uses Flink-specific functionality.
>>> Ideally this would reuse most of the existing Kafka code (maybe we could
>>> refactor just the EOS part into something that could be subbed out).
>>>
>>> Reuven
>>>
>>> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels 
>>> wrote:
>>>
 > It would be interesting to see if there's something we could add to
 the Beam model that would create a better story for Kafka's EOS writes.

 There would have to be a checkpoint-completed callback the DoFn can
 register with the Runner. Does not seem applicable for most Runners
 though.

 > This is true, however isn't it already true for such uses of Flink?

 Yes, that's correct. In the case of Kafka, Flink can offload the
 buffering but for the general case, idempotent writes are only possible
 if we buffer data until the checkpoint is completed.

 On 04.03.19 17:45, Reuven Lax wrote:

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-06 Thread Reuven Lax
Yes, it sounds like we're saying that finalizing a transaction in Kafka
could fail in a non-retryable way, where the only option is to abandon the
entire transaction and start again. This sounds somewhat broken from the
Kafka side if true. Do we know what might cause such failures in Kafka?

Reuven

On Wed, Mar 6, 2019 at 10:42 AM Kenneth Knowles  wrote:

> The situation for Kafka transactions described here seems very analogous
> to a file sink that writes to temp files, waits for that to succeed,
> durably persists, then renames to their final location [1]. What it sounds
> like Raghu is describing is that if the Kafka commit fails then the
> previously written data is discarded. So that would be as if rename failure
> would delete the temp files. The architecture no longer works.
>
> Kenn
>
> [1] see
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L765
>
> On Wed, Mar 6, 2019 at 10:29 AM Thomas Weise  wrote:
>
>> A fair amount of work for true true exactly once output was done in Apex.
>> Different from almost exactly-once :)
>>
>> The takeaway was that the mechanism to achieve it depends on the external
>> system. The implementation looks different for let's say a file sink or
>> JDBC or Kafka.
>>
>> Apex had an exactly-once producer before Kafka supported transactions.
>> That producer relied on the ability to discover what was already written to
>> Kafka upon recovery from failure. Why?
>>
>> Runners are not distributed transaction coordinators and no matter how we
>> write the code, there is always the small possibility that one of two
>> resources fails to commit, resulting in either data loss or duplicates. The
>> Kafka EOS was a hybrid of producer and consumer, the consumer part used
>> during recovery to find out what was already produced previously.
>>
>> Flink and Apex have very similar checkpointing model, that's why this
>> thread caught my attention. Within the topology/runner, exactly-once is
>> achieved by replay having the same effect. For sinks, it needs to rely on
>> the capabilities of the respective system (like atomic rename for file
>> sink, or transaction with metadata table for JDBC).
>>
>> The buffering until checkpoint is complete is a mechanism to get away
>> from sink specific implementations. It comes with the latency penalty
>> (memory overhead could be solved with a write ahead log). But there is
>> still the possibility that we fail to flush the buffer after the checkpoint
>> is complete (data loss)?
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles  wrote:
>>
>>> On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi  wrote:
>>>


 On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:

> RE: Kenn's suggestion. i think Raghu looked into something that, and
> something about it didn't work. I don't remember all the details, but I
> think there might have been some subtle problem with it that wasn't
> obvious. Doesn't mean that there isn't another way to solve that issue.'
>

 Two disadvantages:
 - A transaction in Kafka are tied to single producer instance. There is
 no official API to start a txn in one process and access it in another
 process. Flink's sink uses an internal REST API for this.

>>>
>>> Can you say more about how this works?
>>>
>>> - There is one failure case that I mentioned earlier: if closing the
 transaction in downstream transform fails, it is data loss, there is no way
 to replay the upstream transform that wrote the records to Kafka.

>>>
>>> With coupling of unrelated failures due to fusion, this is a severe
>>> problem. I think I see now how 2PC affects this. From my reading, I can't
>>> see the difference in how Flink works. If the checkpoint finalization
>>> callback that does the Kafka commit fails, does it invalidate the
>>> checkpoint so the start transaction + write elements is retried?
>>>
>>> Kenn
>>>
>>>

 GBKs don't have major scalability limitations in most runner. Extra GBK
 is fine in practice for such a sink (at least no one has complained about
 it yet, though I don't know real usage numbers in practice). Flink's
 implentation in Beam using @RequiresStableInput  does have storage
 requirements and latency costs that increase with checkpoint interval. I
 think is still just as useful. Good to see @RequiresStableInput support
 added to Flink runner in Max's PR.


> Hopefully we can make that work. Another possibility if we can't is to
> do something special for Flink. Beam allows runners to splice out
> well-known transforms with their own implementation. Dataflow already does
> that for Google Cloud Pub/Sub sources/sinks. The Flink runner could splice
> out the Kafka sink with one that uses Flink-specific functionality.
> Ideally this would reuse most of the existing Kafka code (maybe we could
> refactor just the 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-06 Thread Kenneth Knowles
The situation for Kafka transactions described here seems very analogous to
a file sink that writes to temp files, waits for that to succeed, durably
persists, then renames to their final location [1]. What it sounds like
Raghu is describing is that if the Kafka commit fails then the previously
written data is discarded. So that would be as if rename failure would
delete the temp files. The architecture no longer works.

Kenn

[1] see
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L765

On Wed, Mar 6, 2019 at 10:29 AM Thomas Weise  wrote:

> A fair amount of work for true true exactly once output was done in Apex.
> Different from almost exactly-once :)
>
> The takeaway was that the mechanism to achieve it depends on the external
> system. The implementation looks different for let's say a file sink or
> JDBC or Kafka.
>
> Apex had an exactly-once producer before Kafka supported transactions.
> That producer relied on the ability to discover what was already written to
> Kafka upon recovery from failure. Why?
>
> Runners are not distributed transaction coordinators and no matter how we
> write the code, there is always the small possibility that one of two
> resources fails to commit, resulting in either data loss or duplicates. The
> Kafka EOS was a hybrid of producer and consumer, the consumer part used
> during recovery to find out what was already produced previously.
>
> Flink and Apex have very similar checkpointing model, that's why this
> thread caught my attention. Within the topology/runner, exactly-once is
> achieved by replay having the same effect. For sinks, it needs to rely on
> the capabilities of the respective system (like atomic rename for file
> sink, or transaction with metadata table for JDBC).
>
> The buffering until checkpoint is complete is a mechanism to get away from
> sink specific implementations. It comes with the latency penalty (memory
> overhead could be solved with a write ahead log). But there is still the
> possibility that we fail to flush the buffer after the checkpoint is
> complete (data loss)?
>
> Thanks,
> Thomas
>
>
> On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles  wrote:
>
>> On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi  wrote:
>>
>>>
>>>
>>> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:
>>>
 RE: Kenn's suggestion. i think Raghu looked into something that, and
 something about it didn't work. I don't remember all the details, but I
 think there might have been some subtle problem with it that wasn't
 obvious. Doesn't mean that there isn't another way to solve that issue.'

>>>
>>> Two disadvantages:
>>> - A transaction in Kafka are tied to single producer instance. There is
>>> no official API to start a txn in one process and access it in another
>>> process. Flink's sink uses an internal REST API for this.
>>>
>>
>> Can you say more about how this works?
>>
>> - There is one failure case that I mentioned earlier: if closing the
>>> transaction in downstream transform fails, it is data loss, there is no way
>>> to replay the upstream transform that wrote the records to Kafka.
>>>
>>
>> With coupling of unrelated failures due to fusion, this is a severe
>> problem. I think I see now how 2PC affects this. From my reading, I can't
>> see the difference in how Flink works. If the checkpoint finalization
>> callback that does the Kafka commit fails, does it invalidate the
>> checkpoint so the start transaction + write elements is retried?
>>
>> Kenn
>>
>>
>>>
>>> GBKs don't have major scalability limitations in most runner. Extra GBK
>>> is fine in practice for such a sink (at least no one has complained about
>>> it yet, though I don't know real usage numbers in practice). Flink's
>>> implentation in Beam using @RequiresStableInput  does have storage
>>> requirements and latency costs that increase with checkpoint interval. I
>>> think is still just as useful. Good to see @RequiresStableInput support
>>> added to Flink runner in Max's PR.
>>>
>>>
 Hopefully we can make that work. Another possibility if we can't is to
 do something special for Flink. Beam allows runners to splice out
 well-known transforms with their own implementation. Dataflow already does
 that for Google Cloud Pub/Sub sources/sinks. The Flink runner could splice
 out the Kafka sink with one that uses Flink-specific functionality.
 Ideally this would reuse most of the existing Kafka code (maybe we could
 refactor just the EOS part into something that could be subbed out).

 Reuven

 On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels 
 wrote:

> > It would be interesting to see if there's something we could add to
> the Beam model that would create a better story for Kafka's EOS writes.
>
> There would have to be a checkpoint-completed callback the DoFn can
> register with the Runner. Does not seem applicable for most Runners
> though.

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-06 Thread Thomas Weise
A fair amount of work for true true exactly once output was done in Apex.
Different from almost exactly-once :)

The takeaway was that the mechanism to achieve it depends on the external
system. The implementation looks different for let's say a file sink or
JDBC or Kafka.

Apex had an exactly-once producer before Kafka supported transactions. That
producer relied on the ability to discover what was already written to
Kafka upon recovery from failure. Why?

Runners are not distributed transaction coordinators and no matter how we
write the code, there is always the small possibility that one of two
resources fails to commit, resulting in either data loss or duplicates. The
Kafka EOS was a hybrid of producer and consumer, the consumer part used
during recovery to find out what was already produced previously.

Flink and Apex have very similar checkpointing model, that's why this
thread caught my attention. Within the topology/runner, exactly-once is
achieved by replay having the same effect. For sinks, it needs to rely on
the capabilities of the respective system (like atomic rename for file
sink, or transaction with metadata table for JDBC).

The buffering until checkpoint is complete is a mechanism to get away from
sink specific implementations. It comes with the latency penalty (memory
overhead could be solved with a write ahead log). But there is still the
possibility that we fail to flush the buffer after the checkpoint is
complete (data loss)?

Thanks,
Thomas


On Wed, Mar 6, 2019 at 9:37 AM Kenneth Knowles  wrote:

> On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi  wrote:
>
>>
>>
>> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:
>>
>>> RE: Kenn's suggestion. i think Raghu looked into something that, and
>>> something about it didn't work. I don't remember all the details, but I
>>> think there might have been some subtle problem with it that wasn't
>>> obvious. Doesn't mean that there isn't another way to solve that issue.'
>>>
>>
>> Two disadvantages:
>> - A transaction in Kafka are tied to single producer instance. There is
>> no official API to start a txn in one process and access it in another
>> process. Flink's sink uses an internal REST API for this.
>>
>
> Can you say more about how this works?
>
> - There is one failure case that I mentioned earlier: if closing the
>> transaction in downstream transform fails, it is data loss, there is no way
>> to replay the upstream transform that wrote the records to Kafka.
>>
>
> With coupling of unrelated failures due to fusion, this is a severe
> problem. I think I see now how 2PC affects this. From my reading, I can't
> see the difference in how Flink works. If the checkpoint finalization
> callback that does the Kafka commit fails, does it invalidate the
> checkpoint so the start transaction + write elements is retried?
>
> Kenn
>
>
>>
>> GBKs don't have major scalability limitations in most runner. Extra GBK
>> is fine in practice for such a sink (at least no one has complained about
>> it yet, though I don't know real usage numbers in practice). Flink's
>> implentation in Beam using @RequiresStableInput  does have storage
>> requirements and latency costs that increase with checkpoint interval. I
>> think is still just as useful. Good to see @RequiresStableInput support
>> added to Flink runner in Max's PR.
>>
>>
>>> Hopefully we can make that work. Another possibility if we can't is to
>>> do something special for Flink. Beam allows runners to splice out
>>> well-known transforms with their own implementation. Dataflow already does
>>> that for Google Cloud Pub/Sub sources/sinks. The Flink runner could splice
>>> out the Kafka sink with one that uses Flink-specific functionality.
>>> Ideally this would reuse most of the existing Kafka code (maybe we could
>>> refactor just the EOS part into something that could be subbed out).
>>>
>>> Reuven
>>>
>>> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels 
>>> wrote:
>>>
 > It would be interesting to see if there's something we could add to
 the Beam model that would create a better story for Kafka's EOS writes.

 There would have to be a checkpoint-completed callback the DoFn can
 register with the Runner. Does not seem applicable for most Runners
 though.

 > This is true, however isn't it already true for such uses of Flink?

 Yes, that's correct. In the case of Kafka, Flink can offload the
 buffering but for the general case, idempotent writes are only possible
 if we buffer data until the checkpoint is completed.

 On 04.03.19 17:45, Reuven Lax wrote:
 >
 >
 > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels >>> > > wrote:
 >
 >  > Can we do 2? I seem to remember that we had trouble in some
 cases
 > (e..g in the BigQuery case, there was no obvious way to create a
 > deterministic id, which is why we went for a random number
 followed
 > by a reshuffle). Also 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-06 Thread Kenneth Knowles
On Tue, Mar 5, 2019 at 10:02 PM Raghu Angadi  wrote:

>
>
> On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:
>
>> RE: Kenn's suggestion. i think Raghu looked into something that, and
>> something about it didn't work. I don't remember all the details, but I
>> think there might have been some subtle problem with it that wasn't
>> obvious. Doesn't mean that there isn't another way to solve that issue.'
>>
>
> Two disadvantages:
> - A transaction in Kafka are tied to single producer instance. There is no
> official API to start a txn in one process and access it in another
> process. Flink's sink uses an internal REST API for this.
>

Can you say more about how this works?

- There is one failure case that I mentioned earlier: if closing the
> transaction in downstream transform fails, it is data loss, there is no way
> to replay the upstream transform that wrote the records to Kafka.
>

With coupling of unrelated failures due to fusion, this is a severe
problem. I think I see now how 2PC affects this. From my reading, I can't
see the difference in how Flink works. If the checkpoint finalization
callback that does the Kafka commit fails, does it invalidate the
checkpoint so the start transaction + write elements is retried?

Kenn


>
> GBKs don't have major scalability limitations in most runner. Extra GBK is
> fine in practice for such a sink (at least no one has complained about it
> yet, though I don't know real usage numbers in practice). Flink's
> implentation in Beam using @RequiresStableInput  does have storage
> requirements and latency costs that increase with checkpoint interval. I
> think is still just as useful. Good to see @RequiresStableInput support
> added to Flink runner in Max's PR.
>
>
>> Hopefully we can make that work. Another possibility if we can't is to do
>> something special for Flink. Beam allows runners to splice out well-known
>> transforms with their own implementation. Dataflow already does that for
>> Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the
>> Kafka sink with one that uses Flink-specific functionality.  Ideally this
>> would reuse most of the existing Kafka code (maybe we could refactor just
>> the EOS part into something that could be subbed out).
>>
>> Reuven
>>
>> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels  wrote:
>>
>>> > It would be interesting to see if there's something we could add to
>>> the Beam model that would create a better story for Kafka's EOS writes.
>>>
>>> There would have to be a checkpoint-completed callback the DoFn can
>>> register with the Runner. Does not seem applicable for most Runners
>>> though.
>>>
>>> > This is true, however isn't it already true for such uses of Flink?
>>>
>>> Yes, that's correct. In the case of Kafka, Flink can offload the
>>> buffering but for the general case, idempotent writes are only possible
>>> if we buffer data until the checkpoint is completed.
>>>
>>> On 04.03.19 17:45, Reuven Lax wrote:
>>> >
>>> >
>>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels >> > > wrote:
>>> >
>>> >  > Can we do 2? I seem to remember that we had trouble in some
>>> cases
>>> > (e..g in the BigQuery case, there was no obvious way to create a
>>> > deterministic id, which is why we went for a random number followed
>>> > by a reshuffle). Also remember that the user ParDo that is
>>> producing
>>> > data to the sink is not guaranteed to be deterministic; the Beam
>>> > model allows for non-deterministic transforms.
>>> >
>>> > I believe we could use something like the worker id to make it
>>> > deterministic, though the worker id can change after a restart. We
>>> > could
>>> > persist it in Flink's operator state. I do not know if we can come
>>> up
>>> > with a Runner-independent solution.
>>> >
>>> >
>>> > If we did this, we would break it on runners that don't have a concept
>>> > of a stable worker id :( The Dataflow runner can load balance work at
>>> > any time (including moving work around between workers).
>>> >
>>> >
>>> >  > I'm not quite sure I understand. If a ParDo is marked with
>>> > RequiresStableInput, can't the flink runner buffer the input
>>> message
>>> > until after the checkpoint is complete and only then deliver it to
>>> > the ParDo?
>>> >
>>> > You're correct. I thought that it could suffice to only buffer
>>> during a
>>> > checkpoint and otherwise rely on the deterministic execution of the
>>> > pipeline and KafkaIO's de-duplication code.
>>> >
>>> >
>>> > Yes, I want to distinguish the KafkaIO case from the general case. It
>>> > would be interesting to see if there's something we could add to the
>>> > Beam model that would create a better story for Kafka's EOS writes.
>>> >
>>> >
>>> > In any case, emitting only after finalization of checkpoints gives
>>> us
>>> > guaranteed stable input. It also means that the processing is
>>> tight to
>>> > the checkpoint 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Raghu Angadi
On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:

> RE: Kenn's suggestion. i think Raghu looked into something that, and
> something about it didn't work. I don't remember all the details, but I
> think there might have been some subtle problem with it that wasn't
> obvious. Doesn't mean that there isn't another way to solve that issue.'
>

Two disadvantages:
- A transaction in Kafka are tied to single producer instance. There is no
official API to start a txn in one process and access it in another
process. Flink's sink uses an internal REST API for this.
- There is one failure case that I mentioned earlier: if closing the
transaction in downstream transform fails, it is data loss, there is no way
to replay the upstream transform that wrote the records to Kafka.

GBKs don't have major scalability limitations in most runner. Extra GBK is
fine in practice for such a sink (at least no one has complained about it
yet, though I don't know real usage numbers in practice). Flink's
implentation in Beam using @RequiresStableInput  does have storage
requirements and latency costs that increase with checkpoint interval. I
think is still just as useful. Good to see @RequiresStableInput support
added to Flink runner in Max's PR.


> Hopefully we can make that work. Another possibility if we can't is to do
> something special for Flink. Beam allows runners to splice out well-known
> transforms with their own implementation. Dataflow already does that for
> Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the
> Kafka sink with one that uses Flink-specific functionality.  Ideally this
> would reuse most of the existing Kafka code (maybe we could refactor just
> the EOS part into something that could be subbed out).
>
> Reuven
>
> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels  wrote:
>
>> > It would be interesting to see if there's something we could add to the
>> Beam model that would create a better story for Kafka's EOS writes.
>>
>> There would have to be a checkpoint-completed callback the DoFn can
>> register with the Runner. Does not seem applicable for most Runners
>> though.
>>
>> > This is true, however isn't it already true for such uses of Flink?
>>
>> Yes, that's correct. In the case of Kafka, Flink can offload the
>> buffering but for the general case, idempotent writes are only possible
>> if we buffer data until the checkpoint is completed.
>>
>> On 04.03.19 17:45, Reuven Lax wrote:
>> >
>> >
>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels > > > wrote:
>> >
>> >  > Can we do 2? I seem to remember that we had trouble in some cases
>> > (e..g in the BigQuery case, there was no obvious way to create a
>> > deterministic id, which is why we went for a random number followed
>> > by a reshuffle). Also remember that the user ParDo that is producing
>> > data to the sink is not guaranteed to be deterministic; the Beam
>> > model allows for non-deterministic transforms.
>> >
>> > I believe we could use something like the worker id to make it
>> > deterministic, though the worker id can change after a restart. We
>> > could
>> > persist it in Flink's operator state. I do not know if we can come
>> up
>> > with a Runner-independent solution.
>> >
>> >
>> > If we did this, we would break it on runners that don't have a concept
>> > of a stable worker id :( The Dataflow runner can load balance work at
>> > any time (including moving work around between workers).
>> >
>> >
>> >  > I'm not quite sure I understand. If a ParDo is marked with
>> > RequiresStableInput, can't the flink runner buffer the input message
>> > until after the checkpoint is complete and only then deliver it to
>> > the ParDo?
>> >
>> > You're correct. I thought that it could suffice to only buffer
>> during a
>> > checkpoint and otherwise rely on the deterministic execution of the
>> > pipeline and KafkaIO's de-duplication code.
>> >
>> >
>> > Yes, I want to distinguish the KafkaIO case from the general case. It
>> > would be interesting to see if there's something we could add to the
>> > Beam model that would create a better story for Kafka's EOS writes.
>> >
>> >
>> > In any case, emitting only after finalization of checkpoints gives
>> us
>> > guaranteed stable input. It also means that the processing is tight
>> to
>> > the checkpoint interval, the checkpoint duration, and the available
>> > memory.
>> >
>> >
>> > This is true, however isn't it already true for such uses of Flink?
>> >
>> >
>> > On 01.03.19 19:41, Reuven Lax wrote:
>> >  >
>> >  >
>> >  > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
>> > mailto:m...@apache.org>
>> >  > >> wrote:
>> >  >
>> >  > Fully agree. I think we can improve the situation
>> > drastically. For
>> >  > KafkaIO EOS with Flink we need to make these two changes:
>> 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Maximilian Michels
FYI I created a PR for supporting @RequiresStableInput by delaying 
processing until checkpoints complete: 
https://github.com/apache/beam/pull/7991


On 05.03.19 16:49, Reuven Lax wrote:
BTW - as a followup - there is a cost to having a Flink-specific 
override for the Kafka sink. Part of that is test coverage - users who 
write DirectRunner tests for their pipeline will now be using a 
different version of the code than is used on the actual Flink runner. 
It also makes the code less obvious: people who read the KafkaIO code 
will tend not to realize that Flink is running something a bit 
different, and this can lead to confusion.


Reuven

On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax > wrote:


RE: Kenn's suggestion. i think Raghu looked into something that, and
something about it didn't work. I don't remember all the details,
but I think there might have been some subtle problem with it that
wasn't obvious. Doesn't mean that there isn't another way to solve
that issue.

Hopefully we can make that work. Another possibility if we can't is
to do something special for Flink. Beam allows runners to splice out
well-known transforms with their own implementation. Dataflow
already does that for Google Cloud Pub/Sub sources/sinks. The Flink
runner could splice out the Kafka sink with one that uses
Flink-specific functionality.  Ideally this would reuse most of the
existing Kafka code (maybe we could refactor just the EOS part into
something that could be subbed out).

Reuven

On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels mailto:m...@apache.org>> wrote:

 > It would be interesting to see if there's something we could
add to the Beam model that would create a better story for
Kafka's EOS writes.

There would have to be a checkpoint-completed callback the DoFn can
register with the Runner. Does not seem applicable for most
Runners though.

 > This is true, however isn't it already true for such uses of
Flink?

Yes, that's correct. In the case of Kafka, Flink can offload the
buffering but for the general case, idempotent writes are only
possible
if we buffer data until the checkpoint is completed.

On 04.03.19 17:45, Reuven Lax wrote:
 >
 >
 > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >      > Can we do 2? I seem to remember that we had trouble in
some cases
 >     (e..g in the BigQuery case, there was no obvious way to
create a
 >     deterministic id, which is why we went for a random
number followed
 >     by a reshuffle). Also remember that the user ParDo that
is producing
 >     data to the sink is not guaranteed to be deterministic;
the Beam
 >     model allows for non-deterministic transforms.
 >
 >     I believe we could use something like the worker id to
make it
 >     deterministic, though the worker id can change after a
restart. We
 >     could
 >     persist it in Flink's operator state. I do not know if we
can come up
 >     with a Runner-independent solution.
 >
 >
 > If we did this, we would break it on runners that don't have
a concept
 > of a stable worker id :( The Dataflow runner can load balance
work at
 > any time (including moving work around between workers).
 >
 >
 >      > I'm not quite sure I understand. If a ParDo is marked with
 >     RequiresStableInput, can't the flink runner buffer the
input message
 >     until after the checkpoint is complete and only then
deliver it to
 >     the ParDo?
 >
 >     You're correct. I thought that it could suffice to only
buffer during a
 >     checkpoint and otherwise rely on the deterministic
execution of the
 >     pipeline and KafkaIO's de-duplication code.
 >
 >
 > Yes, I want to distinguish the KafkaIO case from the general
case. It
 > would be interesting to see if there's something we could add
to the
 > Beam model that would create a better story for Kafka's EOS
writes.
 >
 >
 >     In any case, emitting only after finalization of
checkpoints gives us
 >     guaranteed stable input. It also means that the
processing is tight to
 >     the checkpoint interval, the checkpoint duration, and the
available
 >     memory.
 >
 >
 > This is true, however isn't it already true for such uses of
Flink?
 >
 >
 >   

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Reuven Lax
BTW - as a followup - there is a cost to having a Flink-specific override
for the Kafka sink. Part of that is test coverage - users who write
DirectRunner tests for their pipeline will now be using a different version
of the code than is used on the actual Flink runner. It also makes the code
less obvious: people who read the KafkaIO code will tend not to realize
that Flink is running something a bit different, and this can lead to
confusion.

Reuven

On Tue, Mar 5, 2019 at 7:46 AM Reuven Lax  wrote:

> RE: Kenn's suggestion. i think Raghu looked into something that, and
> something about it didn't work. I don't remember all the details, but I
> think there might have been some subtle problem with it that wasn't
> obvious. Doesn't mean that there isn't another way to solve that issue.
>
> Hopefully we can make that work. Another possibility if we can't is to do
> something special for Flink. Beam allows runners to splice out well-known
> transforms with their own implementation. Dataflow already does that for
> Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the
> Kafka sink with one that uses Flink-specific functionality.  Ideally this
> would reuse most of the existing Kafka code (maybe we could refactor just
> the EOS part into something that could be subbed out).
>
> Reuven
>
> On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels  wrote:
>
>> > It would be interesting to see if there's something we could add to the
>> Beam model that would create a better story for Kafka's EOS writes.
>>
>> There would have to be a checkpoint-completed callback the DoFn can
>> register with the Runner. Does not seem applicable for most Runners
>> though.
>>
>> > This is true, however isn't it already true for such uses of Flink?
>>
>> Yes, that's correct. In the case of Kafka, Flink can offload the
>> buffering but for the general case, idempotent writes are only possible
>> if we buffer data until the checkpoint is completed.
>>
>> On 04.03.19 17:45, Reuven Lax wrote:
>> >
>> >
>> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels > > > wrote:
>> >
>> >  > Can we do 2? I seem to remember that we had trouble in some cases
>> > (e..g in the BigQuery case, there was no obvious way to create a
>> > deterministic id, which is why we went for a random number followed
>> > by a reshuffle). Also remember that the user ParDo that is producing
>> > data to the sink is not guaranteed to be deterministic; the Beam
>> > model allows for non-deterministic transforms.
>> >
>> > I believe we could use something like the worker id to make it
>> > deterministic, though the worker id can change after a restart. We
>> > could
>> > persist it in Flink's operator state. I do not know if we can come
>> up
>> > with a Runner-independent solution.
>> >
>> >
>> > If we did this, we would break it on runners that don't have a concept
>> > of a stable worker id :( The Dataflow runner can load balance work at
>> > any time (including moving work around between workers).
>> >
>> >
>> >  > I'm not quite sure I understand. If a ParDo is marked with
>> > RequiresStableInput, can't the flink runner buffer the input message
>> > until after the checkpoint is complete and only then deliver it to
>> > the ParDo?
>> >
>> > You're correct. I thought that it could suffice to only buffer
>> during a
>> > checkpoint and otherwise rely on the deterministic execution of the
>> > pipeline and KafkaIO's de-duplication code.
>> >
>> >
>> > Yes, I want to distinguish the KafkaIO case from the general case. It
>> > would be interesting to see if there's something we could add to the
>> > Beam model that would create a better story for Kafka's EOS writes.
>> >
>> >
>> > In any case, emitting only after finalization of checkpoints gives
>> us
>> > guaranteed stable input. It also means that the processing is tight
>> to
>> > the checkpoint interval, the checkpoint duration, and the available
>> > memory.
>> >
>> >
>> > This is true, however isn't it already true for such uses of Flink?
>> >
>> >
>> > On 01.03.19 19:41, Reuven Lax wrote:
>> >  >
>> >  >
>> >  > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
>> > mailto:m...@apache.org>
>> >  > >> wrote:
>> >  >
>> >  > Fully agree. I think we can improve the situation
>> > drastically. For
>> >  > KafkaIO EOS with Flink we need to make these two changes:
>> >  >
>> >  > 1) Introduce buffering while the checkpoint is being taken
>> >  > 2) Replace the random shard id assignment with something
>> > deterministic
>> >  >
>> >  >
>> >  > Can we do 2? I seem to remember that we had trouble in some cases
>> > (e..g
>> >  > in the BigQuery case, there was no obvious way to create a
>> > deterministic
>> >  > id, which is why we went for a random 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Reuven Lax
RE: Kenn's suggestion. i think Raghu looked into something that, and
something about it didn't work. I don't remember all the details, but I
think there might have been some subtle problem with it that wasn't
obvious. Doesn't mean that there isn't another way to solve that issue.

Hopefully we can make that work. Another possibility if we can't is to do
something special for Flink. Beam allows runners to splice out well-known
transforms with their own implementation. Dataflow already does that for
Google Cloud Pub/Sub sources/sinks. The Flink runner could splice out the
Kafka sink with one that uses Flink-specific functionality.  Ideally this
would reuse most of the existing Kafka code (maybe we could refactor just
the EOS part into something that could be subbed out).

Reuven

On Tue, Mar 5, 2019 at 2:53 AM Maximilian Michels  wrote:

> > It would be interesting to see if there's something we could add to the
> Beam model that would create a better story for Kafka's EOS writes.
>
> There would have to be a checkpoint-completed callback the DoFn can
> register with the Runner. Does not seem applicable for most Runners though.
>
> > This is true, however isn't it already true for such uses of Flink?
>
> Yes, that's correct. In the case of Kafka, Flink can offload the
> buffering but for the general case, idempotent writes are only possible
> if we buffer data until the checkpoint is completed.
>
> On 04.03.19 17:45, Reuven Lax wrote:
> >
> >
> > On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels  > > wrote:
> >
> >  > Can we do 2? I seem to remember that we had trouble in some cases
> > (e..g in the BigQuery case, there was no obvious way to create a
> > deterministic id, which is why we went for a random number followed
> > by a reshuffle). Also remember that the user ParDo that is producing
> > data to the sink is not guaranteed to be deterministic; the Beam
> > model allows for non-deterministic transforms.
> >
> > I believe we could use something like the worker id to make it
> > deterministic, though the worker id can change after a restart. We
> > could
> > persist it in Flink's operator state. I do not know if we can come up
> > with a Runner-independent solution.
> >
> >
> > If we did this, we would break it on runners that don't have a concept
> > of a stable worker id :( The Dataflow runner can load balance work at
> > any time (including moving work around between workers).
> >
> >
> >  > I'm not quite sure I understand. If a ParDo is marked with
> > RequiresStableInput, can't the flink runner buffer the input message
> > until after the checkpoint is complete and only then deliver it to
> > the ParDo?
> >
> > You're correct. I thought that it could suffice to only buffer
> during a
> > checkpoint and otherwise rely on the deterministic execution of the
> > pipeline and KafkaIO's de-duplication code.
> >
> >
> > Yes, I want to distinguish the KafkaIO case from the general case. It
> > would be interesting to see if there's something we could add to the
> > Beam model that would create a better story for Kafka's EOS writes.
> >
> >
> > In any case, emitting only after finalization of checkpoints gives us
> > guaranteed stable input. It also means that the processing is tight
> to
> > the checkpoint interval, the checkpoint duration, and the available
> > memory.
> >
> >
> > This is true, however isn't it already true for such uses of Flink?
> >
> >
> > On 01.03.19 19:41, Reuven Lax wrote:
> >  >
> >  >
> >  > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
> > mailto:m...@apache.org>
> >  > >> wrote:
> >  >
> >  > Fully agree. I think we can improve the situation
> > drastically. For
> >  > KafkaIO EOS with Flink we need to make these two changes:
> >  >
> >  > 1) Introduce buffering while the checkpoint is being taken
> >  > 2) Replace the random shard id assignment with something
> > deterministic
> >  >
> >  >
> >  > Can we do 2? I seem to remember that we had trouble in some cases
> > (e..g
> >  > in the BigQuery case, there was no obvious way to create a
> > deterministic
> >  > id, which is why we went for a random number followed by a
> > reshuffle).
> >  > Also remember that the user ParDo that is producing data to the
> > sink is
> >  > not guaranteed to be deterministic; the Beam model allows for
> >  > non-deterministic transforms.
> >  >
> >  >
> >  > However, we won't be able to provide full compatibility with
> >  > RequiresStableInput because Flink only guarantees stable
> > input after a
> >  > checkpoint. RequiresStableInput requires input at any point
> > in time to
> >  > be stable.
> >  >
> >  >
> >  > I'm not quite sure I understand. If a 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Maximilian Michels

Does Kafka garbage collect this eventually in the case where you crash and 
start again  with a different transaction identifier?


Old transactions eventually time out. I believe there is a Kafka 
configuration setting and can be problematic with long checkpoints.



For Kafka, can you get the 2PC behavior like this: Upstream step: open a 
transaction, write a bunch of stuff to it (let Kafka do the buffering) and emit 
a transaction identifier. Downstream @RequiresStableInput step: close 
transaction. Again, I may be totally missing something, but I think that this 
has identical characteristics:


Nice, that should work. It offloads the buffering to Kafka similar to 
how the native Flink KafkaProducer works.


-Max

On 04.03.19 18:31, Kenneth Knowles wrote:



On Mon, Mar 4, 2019 at 9:18 AM Reuven Lax > wrote:




On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles mailto:k...@google.com>> wrote:



On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

 > If you randomly generate shard ids, buffer those until
finalization, finalize a checkpoint so that you never need
to re-run that generation, isn't the result stable from that
point onwards?

Yes, you're right :) For @RequiresStableInput we will always
have to
buffer and emit only after a finalized checkpoint.

2PC is the better model for Flink, at least in the case of
Kafka because
it can offload the buffering to Kafka via its transactions.
RequiresStableInput is a more general solution and it is
feasible to
support it in the Flink Runner. However, we have to make
sure that
checkpoints are taken frequently to avoid too much memory
pressure. 



It would be nice to also support 2PC in Beam, i.e. the
Runner could
choose to either buffer/materialize input or do a 2PC, but
it would also
break the purity of the existing model.


Still digging in to details. I think the "generate random shard
ids & buffer" is a tradition but more specific to BigQueryIO or
FileIO styles. It doesn't have to be done that way if the target
system has special support like Kafka does.

For Kafka, can you get the 2PC behavior like this: Upstream
step: open a transaction, write a bunch of stuff to it (let
Kafka do the buffering) and emit a transaction identifier.
Downstream @RequiresStableInput step: close transaction. Again,
I may be totally missing something, but I think that this has
identical characteristics:


Does Kafka garbage collect this eventually in the case where you
crash and start again  with a different transaction identifier?


I believe that is what I read on the page about Flink's Kafka 2PC, 
though I cannot find it any more. What would the alternative be for 
Kafka? You always have to be ready for a client that goes away.


Kenn


  - Kafka does the buffering
  - checkpoint finalization is the driver of latency
  - failure before checkpoint finalization means the old
transaction sits around and times out eventually
  - failure after checkpoint finalization causes retry with the
same transaction identifier

Kenn


On 01.03.19 19:42, Kenneth Knowles wrote:
 > I think I am fundamentally misunderstanding checkpointing
in Flink.
 >
 > If you randomly generate shard ids, buffer those until
finalization,
 > finalize a checkpoint so that you never need to re-run
that generation,
 > isn't the result stable from that point onwards?
 >
 > Kenn
 >
 > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >     Fully agree. I think we can improve the situation
drastically. For
 >     KafkaIO EOS with Flink we need to make these two changes:
 >
 >     1) Introduce buffering while the checkpoint is being
taken
 >     2) Replace the random shard id assignment with
something deterministic
 >
 >     However, we won't be able to provide full
compatibility with
 >     RequiresStableInput because Flink only guarantees
stable input after a
 >     checkpoint. RequiresStableInput requires input at any
point in time to
 >     be stable. IMHO the only way to achieve that is
materializing output
 >     which Flink does not currently support.
 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-05 Thread Maximilian Michels

It would be interesting to see if there's something we could add to the Beam 
model that would create a better story for Kafka's EOS writes.


There would have to be a checkpoint-completed callback the DoFn can 
register with the Runner. Does not seem applicable for most Runners though.



This is true, however isn't it already true for such uses of Flink?


Yes, that's correct. In the case of Kafka, Flink can offload the 
buffering but for the general case, idempotent writes are only possible 
if we buffer data until the checkpoint is completed.


On 04.03.19 17:45, Reuven Lax wrote:



On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels > wrote:


 > Can we do 2? I seem to remember that we had trouble in some cases
(e..g in the BigQuery case, there was no obvious way to create a
deterministic id, which is why we went for a random number followed
by a reshuffle). Also remember that the user ParDo that is producing
data to the sink is not guaranteed to be deterministic; the Beam
model allows for non-deterministic transforms.

I believe we could use something like the worker id to make it
deterministic, though the worker id can change after a restart. We
could
persist it in Flink's operator state. I do not know if we can come up
with a Runner-independent solution.


If we did this, we would break it on runners that don't have a concept 
of a stable worker id :( The Dataflow runner can load balance work at 
any time (including moving work around between workers).



 > I'm not quite sure I understand. If a ParDo is marked with
RequiresStableInput, can't the flink runner buffer the input message
until after the checkpoint is complete and only then deliver it to
the ParDo?

You're correct. I thought that it could suffice to only buffer during a
checkpoint and otherwise rely on the deterministic execution of the
pipeline and KafkaIO's de-duplication code.


Yes, I want to distinguish the KafkaIO case from the general case. It 
would be interesting to see if there's something we could add to the 
Beam model that would create a better story for Kafka's EOS writes.



In any case, emitting only after finalization of checkpoints gives us
guaranteed stable input. It also means that the processing is tight to
the checkpoint interval, the checkpoint duration, and the available
memory.


This is true, however isn't it already true for such uses of Flink?


On 01.03.19 19:41, Reuven Lax wrote:
 >
 >
 > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >     Fully agree. I think we can improve the situation
drastically. For
 >     KafkaIO EOS with Flink we need to make these two changes:
 >
 >     1) Introduce buffering while the checkpoint is being taken
 >     2) Replace the random shard id assignment with something
deterministic
 >
 >
 > Can we do 2? I seem to remember that we had trouble in some cases
(e..g
 > in the BigQuery case, there was no obvious way to create a
deterministic
 > id, which is why we went for a random number followed by a
reshuffle).
 > Also remember that the user ParDo that is producing data to the
sink is
 > not guaranteed to be deterministic; the Beam model allows for
 > non-deterministic transforms.
 >
 >
 >     However, we won't be able to provide full compatibility with
 >     RequiresStableInput because Flink only guarantees stable
input after a
 >     checkpoint. RequiresStableInput requires input at any point
in time to
 >     be stable.
 >
 >
 > I'm not quite sure I understand. If a ParDo is marked with
 > RequiresStableInput, can't the flink runner buffer the input message
 > until after the checkpoint is complete and only then deliver it
to the
 > ParDo? This adds latency of course, but I'm not sure how else to do
 > things correctly with the Beam model.
 >
 >     IMHO the only way to achieve that is materializing output
 >     which Flink does not currently support.
 >
 >     KafkaIO does not need all the power of RequiresStableInput to
achieve
 >     EOS with Flink, but for the general case I don't see a good
solution at
 >     the moment.
 >
 >     -Max
 >
 >     On 01.03.19 16:45, Reuven Lax wrote:
 >      > Yeah, the person who was working on it originally stopped
working on
 >      > Beam, and nobody else ever finished it. I think it is
important to
 >      > finish though. Many of the existing Sinks are only fully
correct for
 >      > Dataflow today, because they generate either Reshuffle or
 >     GroupByKey to
 >      > ensure input stability before outputting (in many cases
this code
 >     was
 >      > 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-04 Thread Kenneth Knowles
On Mon, Mar 4, 2019 at 9:18 AM Reuven Lax  wrote:

>
>
> On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels  wrote:
>>
>>> > If you randomly generate shard ids, buffer those until finalization,
>>> finalize a checkpoint so that you never need to re-run that generation,
>>> isn't the result stable from that point onwards?
>>>
>>> Yes, you're right :) For @RequiresStableInput we will always have to
>>> buffer and emit only after a finalized checkpoint.
>>>
>>> 2PC is the better model for Flink, at least in the case of Kafka because
>>> it can offload the buffering to Kafka via its transactions.
>>> RequiresStableInput is a more general solution and it is feasible to
>>> support it in the Flink Runner. However, we have to make sure that
>>> checkpoints are taken frequently to avoid too much memory pressure.
>>
>>
>>> It would be nice to also support 2PC in Beam, i.e. the Runner could
>>> choose to either buffer/materialize input or do a 2PC, but it would also
>>> break the purity of the existing model.
>>>
>>
>> Still digging in to details. I think the "generate random shard ids &
>> buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It
>> doesn't have to be done that way if the target system has special support
>> like Kafka does.
>>
>> For Kafka, can you get the 2PC behavior like this: Upstream step: open a
>> transaction, write a bunch of stuff to it (let Kafka do the buffering) and
>> emit a transaction identifier. Downstream @RequiresStableInput step: close
>> transaction. Again, I may be totally missing something, but I think that
>> this has identical characteristics:
>>
>
> Does Kafka garbage collect this eventually in the case where you crash and
> start again  with a different transaction identifier?
>

I believe that is what I read on the page about Flink's Kafka 2PC, though I
cannot find it any more. What would the alternative be for Kafka? You
always have to be ready for a client that goes away.

Kenn


>
>>  - Kafka does the buffering
>>  - checkpoint finalization is the driver of latency
>>  - failure before checkpoint finalization means the old transaction sits
>> around and times out eventually
>>  - failure after checkpoint finalization causes retry with the same
>> transaction identifier
>>
>> Kenn
>>
>>
>>>
>>> On 01.03.19 19:42, Kenneth Knowles wrote:
>>> > I think I am fundamentally misunderstanding checkpointing in Flink.
>>> >
>>> > If you randomly generate shard ids, buffer those until finalization,
>>> > finalize a checkpoint so that you never need to re-run that
>>> generation,
>>> > isn't the result stable from that point onwards?
>>> >
>>> > Kenn
>>> >
>>> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels >> > > wrote:
>>> >
>>> > Fully agree. I think we can improve the situation drastically. For
>>> > KafkaIO EOS with Flink we need to make these two changes:
>>> >
>>> > 1) Introduce buffering while the checkpoint is being taken
>>> > 2) Replace the random shard id assignment with something
>>> deterministic
>>> >
>>> > However, we won't be able to provide full compatibility with
>>> > RequiresStableInput because Flink only guarantees stable input
>>> after a
>>> > checkpoint. RequiresStableInput requires input at any point in
>>> time to
>>> > be stable. IMHO the only way to achieve that is materializing
>>> output
>>> > which Flink does not currently support.
>>> >
>>> > KafkaIO does not need all the power of RequiresStableInput to
>>> achieve
>>> > EOS with Flink, but for the general case I don't see a good
>>> solution at
>>> > the moment.
>>> >
>>> > -Max
>>> >
>>> > On 01.03.19 16:45, Reuven Lax wrote:
>>> >  > Yeah, the person who was working on it originally stopped
>>> working on
>>> >  > Beam, and nobody else ever finished it. I think it is important
>>> to
>>> >  > finish though. Many of the existing Sinks are only fully
>>> correct for
>>> >  > Dataflow today, because they generate either Reshuffle or
>>> > GroupByKey to
>>> >  > ensure input stability before outputting (in many cases this
>>> code
>>> > was
>>> >  > inherited from before Beam existed). On Flink today, these sinks
>>> > might
>>> >  > occasionally produce duplicate output in the case of failures.
>>> >  >
>>> >  > Reuven
>>> >  >
>>> >  > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <
>>> m...@apache.org
>>> > 
>>> >  > >> wrote:
>>> >  >
>>> >  > Circling back to the RequiresStableInput annotation[1]. I've
>>> > done some
>>> >  > protoyping to see how this could be integrated into Flink.
>>> I'm
>>> >  > currently
>>> >  > writing a test based on RequiresStableInput.
>>> >  >
>>> >  > I found out there are already checks in place at the
>>> 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-04 Thread Reuven Lax
On Mon, Mar 4, 2019 at 9:04 AM Kenneth Knowles  wrote:

>
>
> On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels  wrote:
>
>> > If you randomly generate shard ids, buffer those until finalization,
>> finalize a checkpoint so that you never need to re-run that generation,
>> isn't the result stable from that point onwards?
>>
>> Yes, you're right :) For @RequiresStableInput we will always have to
>> buffer and emit only after a finalized checkpoint.
>>
>> 2PC is the better model for Flink, at least in the case of Kafka because
>> it can offload the buffering to Kafka via its transactions.
>> RequiresStableInput is a more general solution and it is feasible to
>> support it in the Flink Runner. However, we have to make sure that
>> checkpoints are taken frequently to avoid too much memory pressure.
>
>
>> It would be nice to also support 2PC in Beam, i.e. the Runner could
>> choose to either buffer/materialize input or do a 2PC, but it would also
>> break the purity of the existing model.
>>
>
> Still digging in to details. I think the "generate random shard ids &
> buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It
> doesn't have to be done that way if the target system has special support
> like Kafka does.
>
> For Kafka, can you get the 2PC behavior like this: Upstream step: open a
> transaction, write a bunch of stuff to it (let Kafka do the buffering) and
> emit a transaction identifier. Downstream @RequiresStableInput step: close
> transaction. Again, I may be totally missing something, but I think that
> this has identical characteristics:
>

Does Kafka garbage collect this eventually in the case where you crash and
start again  with a different transaction identifier?


>  - Kafka does the buffering
>  - checkpoint finalization is the driver of latency
>  - failure before checkpoint finalization means the old transaction sits
> around and times out eventually
>  - failure after checkpoint finalization causes retry with the same
> transaction identifier
>
> Kenn
>
>
>>
>> On 01.03.19 19:42, Kenneth Knowles wrote:
>> > I think I am fundamentally misunderstanding checkpointing in Flink.
>> >
>> > If you randomly generate shard ids, buffer those until finalization,
>> > finalize a checkpoint so that you never need to re-run that generation,
>> > isn't the result stable from that point onwards?
>> >
>> > Kenn
>> >
>> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels > > > wrote:
>> >
>> > Fully agree. I think we can improve the situation drastically. For
>> > KafkaIO EOS with Flink we need to make these two changes:
>> >
>> > 1) Introduce buffering while the checkpoint is being taken
>> > 2) Replace the random shard id assignment with something
>> deterministic
>> >
>> > However, we won't be able to provide full compatibility with
>> > RequiresStableInput because Flink only guarantees stable input
>> after a
>> > checkpoint. RequiresStableInput requires input at any point in time
>> to
>> > be stable. IMHO the only way to achieve that is materializing output
>> > which Flink does not currently support.
>> >
>> > KafkaIO does not need all the power of RequiresStableInput to
>> achieve
>> > EOS with Flink, but for the general case I don't see a good
>> solution at
>> > the moment.
>> >
>> > -Max
>> >
>> > On 01.03.19 16:45, Reuven Lax wrote:
>> >  > Yeah, the person who was working on it originally stopped
>> working on
>> >  > Beam, and nobody else ever finished it. I think it is important
>> to
>> >  > finish though. Many of the existing Sinks are only fully correct
>> for
>> >  > Dataflow today, because they generate either Reshuffle or
>> > GroupByKey to
>> >  > ensure input stability before outputting (in many cases this code
>> > was
>> >  > inherited from before Beam existed). On Flink today, these sinks
>> > might
>> >  > occasionally produce duplicate output in the case of failures.
>> >  >
>> >  > Reuven
>> >  >
>> >  > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels <
>> m...@apache.org
>> > 
>> >  > >> wrote:
>> >  >
>> >  > Circling back to the RequiresStableInput annotation[1]. I've
>> > done some
>> >  > protoyping to see how this could be integrated into Flink.
>> I'm
>> >  > currently
>> >  > writing a test based on RequiresStableInput.
>> >  >
>> >  > I found out there are already checks in place at the Runners
>> to
>> >  > throw in
>> >  > case transforms use RequiresStableInput and its not
>> > supported. However,
>> >  > not a single transform actually uses the annotation.
>> >  >
>> >  > It seems that the effort stopped at some point? Would it make
>> > sense to
>> >  > start annotating KafkaExactlyOnceSink with
>> > 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-04 Thread Kenneth Knowles
On Mon, Mar 4, 2019 at 7:16 AM Maximilian Michels  wrote:

> > If you randomly generate shard ids, buffer those until finalization,
> finalize a checkpoint so that you never need to re-run that generation,
> isn't the result stable from that point onwards?
>
> Yes, you're right :) For @RequiresStableInput we will always have to
> buffer and emit only after a finalized checkpoint.
>
> 2PC is the better model for Flink, at least in the case of Kafka because
> it can offload the buffering to Kafka via its transactions.
> RequiresStableInput is a more general solution and it is feasible to
> support it in the Flink Runner. However, we have to make sure that
> checkpoints are taken frequently to avoid too much memory pressure.


> It would be nice to also support 2PC in Beam, i.e. the Runner could
> choose to either buffer/materialize input or do a 2PC, but it would also
> break the purity of the existing model.
>

Still digging in to details. I think the "generate random shard ids &
buffer" is a tradition but more specific to BigQueryIO or FileIO styles. It
doesn't have to be done that way if the target system has special support
like Kafka does.

For Kafka, can you get the 2PC behavior like this: Upstream step: open a
transaction, write a bunch of stuff to it (let Kafka do the buffering) and
emit a transaction identifier. Downstream @RequiresStableInput step: close
transaction. Again, I may be totally missing something, but I think that
this has identical characteristics:

 - Kafka does the buffering
 - checkpoint finalization is the driver of latency
 - failure before checkpoint finalization means the old transaction sits
around and times out eventually
 - failure after checkpoint finalization causes retry with the same
transaction identifier

Kenn


>
> On 01.03.19 19:42, Kenneth Knowles wrote:
> > I think I am fundamentally misunderstanding checkpointing in Flink.
> >
> > If you randomly generate shard ids, buffer those until finalization,
> > finalize a checkpoint so that you never need to re-run that generation,
> > isn't the result stable from that point onwards?
> >
> > Kenn
> >
> > On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels  > > wrote:
> >
> > Fully agree. I think we can improve the situation drastically. For
> > KafkaIO EOS with Flink we need to make these two changes:
> >
> > 1) Introduce buffering while the checkpoint is being taken
> > 2) Replace the random shard id assignment with something
> deterministic
> >
> > However, we won't be able to provide full compatibility with
> > RequiresStableInput because Flink only guarantees stable input after
> a
> > checkpoint. RequiresStableInput requires input at any point in time
> to
> > be stable. IMHO the only way to achieve that is materializing output
> > which Flink does not currently support.
> >
> > KafkaIO does not need all the power of RequiresStableInput to achieve
> > EOS with Flink, but for the general case I don't see a good solution
> at
> > the moment.
> >
> > -Max
> >
> > On 01.03.19 16:45, Reuven Lax wrote:
> >  > Yeah, the person who was working on it originally stopped working
> on
> >  > Beam, and nobody else ever finished it. I think it is important to
> >  > finish though. Many of the existing Sinks are only fully correct
> for
> >  > Dataflow today, because they generate either Reshuffle or
> > GroupByKey to
> >  > ensure input stability before outputting (in many cases this code
> > was
> >  > inherited from before Beam existed). On Flink today, these sinks
> > might
> >  > occasionally produce duplicate output in the case of failures.
> >  >
> >  > Reuven
> >  >
> >  > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > 
> >  > >> wrote:
> >  >
> >  > Circling back to the RequiresStableInput annotation[1]. I've
> > done some
> >  > protoyping to see how this could be integrated into Flink. I'm
> >  > currently
> >  > writing a test based on RequiresStableInput.
> >  >
> >  > I found out there are already checks in place at the Runners
> to
> >  > throw in
> >  > case transforms use RequiresStableInput and its not
> > supported. However,
> >  > not a single transform actually uses the annotation.
> >  >
> >  > It seems that the effort stopped at some point? Would it make
> > sense to
> >  > start annotating KafkaExactlyOnceSink with
> > @RequiresStableInput? We
> >  > could then get rid of the whitelist.
> >  >
> >  > -Max
> >  >
> >  > [1]
> >  >
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >  >
> >  >
> >  >
> >  > On 01.03.19 14:28, Maximilian Michels wrote:
> >  >  > Just realized that 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-04 Thread Reuven Lax
On Mon, Mar 4, 2019 at 6:55 AM Maximilian Michels  wrote:

> > Can we do 2? I seem to remember that we had trouble in some cases (e..g
> in the BigQuery case, there was no obvious way to create a deterministic
> id, which is why we went for a random number followed by a reshuffle). Also
> remember that the user ParDo that is producing data to the sink is not
> guaranteed to be deterministic; the Beam model allows for non-deterministic
> transforms.
>
> I believe we could use something like the worker id to make it
> deterministic, though the worker id can change after a restart. We could
> persist it in Flink's operator state. I do not know if we can come up
> with a Runner-independent solution.
>

If we did this, we would break it on runners that don't have a concept of a
stable worker id :( The Dataflow runner can load balance work at any time
(including moving work around between workers).

>
> > I'm not quite sure I understand. If a ParDo is marked with
> RequiresStableInput, can't the flink runner buffer the input message until
> after the checkpoint is complete and only then deliver it to the ParDo?
>
> You're correct. I thought that it could suffice to only buffer during a
> checkpoint and otherwise rely on the deterministic execution of the
> pipeline and KafkaIO's de-duplication code.
>

Yes, I want to distinguish the KafkaIO case from the general case. It would
be interesting to see if there's something we could add to the Beam model
that would create a better story for Kafka's EOS writes.

>
> In any case, emitting only after finalization of checkpoints gives us
> guaranteed stable input. It also means that the processing is tight to
> the checkpoint interval, the checkpoint duration, and the available memory.
>

This is true, however isn't it already true for such uses of Flink?


>
> On 01.03.19 19:41, Reuven Lax wrote:
> >
> >
> > On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels  > > wrote:
> >
> > Fully agree. I think we can improve the situation drastically. For
> > KafkaIO EOS with Flink we need to make these two changes:
> >
> > 1) Introduce buffering while the checkpoint is being taken
> > 2) Replace the random shard id assignment with something
> deterministic
> >
> >
> > Can we do 2? I seem to remember that we had trouble in some cases (e..g
> > in the BigQuery case, there was no obvious way to create a deterministic
> > id, which is why we went for a random number followed by a reshuffle).
> > Also remember that the user ParDo that is producing data to the sink is
> > not guaranteed to be deterministic; the Beam model allows for
> > non-deterministic transforms.
> >
> >
> > However, we won't be able to provide full compatibility with
> > RequiresStableInput because Flink only guarantees stable input after
> a
> > checkpoint. RequiresStableInput requires input at any point in time
> to
> > be stable.
> >
> >
> > I'm not quite sure I understand. If a ParDo is marked with
> > RequiresStableInput, can't the flink runner buffer the input message
> > until after the checkpoint is complete and only then deliver it to the
> > ParDo? This adds latency of course, but I'm not sure how else to do
> > things correctly with the Beam model.
> >
> > IMHO the only way to achieve that is materializing output
> > which Flink does not currently support.
> >
> > KafkaIO does not need all the power of RequiresStableInput to achieve
> > EOS with Flink, but for the general case I don't see a good solution
> at
> > the moment.
> >
> > -Max
> >
> > On 01.03.19 16:45, Reuven Lax wrote:
> >  > Yeah, the person who was working on it originally stopped working
> on
> >  > Beam, and nobody else ever finished it. I think it is important to
> >  > finish though. Many of the existing Sinks are only fully correct
> for
> >  > Dataflow today, because they generate either Reshuffle or
> > GroupByKey to
> >  > ensure input stability before outputting (in many cases this code
> > was
> >  > inherited from before Beam existed). On Flink today, these sinks
> > might
> >  > occasionally produce duplicate output in the case of failures.
> >  >
> >  > Reuven
> >  >
> >  > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > 
> >  > >> wrote:
> >  >
> >  > Circling back to the RequiresStableInput annotation[1]. I've
> > done some
> >  > protoyping to see how this could be integrated into Flink. I'm
> >  > currently
> >  > writing a test based on RequiresStableInput.
> >  >
> >  > I found out there are already checks in place at the Runners
> to
> >  > throw in
> >  > case transforms use RequiresStableInput and its not
> > supported. However,
> >  > not a single transform actually uses the annotation.
> >  >
> >  > It 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-04 Thread Maximilian Michels

If you randomly generate shard ids, buffer those until finalization, finalize a 
checkpoint so that you never need to re-run that generation, isn't the result 
stable from that point onwards?


Yes, you're right :) For @RequiresStableInput we will always have to 
buffer and emit only after a finalized checkpoint.


2PC is the better model for Flink, at least in the case of Kafka because 
it can offload the buffering to Kafka via its transactions. 
RequiresStableInput is a more general solution and it is feasible to 
support it in the Flink Runner. However, we have to make sure that 
checkpoints are taken frequently to avoid too much memory pressure.


It would be nice to also support 2PC in Beam, i.e. the Runner could 
choose to either buffer/materialize input or do a 2PC, but it would also 
break the purity of the existing model.


On 01.03.19 19:42, Kenneth Knowles wrote:

I think I am fundamentally misunderstanding checkpointing in Flink.

If you randomly generate shard ids, buffer those until finalization, 
finalize a checkpoint so that you never need to re-run that generation, 
isn't the result stable from that point onwards?


Kenn

On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels > wrote:


Fully agree. I think we can improve the situation drastically. For
KafkaIO EOS with Flink we need to make these two changes:

1) Introduce buffering while the checkpoint is being taken
2) Replace the random shard id assignment with something deterministic

However, we won't be able to provide full compatibility with
RequiresStableInput because Flink only guarantees stable input after a
checkpoint. RequiresStableInput requires input at any point in time to
be stable. IMHO the only way to achieve that is materializing output
which Flink does not currently support.

KafkaIO does not need all the power of RequiresStableInput to achieve
EOS with Flink, but for the general case I don't see a good solution at
the moment.

-Max

On 01.03.19 16:45, Reuven Lax wrote:
 > Yeah, the person who was working on it originally stopped working on
 > Beam, and nobody else ever finished it. I think it is important to
 > finish though. Many of the existing Sinks are only fully correct for
 > Dataflow today, because they generate either Reshuffle or
GroupByKey to
 > ensure input stability before outputting (in many cases this code
was
 > inherited from before Beam existed). On Flink today, these sinks
might
 > occasionally produce duplicate output in the case of failures.
 >
 > Reuven
 >
 > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels mailto:m...@apache.org>
 > >> wrote:
 >
 >     Circling back to the RequiresStableInput annotation[1]. I've
done some
 >     protoyping to see how this could be integrated into Flink. I'm
 >     currently
 >     writing a test based on RequiresStableInput.
 >
 >     I found out there are already checks in place at the Runners to
 >     throw in
 >     case transforms use RequiresStableInput and its not
supported. However,
 >     not a single transform actually uses the annotation.
 >
 >     It seems that the effort stopped at some point? Would it make
sense to
 >     start annotating KafkaExactlyOnceSink with
@RequiresStableInput? We
 >     could then get rid of the whitelist.
 >
 >     -Max
 >
 >     [1]
 >

https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
 >
 >
 >
 >     On 01.03.19 14:28, Maximilian Michels wrote:
 >      > Just realized that transactions do not spawn multiple
elements in
 >      > KafkaExactlyOnceSink. So the proposed solution to stop
processing
 >      > elements while a snapshot is pending would work.
 >      >
 >      > It is certainly not optimal in terms of performance for
Flink and
 >     poses
 >      > problems when checkpoints take long to complete, but it
would be
 >      > worthwhile to implement this to make use of the EOS feature.
 >      >
 >      > Thanks,
 >      > Max
 >      >
 >      > On 01.03.19 12:23, Maximilian Michels wrote:
 >      >> Thanks you for the prompt replies. It's great to see that
there is
 >      >> good understanding of how EOS in Flink works.
 >      >>
 >      >>> This is exactly what RequiresStableInput is supposed to
do. On the
 >      >>> Flink runner, this would be implemented by delaying
processing
 >     until
 >      >>> the current checkpoint is done.
 >      >>
 >      >> I don't think that works because we have no control over
the Kafka
 >      >> transactions. Imagine:
 >      >>
 >      >> 1) ExactlyOnceWriter writes records to Kafka and commits,
then
 >     

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-04 Thread Maximilian Michels

Can we do 2? I seem to remember that we had trouble in some cases (e..g in the 
BigQuery case, there was no obvious way to create a deterministic id, which is 
why we went for a random number followed by a reshuffle). Also remember that 
the user ParDo that is producing data to the sink is not guaranteed to be 
deterministic; the Beam model allows for non-deterministic transforms.


I believe we could use something like the worker id to make it 
deterministic, though the worker id can change after a restart. We could 
persist it in Flink's operator state. I do not know if we can come up 
with a Runner-independent solution.



I'm not quite sure I understand. If a ParDo is marked with RequiresStableInput, 
can't the flink runner buffer the input message until after the checkpoint is 
complete and only then deliver it to the ParDo?


You're correct. I thought that it could suffice to only buffer during a 
checkpoint and otherwise rely on the deterministic execution of the 
pipeline and KafkaIO's de-duplication code.


In any case, emitting only after finalization of checkpoints gives us 
guaranteed stable input. It also means that the processing is tight to 
the checkpoint interval, the checkpoint duration, and the available memory.


On 01.03.19 19:41, Reuven Lax wrote:



On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels > wrote:


Fully agree. I think we can improve the situation drastically. For
KafkaIO EOS with Flink we need to make these two changes:

1) Introduce buffering while the checkpoint is being taken
2) Replace the random shard id assignment with something deterministic


Can we do 2? I seem to remember that we had trouble in some cases (e..g 
in the BigQuery case, there was no obvious way to create a deterministic 
id, which is why we went for a random number followed by a reshuffle). 
Also remember that the user ParDo that is producing data to the sink is 
not guaranteed to be deterministic; the Beam model allows for 
non-deterministic transforms.



However, we won't be able to provide full compatibility with
RequiresStableInput because Flink only guarantees stable input after a
checkpoint. RequiresStableInput requires input at any point in time to
be stable. 



I'm not quite sure I understand. If a ParDo is marked with 
RequiresStableInput, can't the flink runner buffer the input message 
until after the checkpoint is complete and only then deliver it to the 
ParDo? This adds latency of course, but I'm not sure how else to do 
things correctly with the Beam model.


IMHO the only way to achieve that is materializing output
which Flink does not currently support.

KafkaIO does not need all the power of RequiresStableInput to achieve
EOS with Flink, but for the general case I don't see a good solution at
the moment.

-Max

On 01.03.19 16:45, Reuven Lax wrote:
 > Yeah, the person who was working on it originally stopped working on
 > Beam, and nobody else ever finished it. I think it is important to
 > finish though. Many of the existing Sinks are only fully correct for
 > Dataflow today, because they generate either Reshuffle or
GroupByKey to
 > ensure input stability before outputting (in many cases this code
was
 > inherited from before Beam existed). On Flink today, these sinks
might
 > occasionally produce duplicate output in the case of failures.
 >
 > Reuven
 >
 > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels mailto:m...@apache.org>
 > >> wrote:
 >
 >     Circling back to the RequiresStableInput annotation[1]. I've
done some
 >     protoyping to see how this could be integrated into Flink. I'm
 >     currently
 >     writing a test based on RequiresStableInput.
 >
 >     I found out there are already checks in place at the Runners to
 >     throw in
 >     case transforms use RequiresStableInput and its not
supported. However,
 >     not a single transform actually uses the annotation.
 >
 >     It seems that the effort stopped at some point? Would it make
sense to
 >     start annotating KafkaExactlyOnceSink with
@RequiresStableInput? We
 >     could then get rid of the whitelist.
 >
 >     -Max
 >
 >     [1]
 >

https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
 >
 >
 >
 >     On 01.03.19 14:28, Maximilian Michels wrote:
 >      > Just realized that transactions do not spawn multiple
elements in
 >      > KafkaExactlyOnceSink. So the proposed solution to stop
processing
 >      > elements while a snapshot is pending would work.
 >      >
 >      > It is certainly not optimal in terms of performance for
Flink and
 >     poses
 >      > problems when checkpoints take long to complete, but it
would be
 > 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  wrote:

> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:
>
>> I'm not sure what a hard fail is. I probably have a shallow
>> understanding, but doesn't @RequiresStableInput work for 2PC? The
>> preCommit() phase should establish the transaction and commit() is not
>> called until after checkpoint finalization. Can you describe the way that
>> it does not work a little bit more?
>>
>
> - preCommit() is called before checkpoint. Kafka EOS in Flink starts the
> transaction before this and makes sure it flushes all records in
> preCommit(). So far good.
> - commit is called after checkpoint is persisted. Now, imagine commit()
> fails for some reason. There is no option to rerun the 1st phase to write
> the records again in a new transaction. This is a hard failure for the the
> job. In practice Flink might attempt to commit again (not sure how many
> times), which is likely to fail and eventually results in job failure.
>

Btw, just to clarify the above failure case is not related to Beam or
@RequiresStableInput.
It is how 2PC in Flink used by it EOS Kafka sync is designed. preCommit()
can't be rerun past the checkpoint, so commit() phase needs to be self
sufficient to handle failures and reruns.



>
>
>> Kenn
>>
>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>>
>>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles 
>>> wrote:
>>>
 I believe the way you would implement the logic behind Flink's
 KafkaProducer would be to have two steps:

 1. Start transaction
 2. @RequiresStableInput Close transaction

>>>
>>> I see.  What happens if closing the transaction fails in (2)? Flink's
>>> 2PC requires that commit() should never hard fail once preCommit()
>>> succeeds. I think that is cost of not having an extra shuffle. It is
>>> alright since this policy has worked well for Flink so far.
>>>
>>> Overall, it will be great to have @RequiresStableInput support in Flink
>>> runner.
>>>
>>> Raghu.
>>>
 The FlinkRunner would need to insert the "wait until checkpoint
 finalization" logic wherever it sees @RequiresStableInput, which is already
 what it would have to do.

 This matches the KafkaProducer's logic - delay closing the transaction
 until checkpoint finalization. This answers my main question, which is "is
 @RequiresStableInput expressive enough to allow Beam-on-Flink to have
 exactly once behavior with the same performance characteristics as native
 Flink checkpoint finalization?"

 Kenn

 [1] https://github.com/apache/beam/pull/7955

 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:

>
>
> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi 
> wrote:
>
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>
>>
>>
>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>> wrote:
>>
>>> Hi,
>>>
>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>> exactly-once
>>> semantics (EOS). I think it is questionable to exclude Runners from
>>> inside a transform, but I see that the intention was to save users
>>> from
>>> surprises.
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's
>>> native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>>
>>
>>
>> When we discussed this in Aug 2017, the understanding was that 2
>> Phase commit utility in Flink used to implement Flink's Kafka EOS could 
>> not
>> be implemented in Beam's context.
>> See  this message
>>  in
>> that dev thread. Has anything changed in this regard? The whole thread is
>> relevant to this topic and worth going through.
>>
>
> I think that TwoPhaseCommit utility class wouldn't work. The Flink
> runner would probably want to directly use notifySnapshotComplete in order
> to implement @RequiresStableInput.
>
>>
>>
>>>
>>> A checkpoint is realized by sending barriers through all channels
>>> starting from the source until reaching all sinks. Every operator
>>> persists its state once it has received a barrier on all its input
>>> channels, it then forwards it to the downstream operators.
>>>
>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>
>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>> GroupByKey -> ExactlyOnceWriter
>>>
>>> As I understood, Spark or Dataflow use the GroupByKey stages to
>>> persist
>>> the input. That is not required in Flink to be able to take a
>>> consistent
>>> snapshot of the 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Raghu Angadi
Great. For now, we could write new KafkaIO EO sink without any connection
to current design that would work correctly with Flink Runner. After that I
think we can figure out how to reconcile the two.

On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels  wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable. IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > > wrote:
> >
> > Circling back to the RequiresStableInput annotation[1]. I've done
> some
> > protoyping to see how this could be integrated into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place at the Runners to
> > throw in
> > case transforms use RequiresStableInput and its not supported.
> However,
> > not a single transform actually uses the annotation.
> >
> > It seems that the effort stopped at some point? Would it make sense
> to
> > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> >  > Just realized that transactions do not spawn multiple elements in
> >  > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >  > elements while a snapshot is pending would work.
> >  >
> >  > It is certainly not optimal in terms of performance for Flink and
> > poses
> >  > problems when checkpoints take long to complete, but it would be
> >  > worthwhile to implement this to make use of the EOS feature.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 01.03.19 12:23, Maximilian Michels wrote:
> >  >> Thanks you for the prompt replies. It's great to see that there
> is
> >  >> good understanding of how EOS in Flink works.
> >  >>
> >  >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >  >>> Flink runner, this would be implemented by delaying processing
> > until
> >  >>> the current checkpoint is done.
> >  >>
> >  >> I don't think that works because we have no control over the
> Kafka
> >  >> transactions. Imagine:
> >  >>
> >  >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> > starts
> >  >> a new transaction.
> >  >> 2) Flink checkpoints, delaying the processing of elements, the
> >  >> checkpoint fails.
> >  >> 3) We restore from an old checkpoint and will start writing
> > duplicate
> >  >> data to Kafka. The de-duplication that the sink performs does not
> >  >> help, especially because the random shards ids might be assigned
> >  >> differently.
> >  >>
> >  >> IMHO we have to have control over commit to be able to provide
> EOS.
> >  >>
> >  >>> When we discussed this in Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we change the interface.
> >  >>
> >  >>> I don't see how SDF solves this problem..
> >  >>
> >  >> SDF has a checkpoint method which the Runner can call, but I
> think
> >  >> that you are right, that the above problem would be the same.
> >  >>
> >  >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
> > think
> >  >>> that will help many future exactly-once sinks.. and address
> >  >>> fundamental incompatibility between Beam model and 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Kenneth Knowles
I think I am fundamentally misunderstanding checkpointing in Flink.

If you randomly generate shard ids, buffer those until finalization,
finalize a checkpoint so that you never need to re-run that generation,
isn't the result stable from that point onwards?

Kenn

On Fri, Mar 1, 2019 at 10:30 AM Maximilian Michels  wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable. IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > > wrote:
> >
> > Circling back to the RequiresStableInput annotation[1]. I've done
> some
> > protoyping to see how this could be integrated into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place at the Runners to
> > throw in
> > case transforms use RequiresStableInput and its not supported.
> However,
> > not a single transform actually uses the annotation.
> >
> > It seems that the effort stopped at some point? Would it make sense
> to
> > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> >  > Just realized that transactions do not spawn multiple elements in
> >  > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >  > elements while a snapshot is pending would work.
> >  >
> >  > It is certainly not optimal in terms of performance for Flink and
> > poses
> >  > problems when checkpoints take long to complete, but it would be
> >  > worthwhile to implement this to make use of the EOS feature.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 01.03.19 12:23, Maximilian Michels wrote:
> >  >> Thanks you for the prompt replies. It's great to see that there
> is
> >  >> good understanding of how EOS in Flink works.
> >  >>
> >  >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >  >>> Flink runner, this would be implemented by delaying processing
> > until
> >  >>> the current checkpoint is done.
> >  >>
> >  >> I don't think that works because we have no control over the
> Kafka
> >  >> transactions. Imagine:
> >  >>
> >  >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> > starts
> >  >> a new transaction.
> >  >> 2) Flink checkpoints, delaying the processing of elements, the
> >  >> checkpoint fails.
> >  >> 3) We restore from an old checkpoint and will start writing
> > duplicate
> >  >> data to Kafka. The de-duplication that the sink performs does not
> >  >> help, especially because the random shards ids might be assigned
> >  >> differently.
> >  >>
> >  >> IMHO we have to have control over commit to be able to provide
> EOS.
> >  >>
> >  >>> When we discussed this in Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we change the interface.
> >  >>
> >  >>> I don't see how SDF solves this problem..
> >  >>
> >  >> SDF has a checkpoint method which the Runner can call, but I
> think
> >  >> that you are right, that the above problem would be the same.
> >  >>
> >  >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
> > think
> >  >>> that will help many future exactly-once sinks.. and address
> > 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Reuven Lax
On Fri, Mar 1, 2019 at 10:37 AM Maximilian Michels  wrote:

> Fully agree. I think we can improve the situation drastically. For
> KafkaIO EOS with Flink we need to make these two changes:
>
> 1) Introduce buffering while the checkpoint is being taken
> 2) Replace the random shard id assignment with something deterministic
>

Can we do 2? I seem to remember that we had trouble in some cases (e..g in
the BigQuery case, there was no obvious way to create a deterministic id,
which is why we went for a random number followed by a reshuffle). Also
remember that the user ParDo that is producing data to the sink is not
guaranteed to be deterministic; the Beam model allows for non-deterministic
transforms.


>
> However, we won't be able to provide full compatibility with
> RequiresStableInput because Flink only guarantees stable input after a
> checkpoint. RequiresStableInput requires input at any point in time to
> be stable.


I'm not quite sure I understand. If a ParDo is marked with
RequiresStableInput, can't the flink runner buffer the input message until
after the checkpoint is complete and only then deliver it to the ParDo?
This adds latency of course, but I'm not sure how else to do things
correctly with the Beam model.


> IMHO the only way to achieve that is materializing output
> which Flink does not currently support.
>
> KafkaIO does not need all the power of RequiresStableInput to achieve
> EOS with Flink, but for the general case I don't see a good solution at
> the moment.
>
> -Max
>
> On 01.03.19 16:45, Reuven Lax wrote:
> > Yeah, the person who was working on it originally stopped working on
> > Beam, and nobody else ever finished it. I think it is important to
> > finish though. Many of the existing Sinks are only fully correct for
> > Dataflow today, because they generate either Reshuffle or GroupByKey to
> > ensure input stability before outputting (in many cases this code was
> > inherited from before Beam existed). On Flink today, these sinks might
> > occasionally produce duplicate output in the case of failures.
> >
> > Reuven
> >
> > On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  > > wrote:
> >
> > Circling back to the RequiresStableInput annotation[1]. I've done
> some
> > protoyping to see how this could be integrated into Flink. I'm
> > currently
> > writing a test based on RequiresStableInput.
> >
> > I found out there are already checks in place at the Runners to
> > throw in
> > case transforms use RequiresStableInput and its not supported.
> However,
> > not a single transform actually uses the annotation.
> >
> > It seems that the effort stopped at some point? Would it make sense
> to
> > start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> > could then get rid of the whitelist.
> >
> > -Max
> >
> > [1]
> >
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
> >
> >
> >
> > On 01.03.19 14:28, Maximilian Michels wrote:
> >  > Just realized that transactions do not spawn multiple elements in
> >  > KafkaExactlyOnceSink. So the proposed solution to stop processing
> >  > elements while a snapshot is pending would work.
> >  >
> >  > It is certainly not optimal in terms of performance for Flink and
> > poses
> >  > problems when checkpoints take long to complete, but it would be
> >  > worthwhile to implement this to make use of the EOS feature.
> >  >
> >  > Thanks,
> >  > Max
> >  >
> >  > On 01.03.19 12:23, Maximilian Michels wrote:
> >  >> Thanks you for the prompt replies. It's great to see that there
> is
> >  >> good understanding of how EOS in Flink works.
> >  >>
> >  >>> This is exactly what RequiresStableInput is supposed to do. On
> the
> >  >>> Flink runner, this would be implemented by delaying processing
> > until
> >  >>> the current checkpoint is done.
> >  >>
> >  >> I don't think that works because we have no control over the
> Kafka
> >  >> transactions. Imagine:
> >  >>
> >  >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
> > starts
> >  >> a new transaction.
> >  >> 2) Flink checkpoints, delaying the processing of elements, the
> >  >> checkpoint fails.
> >  >> 3) We restore from an old checkpoint and will start writing
> > duplicate
> >  >> data to Kafka. The de-duplication that the sink performs does not
> >  >> help, especially because the random shards ids might be assigned
> >  >> differently.
> >  >>
> >  >> IMHO we have to have control over commit to be able to provide
> EOS.
> >  >>
> >  >>> When we discussed this in Aug 2017, the understanding was that 2
> >  >>> Phase commit utility in Flink used to implement Flink's Kafka
> EOS
> >  >>> could not be implemented in Beam's context.
> >  >>
> >  >> That's also my understanding, unless we 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Fully agree. I think we can improve the situation drastically. For 
KafkaIO EOS with Flink we need to make these two changes:


1) Introduce buffering while the checkpoint is being taken
2) Replace the random shard id assignment with something deterministic

However, we won't be able to provide full compatibility with 
RequiresStableInput because Flink only guarantees stable input after a 
checkpoint. RequiresStableInput requires input at any point in time to 
be stable. IMHO the only way to achieve that is materializing output 
which Flink does not currently support.


KafkaIO does not need all the power of RequiresStableInput to achieve 
EOS with Flink, but for the general case I don't see a good solution at 
the moment.


-Max

On 01.03.19 16:45, Reuven Lax wrote:
Yeah, the person who was working on it originally stopped working on 
Beam, and nobody else ever finished it. I think it is important to 
finish though. Many of the existing Sinks are only fully correct for 
Dataflow today, because they generate either Reshuffle or GroupByKey to 
ensure input stability before outputting (in many cases this code was 
inherited from before Beam existed). On Flink today, these sinks might 
occasionally produce duplicate output in the case of failures.


Reuven

On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels > wrote:


Circling back to the RequiresStableInput annotation[1]. I've done some
protoyping to see how this could be integrated into Flink. I'm
currently
writing a test based on RequiresStableInput.

I found out there are already checks in place at the Runners to
throw in
case transforms use RequiresStableInput and its not supported. However,
not a single transform actually uses the annotation.

It seems that the effort stopped at some point? Would it make sense to
start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
could then get rid of the whitelist.

-Max

[1]

https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM



On 01.03.19 14:28, Maximilian Michels wrote:
 > Just realized that transactions do not spawn multiple elements in
 > KafkaExactlyOnceSink. So the proposed solution to stop processing
 > elements while a snapshot is pending would work.
 >
 > It is certainly not optimal in terms of performance for Flink and
poses
 > problems when checkpoints take long to complete, but it would be
 > worthwhile to implement this to make use of the EOS feature.
 >
 > Thanks,
 > Max
 >
 > On 01.03.19 12:23, Maximilian Michels wrote:
 >> Thanks you for the prompt replies. It's great to see that there is
 >> good understanding of how EOS in Flink works.
 >>
 >>> This is exactly what RequiresStableInput is supposed to do. On the
 >>> Flink runner, this would be implemented by delaying processing
until
 >>> the current checkpoint is done.
 >>
 >> I don't think that works because we have no control over the Kafka
 >> transactions. Imagine:
 >>
 >> 1) ExactlyOnceWriter writes records to Kafka and commits, then
starts
 >> a new transaction.
 >> 2) Flink checkpoints, delaying the processing of elements, the
 >> checkpoint fails.
 >> 3) We restore from an old checkpoint and will start writing
duplicate
 >> data to Kafka. The de-duplication that the sink performs does not
 >> help, especially because the random shards ids might be assigned
 >> differently.
 >>
 >> IMHO we have to have control over commit to be able to provide EOS.
 >>
 >>> When we discussed this in Aug 2017, the understanding was that 2
 >>> Phase commit utility in Flink used to implement Flink's Kafka EOS
 >>> could not be implemented in Beam's context.
 >>
 >> That's also my understanding, unless we change the interface.
 >>
 >>> I don't see how SDF solves this problem..
 >>
 >> SDF has a checkpoint method which the Runner can call, but I think
 >> that you are right, that the above problem would be the same.
 >>
 >>> Absolutely. I would love to support EOS in KakaIO for Flink. I
think
 >>> that will help many future exactly-once sinks.. and address
 >>> fundamental incompatibility between Beam model and Flink's
horizontal
 >>> checkpointing for such applications.
 >>
 >> Great :)
 >>
 >>> The FlinkRunner would need to insert the "wait until checkpoint
 >>> finalization" logic wherever it sees @RequiresStableInput,
which is
 >>> already what it would have to do.
 >>
 >> I don't think that fixes the problem. See above example.
 >>
 >> Thanks,
 >> Max
 >>
 >> On 01.03.19 00:04, Raghu Angadi wrote:
 >>>
 >>>
 >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi mailto:ang...@gmail.com>
 >>> >> wrote:

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Reuven Lax
Yeah, the person who was working on it originally stopped working on Beam,
and nobody else ever finished it. I think it is important to finish though.
Many of the existing Sinks are only fully correct for Dataflow today,
because they generate either Reshuffle or GroupByKey to ensure input
stability before outputting (in many cases this code was inherited from
before Beam existed). On Flink today, these sinks might occasionally
produce duplicate output in the case of failures.

Reuven

On Fri, Mar 1, 2019 at 7:18 AM Maximilian Michels  wrote:

> Circling back to the RequiresStableInput annotation[1]. I've done some
> protoyping to see how this could be integrated into Flink. I'm currently
> writing a test based on RequiresStableInput.
>
> I found out there are already checks in place at the Runners to throw in
> case transforms use RequiresStableInput and its not supported. However,
> not a single transform actually uses the annotation.
>
> It seems that the effort stopped at some point? Would it make sense to
> start annotating KafkaExactlyOnceSink with @RequiresStableInput? We
> could then get rid of the whitelist.
>
> -Max
>
> [1]
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>
>
> On 01.03.19 14:28, Maximilian Michels wrote:
> > Just realized that transactions do not spawn multiple elements in
> > KafkaExactlyOnceSink. So the proposed solution to stop processing
> > elements while a snapshot is pending would work.
> >
> > It is certainly not optimal in terms of performance for Flink and poses
> > problems when checkpoints take long to complete, but it would be
> > worthwhile to implement this to make use of the EOS feature.
> >
> > Thanks,
> > Max
> >
> > On 01.03.19 12:23, Maximilian Michels wrote:
> >> Thanks you for the prompt replies. It's great to see that there is
> >> good understanding of how EOS in Flink works.
> >>
> >>> This is exactly what RequiresStableInput is supposed to do. On the
> >>> Flink runner, this would be implemented by delaying processing until
> >>> the current checkpoint is done.
> >>
> >> I don't think that works because we have no control over the Kafka
> >> transactions. Imagine:
> >>
> >> 1) ExactlyOnceWriter writes records to Kafka and commits, then starts
> >> a new transaction.
> >> 2) Flink checkpoints, delaying the processing of elements, the
> >> checkpoint fails.
> >> 3) We restore from an old checkpoint and will start writing duplicate
> >> data to Kafka. The de-duplication that the sink performs does not
> >> help, especially because the random shards ids might be assigned
> >> differently.
> >>
> >> IMHO we have to have control over commit to be able to provide EOS.
> >>
> >>> When we discussed this in Aug 2017, the understanding was that 2
> >>> Phase commit utility in Flink used to implement Flink's Kafka EOS
> >>> could not be implemented in Beam's context.
> >>
> >> That's also my understanding, unless we change the interface.
> >>
> >>> I don't see how SDF solves this problem..
> >>
> >> SDF has a checkpoint method which the Runner can call, but I think
> >> that you are right, that the above problem would be the same.
> >>
> >>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
> >>> that will help many future exactly-once sinks.. and address
> >>> fundamental incompatibility between Beam model and Flink's horizontal
> >>> checkpointing for such applications.
> >>
> >> Great :)
> >>
> >>> The FlinkRunner would need to insert the "wait until checkpoint
> >>> finalization" logic wherever it sees @RequiresStableInput, which is
> >>> already what it would have to do.
> >>
> >> I don't think that fixes the problem. See above example.
> >>
> >> Thanks,
> >> Max
> >>
> >> On 01.03.19 00:04, Raghu Angadi wrote:
> >>>
> >>>
> >>> On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  >>> > wrote:
> >>>
> >>> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  >>> > wrote:
> >>>
> >>> I'm not sure what a hard fail is. I probably have a shallow
> >>> understanding, but doesn't @RequiresStableInput work for 2PC?
> >>> The preCommit() phase should establish the transaction and
> >>> commit() is not called until after checkpoint finalization. Can
> >>> you describe the way that it does not work a little bit more?
> >>>
> >>>
> >>> - preCommit() is called before checkpoint. Kafka EOS in Flink
> starts
> >>> the transaction before this and makes sure it flushes all records
> in
> >>> preCommit(). So far good.
> >>> - commit is called after checkpoint is persisted. Now, imagine
> >>> commit() fails for some reason. There is no option to rerun the 1st
> >>> phase to write the records again in a new transaction. This is a
> >>> hard failure for the the job. In practice Flink might attempt to
> >>> commit again (not sure how many times), which is likely to fail and
> >>> eventually results in job 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Reuven Lax
Are you suggesting that RequiresStableInput doesn't work with Flink in
general? Or are your concerns specific to the Kafka connector?

RequiresStableInput was originally motivated by other sinks like TextIO,
where you also want to delay creating the final output files until you are
sure their content is stable. Another example is the BigQuery streaming
sink. That sink generates random record ids to send to BigQuery to ensure
exactly-once inserts (BigQuery uses this id for deduplication). If records
are replayed into the sink after data has already been written to BigQuery,
then the sink will generate new random record ids, so the duplicate output
will not be detected. These were the original motivations for
RequiresStableInput, to tell the runner that the input to the transform
must be "stable" before proceeding. In Dataflow a Reshuffle does this, as
Dataflow's shuffle is persistent. For other runners, different techniques
will be needed.

BTW the Reshuffle currently in KafkaIO possibly should be removed  and
replaced with a RequiresStableInput annotation once that is implemented.

Reuven

On Fri, Mar 1, 2019 at 3:30 AM Maximilian Michels  wrote:

> Thanks you for the prompt replies. It's great to see that there is good
> understanding of how EOS in Flink works.
>
> > This is exactly what RequiresStableInput is supposed to do. On the Flink
> runner, this would be implemented by delaying processing until the current
> checkpoint is done.
>
> I don't think that works because we have no control over the Kafka
> transactions. Imagine:
>
> 1) ExactlyOnceWriter writes records to Kafka and commits, then starts a
> new transaction.
> 2) Flink checkpoints, delaying the processing of elements, the
> checkpoint fails.
> 3) We restore from an old checkpoint and will start writing duplicate
> data to Kafka. The de-duplication that the sink performs does not help,
> especially because the random shards ids might be assigned differently.
>
> IMHO we have to have control over commit to be able to provide EOS.
>
> > When we discussed this in Aug 2017, the understanding was that 2 Phase
> commit utility in Flink used to implement Flink's Kafka EOS could not be
> implemented in Beam's context.
>
> That's also my understanding, unless we change the interface.
>
> > I don't see how SDF solves this problem..
>
> SDF has a checkpoint method which the Runner can call, but I think that
> you are right, that the above problem would be the same.
>
> > Absolutely. I would love to support EOS in KakaIO for Flink. I think
> that will help many future exactly-once sinks.. and address fundamental
> incompatibility between Beam model and Flink's horizontal checkpointing for
> such applications.
>
> Great :)
>
> > The FlinkRunner would need to insert the "wait until checkpoint
> finalization" logic wherever it sees @RequiresStableInput, which is already
> what it would have to do.
>
> I don't think that fixes the problem. See above example.
>
> Thanks,
> Max
>
> On 01.03.19 00:04, Raghu Angadi wrote:
> >
> >
> > On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  > > wrote:
> >
> > On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  > > wrote:
> >
> > I'm not sure what a hard fail is. I probably have a shallow
> > understanding, but doesn't @RequiresStableInput work for 2PC?
> > The preCommit() phase should establish the transaction and
> > commit() is not called until after checkpoint finalization. Can
> > you describe the way that it does not work a little bit more?
> >
> >
> > - preCommit() is called before checkpoint. Kafka EOS in Flink starts
> > the transaction before this and makes sure it flushes all records in
> > preCommit(). So far good.
> > - commit is called after checkpoint is persisted. Now, imagine
> > commit() fails for some reason. There is no option to rerun the 1st
> > phase to write the records again in a new transaction. This is a
> > hard failure for the the job. In practice Flink might attempt to
> > commit again (not sure how many times), which is likely to fail and
> > eventually results in job failure.
> >
> >
> > In Apache Beam, the records could be stored in state, and can be written
> > inside commit() to work around this issue. It could have scalability
> > issues if checkpoints are not frequent enough in Flink runner.
> >
> > Raghu.
> >
> >
> >
> > Kenn
> >
> > On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  > > wrote:
> >
> > On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
> > mailto:k...@apache.org>> wrote:
> >
> > I believe the way you would implement the logic behind
> > Flink's KafkaProducer would be to have two steps:
> >
> > 1. Start transaction
> > 2. @RequiresStableInput Close transaction
> >
> >
> > I see.  What happens if closing 

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Circling back to the RequiresStableInput annotation[1]. I've done some 
protoyping to see how this could be integrated into Flink. I'm currently 
writing a test based on RequiresStableInput.


I found out there are already checks in place at the Runners to throw in 
case transforms use RequiresStableInput and its not supported. However, 
not a single transform actually uses the annotation.


It seems that the effort stopped at some point? Would it make sense to 
start annotating KafkaExactlyOnceSink with @RequiresStableInput? We 
could then get rid of the whitelist.


-Max

[1] 
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM 



On 01.03.19 14:28, Maximilian Michels wrote:
Just realized that transactions do not spawn multiple elements in 
KafkaExactlyOnceSink. So the proposed solution to stop processing 
elements while a snapshot is pending would work.


It is certainly not optimal in terms of performance for Flink and poses 
problems when checkpoints take long to complete, but it would be 
worthwhile to implement this to make use of the EOS feature.


Thanks,
Max

On 01.03.19 12:23, Maximilian Michels wrote:
Thanks you for the prompt replies. It's great to see that there is 
good understanding of how EOS in Flink works.


This is exactly what RequiresStableInput is supposed to do. On the 
Flink runner, this would be implemented by delaying processing until 
the current checkpoint is done. 


I don't think that works because we have no control over the Kafka 
transactions. Imagine:


1) ExactlyOnceWriter writes records to Kafka and commits, then starts 
a new transaction.
2) Flink checkpoints, delaying the processing of elements, the 
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate 
data to Kafka. The de-duplication that the sink performs does not 
help, especially because the random shards ids might be assigned 
differently.


IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 
Phase commit utility in Flink used to implement Flink's Kafka EOS 
could not be implemented in Beam's context. 


That's also my understanding, unless we change the interface.


I don't see how SDF solves this problem..


SDF has a checkpoint method which the Runner can call, but I think 
that you are right, that the above problem would be the same.


Absolutely. I would love to support EOS in KakaIO for Flink. I think 
that will help many future exactly-once sinks.. and address 
fundamental incompatibility between Beam model and Flink's horizontal 
checkpointing for such applications.


Great :)

The FlinkRunner would need to insert the "wait until checkpoint 
finalization" logic wherever it sees @RequiresStableInput, which is 
already what it would have to do.


I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:



On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > wrote:


    On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles mailto:k...@apache.org>> wrote:

    I'm not sure what a hard fail is. I probably have a shallow
    understanding, but doesn't @RequiresStableInput work for 2PC?
    The preCommit() phase should establish the transaction and
    commit() is not called until after checkpoint finalization. Can
    you describe the way that it does not work a little bit more?


    - preCommit() is called before checkpoint. Kafka EOS in Flink starts
    the transaction before this and makes sure it flushes all records in
    preCommit(). So far good.
    - commit is called after checkpoint is persisted. Now, imagine
    commit() fails for some reason. There is no option to rerun the 1st
    phase to write the records again in a new transaction. This is a
    hard failure for the the job. In practice Flink might attempt to
    commit again (not sure how many times), which is likely to fail and
    eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be 
written inside commit() to work around this issue. It could have 
scalability issues if checkpoints are not frequent enough in Flink 
runner.


Raghu.



    Kenn

    On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:

    On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
    mailto:k...@apache.org>> wrote:

    I believe the way you would implement the logic behind
    Flink's KafkaProducer would be to have two steps:

    1. Start transaction
    2. @RequiresStableInput Close transaction


    I see.  What happens if closing the transaction fails in
    (2)? Flink's 2PC requires that commit() should never hard
    fail once preCommit() succeeds. I think that is cost of not
    having an extra shuffle. It is alright since this policy has
    

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Just realized that transactions do not spawn multiple elements in 
KafkaExactlyOnceSink. So the proposed solution to stop processing 
elements while a snapshot is pending would work.


It is certainly not optimal in terms of performance for Flink and poses 
problems when checkpoints take long to complete, but it would be 
worthwhile to implement this to make use of the EOS feature.


Thanks,
Max

On 01.03.19 12:23, Maximilian Michels wrote:
Thanks you for the prompt replies. It's great to see that there is good 
understanding of how EOS in Flink works.


This is exactly what RequiresStableInput is supposed to do. On the 
Flink runner, this would be implemented by delaying processing until 
the current checkpoint is done. 


I don't think that works because we have no control over the Kafka 
transactions. Imagine:


1) ExactlyOnceWriter writes records to Kafka and commits, then starts a 
new transaction.
2) Flink checkpoints, delaying the processing of elements, the 
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate 
data to Kafka. The de-duplication that the sink performs does not help, 
especially because the random shards ids might be assigned differently.


IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 Phase 
commit utility in Flink used to implement Flink's Kafka EOS could not 
be implemented in Beam's context. 


That's also my understanding, unless we change the interface.


I don't see how SDF solves this problem..


SDF has a checkpoint method which the Runner can call, but I think that 
you are right, that the above problem would be the same.


Absolutely. I would love to support EOS in KakaIO for Flink. I think 
that will help many future exactly-once sinks.. and address 
fundamental incompatibility between Beam model and Flink's horizontal 
checkpointing for such applications.


Great :)

The FlinkRunner would need to insert the "wait until checkpoint 
finalization" logic wherever it sees @RequiresStableInput, which is 
already what it would have to do.


I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:



On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > wrote:


    On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles mailto:k...@apache.org>> wrote:

    I'm not sure what a hard fail is. I probably have a shallow
    understanding, but doesn't @RequiresStableInput work for 2PC?
    The preCommit() phase should establish the transaction and
    commit() is not called until after checkpoint finalization. Can
    you describe the way that it does not work a little bit more?


    - preCommit() is called before checkpoint. Kafka EOS in Flink starts
    the transaction before this and makes sure it flushes all records in
    preCommit(). So far good.
    - commit is called after checkpoint is persisted. Now, imagine
    commit() fails for some reason. There is no option to rerun the 1st
    phase to write the records again in a new transaction. This is a
    hard failure for the the job. In practice Flink might attempt to
    commit again (not sure how many times), which is likely to fail and
    eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be 
written inside commit() to work around this issue. It could have 
scalability issues if checkpoints are not frequent enough in Flink 
runner.


Raghu.



    Kenn

    On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:

    On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
    mailto:k...@apache.org>> wrote:

    I believe the way you would implement the logic behind
    Flink's KafkaProducer would be to have two steps:

    1. Start transaction
    2. @RequiresStableInput Close transaction


    I see.  What happens if closing the transaction fails in
    (2)? Flink's 2PC requires that commit() should never hard
    fail once preCommit() succeeds. I think that is cost of not
    having an extra shuffle. It is alright since this policy has
    worked well for Flink so far.

    Overall, it will be great to have @RequiresStableInput
    support in Flink runner.

    Raghu.

    The FlinkRunner would need to insert the "wait until
    checkpoint finalization" logic wherever it
    sees @RequiresStableInput, which is already what it
    would have to do.

    This matches the KafkaProducer's logic - delay closing
    the transaction until checkpoint finalization. This
    answers my main question, which is "is
    @RequiresStableInput expressive enough to allow
    Beam-on-Flink to have exactly once behavior with the
  

Re: KafkaIO Exactly-Once & Flink Runner

2019-03-01 Thread Maximilian Michels
Thanks you for the prompt replies. It's great to see that there is good 
understanding of how EOS in Flink works.


This is exactly what RequiresStableInput is supposed to do. On the Flink runner, this would be implemented by delaying processing until the current checkpoint is done. 


I don't think that works because we have no control over the Kafka 
transactions. Imagine:


1) ExactlyOnceWriter writes records to Kafka and commits, then starts a 
new transaction.
2) Flink checkpoints, delaying the processing of elements, the 
checkpoint fails.
3) We restore from an old checkpoint and will start writing duplicate 
data to Kafka. The de-duplication that the sink performs does not help, 
especially because the random shards ids might be assigned differently.


IMHO we have to have control over commit to be able to provide EOS.

When we discussed this in Aug 2017, the understanding was that 2 Phase commit utility in Flink used to implement Flink's Kafka EOS could not be implemented in Beam's context. 


That's also my understanding, unless we change the interface.


I don't see how SDF solves this problem..


SDF has a checkpoint method which the Runner can call, but I think that 
you are right, that the above problem would be the same.



Absolutely. I would love to support EOS in KakaIO for Flink. I think that will 
help many future exactly-once sinks.. and address fundamental incompatibility 
between Beam model and Flink's horizontal checkpointing for such applications.


Great :)


The FlinkRunner would need to insert the "wait until checkpoint finalization" 
logic wherever it sees @RequiresStableInput, which is already what it would have to do.


I don't think that fixes the problem. See above example.

Thanks,
Max

On 01.03.19 00:04, Raghu Angadi wrote:



On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi > wrote:


On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles mailto:k...@apache.org>> wrote:

I'm not sure what a hard fail is. I probably have a shallow
understanding, but doesn't @RequiresStableInput work for 2PC?
The preCommit() phase should establish the transaction and
commit() is not called until after checkpoint finalization. Can
you describe the way that it does not work a little bit more?


- preCommit() is called before checkpoint. Kafka EOS in Flink starts
the transaction before this and makes sure it flushes all records in
preCommit(). So far good.
- commit is called after checkpoint is persisted. Now, imagine
commit() fails for some reason. There is no option to rerun the 1st
phase to write the records again in a new transaction. This is a
hard failure for the the job. In practice Flink might attempt to
commit again (not sure how many times), which is likely to fail and
eventually results in job failure.


In Apache Beam, the records could be stored in state, and can be written 
inside commit() to work around this issue. It could have scalability 
issues if checkpoints are not frequent enough in Flink runner.


Raghu.



Kenn

On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi mailto:ang...@gmail.com>> wrote:

On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles
mailto:k...@apache.org>> wrote:

I believe the way you would implement the logic behind
Flink's KafkaProducer would be to have two steps:

1. Start transaction
2. @RequiresStableInput Close transaction


I see.  What happens if closing the transaction fails in
(2)? Flink's 2PC requires that commit() should never hard
fail once preCommit() succeeds. I think that is cost of not
having an extra shuffle. It is alright since this policy has
worked well for Flink so far.

Overall, it will be great to have @RequiresStableInput
support in Flink runner.

Raghu.

The FlinkRunner would need to insert the "wait until
checkpoint finalization" logic wherever it
sees @RequiresStableInput, which is already what it
would have to do.

This matches the KafkaProducer's logic - delay closing
the transaction until checkpoint finalization. This
answers my main question, which is "is
@RequiresStableInput expressive enough to allow
Beam-on-Flink to have exactly once behavior with the
same performance characteristics as native Flink
checkpoint finalization?"

Kenn

[1] https://github.com/apache/beam/pull/7955

On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax
mailto:re...@google.com>> wrote:



On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi
mailto:ang...@gmail.com>> wrote:



Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi  wrote:

> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:
>
>> I'm not sure what a hard fail is. I probably have a shallow
>> understanding, but doesn't @RequiresStableInput work for 2PC? The
>> preCommit() phase should establish the transaction and commit() is not
>> called until after checkpoint finalization. Can you describe the way that
>> it does not work a little bit more?
>>
>
> - preCommit() is called before checkpoint. Kafka EOS in Flink starts the
> transaction before this and makes sure it flushes all records in
> preCommit(). So far good.
> - commit is called after checkpoint is persisted. Now, imagine commit()
> fails for some reason. There is no option to rerun the 1st phase to write
> the records again in a new transaction. This is a hard failure for the the
> job. In practice Flink might attempt to commit again (not sure how many
> times), which is likely to fail and eventually results in job failure.
>

In Apache Beam, the records could be stored in state, and can be written
inside commit() to work around this issue. It could have scalability issues
if checkpoints are not frequent enough in Flink runner.

Raghu.


>
>
>> Kenn
>>
>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>>
>>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles 
>>> wrote:
>>>
 I believe the way you would implement the logic behind Flink's
 KafkaProducer would be to have two steps:

 1. Start transaction
 2. @RequiresStableInput Close transaction

>>>
>>> I see.  What happens if closing the transaction fails in (2)? Flink's
>>> 2PC requires that commit() should never hard fail once preCommit()
>>> succeeds. I think that is cost of not having an extra shuffle. It is
>>> alright since this policy has worked well for Flink so far.
>>>
>>> Overall, it will be great to have @RequiresStableInput support in Flink
>>> runner.
>>>
>>> Raghu.
>>>
 The FlinkRunner would need to insert the "wait until checkpoint
 finalization" logic wherever it sees @RequiresStableInput, which is already
 what it would have to do.

 This matches the KafkaProducer's logic - delay closing the transaction
 until checkpoint finalization. This answers my main question, which is "is
 @RequiresStableInput expressive enough to allow Beam-on-Flink to have
 exactly once behavior with the same performance characteristics as native
 Flink checkpoint finalization?"

 Kenn

 [1] https://github.com/apache/beam/pull/7955

 On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:

>
>
> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi 
> wrote:
>
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>
>>
>>
>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>> wrote:
>>
>>> Hi,
>>>
>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>> exactly-once
>>> semantics (EOS). I think it is questionable to exclude Runners from
>>> inside a transform, but I see that the intention was to save users
>>> from
>>> surprises.
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's
>>> native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>>
>>
>>
>> When we discussed this in Aug 2017, the understanding was that 2
>> Phase commit utility in Flink used to implement Flink's Kafka EOS could 
>> not
>> be implemented in Beam's context.
>> See  this message
>>  in
>> that dev thread. Has anything changed in this regard? The whole thread is
>> relevant to this topic and worth going through.
>>
>
> I think that TwoPhaseCommit utility class wouldn't work. The Flink
> runner would probably want to directly use notifySnapshotComplete in order
> to implement @RequiresStableInput.
>
>>
>>
>>>
>>> A checkpoint is realized by sending barriers through all channels
>>> starting from the source until reaching all sinks. Every operator
>>> persists its state once it has received a barrier on all its input
>>> channels, it then forwards it to the downstream operators.
>>>
>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>
>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>> GroupByKey -> ExactlyOnceWriter
>>>
>>> As I understood, Spark or Dataflow use the GroupByKey stages to
>>> persist
>>> the input. That is not required in Flink to be able to take a
>>> consistent
>>> snapshot of the pipeline.
>>>
>>> Basically, for Flink we don't need any of 

Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles  wrote:

> I'm not sure what a hard fail is. I probably have a shallow understanding,
> but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should
> establish the transaction and commit() is not called until after checkpoint
> finalization. Can you describe the way that it does not work a little bit
> more?
>

- preCommit() is called before checkpoint. Kafka EOS in Flink starts the
transaction before this and makes sure it flushes all records in
preCommit(). So far good.
- commit is called after checkpoint is persisted. Now, imagine commit()
fails for some reason. There is no option to rerun the 1st phase to write
the records again in a new transaction. This is a hard failure for the the
job. In practice Flink might attempt to commit again (not sure how many
times), which is likely to fail and eventually results in job failure.


> Kenn
>
> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:
>
>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles  wrote:
>>
>>> I believe the way you would implement the logic behind Flink's
>>> KafkaProducer would be to have two steps:
>>>
>>> 1. Start transaction
>>> 2. @RequiresStableInput Close transaction
>>>
>>
>> I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
>> requires that commit() should never hard fail once preCommit() succeeds. I
>> think that is cost of not having an extra shuffle. It is alright since this
>> policy has worked well for Flink so far.
>>
>> Overall, it will be great to have @RequiresStableInput support in Flink
>> runner.
>>
>> Raghu.
>>
>>> The FlinkRunner would need to insert the "wait until checkpoint
>>> finalization" logic wherever it sees @RequiresStableInput, which is already
>>> what it would have to do.
>>>
>>> This matches the KafkaProducer's logic - delay closing the transaction
>>> until checkpoint finalization. This answers my main question, which is "is
>>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>>> exactly once behavior with the same performance characteristics as native
>>> Flink checkpoint finalization?"
>>>
>>> Kenn
>>>
>>> [1] https://github.com/apache/beam/pull/7955
>>>
>>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>>>


 On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:

>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>> KafkaProducer supports exactly-once. It simply commits the pending
>> transaction once it has completed a checkpoint.
>
>
>
> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
> wrote:
>
>> Hi,
>>
>> I came across KafkaIO's Runner whitelist [1] for enabling
>> exactly-once
>> semantics (EOS). I think it is questionable to exclude Runners from
>> inside a transform, but I see that the intention was to save users
>> from
>> surprises.
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>> KafkaProducer supports exactly-once. It simply commits the pending
>> transaction once it has completed a checkpoint.
>>
>
>
> When we discussed this in Aug 2017, the understanding was that 2 Phase
> commit utility in Flink used to implement Flink's Kafka EOS could not be
> implemented in Beam's context.
> See  this message
>  in
> that dev thread. Has anything changed in this regard? The whole thread is
> relevant to this topic and worth going through.
>

 I think that TwoPhaseCommit utility class wouldn't work. The Flink
 runner would probably want to directly use notifySnapshotComplete in order
 to implement @RequiresStableInput.

>
>
>>
>> A checkpoint is realized by sending barriers through all channels
>> starting from the source until reaching all sinks. Every operator
>> persists its state once it has received a barrier on all its input
>> channels, it then forwards it to the downstream operators.
>>
>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>
>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>> GroupByKey -> ExactlyOnceWriter
>>
>> As I understood, Spark or Dataflow use the GroupByKey stages to
>> persist
>> the input. That is not required in Flink to be able to take a
>> consistent
>> snapshot of the pipeline.
>>
>> Basically, for Flink we don't need any of that magic that KafkaIO
>> does.
>> What we would need to support EOS is a way to tell the
>> ExactlyOnceWriter
>> (a DoFn) to commit once a checkpoint has completed.
>
> I know that the new version of SDF supports checkpointing which should
>> solve this issue. But there is still a lot of work to do to make this
>> reality.
>>
>
> I don't see how SDF solves this problem.. May be 

Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Kenneth Knowles
I'm not sure what a hard fail is. I probably have a shallow understanding,
but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should
establish the transaction and commit() is not called until after checkpoint
finalization. Can you describe the way that it does not work a little bit
more?

Kenn

On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi  wrote:

> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles  wrote:
>
>> I believe the way you would implement the logic behind Flink's
>> KafkaProducer would be to have two steps:
>>
>> 1. Start transaction
>> 2. @RequiresStableInput Close transaction
>>
>
> I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
> requires that commit() should never hard fail once preCommit() succeeds. I
> think that is cost of not having an extra shuffle. It is alright since this
> policy has worked well for Flink so far.
>
> Overall, it will be great to have @RequiresStableInput support in Flink
> runner.
>
> Raghu.
>
>> The FlinkRunner would need to insert the "wait until checkpoint
>> finalization" logic wherever it sees @RequiresStableInput, which is already
>> what it would have to do.
>>
>> This matches the KafkaProducer's logic - delay closing the transaction
>> until checkpoint finalization. This answers my main question, which is "is
>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>> exactly once behavior with the same performance characteristics as native
>> Flink checkpoint finalization?"
>>
>> Kenn
>>
>> [1] https://github.com/apache/beam/pull/7955
>>
>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:
>>>

 Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.



 On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
 wrote:

> Hi,
>
> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
> semantics (EOS). I think it is questionable to exclude Runners from
> inside a transform, but I see that the intention was to save users
> from
> surprises.
>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.
>


 When we discussed this in Aug 2017, the understanding was that 2 Phase
 commit utility in Flink used to implement Flink's Kafka EOS could not be
 implemented in Beam's context.
 See  this message
  in
 that dev thread. Has anything changed in this regard? The whole thread is
 relevant to this topic and worth going through.

>>>
>>> I think that TwoPhaseCommit utility class wouldn't work. The Flink
>>> runner would probably want to directly use notifySnapshotComplete in order
>>> to implement @RequiresStableInput.
>>>


>
> A checkpoint is realized by sending barriers through all channels
> starting from the source until reaching all sinks. Every operator
> persists its state once it has received a barrier on all its input
> channels, it then forwards it to the downstream operators.
>
> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>
> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
> GroupByKey -> ExactlyOnceWriter
>
> As I understood, Spark or Dataflow use the GroupByKey stages to
> persist
> the input. That is not required in Flink to be able to take a
> consistent
> snapshot of the pipeline.
>
> Basically, for Flink we don't need any of that magic that KafkaIO
> does.
> What we would need to support EOS is a way to tell the
> ExactlyOnceWriter
> (a DoFn) to commit once a checkpoint has completed.

 I know that the new version of SDF supports checkpointing which should
> solve this issue. But there is still a lot of work to do to make this
> reality.
>

 I don't see how SDF solves this problem.. May be pseudo code would make
 more clear.  But if helps, that is great!

 So I think it would make sense to think about a way to make KafkaIO's
> EOS more accessible to Runners which support a different way of
> checkpointing.
>

 Absolutely. I would love to support EOS in KakaIO for Flink. I think
 that will help many future exactly-once sinks.. and address fundamental
 incompatibility between Beam model and Flink's horizontal checkpointing for
 such applications.

 Raghu.


> Cheers,
> Max
>
> PS: I found this document about RequiresStableInput [3], but IMHO
> defining an annotation only manifests the conceptual difference
> between

Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles  wrote:

> I believe the way you would implement the logic behind Flink's
> KafkaProducer would be to have two steps:
>
> 1. Start transaction
> 2. @RequiresStableInput Close transaction
>

I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
requires that commit() should never hard fail once preCommit() succeeds. I
think that is cost of not having an extra shuffle. It is alright since this
policy has worked well for Flink so far.

Overall, it will be great to have @RequiresStableInput support in Flink
runner.

Raghu.

> The FlinkRunner would need to insert the "wait until checkpoint
> finalization" logic wherever it sees @RequiresStableInput, which is already
> what it would have to do.
>
> This matches the KafkaProducer's logic - delay closing the transaction
> until checkpoint finalization. This answers my main question, which is "is
> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
> exactly once behavior with the same performance characteristics as native
> Flink checkpoint finalization?"
>
> Kenn
>
> [1] https://github.com/apache/beam/pull/7955
>
> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:
>
>>
>>
>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:
>>
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
 KafkaProducer supports exactly-once. It simply commits the pending
 transaction once it has completed a checkpoint.
>>>
>>>
>>>
>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>>> wrote:
>>>
 Hi,

 I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
 semantics (EOS). I think it is questionable to exclude Runners from
 inside a transform, but I see that the intention was to save users from
 surprises.

 Now why does the Flink Runner not support KafkaIO EOS? Flink's native
 KafkaProducer supports exactly-once. It simply commits the pending
 transaction once it has completed a checkpoint.

>>>
>>>
>>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>>> implemented in Beam's context.
>>> See  this message
>>>  in
>>> that dev thread. Has anything changed in this regard? The whole thread is
>>> relevant to this topic and worth going through.
>>>
>>
>> I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
>> would probably want to directly use notifySnapshotComplete in order to
>> implement @RequiresStableInput.
>>
>>>
>>>

 A checkpoint is realized by sending barriers through all channels
 starting from the source until reaching all sinks. Every operator
 persists its state once it has received a barrier on all its input
 channels, it then forwards it to the downstream operators.

 The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:

 Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
 GroupByKey -> ExactlyOnceWriter

 As I understood, Spark or Dataflow use the GroupByKey stages to persist
 the input. That is not required in Flink to be able to take a
 consistent
 snapshot of the pipeline.

 Basically, for Flink we don't need any of that magic that KafkaIO does.
 What we would need to support EOS is a way to tell the
 ExactlyOnceWriter
 (a DoFn) to commit once a checkpoint has completed.
>>>
>>> I know that the new version of SDF supports checkpointing which should
 solve this issue. But there is still a lot of work to do to make this
 reality.

>>>
>>> I don't see how SDF solves this problem.. May be pseudo code would make
>>> more clear.  But if helps, that is great!
>>>
>>> So I think it would make sense to think about a way to make KafkaIO's
 EOS more accessible to Runners which support a different way of
 checkpointing.

>>>
>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
>>> that will help many future exactly-once sinks.. and address fundamental
>>> incompatibility between Beam model and Flink's horizontal checkpointing for
>>> such applications.
>>>
>>> Raghu.
>>>
>>>
 Cheers,
 Max

 PS: I found this document about RequiresStableInput [3], but IMHO
 defining an annotation only manifests the conceptual difference between
 the Runners.


 [1]

 https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
 [2]

 https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
 [3]

 https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM

>>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Kenneth Knowles
I believe the way you would implement the logic behind Flink's
KafkaProducer would be to have two steps:

1. Start transaction
2. @RequiresStableInput Close transaction

The FlinkRunner would need to insert the "wait until checkpoint
finalization" logic wherever it sees @RequiresStableInput, which is already
what it would have to do.

This matches the KafkaProducer's logic - delay closing the transaction
until checkpoint finalization. This answers my main question, which is "is
@RequiresStableInput expressive enough to allow Beam-on-Flink to have
exactly once behavior with the same performance characteristics as native
Flink checkpoint finalization?"

Kenn

[1] https://github.com/apache/beam/pull/7955

On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax  wrote:

>
>
> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:
>
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>
>>
>>
>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels 
>> wrote:
>>
>>> Hi,
>>>
>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>>> semantics (EOS). I think it is questionable to exclude Runners from
>>> inside a transform, but I see that the intention was to save users from
>>> surprises.
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>>
>>
>>
>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>> implemented in Beam's context.
>> See  this message
>>  in that
>> dev thread. Has anything changed in this regard? The whole thread is
>> relevant to this topic and worth going through.
>>
>
> I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
> would probably want to directly use notifySnapshotComplete in order to
> implement @RequiresStableInput.
>
>>
>>
>>>
>>> A checkpoint is realized by sending barriers through all channels
>>> starting from the source until reaching all sinks. Every operator
>>> persists its state once it has received a barrier on all its input
>>> channels, it then forwards it to the downstream operators.
>>>
>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>
>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>> GroupByKey -> ExactlyOnceWriter
>>>
>>> As I understood, Spark or Dataflow use the GroupByKey stages to persist
>>> the input. That is not required in Flink to be able to take a consistent
>>> snapshot of the pipeline.
>>>
>>> Basically, for Flink we don't need any of that magic that KafkaIO does.
>>> What we would need to support EOS is a way to tell the ExactlyOnceWriter
>>> (a DoFn) to commit once a checkpoint has completed.
>>
>> I know that the new version of SDF supports checkpointing which should
>>> solve this issue. But there is still a lot of work to do to make this
>>> reality.
>>>
>>
>> I don't see how SDF solves this problem.. May be pseudo code would make
>> more clear.  But if helps, that is great!
>>
>> So I think it would make sense to think about a way to make KafkaIO's
>>> EOS more accessible to Runners which support a different way of
>>> checkpointing.
>>>
>>
>> Absolutely. I would love to support EOS in KakaIO for Flink. I think that
>> will help many future exactly-once sinks.. and address fundamental
>> incompatibility between Beam model and Flink's horizontal checkpointing for
>> such applications.
>>
>> Raghu.
>>
>>
>>> Cheers,
>>> Max
>>>
>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>> defining an annotation only manifests the conceptual difference between
>>> the Runners.
>>>
>>>
>>> [1]
>>>
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>> [2]
>>>
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>> [3]
>>>
>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>
>>


Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Reuven Lax
On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi  wrote:

>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>> KafkaProducer supports exactly-once. It simply commits the pending
>> transaction once it has completed a checkpoint.
>
>
>
> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels  wrote:
>
>> Hi,
>>
>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>> semantics (EOS). I think it is questionable to exclude Runners from
>> inside a transform, but I see that the intention was to save users from
>> surprises.
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>> KafkaProducer supports exactly-once. It simply commits the pending
>> transaction once it has completed a checkpoint.
>>
>
>
> When we discussed this in Aug 2017, the understanding was that 2 Phase
> commit utility in Flink used to implement Flink's Kafka EOS could not be
> implemented in Beam's context.
> See  this message
>  in that
> dev thread. Has anything changed in this regard? The whole thread is
> relevant to this topic and worth going through.
>

I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
would probably want to directly use notifySnapshotComplete in order to
implement @RequiresStableInput.

>
>
>>
>> A checkpoint is realized by sending barriers through all channels
>> starting from the source until reaching all sinks. Every operator
>> persists its state once it has received a barrier on all its input
>> channels, it then forwards it to the downstream operators.
>>
>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>
>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>> GroupByKey -> ExactlyOnceWriter
>>
>> As I understood, Spark or Dataflow use the GroupByKey stages to persist
>> the input. That is not required in Flink to be able to take a consistent
>> snapshot of the pipeline.
>>
>> Basically, for Flink we don't need any of that magic that KafkaIO does.
>> What we would need to support EOS is a way to tell the ExactlyOnceWriter
>> (a DoFn) to commit once a checkpoint has completed.
>
> I know that the new version of SDF supports checkpointing which should
>> solve this issue. But there is still a lot of work to do to make this
>> reality.
>>
>
> I don't see how SDF solves this problem.. May be pseudo code would make
> more clear.  But if helps, that is great!
>
> So I think it would make sense to think about a way to make KafkaIO's
>> EOS more accessible to Runners which support a different way of
>> checkpointing.
>>
>
> Absolutely. I would love to support EOS in KakaIO for Flink. I think that
> will help many future exactly-once sinks.. and address fundamental
> incompatibility between Beam model and Flink's horizontal checkpointing for
> such applications.
>
> Raghu.
>
>
>> Cheers,
>> Max
>>
>> PS: I found this document about RequiresStableInput [3], but IMHO
>> defining an annotation only manifests the conceptual difference between
>> the Runners.
>>
>>
>> [1]
>>
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>> [2]
>>
>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>> [3]
>>
>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>
>


Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Raghu Angadi
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.



On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels  wrote:

> Hi,
>
> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
> semantics (EOS). I think it is questionable to exclude Runners from
> inside a transform, but I see that the intention was to save users from
> surprises.
>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.
>


When we discussed this in Aug 2017, the understanding was that 2 Phase
commit utility in Flink used to implement Flink's Kafka EOS could not be
implemented in Beam's context.
See  this message
 in that
dev thread. Has anything changed in this regard? The whole thread is
relevant to this topic and worth going through.


>
> A checkpoint is realized by sending barriers through all channels
> starting from the source until reaching all sinks. Every operator
> persists its state once it has received a barrier on all its input
> channels, it then forwards it to the downstream operators.
>
> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>
> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
> GroupByKey -> ExactlyOnceWriter
>
> As I understood, Spark or Dataflow use the GroupByKey stages to persist
> the input. That is not required in Flink to be able to take a consistent
> snapshot of the pipeline.
>
> Basically, for Flink we don't need any of that magic that KafkaIO does.
> What we would need to support EOS is a way to tell the ExactlyOnceWriter
> (a DoFn) to commit once a checkpoint has completed.

I know that the new version of SDF supports checkpointing which should
> solve this issue. But there is still a lot of work to do to make this
> reality.
>

I don't see how SDF solves this problem.. May be pseudo code would make
more clear.  But if helps, that is great!

So I think it would make sense to think about a way to make KafkaIO's
> EOS more accessible to Runners which support a different way of
> checkpointing.
>

Absolutely. I would love to support EOS in KakaIO for Flink. I think that
will help many future exactly-once sinks.. and address fundamental
incompatibility between Beam model and Flink's horizontal checkpointing for
such applications.

Raghu.


> Cheers,
> Max
>
> PS: I found this document about RequiresStableInput [3], but IMHO
> defining an annotation only manifests the conceptual difference between
> the Runners.
>
>
> [1]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> [2]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> [3]
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>


Re: KafkaIO Exactly-Once & Flink Runner

2019-02-28 Thread Reuven Lax
This is exactly what RequiresStableInput is supposed to do. On the Flink
runner, this would be implemented by delaying processing until the current
checkpoint is done . In fact many sinks are probably subtly broken on the
Flink runner today without RequiresStableInput, so we really need to finish
this work and add a Flink implementation of it.

Reuven

On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels  wrote:

> Hi,
>
> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
> semantics (EOS). I think it is questionable to exclude Runners from
> inside a transform, but I see that the intention was to save users from
> surprises.
>
> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
> KafkaProducer supports exactly-once. It simply commits the pending
> transaction once it has completed a checkpoint.
>
> A checkpoint is realized by sending barriers through all channels
> starting from the source until reaching all sinks. Every operator
> persists its state once it has received a barrier on all its input
> channels, it then forwards it to the downstream operators.
>
> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>
> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
> GroupByKey -> ExactlyOnceWriter
>
> As I understood, Spark or Dataflow use the GroupByKey stages to persist
> the input. That is not required in Flink to be able to take a consistent
> snapshot of the pipeline.
>
> Basically, for Flink we don't need any of that magic that KafkaIO does.
> What we would need to support EOS is a way to tell the ExactlyOnceWriter
> (a DoFn) to commit once a checkpoint has completed.
>
> I know that the new version of SDF supports checkpointing which should
> solve this issue. But there is still a lot of work to do to make this
> reality.
>
> So I think it would make sense to think about a way to make KafkaIO's
> EOS more accessible to Runners which support a different way of
> checkpointing.
>
> Cheers,
> Max
>
> PS: I found this document about RequiresStableInput [3], but IMHO
> defining an annotation only manifests the conceptual difference between
> the Runners.
>
>
> [1]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
> [2]
>
> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
> [3]
>
> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>