bruce_zhao created SPARK-24970:
----------------------------------

             Summary: Spark Kinesis streaming application fails to recover from 
streaming checkpoint due to ProvisionedThroughputExceededException
                 Key: SPARK-24970
                 URL: https://issues.apache.org/jira/browse/SPARK-24970
             Project: Spark
          Issue Type: Bug
          Components: DStreams
    Affects Versions: 2.2.0
            Reporter: bruce_zhao


We're using Spark streaming to consume Kinesis data, and found that it reads 
more data from Kinesis and is easy to touch 
ProvisionedThroughputExceededException *when it recovers from streaming 
checkpoint*. 

 

Normally, it's a WARN in spark log. But when we have multiple streaming 
applications (i.e., 5 applications) to consume the same Kinesis stream, the 
situation becomes serious. *The application will fail to recover due to the 
following exception in driver.*  And one application failure will also affect 
the other running applications. 

 

 
{panel:title=Exception}


org.apache.spark.SparkException: Job aborted due to stage failure: 
{color:#FF0000}*Task 5 in stage 7.0 failed 4 times, most recent 
failure*:{color} Lost task 5.3 in stage 7.0 (TID 128, 
ip-172-31-14-36.ap-northeast-1.compute.internal, executor 1): 
org.apache.spark.SparkException: Gave up after 3 retries while getting records 
using shard iterator, last exception: at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$retryOrTimeout$2.apply(KinesisBackedBlockRDD.scala:288)
 at scala.Option.getOrElse(Option.scala:121) at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:282)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecordsAndNextKinesisIterator(KinesisBackedBlockRDD.scala:223)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getRecords(KinesisBackedBlockRDD.scala:207)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:162)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.getNext(KinesisBackedBlockRDD.scala:133)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at 
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
 at 
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954) at 
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 
Caused by: 
*{color:#FF0000}com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException:
 Rate exceeded for shard shardId-000000000000 in stream rellfsstream-an under 
account 1111111111111{color}.* (Service: AmazonKinesis; Status Code: 400; Error 
Code: ProvisionedThroughputExceededException; Request ID: 
d3520677-060e-14c4-8014-2886b6b75f03) at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1587)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1257)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1029)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:741)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:715)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:697)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:665)
 at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:647)
 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:511) at 
com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2219)
 at 
com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2195)
 at 
com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1004)
 at 
com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:980)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator$$anonfun$2.apply(KinesisBackedBlockRDD.scala:224)
 at 
org.apache.spark.streaming.kinesis.KinesisSequenceRangeIterator.retryOrTimeout(KinesisBackedBlockRDD.scala:269)
{panel}
 

 

After check the source code, we found it calls getBlockFromKinesis() to recover 
data and in this function it accesses Kinesis directly to read data. As all 
partitions in the BlockRDD will access Kinesis, and AWS Kinesis only supports 5 
concurrency reads per shard per second, it will touch 
ProvisionedThroughputExceededException easily. Even the code does some retries, 
it's still easy to fail when conflicts is heavy. 

 

 
{code:java}
// KinesisBackedBlockRDD.scala

def getBlockFromKinesis(): Iterator[T] = {
  val credentials = kinesisCreds.provider.getCredentials
  partition.seqNumberRanges.ranges.iterator.flatMap { range =>
    new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
      range, kinesisReadConfigs).map(messageHandler)
  }
}

if (partition.isBlockIdValid) {
  getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
} else {
  getBlockFromKinesis()
}
{code}
 

 

 

Why do we need to re-read data from Kinesis directly? Is there any way to avoid 
it under the current design?

Mostly, when we use Spark streaming, we will enable WAL. Then we can recover 
data from WAL, instead of re-reading from Kinesis directly.

I'd like to make a change for this. It may not be a perfect fix, but at least 
we can provide a choice to avoid this problem under current design. 

 

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to