On Thu, Feb 28, 2019 at 2:42 PM Raghu Angadi <ang...@gmail.com> wrote:

> On Thu, Feb 28, 2019 at 2:34 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> I'm not sure what a hard fail is. I probably have a shallow
>> understanding, but doesn't @RequiresStableInput work for 2PC? The
>> preCommit() phase should establish the transaction and commit() is not
>> called until after checkpoint finalization. Can you describe the way that
>> it does not work a little bit more?
>>
>
> - preCommit() is called before checkpoint. Kafka EOS in Flink starts the
> transaction before this and makes sure it flushes all records in
> preCommit(). So far good.
> - commit is called after checkpoint is persisted. Now, imagine commit()
> fails for some reason. There is no option to rerun the 1st phase to write
> the records again in a new transaction. This is a hard failure for the the
> job. In practice Flink might attempt to commit again (not sure how many
> times), which is likely to fail and eventually results in job failure.
>

In Apache Beam, the records could be stored in state, and can be written
inside commit() to work around this issue. It could have scalability issues
if checkpoints are not frequent enough in Flink runner.

Raghu.


>
>
>> Kenn
>>
>> On Thu, Feb 28, 2019 at 1:25 PM Raghu Angadi <ang...@gmail.com> wrote:
>>
>>> On Thu, Feb 28, 2019 at 11:01 AM Kenneth Knowles <k...@apache.org>
>>> wrote:
>>>
>>>> I believe the way you would implement the logic behind Flink's
>>>> KafkaProducer would be to have two steps:
>>>>
>>>> 1. Start transaction
>>>> 2. @RequiresStableInput Close transaction
>>>>
>>>
>>> I see.  What happens if closing the transaction fails in (2)? Flink's
>>> 2PC requires that commit() should never hard fail once preCommit()
>>> succeeds. I think that is cost of not having an extra shuffle. It is
>>> alright since this policy has worked well for Flink so far.
>>>
>>> Overall, it will be great to have @RequiresStableInput support in Flink
>>> runner.
>>>
>>> Raghu.
>>>
>>>> The FlinkRunner would need to insert the "wait until checkpoint
>>>> finalization" logic wherever it sees @RequiresStableInput, which is already
>>>> what it would have to do.
>>>>
>>>> This matches the KafkaProducer's logic - delay closing the transaction
>>>> until checkpoint finalization. This answers my main question, which is "is
>>>> @RequiresStableInput expressive enough to allow Beam-on-Flink to have
>>>> exactly once behavior with the same performance characteristics as native
>>>> Flink checkpoint finalization?"
>>>>
>>>> Kenn
>>>>
>>>> [1] https://github.com/apache/beam/pull/7955
>>>>
>>>> On Thu, Feb 28, 2019 at 10:43 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 28, 2019 at 10:41 AM Raghu Angadi <ang...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's native
>>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>>> transaction once it has completed a checkpoint.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 28, 2019 at 9:59 AM Maximilian Michels <m...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I came across KafkaIO's Runner whitelist [1] for enabling
>>>>>>> exactly-once
>>>>>>> semantics (EOS). I think it is questionable to exclude Runners from
>>>>>>> inside a transform, but I see that the intention was to save users
>>>>>>> from
>>>>>>> surprises.
>>>>>>>
>>>>>>> Now why does the Flink Runner not support KafkaIO EOS? Flink's
>>>>>>> native
>>>>>>> KafkaProducer supports exactly-once. It simply commits the pending
>>>>>>> transaction once it has completed a checkpoint.
>>>>>>>
>>>>>>
>>>>>>
>>>>>> When we discussed this in Aug 2017, the understanding was that 2
>>>>>> Phase commit utility in Flink used to implement Flink's Kafka EOS could 
>>>>>> not
>>>>>> be implemented in Beam's context.
>>>>>> See  this message
>>>>>> <https://www.mail-archive.com/dev@beam.apache.org/msg02664.html> in
>>>>>> that dev thread. Has anything changed in this regard? The whole thread is
>>>>>> relevant to this topic and worth going through.
>>>>>>
>>>>>
>>>>> I think that TwoPhaseCommit utility class wouldn't work. The Flink
>>>>> runner would probably want to directly use notifySnapshotComplete in order
>>>>> to implement @RequiresStableInput.
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> A checkpoint is realized by sending barriers through all channels
>>>>>>> starting from the source until reaching all sinks. Every operator
>>>>>>> persists its state once it has received a barrier on all its input
>>>>>>> channels, it then forwards it to the downstream operators.
>>>>>>>
>>>>>>> The architecture of Beam's KafkaExactlyOnceSink is as follows[2]:
>>>>>>>
>>>>>>> Input -> AssignRandomShardIds -> GroupByKey -> AssignSequenceIds ->
>>>>>>> GroupByKey -> ExactlyOnceWriter
>>>>>>>
>>>>>>> As I understood, Spark or Dataflow use the GroupByKey stages to
>>>>>>> persist
>>>>>>> the input. That is not required in Flink to be able to take a
>>>>>>> consistent
>>>>>>> snapshot of the pipeline.
>>>>>>>
>>>>>>> Basically, for Flink we don't need any of that magic that KafkaIO
>>>>>>> does.
>>>>>>> What we would need to support EOS is a way to tell the
>>>>>>> ExactlyOnceWriter
>>>>>>> (a DoFn) to commit once a checkpoint has completed.
>>>>>>
>>>>>> I know that the new version of SDF supports checkpointing which
>>>>>>> should
>>>>>>> solve this issue. But there is still a lot of work to do to make
>>>>>>> this
>>>>>>> reality.
>>>>>>>
>>>>>>
>>>>>> I don't see how SDF solves this problem.. May be pseudo code would
>>>>>> make more clear.  But if helps, that is great!
>>>>>>
>>>>>> So I think it would make sense to think about a way to make KafkaIO's
>>>>>>> EOS more accessible to Runners which support a different way of
>>>>>>> checkpointing.
>>>>>>>
>>>>>>
>>>>>> Absolutely. I would love to support EOS in KakaIO for Flink. I think
>>>>>> that will help many future exactly-once sinks.. and address fundamental
>>>>>> incompatibility between Beam model and Flink's horizontal checkpointing 
>>>>>> for
>>>>>> such applications.
>>>>>>
>>>>>> Raghu.
>>>>>>
>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>> PS: I found this document about RequiresStableInput [3], but IMHO
>>>>>>> defining an annotation only manifests the conceptual difference
>>>>>>> between
>>>>>>> the Runners.
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1144
>>>>>>> [2]
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/988a22f01bb133dd65b3cc75e05978d695aed76c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L166
>>>>>>> [3]
>>>>>>>
>>>>>>> https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM
>>>>>>>
>>>>>>

Reply via email to