Thanks Roman for your response and advice. >From my understanding increasing shards will increase throughput but still if >more than 5 requests are made per shard/per second, and since we have 20 apps >(and increasing) then the exception might occur. Please let me know if I have missed anything. Mans On Tuesday, June 16, 2020, 03:29:59 PM EDT, Roman Grebennikov <g...@dfdx.me> wrote: #yiv4708954190 p.yiv4708954190MsoNormal, #yiv4708954190 p.yiv4708954190MsoNoSpacing{margin:0;}Hi,
usually this exception is thrown by aws-java-sdk and means that your kinesis stream is hitting a throughput limit (what a surprise). We experienced the same thing when we had a single "event-bus" style stream and multiple flink apps reading from it. Each Kinesis partition has a limit of 5 poll operations per second. If you have a stream with 4 partitions and 30 jobs reading from it, I guess that each job is constantly hitting op limit for kinesis with default kinesis consumer settings and it does an exponential back-off (by just sleeping for a small period of time and then retrying). You have two options here: 1. scale up the kinesis stream, so there will be more partitions and higher overall throughput limits 2. tune kinesis consumer backoff parameters: Our current ones, for example, look like this: conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") // we poll every 2s conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // in case of throughput error, initial timeout is 2s conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "10000") // we can go up to 10s pause conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, "1.5") // multiplying pause to 1.5 on each next step conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and make up to 100 retries with best regards, Roman Grebennikov | g...@dfdx.me On Mon, Jun 15, 2020, at 13:45, M Singh wrote: Hi: I am using multiple (almost 30 and growing) Flink streaming applications that read from the same kinesis stream and get ProvisionedThroughputExceededException exception which fails the job. I have seen a reference http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E - which indicates there might be some solution perhaps in Flink 1.8/1.9. I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to ProvisionedThroughputExceededException - ASF JIRA is still open. So i wanted to find out 1. If this issue has been resolved and if so in which version ? 2. Is there any kinesis consumer with kinesis fanout available that can help address this issue ? 3. Is there any specific parameter in kinesis consumer config that can address this issue ? If there is any other pointer/documentation/reference, please let me know. Thanks