Repository: spark
Updated Branches:
  refs/heads/master 339b53a13 -> 46a64d1e0


[SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery

## What changes were proposed in this pull request?
added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps 
reduce the amount of data returned by kinesis api call making the recovery 
considerably faster

As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can 
also store the number of records. Which can later be used for api call.

## How was this patch tested?
The patch was manually tested

Apologies for any silly mistakes, opening first pull request

Author: Gaurav <gau...@techtinium.com>

Closes #16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46a64d1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46a64d1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46a64d1e

Branch: refs/heads/master
Commit: 46a64d1e0ae12c31e848f377a84fb28e3efb3699
Parents: 339b53a
Author: Gaurav <gau...@techtinium.com>
Authored: Mon Mar 6 10:41:49 2017 -0800
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Mon Mar 6 10:41:49 2017 -0800

----------------------------------------------------------------------
 .../kinesis/KinesisBackedBlockRDD.scala         | 25 +++++++++++++++-----
 .../streaming/kinesis/KinesisReceiver.scala     |  3 ++-
 .../kinesis/KinesisBackedBlockRDDSuite.scala    |  4 ++--
 .../streaming/kinesis/KinesisStreamSuite.scala  |  4 ++--
 4 files changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
index 23c4d99..0f1790b 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator
 /** Class representing a range of Kinesis sequence numbers. Both sequence 
numbers are inclusive. */
 private[kinesis]
 case class SequenceNumberRange(
-    streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: 
String)
+    streamName: String,
+    shardId: String,
+    fromSeqNumber: String,
+    toSeqNumber: String,
+    recordCount: Int)
 
 /** Class representing an array of Kinesis sequence number ranges */
 private[kinesis]
@@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator(
   private val client = new AmazonKinesisClient(credentials)
   private val streamName = range.streamName
   private val shardId = range.shardId
+  // AWS limits to maximum of 10k records per get call
+  private val maxGetRecordsLimit = 10000
 
   private var toSeqNumberReceived = false
   private var lastSeqNumber: String = null
@@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator(
 
         // If the internal iterator has not been initialized,
         // then fetch records from starting sequence number
-        internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, 
range.fromSeqNumber)
+        internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, 
range.fromSeqNumber,
+          range.recordCount)
       } else if (!internalIterator.hasNext) {
 
         // If the internal iterator does not have any more records,
         // then fetch more records after the last consumed sequence number
-        internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, 
lastSeqNumber)
+        internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, 
lastSeqNumber,
+          range.recordCount)
       }
 
       if (!internalIterator.hasNext) {
@@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator(
   /**
    * Get records starting from or after the given sequence number.
    */
-  private def getRecords(iteratorType: ShardIteratorType, seqNum: String): 
Iterator[Record] = {
+  private def getRecords(
+      iteratorType: ShardIteratorType,
+      seqNum: String,
+      recordCount: Int): Iterator[Record] = {
     val shardIterator = getKinesisIterator(iteratorType, seqNum)
-    val result = getRecordsAndNextKinesisIterator(shardIterator)
+    val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount)
     result._1
   }
 
@@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator(
    * to get records from Kinesis), and get the next shard iterator for next 
consumption.
    */
   private def getRecordsAndNextKinesisIterator(
-      shardIterator: String): (Iterator[Record], String) = {
+      shardIterator: String,
+      recordCount: Int): (Iterator[Record], String) = {
     val getRecordsRequest = new GetRecordsRequest
     getRecordsRequest.setRequestCredentials(credentials)
     getRecordsRequest.setShardIterator(shardIterator)
+    getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit))
     val getRecordsResult = retryOrTimeout[GetRecordsResult](
       s"getting records using shard iterator") {
         client.getRecords(getRecordsRequest)

http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 13fc54e..320728f 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -210,7 +210,8 @@ private[kinesis] class KinesisReceiver[T](
     if (records.size > 0) {
       val dataIterator = records.iterator().asScala.map(messageHandler)
       val metadata = SequenceNumberRange(streamName, shardId,
-        records.get(0).getSequenceNumber(), records.get(records.size() - 
1).getSequenceNumber())
+        records.get(0).getSequenceNumber(), records.get(records.size() - 
1).getSequenceNumber(),
+        records.size())
       blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 18a5a15..2c7b9c5 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -51,7 +51,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: 
Boolean)
       shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { 
_._2 }}
       shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
         val seqNumRange = SequenceNumberRange(
-          testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
+          testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, 
seqNumbers.size)
         (shardId, seqNumRange)
       }
       allRanges = shardIdToRange.values.toSeq
@@ -181,7 +181,7 @@ abstract class 
KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
 
     // Create the necessary ranges to use in the RDD
     val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
-      SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", 
"xxx", "yyy")))
+      SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", 
"xxx", "yyy", 1)))
     val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
       val range = shardIdToRange(shardIds(i + (numPartitions - 
numPartitionsInKinesis)))
       SequenceNumberRanges(Array(range))

http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 387a96f..afb55c8 100644
--- 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -119,13 +119,13 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
 
     // Generate block info data for testing
     val seqNumRanges1 = SequenceNumberRanges(
-      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
+      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67))
     val blockId1 = StreamBlockId(kinesisStream.id, 123)
     val blockInfo1 = ReceivedBlockInfo(
       0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, 
None))
 
     val seqNumRanges2 = SequenceNumberRanges(
-      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
+      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89))
     val blockId2 = StreamBlockId(kinesisStream.id, 345)
     val blockInfo2 = ReceivedBlockInfo(
       0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, 
None))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to