Repository: spark Updated Branches: refs/heads/master 339b53a13 -> 46a64d1e0
[SPARK-19304][STREAMING][KINESIS] fix kinesis slow checkpoint recovery ## What changes were proposed in this pull request? added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps reduce the amount of data returned by kinesis api call making the recovery considerably faster As we are storing the `fromSeqNum` & `toSeqNum` in checkpoint metadata, we can also store the number of records. Which can later be used for api call. ## How was this patch tested? The patch was manually tested Apologies for any silly mistakes, opening first pull request Author: Gaurav <gau...@techtinium.com> Closes #16842 from Gauravshah/kinesis_checkpoint_recovery_fix_2_1_0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46a64d1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46a64d1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46a64d1e Branch: refs/heads/master Commit: 46a64d1e0ae12c31e848f377a84fb28e3efb3699 Parents: 339b53a Author: Gaurav <gau...@techtinium.com> Authored: Mon Mar 6 10:41:49 2017 -0800 Committer: Burak Yavuz <brk...@gmail.com> Committed: Mon Mar 6 10:41:49 2017 -0800 ---------------------------------------------------------------------- .../kinesis/KinesisBackedBlockRDD.scala | 25 +++++++++++++++----- .../streaming/kinesis/KinesisReceiver.scala | 3 ++- .../kinesis/KinesisBackedBlockRDDSuite.scala | 4 ++-- .../streaming/kinesis/KinesisStreamSuite.scala | 4 ++-- 4 files changed, 25 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 23c4d99..0f1790b 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -36,7 +36,11 @@ import org.apache.spark.util.NextIterator /** Class representing a range of Kinesis sequence numbers. Both sequence numbers are inclusive. */ private[kinesis] case class SequenceNumberRange( - streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) + streamName: String, + shardId: String, + fromSeqNumber: String, + toSeqNumber: String, + recordCount: Int) /** Class representing an array of Kinesis sequence number ranges */ private[kinesis] @@ -136,6 +140,8 @@ class KinesisSequenceRangeIterator( private val client = new AmazonKinesisClient(credentials) private val streamName = range.streamName private val shardId = range.shardId + // AWS limits to maximum of 10k records per get call + private val maxGetRecordsLimit = 10000 private var toSeqNumberReceived = false private var lastSeqNumber: String = null @@ -153,12 +159,14 @@ class KinesisSequenceRangeIterator( // If the internal iterator has not been initialized, // then fetch records from starting sequence number - internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber) + internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, range.fromSeqNumber, + range.recordCount) } else if (!internalIterator.hasNext) { // If the internal iterator does not have any more records, // then fetch more records after the last consumed sequence number - internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber) + internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, lastSeqNumber, + range.recordCount) } if (!internalIterator.hasNext) { @@ -191,9 +199,12 @@ class KinesisSequenceRangeIterator( /** * Get records starting from or after the given sequence number. */ - private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = { + private def getRecords( + iteratorType: ShardIteratorType, + seqNum: String, + recordCount: Int): Iterator[Record] = { val shardIterator = getKinesisIterator(iteratorType, seqNum) - val result = getRecordsAndNextKinesisIterator(shardIterator) + val result = getRecordsAndNextKinesisIterator(shardIterator, recordCount) result._1 } @@ -202,10 +213,12 @@ class KinesisSequenceRangeIterator( * to get records from Kinesis), and get the next shard iterator for next consumption. */ private def getRecordsAndNextKinesisIterator( - shardIterator: String): (Iterator[Record], String) = { + shardIterator: String, + recordCount: Int): (Iterator[Record], String) = { val getRecordsRequest = new GetRecordsRequest getRecordsRequest.setRequestCredentials(credentials) getRecordsRequest.setShardIterator(shardIterator) + getRecordsRequest.setLimit(Math.min(recordCount, this.maxGetRecordsLimit)) val getRecordsResult = retryOrTimeout[GetRecordsResult]( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 13fc54e..320728f 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -210,7 +210,8 @@ private[kinesis] class KinesisReceiver[T]( if (records.size > 0) { val dataIterator = records.iterator().asScala.map(messageHandler) val metadata = SequenceNumberRange(streamName, shardId, - records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber()) + records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber(), + records.size()) blockGenerator.addMultipleDataWithCallback(dataIterator, metadata) } } http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 18a5a15..2c7b9c5 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -51,7 +51,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => val seqNumRange = SequenceNumberRange( - testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last) + testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size) (shardId, seqNumRange) } allRanges = shardIdToRange.values.toSeq @@ -181,7 +181,7 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) // Create the necessary ranges to use in the RDD val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)( - SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))) + SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 1))) val realRanges = Array.tabulate(numPartitionsInKinesis) { i => val range = shardIdToRange(shardIds(i + (numPartitions - numPartitionsInKinesis))) SequenceNumberRanges(Array(range)) http://git-wip-us.apache.org/repos/asf/spark/blob/46a64d1e/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 387a96f..afb55c8 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -119,13 +119,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Generate block info data for testing val seqNumRanges1 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")) + SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67)) val blockId1 = StreamBlockId(kinesisStream.id, 123) val blockInfo1 = ReceivedBlockInfo( 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) val seqNumRanges2 = SequenceNumberRanges( - SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb")) + SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89)) val blockId2 = StreamBlockId(kinesisStream.id, 345) val blockInfo2 = ReceivedBlockInfo( 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org