[ https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stephen updated SPARK-30393: ---------------------------- Description: I have a spark application which consume from Kinesis with 6 shards. Data was produced to Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is enough to handle that. I use reduceByKeyAndWindow and mapWithState in the program and the sliding window is one hour long. Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time so the data incoming rate is very low like 200 records/sec. I run the Spark application by creating new context, checkpoint is created at s3, but when I kill the app and restarts, it failed to recover from checkpoint, and the error message is the following and my SparkUI shows all the batches are stucked, and it takes a long time for the checkpoint recovery to complete, 15 minutes to over an hour. I found lots of error message in the log related to Kinesis exceeding read limit: {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last exception: bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288) bq. at scala.Option.getOrElse(Option.scala:121) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133) bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) bq. at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) bq. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) bq. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) bq. at org.apache.spark.scheduler.Task.run(Task.scala:121) bq. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) bq. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) bq. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) bq. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) bq. at java.lang.Thread.run(Thread.java:748) bq. Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: e368b876-c315-d0f0-b513-e2af2bd14525) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) bq. at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) bq. at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269) bq. ... 20 more{quote} I see someone reported the similar problem https://issues.apache.org/jira/browse/SPARK-24970, not sure whether there is any fix for that. Since my batchinterval is 150 seconds, I have tried increase blockinterval to 1000ms (1 second) so that I have less number of partitions. But the problem still exists. I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, but still the problem exists. I also read that enable WAL is no longer necessary for spark version 2 beyond. I understand checkpoint recovery might be a lengthy process, but how do I eliminate the " ProvisionedThroughputExceededException" error, I think that is perhaps causing the slow checkpoint recovery. Thanks, can someone please help? was: I have a spark application which consume from Kinesis with 6 shards. Data was produced to Kinesis at at most 2000 records/second. At non peak time data only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is enough to handle that. I use reduceByKeyAndWindow and mapWithState in the program and the sliding window is one hour long. Recently I am trying to checkpoint the application to S3. I am testing this at nonpeak time so the data incoming rate is very low like 200 records/sec. I run the Spark application by creating new context, checkpoint is created at s3, but when I kill the app and restarts, it failed to recover from checkpoint, and the error message is the following and my SparkUI shows all the batches are stucked, and it takes a long time for the checkpoint recovery to complete, 15 minutes to over an hour. I found lots of error message in the log related to Kinesis exceeding read limit: {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): org.apache.spark.SparkException: Gave up after 3 retries while getting shard iterator from sequence number 49601654074184110438492229476281538439036626028298502210, last exception: bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288) bq. at scala.Option.getOrElse(Option.scala:121) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133) bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) bq. at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) bq. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) bq. at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) bq. at org.apache.spark.scheduler.Task.run(Task.scala:121) bq. at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) bq. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) bq. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) bq. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) bq. at java.lang.Thread.run(Thread.java:748) bq. Caused by: com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: Rate exceeded for shard shardId-000000000004 in stream my-stream-name under account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error Code: ProvisionedThroughputExceededException; Request ID: e368b876-c315-d0f0-b513-e2af2bd14525) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) bq. at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) bq. at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) bq. at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383) bq. at com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247) bq. at org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269) bq. ... 20 more{quote} I see someone reported the similar problem https://issues.apache.org/jira/browse/SPARK-24970, not sure whether there is any fix for that. Since my batchinterval is 150 seconds, I have tried increase blockinterval to 1000ms (1 second) so that I have less number of partitions. But the problem still exists. I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, but still the problem exists. I also read that enable WAL is no longer necessary from somewhere. I understand checkpoint recovery might be a lengthy process, but how do I eliminate the " ProvisionedThroughputExceededException" error, I think that is perhaps causing the slow checkpoint recovery. Thanks, can someone please help? > Too much ProvisionedThroughputExceededException while recover from checkpoint > ----------------------------------------------------------------------------- > > Key: SPARK-30393 > URL: https://issues.apache.org/jira/browse/SPARK-30393 > Project: Spark > Issue Type: Question > Components: DStreams > Affects Versions: 2.4.3 > Environment: I am using EMR 5.23.0, Spark 2.4.3, > spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty > of memory. 6 kinesis shards, I even increased to 12 shards but still see the > kinesis error > Reporter: Stephen > Priority: Major > Attachments: kinesisusagewhilecheckpointrecoveryerror.png, > sparkuiwhilecheckpointrecoveryerror.png > > > I have a spark application which consume from Kinesis with 6 shards. Data was > produced to Kinesis at at most 2000 records/second. At non peak time data > only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards > is enough to handle that. > I use reduceByKeyAndWindow and mapWithState in the program and the sliding > window is one hour long. > Recently I am trying to checkpoint the application to S3. I am testing this > at nonpeak time so the data incoming rate is very low like 200 records/sec. I > run the Spark application by creating new context, checkpoint is created at > s3, but when I kill the app and restarts, it failed to recover from > checkpoint, and the error message is the following and my SparkUI shows all > the batches are stucked, and it takes a long time for the checkpoint recovery > to complete, 15 minutes to over an hour. > I found lots of error message in the log related to Kinesis exceeding read > limit: > {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 > (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): > org.apache.spark.SparkException: Gave up after 3 retries while getting shard > iterator from sequence number > 49601654074184110438492229476281538439036626028298502210, last exception: > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288) > bq. at scala.Option.getOrElse(Option.scala:121) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133) > bq. at > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) > bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > bq. at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) > bq. at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > bq. at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > bq. at org.apache.spark.scheduler.Task.run(Task.scala:121) > bq. at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > bq. at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > bq. at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > bq. at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > bq. at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > bq. at java.lang.Thread.run(Thread.java:748) > bq. Caused by: > com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: > Rate exceeded for shard shardId-000000000004 in stream my-stream-name under > account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error > Code: ProvisionedThroughputExceededException; Request ID: > e368b876-c315-d0f0-b513-e2af2bd14525) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686) > bq. at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668) > bq. at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532) > bq. at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512) > bq. at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782) > bq. at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2749) > bq. at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2738) > bq. at > com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetShardIterator(AmazonKinesisClient.java:1383) > bq. at > com.amazonaws.services.kinesis.AmazonKinesisClient.getShardIterator(AmazonKinesisClient.java:1355) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$3.apply(KinesisBackedBlockRDD.scala:247) > bq. at > org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269) > bq. ... 20 more{quote} > I see someone reported the similar problem > https://issues.apache.org/jira/browse/SPARK-24970, not sure whether there is > any fix for that. > Since my batchinterval is 150 seconds, I have tried increase blockinterval to > 1000ms (1 second) so that I have less number of partitions. But the problem > still exists. > I also tried enable WAL, spark.streaming.receiver.writeAheadLog.enable=true, > but still the problem exists. I also read that enable WAL is no longer > necessary for spark version 2 beyond. > I understand checkpoint recovery might be a lengthy process, but how do I > eliminate the " ProvisionedThroughputExceededException" error, I think that > is perhaps causing the slow checkpoint recovery. > Thanks, can someone please help? -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org