Re: ES sink never receive error code

2021-05-24 Thread Yangze Guo
Jacky is right. It's a known issue and will be fixed in FLINK-21511.

Best,
Yangze Guo

On Tue, May 25, 2021 at 8:40 AM Jacky Yin 殷传旺  wrote:
>
> If you are using es connector 6.*, actually there is a deadlock bug if the 
> backoff is enabled. The 'retry' and 'flush' share one thread pool which has 
> only one thread. Sometimes the one holding the thread tries to get the 
> semaphore which is hold by the one who tries to get the thread. Therefore 
> please upgrade to connector 7.*.
>
> 
> 发件人: Qihua Yang 
> 发送时间: 2021年5月24日 23:17
> 收件人: Yangze Guo 
> 抄送: ro...@apache.org ; user 
> 主题: Re: ES sink never receive error code
>
> Got it! thanks for helping.
>
> On Thu, May 20, 2021 at 7:15 PM Yangze Guo  wrote:
>
> > So, ES BulkProcessor retried after bulk request was partially rejected. And 
> > eventually that request was sent successfully? That is why failure handler 
> > was not called?
>
> If the bulk request fails after the max number of retries
> (bulk.flush.backoff.retries), the failure handler will still be
> called.
>
>
> Best,
> Yangze Guo
>
> On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
> >
> > Thank you for the reply!
> > Yes, we did config bulk.flush.backoff.enable.
> > So, ES BulkProcessor retried after bulk request was partially rejected. And 
> > eventually that request was sent successfully? That is why failure handler 
> > was not called?
> >
> > Thanks,
> > Qihua
> >
> > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:
> >>
> >> Hi,
> >>
> >> Have you tried to change bulk.flush.backoff.enable?
> >> According to the docs [1], the underlying ES BulkProcessor will retry
> >> (by default), so the provided failure handler might not be called.
> >>
> >> [1]
> >> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >> >
> >> > Hello,
> >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data 
> >> > to ES by using bulk requests. From ES metrics, we observed some bulk 
> >> > thread pool rejections. Contacted AWS team, their explanation is part of 
> >> > bulk request was rejected. Response body should include status for each 
> >> > item. For bulk thread pool rejection, the error code is 429.
> >> > Our flink app override FailureHandler to process error cases.
> >> > I checked Flink code, it has AfterBulk() method to handle item errors. 
> >> > FailureHandler() never received any 429 error.
> >> > Is that flink issue? Or we need to config something to make it work?
> >> > Thanks,
> >> >
> >> > Qihua


回复: ES sink never receive error code

2021-05-24 Thread Jacky Yin 殷传旺
If you are using es connector 6.*, actually there is a deadlock bug if the 
backoff is enabled. The 'retry' and 'flush' share one thread pool which has 
only one thread. Sometimes the one holding the thread tries to get the 
semaphore which is hold by the one who tries to get the thread. Therefore 
please upgrade to connector 7.*.


发件人: Qihua Yang 
发送时间: 2021年5月24日 23:17
收件人: Yangze Guo 
抄送: ro...@apache.org ; user 
主题: Re: ES sink never receive error code

Got it! thanks for helping.

On Thu, May 20, 2021 at 7:15 PM Yangze Guo 
mailto:karma...@gmail.com>> wrote:
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang 
mailto:yang...@gmail.com>> wrote:
>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan 
> mailto:ro...@apache.org>> wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang 
>> mailto:yang...@gmail.com>> wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to 
>> > ES by using bulk requests. From ES metrics, we observed some bulk thread 
>> > pool rejections. Contacted AWS team, their explanation is part of bulk 
>> > request was rejected. Response body should include status for each item. 
>> > For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. 
>> > FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua


Re: ES sink never receive error code

2021-05-24 Thread Qihua Yang
Got it! thanks for helping.

On Thu, May 20, 2021 at 7:15 PM Yangze Guo  wrote:

> > So, ES BulkProcessor retried after bulk request was partially rejected.
> And eventually that request was sent successfully? That is why failure
> handler was not called?
>
> If the bulk request fails after the max number of retries
> (bulk.flush.backoff.retries), the failure handler will still be
> called.
>
>
> Best,
> Yangze Guo
>
> On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
> >
> > Thank you for the reply!
> > Yes, we did config bulk.flush.backoff.enable.
> > So, ES BulkProcessor retried after bulk request was partially rejected.
> And eventually that request was sent successfully? That is why failure
> handler was not called?
> >
> > Thanks,
> > Qihua
> >
> > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan 
> wrote:
> >>
> >> Hi,
> >>
> >> Have you tried to change bulk.flush.backoff.enable?
> >> According to the docs [1], the underlying ES BulkProcessor will retry
> >> (by default), so the provided failure handler might not be called.
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
> >>
> >> Regards,
> >> Roman
> >>
> >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >> >
> >> > Hello,
> >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream
> data to ES by using bulk requests. From ES metrics, we observed some bulk
> thread pool rejections. Contacted AWS team, their explanation is part of
> bulk request was rejected. Response body should include status for each
> item. For bulk thread pool rejection, the error code is 429.
> >> > Our flink app override FailureHandler to process error cases.
> >> > I checked Flink code, it has AfterBulk() method to handle item
> errors. FailureHandler() never received any 429 error.
> >> > Is that flink issue? Or we need to config something to make it work?
> >> > Thanks,
> >> >
> >> > Qihua
>


Re: ES sink never receive error code

2021-05-20 Thread Yangze Guo
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?

If the bulk request fails after the max number of retries
(bulk.flush.backoff.retries), the failure handler will still be
called.


Best,
Yangze Guo

On Fri, May 21, 2021 at 5:53 AM Qihua Yang  wrote:
>
> Thank you for the reply!
> Yes, we did config bulk.flush.backoff.enable.
> So, ES BulkProcessor retried after bulk request was partially rejected. And 
> eventually that request was sent successfully? That is why failure handler 
> was not called?
>
> Thanks,
> Qihua
>
> On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:
>>
>> Hi,
>>
>> Have you tried to change bulk.flush.backoff.enable?
>> According to the docs [1], the underlying ES BulkProcessor will retry
>> (by default), so the provided failure handler might not be called.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>>
>> Regards,
>> Roman
>>
>> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
>> >
>> > Hello,
>> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data to 
>> > ES by using bulk requests. From ES metrics, we observed some bulk thread 
>> > pool rejections. Contacted AWS team, their explanation is part of bulk 
>> > request was rejected. Response body should include status for each item. 
>> > For bulk thread pool rejection, the error code is 429.
>> > Our flink app override FailureHandler to process error cases.
>> > I checked Flink code, it has AfterBulk() method to handle item errors. 
>> > FailureHandler() never received any 429 error.
>> > Is that flink issue? Or we need to config something to make it work?
>> > Thanks,
>> >
>> > Qihua


Re: ES sink never receive error code

2021-05-20 Thread Qihua Yang
Thank you for the reply!
Yes, we did config bulk.flush.backoff.enable.
So, ES BulkProcessor retried after bulk request was partially rejected. And
eventually that request was sent successfully? That is why failure handler
was not called?

Thanks,
Qihua

On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan  wrote:

> Hi,
>
> Have you tried to change bulk.flush.backoff.enable?
> According to the docs [1], the underlying ES BulkProcessor will retry
> (by default), so the provided failure handler might not be called.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
> >
> > Hello,
> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data
> to ES by using bulk requests. From ES metrics, we observed some bulk thread
> pool rejections. Contacted AWS team, their explanation is part of bulk
> request was rejected. Response body should include status for each item.
> For bulk thread pool rejection, the error code is 429.
> > Our flink app override FailureHandler to process error cases.
> > I checked Flink code, it has AfterBulk() method to handle item errors.
> FailureHandler() never received any 429 error.
> > Is that flink issue? Or we need to config something to make it work?
> > Thanks,
> >
> > Qihua
>


Re: ES sink never receive error code

2021-05-20 Thread Roman Khachatryan
Hi,

Have you tried to change bulk.flush.backoff.enable?
According to the docs [1], the underlying ES BulkProcessor will retry
(by default), so the provided failure handler might not be called.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor

Regards,
Roman

On Thu, May 20, 2021 at 10:08 PM Qihua Yang  wrote:
>
> Hello,
> We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES 
> by using bulk requests. From ES metrics, we observed some bulk thread pool 
> rejections. Contacted AWS team, their explanation is part of bulk request was 
> rejected. Response body should include status for each item. For bulk thread 
> pool rejection, the error code is 429.
> Our flink app override FailureHandler to process error cases.
> I checked Flink code, it has AfterBulk() method to handle item errors. 
> FailureHandler() never received any 429 error.
> Is that flink issue? Or we need to config something to make it work?
> Thanks,
>
> Qihua


ES sink never receive error code

2021-05-20 Thread Qihua Yang
Hello,
We are using flink-connector-elasticsearch6_2.11 to ingest stream data to
ES by using bulk requests. From ES metrics, we observed some bulk thread
pool rejections. Contacted AWS team, their explanation is part of bulk
request was rejected. Response body should include status for each item.
For bulk thread pool rejection, the error code is 429.
Our flink app override FailureHandler to process error cases.
I checked Flink code, it has AfterBulk() method to handle item errors.
FailureHandler() never received any 429 error.
Is that flink issue? Or we need to config something to make it work?
Thanks,

Qihua