Hi Mingliang,
Yes sounds like a good solution, I am not very familiar with ElasticSearch
internals and APIs but will try to assist with the PR when ready.
Best Regards
Ahmed Hamdy


On Tue, 11 Jun 2024 at 07:07, Mingliang Liu <lium...@apache.org> wrote:

> Thank you Ahmed for the explanation.
>
> The current Elasticsearch 8 connector already uses the
> FatalExeptionClassifier for fatal / non-retriable requests [1]. It's very
> similar to what you linked in the AWS connectors. Currently this is only
> used for fully failed requests. The main problem I was concerned about is
> the partial failures, when the Future of the client bulk request was not
> completed exceptionally, but instead some items failed according to the
> response. For those failed entries in the partially failed request, we
> retry infinitely though retrying will not always help.
>
> To avoid problems of "too many failed but non-retryable request entries in
> the buffer", I was thinking we can fail fast instead of infinitely
> retrying. Alternatively, we can limit the maximum number of retrying per
> rerecord. Like FLINK-35541 you shared for AWS connectors, I think a similar
> approach in Elasticsearch 8 connector would be useful. Given a
> non-retriable request entry, it will retry the request entries anyway but
> will eventually fail after exhausting the retries. Having both sound like a
> more comprehensive solution, as following sample:
>
>     void handlePartiallyUnprocessedRequest(
>             Response response, Consumer requestResult) {
>         List<Request> requestsToRetry = new ArrayList<>();
>
>         for (Request r : response.failedItems()) {
>             if (!isRetryable(r.errorCode())        // we don't have this
> check for ES 8, which could be 400 / 404
>                     || r.retryCount++ > maxRetry) {   // FLINK-35541 could
> help limit retries for all failed requests
>                 throw new FlinkRuntimeException();
>             }
>             requestsToRetry.add(r);
>         }
>
>         requestResult.accept(requestsToRetry);
>     }
>
> [1]
>
> https://github.com/apache/flink-connector-elasticsearch/blob/da2ef1fa6d5edd3cf1328b11632929fd2c99f567/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L73-L82
>
>
> On Fri, Jun 7, 2024 at 3:42 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>
> > Hi Mingliang,
> >
> > We already have a mechanism for detecting and propagating
> > Fatal/Non-retryable exceptions[1]. We can use that in ElasticSearch
> similar
> > to what we do for AWS connectors[2]. Also, you can check AWS connectors
> for
> > how to add a fail-fast mechanism to disable retrying all along.
> >
> > > FLIP-451 proposes timeout for retrying which helps with un-acknowledged
> > > requests, but not addressing the case when request gets processed and
> > > failed items keep failing no matter how many times we retry. Correct me
> > if
> > > I'm wrong
> > >
> > yes you are correct, this is mainly to mitigate the issues arising from
> > incorrect handling of requests in sink implementers.
> > The Failure handling itself has always been assumed to be the Sink
> > implementation responsibility, this is done in 3 levels
> > - Classifying Fatal exceptions as mentioned above
> > - Adding configuration to disable retries as mentioned above as well.
> > - Adding mechanism to limit retries as in the proposed ticket for AWS
> > connectors[3]
> >
> > In my opinion at least 1 and 3 are useful in this case for Elasticsearch,
> > Adding classifiers and retry mechanisms for elasticsearch.
> >
> > Or we can allow users to configure
> > > "drop/fail" behavior for non-retriable errors
> > >
> >
> > I am not sure I follow this proposal, but in general while "Dropping"
> > records seems to boost reliability, it breaks the at-least-once semantics
> > and if you don't have proper tracing and debugging mechanisms we will be
> > shooting ourselves in the foot.
> >
> >
> > 1-
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/throwable/FatalExceptionClassifier.java
> > 2-
> >
> >
> https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L227
> >
> > 3-https://issues.apache.org/jira/browse/FLINK-35541
> > Best Regards
> > Ahmed Hamdy
> >
> >
> > On Thu, 6 Jun 2024 at 06:53, Mingliang Liu <lium...@apache.org> wrote:
> >
> > > Hi all,
> > >
> > > Currently the Elasticsearch 8 connector retries all items if the
> request
> > > fails as a whole, and retries failed items if the request has partial
> > > failures [1]. I think this infinitely retries might be problematic in
> > some
> > > cases when retrying can never eventually succeed. For example, if the
> > > request is 400 (bad request) or 404 (not found), retries do not help.
> If
> > > there are too many failed items non-retriable, new requests will get
> > > processed less effectively. In extreme cases, it may stall the pipeline
> > if
> > > in-flight requests are occupied by those failed items.
> > >
> > > FLIP-451 proposes timeout for retrying which helps with un-acknowledged
> > > requests, but not addressing the case when request gets processed and
> > > failed items keep failing no matter how many times we retry. Correct me
> > if
> > > I'm wrong.
> > >
> > > One opinionated option is to fail fast for non-retriable errors like
> 400
> > /
> > > 404 and to drop items for 409. Or we can allow users to configure
> > > "drop/fail" behavior for non-retriable errors. I prefer the latter. I
> > > checked how LogStash ingests data to Elasticsearch and it takes a
> similar
> > > approach for non-retriable errors [2]. In my day job, we have a
> > > dead-letter-queue in AsynSinkWriter for failed entries that exhaust
> > > retries. I guess that is too specific to our setup and seems an
> overkill
> > > here for Elasticsearch connector.
> > >
> > > Any thoughts on this?
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink-connector-elasticsearch/blob/5d1f8d03e3cff197ed7fe30b79951e44808b48fe/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java#L152-L170
> > > [2]
> > >
> > >
> >
> https://github.com/logstash-plugins/logstash-output-elasticsearch/blob/main/lib/logstash/plugin_mixins/elasticsearch/common.rb#L283-L304
> > >
> >
>

Reply via email to