Hi, Have you tried to change bulk.flush.backoff.enable? According to the docs [1], the underlying ES BulkProcessor will retry (by default), so the provided failure handler might not be called.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor Regards, Roman On Thu, May 20, 2021 at 10:08 PM Qihua Yang <yang...@gmail.com> wrote: > > Hello, > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES > by using bulk requests. From ES metrics, we observed some bulk thread pool > rejections. Contacted AWS team, their explanation is part of bulk request was > rejected. Response body should include status for each item. For bulk thread > pool rejection, the error code is 429. > Our flink app override FailureHandler to process error cases. > I checked Flink code, it has AfterBulk() method to handle item errors. > FailureHandler() never received any 429 error. > Is that flink issue? Or we need to config something to make it work? > Thanks, > > Qihua