I believe the way you would implement the logic behind Flink's
KafkaProducer would be to have two steps:

1. Start transaction
2. @RequiresStableInput Close transaction

The FlinkRunner would need to insert the "wait until checkpoint
finalization" logic wherever it sees @RequiresStableInput, which is already
what it would have to do.

This matches the KafkaProducer's logic - delay closing the transaction
until checkpoint finalization. This answers my main question, which is "is
@RequiresStableInput expressive enough to allow Beam-on-Flink to have
exactly once behavior with the same performance characteristics as native
Flink checkpoint finalization?"

Kenn

[1] https://github.com/apache/beam/pull/7955

On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <[email protected]> wrote:

>
>
> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <[email protected]> wrote:
>
>>
>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>
>>
>>
>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>>> semantics (EOS). I think it is questionable to exclude Runners from
>>> inside a transform, but I see that the intention was to save users from
>>> surprises.
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>> KafkaProducer supports exactly-once. It simply commits the pending
>>> transaction once it has completed a checkpoint.
>>>
>>
>>
>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>> implemented in Beam's context.
>> See  this message
>> <https://www.mail-archive.com/[email protected]/msg02664.html> in that
>> dev thread. Has anything changed in this regard? The whole thread is
>> relevant to this topic and worth going through.
>>
>
> I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
> would probably want to directly use notifySnapshotComplete in order to
> implement @RequiresStableInput.
>
>>
>>
>>>
>>> A checkpoint is realized by sending barriers through all channels
>>> starting from the source until reaching all sinks. Every operator
>>> persists its state once it has received a barrier on all its input
>>> channels, it then forwards it to the downstream operators.
>>>
>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>
>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>> GroupByKey -> ExactlyOnceWriter
>>>
>>> As I understood, Spark or Dataflow use the GroupByKey stages to persist
>>> the input. That is not required in Flink to be able to take a consistent
>>> snapshot of the pipeline.
>>>
>>> Basically, for Flink we don't need any of that magic that KafkaIO does.
>>> What we would need to support EOS is a way to tell the ExactlyOnceWriter
>>> (a DoFn) to commit once a checkpoint has completed.
>>
>> I know that the new version of SDF supports checkpointing which should
>>> solve this issue. But there is still a lot of work to do to make this
>>> reality.
>>>
>>
>> I don't see how SDF solves this problem.. May be pseudo code would make
>> more clear.  But if helps, that is great!
>>
>> So I think it would make sense to think about a way to make KafkaIO's
>>> EOS more accessible to Runners which support a different way of
>>> checkpointing.
>>>
>>
>> Absolutely. I would love to support EOS in KakaIO for Flink. I think that
>> will help many future exactly-once sinks.. and address fundamental
>> incompatibility between Beam model and Flink's horizontal checkpointing for
>> such applications.
>>
>> Raghu.
>>
>>
>>> Cheers,
>>> Max
>>>
>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>> defining an annotation only manifests the conceptual difference between
>>> the Runners.
>>>
>>>
>>> [1]
>>>
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>> [2]
>>>
>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>> [3]
>>>
>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>
>>

Reply via email to