Re: Can I make an Elasticsearch Sink effectively exactly once?

2019-02-21 Thread Dawid Wysakowicz
Hi,

The procedure you described will not give you exactly once semantics.
What the cited excerpt means is that a checkpoint will not be considered
finished until pending requests are acknowledged. It does not mean that
those requests are stored on the flink side. That said if an error
occurs before those requests are acknowledged. The job will be recovered
from previous successful checkpoint. Nevertheless some of the requests
that "belong" to current checkpoint might have been sent to ES at this
time, that's where the "at-least-once" delivery comes from.

If you do have a deterministic way of generating ElasticseachId this
semantic should be enough for you though. Any duplicates(by the id)
should be updated on the ES side.

Best,

Dawid

On 21/02/2019 14:26, Stephen Connolly wrote:
> From how I understand it:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html#elasticsearch-sinks-and-fault-tolerance
>
> the Flink Elasticsearch Sink guarantees at-least-once delivery of
> action requests to Elasticsearch clusters. It does so by waiting
> for all pending action requests in the BulkProcessor at the time
> of checkpoints. This effectively assures that all requests before
> the checkpoint was triggered have been successfully acknowledged
> by Elasticsearch, before proceeding to process more records sent
> to the sink.
>
>
> So I am thinking:
>
>   * If I put a .map(json -> json.set("_id",
> ElasticsearchId.generate()) in front of the Elasticsearch sink
>   * If I have a ActionRequestFailureHandler that drops any ID
> conflicts on the floor
>
> Would this give me exactly once output to Elasticsearch as the
> BulkProcessor's checkpoint would include the "_id" and thus in the
> event of a recovery the duplicates would be detected.
>
> Or is there some additional concern I need to be aware of?
>
> Thanks
>
> -stephenc


signature.asc
Description: OpenPGP digital signature


Can I make an Elasticsearch Sink effectively exactly once?

2019-02-21 Thread Stephen Connolly
>From how I understand it:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html#elasticsearch-sinks-and-fault-tolerance

the Flink Elasticsearch Sink guarantees at-least-once delivery of action
> requests to Elasticsearch clusters. It does so by waiting for all pending
> action requests in the BulkProcessor at the time of checkpoints. This
> effectively assures that all requests before the checkpoint was triggered
> have been successfully acknowledged by Elasticsearch, before proceeding to
> process more records sent to the sink.


So I am thinking:


   - If I put a .map(json -> json.set("_id", ElasticsearchId.generate()) in
   front of the Elasticsearch sink
   - If I have a ActionRequestFailureHandler that drops any ID conflicts on
   the floor

Would this give me exactly once output to Elasticsearch as the
BulkProcessor's checkpoint would include the "_id" and thus in the event of
a recovery the duplicates would be detected.

Or is there some additional concern I need to be aware of?

Thanks

-stephenc