Hi Andrew!

There’s nothing special about extending the checkpointing interfaces for the 
SinkFunction; for Flink they’re essentially user functions that have user state 
to be checkpointed.
So yes, you’ll just implement is as you would for a flatMap / map / etc. 
function.

Fell free to let me know if you bump into any questions.

Cheers,
Gordon


On January 16, 2017 at 11:37:30 PM, Andrew Roberts (arobe...@fuze.com) wrote:

Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to 
do something similar for our homegrown sinks. It sounds like just having the 
affected sinks participate in checkpointing is enough of a solution - is there 
anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I 
just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:

Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles 
around how we deal with the pending buffered requests with accordance to 
Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
description: https://issues.apache.org/jira/browse/FLINK-5487. What do you 
think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper 
at-least-once support in the Elasticsearch sinks, so I believe this will be 
brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.

Reply via email to