Hi Mingliang, Yes sounds like a good solution, I am not very familiar with ElasticSearch internals and APIs but will try to assist with the PR when ready. Best Regards Ahmed Hamdy
On Tue, 11 Jun 2024 at 07:07, Mingliang Liu <lium...@apache.org> wrote: > Thank you Ahmed for the explanation. > > The current Elasticsearch 8 connector already uses the > FatalExeptionClassifier for fatal / non-retriable requests [1]. It's very > similar to what you linked in the AWS connectors. Currently this is only > used for fully failed requests. The main problem I was concerned about is > the partial failures, when the Future of the client bulk request was not > completed exceptionally, but instead some items failed according to the > response. For those failed entries in the partially failed request, we > retry infinitely though retrying will not always help. > > To avoid problems of "too many failed but non-retryable request entries in > the buffer", I was thinking we can fail fast instead of infinitely > retrying. Alternatively, we can limit the maximum number of retrying per > rerecord. Like FLINK-35541 you shared for AWS connectors, I think a similar > approach in Elasticsearch 8 connector would be useful. Given a > non-retriable request entry, it will retry the request entries anyway but > will eventually fail after exhausting the retries. Having both sound like a > more comprehensive solution, as following sample: > > void handlePartiallyUnprocessedRequest( > Response response, Consumer requestResult) { > List<Request> requestsToRetry = new ArrayList<>(); > > for (Request r : response.failedItems()) { > if (!isRetryable(r.errorCode()) // we don't have this > check for ES 8, which could be 400 / 404 > || r.retryCount++ > maxRetry) { // FLINK-35541 could > help limit retries for all failed requests > throw new FlinkRuntimeException(); > } > requestsToRetry.add(r); > } > > requestResult.accept(requestsToRetry); > } > > [1] > > https://github.com/apache/flink-connector-elasticsearch/blob/da2ef1fa6d5edd3cf1328b11632929fd2c99f567/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L73-L82 > > > On Fri, Jun 7, 2024 at 3:42 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > > > Hi Mingliang, > > > > We already have a mechanism for detecting and propagating > > Fatal/Non-retryable exceptions[1]. We can use that in ElasticSearch > similar > > to what we do for AWS connectors[2]. Also, you can check AWS connectors > for > > how to add a fail-fast mechanism to disable retrying all along. > > > > > FLIP-451 proposes timeout for retrying which helps with un-acknowledged > > > requests, but not addressing the case when request gets processed and > > > failed items keep failing no matter how many times we retry. Correct me > > if > > > I'm wrong > > > > > yes you are correct, this is mainly to mitigate the issues arising from > > incorrect handling of requests in sink implementers. > > The Failure handling itself has always been assumed to be the Sink > > implementation responsibility, this is done in 3 levels > > - Classifying Fatal exceptions as mentioned above > > - Adding configuration to disable retries as mentioned above as well. > > - Adding mechanism to limit retries as in the proposed ticket for AWS > > connectors[3] > > > > In my opinion at least 1 and 3 are useful in this case for Elasticsearch, > > Adding classifiers and retry mechanisms for elasticsearch. > > > > Or we can allow users to configure > > > "drop/fail" behavior for non-retriable errors > > > > > > > I am not sure I follow this proposal, but in general while "Dropping" > > records seems to boost reliability, it breaks the at-least-once semantics > > and if you don't have proper tracing and debugging mechanisms we will be > > shooting ourselves in the foot. > > > > > > 1- > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java > > 2- > > > > > https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L227 > > > > 3-https://issues.apache.org/jira/browse/FLINK-35541 > > Best Regards > > Ahmed Hamdy > > > > > > On Thu, 6 Jun 2024 at 06:53, Mingliang Liu <lium...@apache.org> wrote: > > > > > Hi all, > > > > > > Currently the Elasticsearch 8 connector retries all items if the > request > > > fails as a whole, and retries failed items if the request has partial > > > failures [1]. I think this infinitely retries might be problematic in > > some > > > cases when retrying can never eventually succeed. For example, if the > > > request is 400 (bad request) or 404 (not found), retries do not help. > If > > > there are too many failed items non-retriable, new requests will get > > > processed less effectively. In extreme cases, it may stall the pipeline > > if > > > in-flight requests are occupied by those failed items. > > > > > > FLIP-451 proposes timeout for retrying which helps with un-acknowledged > > > requests, but not addressing the case when request gets processed and > > > failed items keep failing no matter how many times we retry. Correct me > > if > > > I'm wrong. > > > > > > One opinionated option is to fail fast for non-retriable errors like > 400 > > / > > > 404 and to drop items for 409. Or we can allow users to configure > > > "drop/fail" behavior for non-retriable errors. I prefer the latter. I > > > checked how LogStash ingests data to Elasticsearch and it takes a > similar > > > approach for non-retriable errors [2]. In my day job, we have a > > > dead-letter-queue in AsynSinkWriter for failed entries that exhaust > > > retries. I guess that is too specific to our setup and seems an > overkill > > > here for Elasticsearch connector. > > > > > > Any thoughts on this? > > > > > > [1] > > > > > > > > > https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170 > > > [2] > > > > > > > > > https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304 > > > > > >