[ 
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15761341#comment-15761341
 ] 

ASF GitHub Bot commented on FLINK-5122:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2861#discussion_r93045771
  
    --- Diff: 
flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 ---
    @@ -186,22 +191,44 @@ public void beforeBulk(long executionId, BulkRequest 
request) {
     
                        @Override
                        public void afterBulk(long executionId, BulkRequest 
request, BulkResponse response) {
    +                           boolean allRequestsRepeatable = true;
                                if (response.hasFailures()) {
                                        for (BulkItemResponse itemResp : 
response.getItems()) {
                                                if (itemResp.isFailed()) {
    -                                                   LOG.error("Failed to 
index document in Elasticsearch: " + itemResp.getFailureMessage());
    -                                                   
failureThrowable.compareAndSet(null, new 
RuntimeException(itemResp.getFailureMessage()));
    +                                                   // Check if index 
request can be retried
    +                                                   String 
failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
    +                                                   if 
(failureMessageLowercase.contains("timeout") || 
failureMessageLowercase.contains("timed out") // Generic timeout errors
    +                                                                   || 
failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // 
Shard not available due to rebalancing or node down
    +                                                                   || 
(failureMessageLowercase.contains("data/write/bulk") && 
failureMessageLowercase.contains("bulk")) // Bulk index queue on node full 
    +                                                           ) {
    +                                                           
LOG.debug("Retry batch: " + itemResp.getFailureMessage());
    --- End diff --
    
    I can not assess how often retries are needed.
    Users can also manually increase the log level if needed. So we can leave 
it as is.
    However, I'm wondering whether we want to include a metric that counts the 
number of retries that occurred.


> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
>                 Key: FLINK-5122
>                 URL: https://issues.apache.org/jira/browse/FLINK-5122
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.2.0
>            Reporter: static-max
>            Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at 
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be 
> created and balanced. On those errors the bulk should be tried again instead 
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> UnavailableShardsException[[index-name][3] primary shard is not active 
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20] 
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR 
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
> Failed to index document in Elasticsearch: 
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
>  nested: EsRejectedExecutionException[rejected execution of 
> org.elasticsearch.transport.TransportService$4@727e677c on 
> EsThreadPoolExecutor[bulk, queue capacity = 1, 
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
>  pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 
> 2939]]];
> I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to