Kinesis ProvisionedThroughputExceededException

2020-06-15 Thread M Singh
Hi:
I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.

I have seen a reference 
http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.

So i wanted to find out 
1. If this issue has been resolved and if so in which version ?2. Is there any 
kinesis consumer with kinesis fanout available that can help address this issue 
?3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?
If there is any other pointer/documentation/reference, please let me know.
Thanks


Re: Kinesis ProvisionedThroughputExceededException

2020-06-16 Thread Roman Grebennikov
Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") // 
we poll every 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // in 
case of throughput error, initial timeout is 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // we 
can go up to 10s pause
 
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and make 
up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
> Hi:
> 
> I am using multiple (almost 30 and growing) Flink streaming applications that 
> read from the same kinesis stream and get 
> ProvisionedThroughputExceededException exception which fails the job.
> I have seen a reference 
> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
> 
> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
> ProvisionedThroughputExceededException - ASF JIRA 
>  is still open.
> 
> 
> So i wanted to find out 
> 
> 1. If this issue has been resolved and if so in which version ?
> 2. Is there any kinesis consumer with kinesis fanout available that can help 
> address this issue ?
> 3. Is there any specific parameter in kinesis consumer config that can 
> address this issue ?
> 
> If there is any other pointer/documentation/reference, please let me know.
> 
> Thanks
> 


Re: Kinesis ProvisionedThroughputExceededException

2020-06-16 Thread M Singh
 
Thanks Roman for your response and advice.
>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 
Please let me know if I have missed anything.
MansOn Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov 
 wrote:  
 
 #yiv4708954190 p.yiv4708954190MsoNormal, #yiv4708954190 
p.yiv4708954190MsoNoSpacing{margin:0;}Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 
http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks



  

Re: Kinesis ProvisionedThroughputExceededException

2020-06-17 Thread Roman Grebennikov
Hi,

It will occur if your job will reach SHARD_GETRECORDS_RETRIES consecutive 
failed attempts to pull the data from kinesis.
So if you scale up the topic in kinesis and tune a bit backoff parameters, you 
will lower the probability of this exception almost to zero (but with increased 
costs and worst-case latency).

But yes, this is a main drawback of managed solutions - as far as you reach a 
significant load, you need to pay more. Other managed option within AWS is to 
switch to MSK, managed Kafka, which has no such significant restrictions.

And the final option is to wait until FLINK-17688 
 will be implemented (using 
Kinesis enhanced fan-out, so Kinesis will push the data to consumer, instead of 
consumer periodically pulling the data).

Roman Grebennikov | g...@dfdx.me


On Wed, Jun 17, 2020, at 04:39, M Singh wrote:
> 
> 
> Thanks Roman for your response and advice.
> 
> From my understanding increasing shards will increase throughput but still if 
> more than 5 requests are made per shard/per second, and since we have 20 apps 
> (and increasing) then the exception might occur. 
> 
> Please let me know if I have missed anything.
> 
> Mans
> On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov  
> wrote:
> 
> 
> Hi, 
> 
> usually this exception is thrown by aws-java-sdk and means that your kinesis 
> stream is hitting a throughput limit (what a surprise). We experienced the 
> same thing when we had a single "event-bus" style stream and multiple flink 
> apps reading from it.
> 
> Each Kinesis partition has a limit of 5 poll operations per second. If you 
> have a stream with 4 partitions and 30 jobs reading from it, I guess that 
> each job is constantly hitting op limit for kinesis with default kinesis 
> consumer settings and it does an exponential back-off (by just sleeping for a 
> small period of time and then retrying).
> 
> You have two options here:
> 1. scale up the kinesis stream, so there will be more partitions and higher 
> overall throughput limits
> 2. tune kinesis consumer backoff parameters:
> 
> Our current ones, for example, look like this:
> 
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
> // we poll every 2s
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
> in case of throughput error, initial timeout is 2s
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
> we can go up to 10s pause
>  
> conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
>  "1.5") // multiplying pause to 1.5 on each next step
>  conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
> make up to 100 retries
> 
> with best regards,
> Roman Grebennikov | g...@dfdx.me
> 
> 
> On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
>> Hi:
>> 
>> I am using multiple (almost 30 and growing) Flink streaming applications 
>> that read from the same kinesis stream and get 
>> ProvisionedThroughputExceededException exception which fails the job.
>> I have seen a reference 
>> 
>> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
>> 
>> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due 
>> to ProvisionedThroughputExceededException - ASF JIRA 
>>  is still open.
>> 
>> 
>> So i wanted to find out 
>> 
>> 1. If this issue has been resolved and if so in which version ?
>> 2. Is there any kinesis consumer with kinesis fanout available that can help 
>> address this issue ?
>> 3. Is there any specific parameter in kinesis consumer config that can 
>> address this issue ?
>> 
>> If there is any other pointer/documentation/reference, please let me know.
>> 
>> Thanks
>> 
> 


Re: Kinesis ProvisionedThroughputExceededException

2020-06-18 Thread M Singh
 Thanks Roman for your response.  Mans
On Wednesday, June 17, 2020, 03:26:31 AM EDT, Roman Grebennikov 
 wrote:  
 
 #yiv4075825537 p.yiv4075825537MsoNormal, #yiv4075825537 
p.yiv4075825537MsoNoSpacing{margin:0;}Hi,

It will occur if your job will reach SHARD_GETRECORDS_RETRIES consecutive 
failed attempts to pull the data from kinesis.
So if you scale up the topic in kinesis and tune a bit backoff parameters, you 
will lower the probability of this exception almost to zero (but with increased 
costs and worst-case latency).

But yes, this is a main drawback of managed solutions - as far as you reach a 
significant load, you need to pay more. Other managed option within AWS is to 
switch to MSK, managed Kafka, which has no such significant restrictions.

And the final option is to wait until FLINK-17688 will be implemented (using 
Kinesis enhanced fan-out, so Kinesis will push the data to consumer, instead of 
consumer periodically pulling the data).

Roman Grebennikov | g...@dfdx.me


On Wed, Jun 17, 2020, at 04:39, M Singh wrote:



Thanks Roman for your response and advice.

>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 

Please let me know if I have missed anything.

Mans
On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov  
wrote:


Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "1") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 

http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks