[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-31 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Environment: I am using EMR 5.25.0, Spark 2.4.3, 
spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty of 
memory. 6 kinesis shards, I even increased to 12 shards but still see the 
kinesis error  (was: I am using EMR 5.23.0, Spark 2.4.3, 
spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty of 
memory. 6 kinesis shards, I even increased to 12 shards but still see the 
kinesis error)

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -
>
> Key: SPARK-30393
> URL: https://issues.apache.org/jira/browse/SPARK-30393
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.3
> Environment: I am using EMR 5.25.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>Reporter: Stephen
>Priority: Major
> Attachments: kinesisexceedreadlimit.png, 
> kinesisusagewhilecheckpointrecoveryerror.png, 
> sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
> (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> bq. at scala.Option.getOrElse(Option.scala:121)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> bq. at 
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> bq. at 
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> bq. at java.lang.Thread.run(Thread.java:748)
> 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Attachment: kinesisexceedreadlimit.png

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -
>
> Key: SPARK-30393
> URL: https://issues.apache.org/jira/browse/SPARK-30393
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.3
> Environment: I am using EMR 5.23.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>Reporter: Stephen
>Priority: Major
> Attachments: kinesisexceedreadlimit.png, 
> kinesisusagewhilecheckpointrecoveryerror.png, 
> sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
> (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> bq. at scala.Option.getOrElse(Option.scala:121)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> bq. at 
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> bq. at 
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> bq. at java.lang.Thread.run(Thread.java:748)
> bq. Caused by: 
> com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
> Rate exceeded for shard shardId-0004 in stream my-stream-name under 
> account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
> Code: ProvisionedThroughputExceededException; Request ID: 
> e368b876-c315-d0f0-b513-e2af2bd14525)
> bq. at 
> 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Attachment: (was: kinesisexceedreadlimit.png)

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -
>
> Key: SPARK-30393
> URL: https://issues.apache.org/jira/browse/SPARK-30393
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.3
> Environment: I am using EMR 5.23.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>Reporter: Stephen
>Priority: Major
> Attachments: kinesisusagewhilecheckpointrecoveryerror.png, 
> sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
> (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> bq. at scala.Option.getOrElse(Option.scala:121)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> bq. at 
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> bq. at 
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> bq. at java.lang.Thread.run(Thread.java:748)
> bq. Caused by: 
> com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
> Rate exceeded for shard shardId-0004 in stream my-stream-name under 
> account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
> Code: ProvisionedThroughputExceededException; Request ID: 
> e368b876-c315-d0f0-b513-e2af2bd14525)
> bq. at 
> 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Attachment: kinesisexceedreadlimit.png

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -
>
> Key: SPARK-30393
> URL: https://issues.apache.org/jira/browse/SPARK-30393
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.3
> Environment: I am using EMR 5.23.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>Reporter: Stephen
>Priority: Major
> Attachments: kinesisexceedreadlimit.png, 
> kinesisusagewhilecheckpointrecoveryerror.png, 
> sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
> (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> bq. at scala.Option.getOrElse(Option.scala:121)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> bq. at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> bq. at 
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> bq. at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> bq. at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> bq. at 
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> bq. at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> bq. at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> bq. at java.lang.Thread.run(Thread.java:748)
> bq. Caused by: 
> com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
> Rate exceeded for shard shardId-0004 in stream my-stream-name under 
> account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
> Code: ProvisionedThroughputExceededException; Request ID: 
> e368b876-c315-d0f0-b513-e2af2bd14525)
> bq. at 
> 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

{quote}19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 
(TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

bq. {{{quote}??{{19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in 
stage 33.0 (TID 4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
bq. at scala.Option.getOrElse(Option.scala:121)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
bq. at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
bq. at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
bq. at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
bq. at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
bq. at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
bq. at org.apache.spark.scheduler.Task.run(Task.scala:121)
bq. at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
bq. at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
bq. at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
bq. at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
bq. at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
bq. at java.lang.Thread.run(Thread.java:748)
bq. Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
bq. at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
bq. at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
bq. at 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Description: 
I have a spark application which consume from Kinesis with 6 shards. Data was 
produced to Kinesis at at most 2000 records/second. At non peak time data only 
comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards is 
enough to handle that.

I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
window is one hour long.

Recently I am trying to checkpoint the application to S3. I am testing this at 
nonpeak time so the data incoming rate is very low like 200 records/sec. I run 
the Spark application by creating new context, checkpoint is created at s3, but 
when I kill the app and restarts, it failed to recover from checkpoint, and the 
error message is the following and my SparkUI shows all the batches are 
stucked, and it takes a long time for the checkpoint recovery to complete, 15 
minutes to over an hour.

I found lots of error message in the log related to Kinesis exceeding read 
limit:

??{{19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 
4452, ip-172-17-32-11.ec2.internal, executor 9): 
org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
iterator from sequence number 
49601654074184110438492229476281538439036626028298502210, last exception:
at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
at scala.Option.getOrElse(Option.scala:121)
at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-0004 in stream my-stream-name under 
account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
e368b876-c315-d0f0-b513-e2af2bd14525)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2782)

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Attachment: sparkuiwhilecheckpointrecoveryerror.png

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -
>
> Key: SPARK-30393
> URL: https://issues.apache.org/jira/browse/SPARK-30393
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.3
> Environment: I am using EMR 5.23.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>Reporter: Stephen
>Priority: Major
> Attachments: kinesisusagewhilecheckpointrecoveryerror.png, 
> sparkuiwhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {{19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 
> 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
> Rate exceeded for shard shardId-0004 in stream my-stream-name under 
> account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
> Code: ProvisionedThroughputExceededException; Request ID: 
> e368b876-c315-d0f0-b513-e2af2bd14525)
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
> at 
> 

[jira] [Updated] (SPARK-30393) Too much ProvisionedThroughputExceededException while recover from checkpoint

2019-12-30 Thread Stephen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephen updated SPARK-30393:

Attachment: kinesisusagewhilecheckpointrecoveryerror.png

> Too much ProvisionedThroughputExceededException while recover from checkpoint
> -
>
> Key: SPARK-30393
> URL: https://issues.apache.org/jira/browse/SPARK-30393
> Project: Spark
>  Issue Type: Question
>  Components: DStreams
>Affects Versions: 2.4.3
> Environment: I am using EMR 5.23.0, Spark 2.4.3, 
> spark-streaming-kinesis-asl 2.4.3 I have 6 r5.4xLarge in my cluster, plenty 
> of memory. 6 kinesis shards, I even increased to 12 shards but still see the 
> kinesis error
>Reporter: Stephen
>Priority: Major
> Attachments: kinesisusagewhilecheckpointrecoveryerror.png
>
>
> I have a spark application which consume from Kinesis with 6 shards. Data was 
> produced to Kinesis at at most 2000 records/second. At non peak time data 
> only comes in at 200 records/second. Each record is 0.5K Bytes. So 6 shards 
> is enough to handle that.
> I use reduceByKeyAndWindow and mapWithState in the program and the sliding 
> window is one hour long.
> Recently I am trying to checkpoint the application to S3. I am testing this 
> at nonpeak time so the data incoming rate is very low like 200 records/sec. I 
> run the Spark application by creating new context, checkpoint is created at 
> s3, but when I kill the app and restarts, it failed to recover from 
> checkpoint, and the error message is the following and my SparkUI shows all 
> the batches are stucked, and it takes a long time for the checkpoint recovery 
> to complete, 15 minutes to over an hour.
> I found lots of error message in the log related to Kinesis exceeding read 
> limit:
> {{19/12/24 00:15:21 WARN TaskSetManager: Lost task 571.0 in stage 33.0 (TID 
> 4452, ip-172-17-32-11.ec2.internal, executor 9): 
> org.apache.spark.SparkException: Gave up after 3 retries while getting shard 
> iterator from sequence number 
> 49601654074184110438492229476281538439036626028298502210, last exception:
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
> at scala.Option.getOrElse(Option.scala:121)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getKinesisIterator(KinesisBackedBlockRDD.scala:246)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:206)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
> at 
> org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
> Rate exceeded for shard shardId-0004 in stream my-stream-name under 
> account my-account-number. (Service: AmazonKinesis; Status Code: 400; Error 
> Code: ProvisionedThroughputExceededException; Request ID: 
> e368b876-c315-d0f0-b513-e2af2bd14525)
> at 
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
> at 
>