This is not a bad take. It still makes a few assumptions

1) the output checkpoints the id of the last *known* ID that was *persisted* in 
kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out

On 05 Feb 2016, at 13:41, Niels Basjes 
<ni...@basjes.nl<mailto:ni...@basjes.nl>> wrote:

Hi,

Buffering the data (in all cases) would hurt the latency so much that Flink is 
effectively reverting to microbatching (where batch size is checkpoint period) 
with regards of the output.

My initial thoughts on how to solve this was as follows:
1) The output persists the ID of the last message it wrote to Kafka in the 
checkpoint.
2) Upon recovery the sink would
2a) Record the offset Kafka is at at that point in time
2b) For all 'new' messages validate if it must write this message by reading 
from Kafka (starting at the offset in the checkpoint) and if the message is 
already present it would skip it.
3) If a message arrives that has not yet written the message is written. Under 
the assumption that the messages arrive in the same order as before the sink 
can now simply run as normal.

This way the performance is only impacted in the (short) period after the 
recovery of a disturbance.

What do you think?

Niels Basjes



On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:
Hi Niels!

In general, exactly once output requires transactional cooperation from the 
target system. Kafka has that on the roadmap, we should be able to integrate 
that once it is out.
That means output is "committed" upon completed checkpoints, which guarantees 
nothing is written multiple times.

Chesnay is working on an interesting prototype as a generic solution (also for 
Kafka, while they don't have that feature):
It buffers the data in the sink persistently (using the fault tolerance state 
backends) and pushes the results out on notification of a completed checkpoint.
That gives you exactly once semantics, but involves an extra materialization of 
the data.


I think that there is actually a fundamental latency issue with "exactly once 
sinks", no matter how you implement them in any systems:
You can only commit once you are sure that everything went well, to a specific 
point where you are sure no replay will ever be needed.

So the latency in Flink for an exactly-once output would be at least the 
checkpoint interval.

I'm eager to hear your thoughts on this.

Greetings,
Stephan


On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes 
<ni...@basjes.nl<mailto:ni...@basjes.nl>> wrote:
Hi,

It is my understanding that the exactly-once semantics regarding the input from 
Kafka is based on the checkpointing in the source component retaining the 
offset where it was at the checkpoint moment.

My question is how does that work for a sink? How can I make sure that (in 
light of failures) each message that is read from Kafka (my input) is written 
to Kafka (my output) exactly once?


--
Best regards / Met vriendelijke groeten,

Niels Basjes




--
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to