[ https://issues.apache.org/jira/browse/FLINK-5487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15875787#comment-15875787 ]
ASF GitHub Bot commented on FLINK-5487: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/3358#discussion_r102181691 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -165,20 +286,36 @@ public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { - for (BulkItemResponse itemResp : response.getItems()) { - Throwable failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResp); + BulkItemResponse itemResponse; + Throwable failure; + + for (int i = 0; i < response.getItems().length; i++) { + itemResponse = response.getItems()[i]; + failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { - LOG.error("Failed Elasticsearch item request: {}", failure.getMessage(), failure); - failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); + + if (failureHandler.onFailure(request.requests().get(i), failure, requestIndexer)) { + failureThrowable.compareAndSet(null, failure); + } } } } + + numPendingRequests.getAndAdd(-request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure); - failureThrowable.compareAndSet(null, failure); + LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); + + // whole bulk request failures are usually just temporary timeouts on + // the Elasticsearch side; simply retry all action requests in the bulk + for (ActionRequest action : request.requests()) { + requestIndexer.add(action); + } + + numPendingRequests.getAndAdd(-request.numberOfActions()); --- End diff -- Lets say a bulk with 500 actions fails, so we re-add the bulk again, but subtract 500 actions from the pending requests. Now the bulk succeeds and we subtract 500 actions again. Which would make the num pending requests negative? and void the at least once guarantees? Am I overseeing something here? > Proper at-least-once support for ElasticsearchSink > -------------------------------------------------- > > Key: FLINK-5487 > URL: https://issues.apache.org/jira/browse/FLINK-5487 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Critical > > Discussion in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fault-tolerance-guarantees-of-Elasticsearch-sink-in-flink-elasticsearch2-td10982.html > Currently, the Elasticsearch Sink actually doesn't offer any guarantees for > message delivery. > For proper support of at-least-once, the sink will need to participate in > Flink's checkpointing: when snapshotting is triggered at the > {{ElasticsearchSink}}, we need to synchronize on the pending ES requests by > flushing the internal bulk processor. For temporary ES failures (see > FLINK-5122) that may happen on the flush, we should retry them before > returning from snapshotting and acking the checkpoint. If there are > non-temporary ES failures on the flush, the current snapshot should fail. -- This message was sent by Atlassian JIRA (v6.3.15#6346)