[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875810#comment-15875810
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102184622
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
    @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
                                        if (response.hasFailures()) {
    -                                           for (BulkItemResponse itemResp 
: response.getItems()) {
    -                                                   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
    +                                           BulkItemResponse itemResponse;
    +                                           Throwable failure;
    +
    +                                           for (int i = 0; i < 
response.getItems().length; i++) {
    +                                                   itemResponse = 
response.getItems()[i];
    +                                                   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                                                        if (failure != null) {
    -                                                           
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
    -                                                           
failureThrowable.compareAndSet(null, failure);
    +                                                           
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
    +
    +                                                           if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
    +                                                                   
failureThrowable.compareAndSet(null, failure);
    +                                                           }
                                                        }
                                                }
                                        }
    +
    +                                   
numPendingRequests.getAndAdd(-request.numberOfActions());
                                }
     
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
    -                                   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
    -                                   failureThrowable.compareAndSet(null, 
failure);
    +                                   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
    +
    +                                   // whole bulk request failures are 
usually just temporary timeouts on
    +                                   // the Elasticsearch side; simply retry 
all action requests in the bulk
    +                                   for (ActionRequest action : 
request.requests()) {
    +                                           requestIndexer.add(action);
    +                                   }
    +
    +                                   
numPendingRequests.getAndAdd(-request.numberOfActions());
    --- End diff --
    
    The `BulkProcessorIndexer` will increment `numPendingRequests` whenever the 
user calls `add(ActionRequest)`. So, in your description, when the user re-adds 
the 500 requests, `numPendingRequests` first becomes `500+500=1000`. Then, we 
consider the failed 500 requests to have completed when this line is reached, 
so `numPendingRequests` becomes `1000-500=500`.


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to