Thanks Roman for your response and advice.
>From my understanding increasing shards will increase throughput but still if 
>more than 5 requests are made per shard/per second, and since we have 20 apps 
>(and increasing) then the exception might occur. 
Please let me know if I have missed anything.
Mans    On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov 
<g...@dfdx.me> wrote:  
 
 #yiv4708954190 p.yiv4708954190MsoNormal, #yiv4708954190 
p.yiv4708954190MsoNoSpacing{margin:0;}Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") 
// we poll every 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // 
in case of throughput error, initial timeout is 2s
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "10000") // 
we can go up to 10s pause
    
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
    conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and 
make up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:

Hi:

I am using multiple (almost 30 and growing) Flink streaming applications that 
read from the same kinesis stream and get 
ProvisionedThroughputExceededException exception which fails the job.
I have seen a reference 
http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
 - which indicates there might be some solution perhaps in Flink 1.8/1.9.  

I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
ProvisionedThroughputExceededException - ASF JIRA is still open.


So i wanted to find out 

1. If this issue has been resolved and if so in which version ?
2. Is there any kinesis consumer with kinesis fanout available that can help 
address this issue ?
3. Is there any specific parameter in kinesis consumer config that can address 
this issue ?

If there is any other pointer/documentation/reference, please let me know.

Thanks



  

Reply via email to