On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <k...@apache.org> wrote:

> I believe the way you would implement the logic behind Flink's
> KafkaProducer would be to have two steps:
>
> 1. Start transaction
> 2. @RequiresStableInput Close transaction
>

I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
requires that commit() should never hard fail once preCommit() succeeds. I
think that is cost of not having an extra shuffle. It is alright since this
policy has worked well for Flink so far.

Overall, it will be great to have @RequiresStableInput support in Flink
runner.

Raghu.

> The FlinkRunner would need to insert the "wait until checkpoint
> finalization" logic wherever it sees @RequiresStableInput, which is already
> what it would have to do.
>
> This matches the KafkaProducer's logic - delay closing the transaction
> until checkpoint finalization. This answers my main question, which is "is
> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
> exactly once behavior with the same performance characteristics as native
> Flink checkpoint finalization?"
>
> Kenn
>
> [1] https://github.com/apache/beam/pull/7955
>
> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com> wrote:
>>
>>>
>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>> transaction once it has completed a checkpoint.
>>>
>>>
>>>
>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>> inside a transform, but I see that the intention was to save users from
>>>> surprises.
>>>>
>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>> transaction once it has completed a checkpoint.
>>>>
>>>
>>>
>>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>>> implemented in Beam's context.
>>> See  this message
>>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
>>> that dev thread. Has anything changed in this regard? The whole thread is
>>> relevant to this topic and worth going through.
>>>
>>
>> I think that TwoPhaseCommit utility class wouldn't work. The Flink runner
>> would probably want to directly use notifySnapshotComplete in order to
>> implement @RequiresStableInput.
>>
>>>
>>>
>>>>
>>>> A checkpoint is realized by sending barriers through all channels
>>>> starting from the source until reaching all sinks. Every operator
>>>> persists its state once it has received a barrier on all its input
>>>> channels, it then forwards it to the downstream operators.
>>>>
>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>>
>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>>> GroupByKey -> ExactlyOnceWriter
>>>>
>>>> As I understood, Spark or Dataflow use the GroupByKey stages to persist
>>>> the input. That is not required in Flink to be able to take a
>>>> consistent
>>>> snapshot of the pipeline.
>>>>
>>>> Basically, for Flink we don't need any of that magic that KafkaIO does.
>>>> What we would need to support EOS is a way to tell the
>>>> ExactlyOnceWriter
>>>> (a DoFn) to commit once a checkpoint has completed.
>>>
>>> I know that the new version of SDF supports checkpointing which should
>>>> solve this issue. But there is still a lot of work to do to make this
>>>> reality.
>>>>
>>>
>>> I don't see how SDF solves this problem.. May be pseudo code would make
>>> more clear.  But if helps, that is great!
>>>
>>> So I think it would make sense to think about a way to make KafkaIO's
>>>> EOS more accessible to Runners which support a different way of
>>>> checkpointing.
>>>>
>>>
>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
>>> that will help many future exactly-once sinks.. and address fundamental
>>> incompatibility between Beam model and Flink's horizontal checkpointing for
>>> such applications.
>>>
>>> Raghu.
>>>
>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>>> defining an annotation only manifests the conceptual difference between
>>>> the Runners.
>>>>
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>>> [2]
>>>>
>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>>> [3]
>>>>
>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>>
>>>

Reply via email to