Oops, seems that Stephan's email covers my answer plus the plans to provide
transactional sinks :-)

On Thu, Aug 27, 2015 at 1:25 PM, Kostas Tzoumas <ktzou...@apache.org> wrote:

> Note that the definition of "exactly-once" means that records are
> guaranteed to be processed exactly once by Flink operators, and thus state
> updates to operator state happen exactly once (e.g., if C had a counter
> that x1, x2, and x3 incremented, the counter would have a value of 3 and
> not a value of 6). This is not specific to Flink, but the most accepted
> definition, and applicable to all stream processing systems. The reason is
> that the stream processor cannot by itself guarantee what happens to the
> outside world (the outside world is in this case the data sink).
>
> See the docs (
> https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html
> ):
>
> "Apache Flink offers a fault tolerance mechanism to consistently recover
> the state of data streaming applications. The mechanism ensures that even
> in the presence of failures, the program’s state will eventually reflect
> every record from the data stream exactly once."
>
> Guaranteeing exactly once delivery to the sink is possible, as Marton
> above suggests, but the sink implementation needs to be aware and take part
> in the checkpointing mechanism.
>
>
> On Thu, Aug 27, 2015 at 1:14 PM, Márton Balassi <balassi.mar...@gmail.com>
> wrote:
>
>> Dear Zhangrucong,
>>
>> From your explanation it seems that you have a good general understanding
>> of Flink's checkpointing algorithm. Your concern is valid, by default a
>> sink C with emits tuples to the "outside world" potentially multiple times.
>> A neat trick to solve this issue for your user defined sinks is to use the
>> CheckpointNotifier interface to output records only after the corresponding
>> checkpoint has been totally processed by the system, so sinks can also
>> provid exactly once guarantees in Flink.
>>
>> This would mean that your SinkFunction has to implement both the
>> Checkpointed and the CheckpointNotifier interfaces. The idea is to mark the
>> output tuples with the correspoding checkpoint id, so then they can be
>> emitted in a "consistent" manner when the checkpoint is globally
>> acknowledged by the system. You buffer your output records in a collection
>> of your choice and whenever a snapshotState of the Checkpointed interface
>> is invoked you mark your fresh output records with the current
>> checkpointID. Whenever the notifyCheckpointComplete is invoked you emit
>> records with the corresponding ID.
>>
>> Note that this adds latency to your processing and as you potentially
>> need to checkpoint a lot of data in the sinks I would recommend to use a
>> HDFS as a state backend instead of the default solution.
>>
>> Best,
>>
>> Marton
>>
>> On Thu, Aug 27, 2015 at 12:32 PM, Zhangrucong <zhangruc...@huawei.com>
>> wrote:
>>
>>> Hi:
>>>
>>>       The document said Flink can guarantee processing each tuple
>>> exactly-once, but I can not understand how it works.
>>>
>>>    For example, In Fig 1, C is running between snapshot n-1 and snapshot
>>> n(snapshot n hasn’t been generated). After snapshot n-1, C has processed
>>> tuple x1, x2, x3 and already outputted to user,  then C failed and it
>>> recoveries from snapshot n-1. In my opinion, x1, x2, x3 will be processed
>>> and outputted to user again. My question is how Flink guarantee x1,x2,x3
>>> are processed and outputted to user only once?
>>>
>>>
>>>
>>>
>>>
>>> Fig 1.
>>>
>>> Thanks for answing.
>>>
>>
>>
>

Reply via email to