[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874253#comment-15874253
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3358

    [FLINK-5487] [elasticsearch] At-least-once ElasticsearchSink

    This PR adds proper support for an at-least-once `ElasticsearchSink`. This 
is based on the pluggable error handling strategy functionality added in #3426, 
so only the last commit is relevant.
    
    Like the Kafka producer, the way it works is that pending requests not yet 
acknowledged by Elasticsearch needs to be flushed before proceeding with the 
next record from upstream.
    Slight difference is that for the `ElasticsearchSink`, since we're allowing 
re-adding failed requests back to the internal `BulkProcessor` (as part of 
#3426), we'll also need to wait for the re-added requests. The docs warn that 
if requests are re-added, it may lead to longer checkpoints since we need to 
wait for those too.
    
    Flushing is enabled by default, but we provide a `disableFlushOnCheckpoint` 
method to switch it off. The docs and Javadoc of the method warns the user how 
this would affect at-least-once delivery.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-5487

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3358.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3358
    
----
commit 6a826b8eb7a98e3d159999bc44d827df54c94fdd
Author: Max Kuklinski <max.kuklin...@live.de>
Date:   2016-11-23T16:54:11Z

    [FLINK-5122] [elasticsearch] Retry temporary Elasticsearch request errors.
    
    Covered exceptions are: Timeouts, No Master, UnavailableShardsException, 
bulk queue on node full

commit 9cb60c263fb0df9a8ccd82b33070e22085b5ab23
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-01-30T05:55:26Z

    [FLINK-5353] [elasticsearch] User-provided failure handler for 
ElasticsearchSink
    
    This commit fixes both FLINK-5353 and FLINK-5122. It allows users to 
implement a
    failure handler to control how failed action requests are dealt with.
    
    The commit also includes general improvements to FLINK-5122:
    1. Use the built-in backoff functionality in the Elasticsearch 
BulkProcessor (not
    available for Elasticsearch 1.x)
    2. Migrate the `checkErrorAndRetryBulk` functionality to the new failure 
handler

commit 1c448e3177c65ebc627bdd4ecfff76bbf209ddde
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-02-20T08:50:19Z

    [FLINK-5487] [elasticsearch] At-least-once Elasticsearch Sink

----


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to