static-max created FLINK-5122:
---------------------------------

             Summary: Elasticsearch Sink loses documents when cluster has high 
load
                 Key: FLINK-5122
                 URL: https://issues.apache.org/jira/browse/FLINK-5122
             Project: Flink
          Issue Type: Bug
          Components: Streaming Connectors
    Affects Versions: 1.2.0
            Reporter: static-max


My cluster had high load and documents got not indexed. This violates the "at 
least once" semantics in the ES connector.

I gave pressure on my cluster to test Flink, causing new indices to be created 
and balanced. On those errors the bulk should be tried again instead of being 
discarded.

Primary shard not active because ES decided to rebalance the index:
2016-11-15 15:35:16,123 ERROR 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
Failed to index document in Elasticsearch: 
UnavailableShardsException[[index-name][3] primary shard is not active Timeout: 
[1m], request: [BulkShardRequest to [index-name] containing [20] requests]]

Bulk queue on node full (I set queue to a low value to reproduce error):
22:37:57,702 ERROR 
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink  - 
Failed to index document in Elasticsearch: 
RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
 nested: EsRejectedExecutionException[rejected execution of 
org.elasticsearch.transport.TransportService$4@727e677c on 
EsThreadPoolExecutor[bulk, queue capacity = 1, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running, 
pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 2939]]];

I can try to propose a PR for this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to