On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles <[email protected]> wrote:

> I'm not sure what a hard fail is. I probably have a shallow understanding,
> but doesn't @RequiresStableInput work for 2PC? The preCommit() phase should
> establish the transaction and commit() is not called until after checkpoint
> finalization. Can you describe the way that it does not work a little bit
> more?
>

- preCommit() is called before checkpoint. Kafka EOS in Flink starts the
transaction before this and makes sure it flushes all records in
preCommit(). So far good.
- commit is called after checkpoint is persisted. Now, imagine commit()
fails for some reason. There is no option to rerun the 1st phase to write
the records again in a new transaction. This is a hard failure for the the
job. In practice Flink might attempt to commit again (not sure how many
times), which is likely to fail and eventually results in job failure.


> Kenn
>
> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <[email protected]> wrote:
>
>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <[email protected]> wrote:
>>
>>> I believe the way you would implement the logic behind Flink's
>>> KafkaProducer would be to have two steps:
>>>
>>> 1. Start transaction
>>> 2. @RequiresStableInput Close transaction
>>>
>>
>> I see.  What happens if closing the transaction fails in (2)? Flink's 2PC
>> requires that commit() should never hard fail once preCommit() succeeds. I
>> think that is cost of not having an extra shuffle. It is alright since this
>> policy has worked well for Flink so far.
>>
>> Overall, it will be great to have @RequiresStableInput support in Flink
>> runner.
>>
>> Raghu.
>>
>>> The FlinkRunner would need to insert the "wait until checkpoint
>>> finalization" logic wherever it sees @RequiresStableInput, which is already
>>> what it would have to do.
>>>
>>> This matches the KafkaProducer's logic - delay closing the transaction
>>> until checkpoint finalization. This answers my main question, which is "is
>>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>>> exactly once behavior with the same performance characteristics as native
>>> Flink checkpoint finalization?"
>>>
>>> Kenn
>>>
>>> [1] https://github.com/apache/beam/pull/7955
>>>
>>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <[email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <[email protected]> wrote:
>>>>
>>>>>
>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>> transaction once it has completed a checkpoint.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>>>>> exactly-once
>>>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>>>> inside a transform, but I see that the intention was to save users
>>>>>> from
>>>>>> surprises.
>>>>>>
>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>> transaction once it has completed a checkpoint.
>>>>>>
>>>>>
>>>>>
>>>>> When we discussed this in Aug 2017, the understanding was that 2 Phase
>>>>> commit utility in Flink used to implement Flink's Kafka EOS could not be
>>>>> implemented in Beam's context.
>>>>> See  this message
>>>>> <https://www.mail-archive.com/[email protected]/msg02664.html> in
>>>>> that dev thread. Has anything changed in this regard? The whole thread is
>>>>> relevant to this topic and worth going through.
>>>>>
>>>>
>>>> I think that TwoPhaseCommit utility class wouldn't work. The Flink
>>>> runner would probably want to directly use notifySnapshotComplete in order
>>>> to implement @RequiresStableInput.
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> A checkpoint is realized by sending barriers through all channels
>>>>>> starting from the source until reaching all sinks. Every operator
>>>>>> persists its state once it has received a barrier on all its input
>>>>>> channels, it then forwards it to the downstream operators.
>>>>>>
>>>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>>>>
>>>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>>>>> GroupByKey -> ExactlyOnceWriter
>>>>>>
>>>>>> As I understood, Spark or Dataflow use the GroupByKey stages to
>>>>>> persist
>>>>>> the input. That is not required in Flink to be able to take a
>>>>>> consistent
>>>>>> snapshot of the pipeline.
>>>>>>
>>>>>> Basically, for Flink we don't need any of that magic that KafkaIO
>>>>>> does.
>>>>>> What we would need to support EOS is a way to tell the
>>>>>> ExactlyOnceWriter
>>>>>> (a DoFn) to commit once a checkpoint has completed.
>>>>>
>>>>> I know that the new version of SDF supports checkpointing which should
>>>>>> solve this issue. But there is still a lot of work to do to make this
>>>>>> reality.
>>>>>>
>>>>>
>>>>> I don't see how SDF solves this problem.. May be pseudo code would
>>>>> make more clear.  But if helps, that is great!
>>>>>
>>>>> So I think it would make sense to think about a way to make KafkaIO's
>>>>>> EOS more accessible to Runners which support a different way of
>>>>>> checkpointing.
>>>>>>
>>>>>
>>>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
>>>>> that will help many future exactly-once sinks.. and address fundamental
>>>>> incompatibility between Beam model and Flink's horizontal checkpointing 
>>>>> for
>>>>> such applications.
>>>>>
>>>>> Raghu.
>>>>>
>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>>>>> defining an annotation only manifests the conceptual difference
>>>>>> between
>>>>>> the Runners.
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>>>>> [2]
>>>>>>
>>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>>>>> [3]
>>>>>>
>>>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>>>>
>>>>>

Reply via email to