Hi,

Have you tried to change bulk.flush.backoff.enable?
According to the docs [1], the underlying ES BulkProcessor will retry
(by default), so the provided failure handler might not be called.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor

Regards,
Roman

On Thu, May 20, 2021 at 10:08 PM Qihua Yang <yang...@gmail.com> wrote:
>
> Hello,
> We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES 
> by using bulk requests. From ES metrics, we observed some bulk thread pool 
> rejections. Contacted AWS team, their explanation is part of bulk request was 
> rejected. Response body should include status for each item. For bulk thread 
> pool rejection, the error code is 429.
> Our flink app override FailureHandler to process error cases.
> I checked Flink code, it has AfterBulk() method to handle item errors. 
> FailureHandler() never received any 429 error.
> Is that flink issue? Or we need to config something to make it work?
> Thanks,
>
> Qihua

Reply via email to