[
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690712#comment-15690712
]
ASF GitHub Bot commented on FLINK-5122:
---------------------------------------
GitHub user static-max opened a pull request:
https://github.com/apache/flink/pull/2861
[FLINK-5122] Index requests will be retried if the error is only temp…
This PR will re-add index requests to the BulkProcessor if the error is
temporay, like
* Generel timeout errors
* No master
* UnavailableShardsException (Rebalancing, Node down)
* Bulk queue on node full
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/static-max/flink
flink-connector-elasticsearch2-robust
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2861.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2861
----
commit 2ea8bd099100203d73af9b3a5e616e6d6d1cd50d
Author: Max Kuklinski <[email protected]>
Date: 2016-11-23T16:54:11Z
[FLINK-5122] Index requests will be retried if the error is only temporary
on Elasticsearch side. Covered are: Timeouts, No Master,
UnavailableShardsException, bulk queue on node full
----
> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.2.0
> Reporter: static-max
> Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be
> created and balanced. On those errors the bulk should be tried again instead
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink -
> Failed to index document in Elasticsearch:
> UnavailableShardsException[[index-name][3] primary shard is not active
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20]
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink -
> Failed to index document in Elasticsearch:
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
> nested: EsRejectedExecutionException[rejected execution of
> org.elasticsearch.transport.TransportService$4@727e677c on
> EsThreadPoolExecutor[bulk, queue capacity = 1,
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
> pool size = 2, active threads = 2, queued tasks = 1, completed tasks =
> 2939]]];
> I can try to propose a PR for this.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)