I'm not sure what a hard fail is. I probably have a shallow understanding,
but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should
establish the transaction and commit() is not called until after checkpoint
finalization. Can you describe the way that it does not work a little bit
more?

Kenn

On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <ang...@gmail.com> wrote:

> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> I believe the way you would implement the logic behind Flink's
>> KafkaProducer would be to have two steps:
>>
>> 1. Start transaction
>> 2. @RequiresStableInput Close transaction
>>
>
> I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
> requires that commit() should never hard fail once preCommit() succeeds. I
> think that is cost of not having an extra shuffle. It is alright since this
> policy has worked well for Flink so far.
>
> Overall, it will be great to have @RequiresStableInput support in Flink
> runner.
>
> Raghu.
>
>> The FlinkRunner would need to insert the "wait until checkpoint
>> finalization" logic wherever it sees @RequiresStableInput, which is already
>> what it would have to do.
>>
>> This matches the KafkaProducer's logic - delay closing the transaction
>> until checkpoint finalization. This answers my main question, which is "is
>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>> exactly once behavior with the same performance characteristics as native
>> Flink checkpoint finalization?"
>>
>> Kenn
>>
>> [1] https://github.com/apache/beam/pull/7955
>>
>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com> wrote:
>>>
>>>>
>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>> transaction once it has completed a checkpoint.
>>>>
>>>>
>>>>
>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I came across KafkaIO's Runner whitelist [1] for enabling exactly-once
>>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>>> inside a transform, but I see that the intention was to save users
>>>>> from
>>>>> surprises.
>>>>>
>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>> transaction once it has completed a checkpoint.
>>>>>
>>>>
>>>>
>>>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>>>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>>>> implemented in Beam's context.
>>>> See  this message
>>>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
>>>> that dev thread. Has anything changed in this regard? The whole thread is
>>>> relevant to this topic and worth going through.
>>>>
>>>
>>> I think that TwoPhaseCommit utility class wouldn't work. The Flink
>>> runner would probably want to directly use notifySnapshotComplete in order
>>> to implement @RequiresStableInput.
>>>
>>>>
>>>>
>>>>>
>>>>> A checkpoint is realized by sending barriers through all channels
>>>>> starting from the source until reaching all sinks. Every operator
>>>>> persists its state once it has received a barrier on all its input
>>>>> channels, it then forwards it to the downstream operators.
>>>>>
>>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>>>
>>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>>>> GroupByKey -> ExactlyOnceWriter
>>>>>
>>>>> As I understood, Spark or Dataflow use the GroupByKey stages to
>>>>> persist
>>>>> the input. That is not required in Flink to be able to take a
>>>>> consistent
>>>>> snapshot of the pipeline.
>>>>>
>>>>> Basically, for Flink we don't need any of that magic that KafkaIO
>>>>> does.
>>>>> What we would need to support EOS is a way to tell the
>>>>> ExactlyOnceWriter
>>>>> (a DoFn) to commit once a checkpoint has completed.
>>>>
>>>> I know that the new version of SDF supports checkpointing which should
>>>>> solve this issue. But there is still a lot of work to do to make this
>>>>> reality.
>>>>>
>>>>
>>>> I don't see how SDF solves this problem.. May be pseudo code would make
>>>> more clear.  But if helps, that is great!
>>>>
>>>> So I think it would make sense to think about a way to make KafkaIO's
>>>>> EOS more accessible to Runners which support a different way of
>>>>> checkpointing.
>>>>>
>>>>
>>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
>>>> that will help many future exactly-once sinks.. and address fundamental
>>>> incompatibility between Beam model and Flink's horizontal checkpointing for
>>>> such applications.
>>>>
>>>> Raghu.
>>>>
>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>>>> defining an annotation only manifests the conceptual difference
>>>>> between
>>>>> the Runners.
>>>>>
>>>>>
>>>>> [1]
>>>>>
>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>>>> [2]
>>>>>
>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>>>> [3]
>>>>>
>>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>>>
>>>>

Reply via email to