[ 
https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875787#comment-15875787
 ] 

ASF GitHub Bot commented on FLINK-5487:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3358#discussion_r102181691
  
    --- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
    @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest 
request) { }
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, BulkResponse response) {
                                        if (response.hasFailures()) {
    -                                           for (BulkItemResponse itemResp 
: response.getItems()) {
    -                                                   Throwable failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResp);
    +                                           BulkItemResponse itemResponse;
    +                                           Throwable failure;
    +
    +                                           for (int i = 0; i < 
response.getItems().length; i++) {
    +                                                   itemResponse = 
response.getItems()[i];
    +                                                   failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                                                        if (failure != null) {
    -                                                           
LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), 
failure);
    -                                                           
failureThrowable.compareAndSet(null, failure);
    +                                                           
LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);
    +
    +                                                           if 
(failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) {
    +                                                                   
failureThrowable.compareAndSet(null, failure);
    +                                                           }
                                                        }
                                                }
                                        }
    +
    +                                   
numPendingRequests.getAndAdd(-request.numberOfActions());
                                }
     
                                @Override
                                public void afterBulk(long executionId, 
BulkRequest request, Throwable failure) {
    -                                   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure);
    -                                   failureThrowable.compareAndSet(null, 
failure);
    +                                   LOG.error("Failed Elasticsearch bulk 
request: {}", failure.getMessage(), failure.getCause());
    +
    +                                   // whole bulk request failures are 
usually just temporary timeouts on
    +                                   // the Elasticsearch side; simply retry 
all action requests in the bulk
    +                                   for (ActionRequest action : 
request.requests()) {
    +                                           requestIndexer.add(action);
    +                                   }
    +
    +                                   
numPendingRequests.getAndAdd(-request.numberOfActions());
    --- End diff --
    
    Lets say a bulk with 500 actions fails, so we re-add the bulk again, but 
subtract 500 actions from the pending requests.
    
    Now the bulk succeeds and we subtract 500 actions again. Which would make 
the num pending requests negative? and void the at least once guarantees?
    
    Am I overseeing something here?


> Proper at-least-once support for ElasticsearchSink
> --------------------------------------------------
>
>                 Key: FLINK-5487
>                 URL: https://issues.apache.org/jira/browse/FLINK-5487
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Discussion in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html
> Currently, the Elasticsearch Sink actually doesn't offer any guarantees for 
> message delivery.
> For proper support of at-least-once, the sink will need to participate in 
> Flink's checkpointing: when snapshotting is triggered at the 
> {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by 
> flushing the internal bulk processor. For temporary ES failures (see 
> FLINK-5122) that may happen on the flush, we should retry them before 
> returning from snapshotting and acking the checkpoint. If there are 
> non-temporary ES failures on the flush, the current snapshot should fail.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to