Thank you for the reply!
Yes, we did config bulk.flush.backoff.enable.
So, ES BulkProcessor retried after bulk request was partially rejected. And
eventually that request was sent successfully? That is why failure handler
was not called?

Thanks,
Qihua

On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> Have you tried to change bulk.flush.backoff.enable?
> According to the docs [1], the underlying ES BulkProcessor will retry
> (by default), so the provided failure handler might not be called.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 10:08 PM Qihua Yang <yang...@gmail.com> wrote:
> >
> > Hello,
> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data
> to ES by using bulk requests. From ES metrics, we observed some bulk thread
> pool rejections. Contacted AWS team, their explanation is part of bulk
> request was rejected. Response body should include status for each item.
> For bulk thread pool rejection, the error code is 429.
> > Our flink app override FailureHandler to process error cases.
> > I checked Flink code, it has AfterBulk() method to handle item errors.
> FailureHandler() never received any 429 error.
> > Is that flink issue? Or we need to config something to make it work?
> > Thanks,
> >
> > Qihua
>

Reply via email to