[ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17017930#comment-17017930
 ] 

Steve Loughran commented on SPARK-30393:
----------------------------------------

{{ProvisionedThroughputExceededException}} means your client(s) are making more 
requests per second than that AWS Endpoint permits.

Applications are expected to recognise this and perform some kind of 
exponential backoff. It looks suspiciously like the spark-kinetic module is not 
doing this.

If this is the case, I recommend doing so (it's what s3A does for S3 503's and 
DDB ProvisionedThroughput). See 
https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java#L390
 for the code to identify the problem; retries are straightforward as you can 
be confident the request was not processed.

In the meantime -reduce the number of workers trying to talk to that particular 
stream. AWS endpoint throttling means that their scalability can be sub-linear.

side issue: EMR's spark is a fork of AWS spark. You probably need to talk to 
them

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-30393
>                 URL: https://issues.apache.org/jira/browse/SPARK-30393
>             Project: Spark
>          Issue Type: Question
>          Components: DStreams
>    Affects Versions: 2.4.3
>         Environment: I am using EMR 5.25.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>            Reporter: Stephen
>            Priority: Major
>         Attachments: kinesisexceedreadlimit.png, 
> kinesisusagewhilecheckpointrecoveryerror.png, 
> sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
> (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> bq.         at scala.Option.getOrElse(Option.scala:121)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> bq.         at 
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq.         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq.         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> bq.         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq.         at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> bq.         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> bq.         at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> bq.         at org.apache.spark.scheduler.Task.run(Task.scala:121)
> bq.         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> bq.         at 
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> bq.         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> bq.         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> bq.         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> bq.         at java.lang.Thread.run(Thread.java:748)
> bq. Caused by: 
> com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
> Rate exceeded for shard shardId-000000000004 in stream my-stream-name under 
> account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
> Code: ProvisionedThroughputExceededException; Request ID: 
> e368b876-c315-d0f0-b513-e2af2bd14525)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
> bq.         at 
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
> bq.         at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)
> bq.         at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749)
> bq.         at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738)
> bq.         at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383)
> bq.         at 
> com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247)
> bq.         at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
> bq.         ... 20 more{quote}
> I see someone reported the similar problem 
> https://issues.apache.org/jira/browse/SPARK-24970, not sure whether there is 
> any fix for that.
> Since my batchinterval is 150 seconds, I have tried increase blockinterval to 
> 1000ms (1 second) so that I have less number of partitions. But the problem 
> still exists.
> I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, 
> but still the problem exists. I also read that enable WAL is no longer 
> necessary from beyond spark version 2.
> Could this be related to my hour long sliding window I kept in memory? 3600 
> seconds X 200records/second = 720K record, if the recovery process try to 
> load all of them into memory from kinesis, it will exceed my limit of 12 
> shards*2000record/sec/shard = 24K records/second? If so, wouldn't this be a 
> flaw as I don't need and can't afford 360 (peak time 3600) shards for this 
> app just for checkpointing purpose. 
> I understand checkpoint recovery might be a lengthy process, but how do I 
> eliminate the " ProvisionedThroughputExceededException" error, I think that 
> is perhaps causing the slow checkpoint recovery.
> In the attached screenshot "kinesisexceedreadlimit.png", one can see the 
> sharp increase of Get Record Count to nearly 3.8 million records in 5 minutes 
> interval during which the checkpoint recovery is happening. And Get Record 
> Success dropped to around 0.5.
> Thanks, can someone please help? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to