[
https://issues.apache.org/jira/browse/FLINK-5122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15842861#comment-15842861
]
ASF GitHub Bot commented on FLINK-5122:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/2861
Thanks! There wasn't any discussion on switching to a pure HTTP client, but
we might need to take into consideration the effort it will take and benefits
the result will offer if we were to go for that.
Regarding handling the `Throwable`: I'm currently trying out
`BulkItemResponse.getFailure().getStatus` which returns a `RestStatus`. From
the Javadocs, it seems like we can just target some of the status codes:
https://www.javadoc.io/doc/org.elasticsearch/elasticsearch/2.3.5. I'm currently
thinking about just handling `TOO_MANY_REQUESTS` (ex. EsRejectedExecutionEx is
one of these) and `INTERNAL_SERVER_ERROR` (the timeout exception has this
code). What do you think?
> Elasticsearch Sink loses documents when cluster has high load
> -------------------------------------------------------------
>
> Key: FLINK-5122
> URL: https://issues.apache.org/jira/browse/FLINK-5122
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.2.0
> Reporter: static-max
> Assignee: static-max
>
> My cluster had high load and documents got not indexed. This violates the "at
> least once" semantics in the ES connector.
> I gave pressure on my cluster to test Flink, causing new indices to be
> created and balanced. On those errors the bulk should be tried again instead
> of being discarded.
> Primary shard not active because ES decided to rebalance the index:
> 2016-11-15 15:35:16,123 ERROR
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink -
> Failed to index document in Elasticsearch:
> UnavailableShardsException[[index-name][3] primary shard is not active
> Timeout: [1m], request: [BulkShardRequest to [index-name] containing [20]
> requests]]
> Bulk queue on node full (I set queue to a low value to reproduce error):
> 22:37:57,702 ERROR
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink -
> Failed to index document in Elasticsearch:
> RemoteTransportException[[node1][192.168.1.240:9300][indices:data/write/bulk[s][p]]];
> nested: EsRejectedExecutionException[rejected execution of
> org.elasticsearch.transport.TransportService$4@727e677c on
> EsThreadPoolExecutor[bulk, queue capacity = 1,
> org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@51322d37[Running,
> pool size = 2, active threads = 2, queued tasks = 1, completed tasks =
> 2939]]];
> I can try to propose a PR for this.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)