http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
new file mode 100644
index 0000000..15ac588
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import scala.reflect.ClassTag
+
+import com.amazonaws.regions.RegionUtils
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Duration, StreamingContext}
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, 
JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object KinesisUtils {
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note: The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *                       Kinesis `Record`, which contains both message data, 
and metadata.
+   */
+  def createStream[T: ClassTag](
+      ssc: StreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      messageHandler: Record => T): ReceiverInputDStream[T] = {
+    val cleanedHandler = ssc.sc.clean(messageHandler)
+    // Setting scope to override receiver stream's scope of "receiver stream"
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
+        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        cleanedHandler, None)
+    }
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   *  The given AWS credentials will get saved in DStream checkpoints if 
checkpointing
+   *  is enabled. Make sure that your checkpoint directory is secure.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *                       Kinesis `Record`, which contains both message data, 
and metadata.
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use 
DefaultAWSCredentialsProviderChain)
+   */
+  // scalastyle:off
+  def createStream[T: ClassTag](
+      ssc: StreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      messageHandler: Record => T,
+      awsAccessKeyId: String,
+      awsSecretKey: String): ReceiverInputDStream[T] = {
+    // scalastyle:on
+    val cleanedHandler = ssc.sc.clean(messageHandler)
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream[T](ssc, streamName, endpointUrl, 
validateRegion(regionName),
+        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        cleanedHandler, Some(SerializableAWSCredentials(awsAccessKeyId, 
awsSecretKey)))
+    }
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note: The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  def createStream(
+      ssc: StreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+    // Setting scope to override receiver stream's scope of "receiver stream"
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, 
validateRegion(regionName),
+        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        defaultMessageHandler, None)
+    }
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   *  The given AWS credentials will get saved in DStream checkpoints if 
checkpointing
+   *  is enabled. Make sure that your checkpoint directory is secure.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use 
DefaultAWSCredentialsProviderChain)
+   */
+  def createStream(
+      ssc: StreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      awsAccessKeyId: String,
+      awsSecretKey: String): ReceiverInputDStream[Array[Byte]] = {
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, 
validateRegion(regionName),
+        initialPositionInStream, kinesisAppName, checkpointInterval, 
storageLevel,
+        defaultMessageHandler, Some(SerializableAWSCredentials(awsAccessKeyId, 
awsSecretKey)))
+    }
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   *
+   *  - The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   *    on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   *    gets AWS credentials.
+   *  - The region of the `endpointUrl` will be used for DynamoDB and 
CloudWatch.
+   *  - The Kinesis application name used by the Kinesis Client Library (KCL) 
will be the app name
+   *    in [[org.apache.spark.SparkConf]].
+   *
+   * @param ssc StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Endpoint url of Kinesis service
+   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  @deprecated("use other forms of createStream", "1.4.0")
+  def createStream(
+      ssc: StreamingContext,
+      streamName: String,
+      endpointUrl: String,
+      checkpointInterval: Duration,
+      initialPositionInStream: InitialPositionInStream,
+      storageLevel: StorageLevel
+    ): ReceiverInputDStream[Array[Byte]] = {
+    ssc.withNamedScope("kinesis stream") {
+      new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl,
+        getRegionByEndpoint(endpointUrl), initialPositionInStream, 
ssc.sc.appName,
+        checkpointInterval, storageLevel, defaultMessageHandler, None)
+    }
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note: The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   *
+   * @param jssc Java StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *                       Kinesis `Record`, which contains both message data, 
and metadata.
+   * @param recordClass Class of the records in DStream
+   */
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      messageHandler: JFunction[Record, T],
+      recordClass: Class[T]): JavaReceiverInputDStream[T] = {
+    implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
+    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
+    createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, 
regionName,
+      initialPositionInStream, checkpointInterval, storageLevel, 
cleanedHandler)
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   * The given AWS credentials will get saved in DStream checkpoints if 
checkpointing
+   * is enabled. Make sure that your checkpoint directory is secure.
+   *
+   * @param jssc Java StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param messageHandler A custom message handler that can generate a 
generic output from a
+   *                       Kinesis `Record`, which contains both message data, 
and metadata.
+   * @param recordClass Class of the records in DStream
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use 
DefaultAWSCredentialsProviderChain)
+   */
+  // scalastyle:off
+  def createStream[T](
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      messageHandler: JFunction[Record, T],
+      recordClass: Class[T],
+      awsAccessKeyId: String,
+      awsSecretKey: String): JavaReceiverInputDStream[T] = {
+    // scalastyle:on
+    implicit val recordCmt: ClassTag[T] = ClassTag(recordClass)
+    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call(_))
+    createStream[T](jssc.ssc, kinesisAppName, streamName, endpointUrl, 
regionName,
+      initialPositionInStream, checkpointInterval, storageLevel, 
cleanedHandler,
+      awsAccessKeyId, awsSecretKey)
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note: The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   *
+   * @param jssc Java StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[Array[Byte]] = {
+    createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, 
endpointUrl, regionName,
+      initialPositionInStream, checkpointInterval, storageLevel, 
defaultMessageHandler(_))
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   * The given AWS credentials will get saved in DStream checkpoints if 
checkpointing
+   * is enabled. Make sure that your checkpoint directory is secure.
+   *
+   * @param jssc Java StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use 
DefaultAWSCredentialsProviderChain)
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      awsAccessKeyId: String,
+      awsSecretKey: String): JavaReceiverInputDStream[Array[Byte]] = {
+    createStream[Array[Byte]](jssc.ssc, kinesisAppName, streamName, 
endpointUrl, regionName,
+      initialPositionInStream, checkpointInterval, storageLevel,
+      defaultMessageHandler(_), awsAccessKeyId, awsSecretKey)
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   * - The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   *   on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   *   gets AWS credentials.
+   * - The region of the `endpointUrl` will be used for DynamoDB and 
CloudWatch.
+   * - The Kinesis application name used by the Kinesis Client Library (KCL) 
will be the app name in
+   *   [[org.apache.spark.SparkConf]].
+   *
+   * @param jssc Java StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Endpoint url of Kinesis service
+   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  @deprecated("use other forms of createStream", "1.4.0")
+  def createStream(
+      jssc: JavaStreamingContext,
+      streamName: String,
+      endpointUrl: String,
+      checkpointInterval: Duration,
+      initialPositionInStream: InitialPositionInStream,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[Array[Byte]] = {
+    createStream(
+      jssc.ssc, streamName, endpointUrl, checkpointInterval, 
initialPositionInStream, storageLevel)
+  }
+
+  private def getRegionByEndpoint(endpointUrl: String): String = {
+    RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+  }
+
+  private def validateRegion(regionName: String): String = {
+    Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse {
+      throw new IllegalArgumentException(s"Region name '$regionName' is not 
valid")
+    }
+  }
+
+  private[kinesis] def defaultMessageHandler(record: Record): Array[Byte] = {
+    if (record == null) return null
+    val byteBuffer = record.getData()
+    val byteArray = new Array[Byte](byteBuffer.remaining())
+    byteBuffer.get(byteArray)
+    byteArray
+  }
+}
+
+/**
+ * This is a helper class that wraps the methods in KinesisUtils into more 
Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's 
KinesisUtils.
+ */
+private class KinesisUtilsPythonHelper {
+
+  def getInitialPositionInStream(initialPositionInStream: Int): 
InitialPositionInStream = {
+    initialPositionInStream match {
+      case 0 => InitialPositionInStream.LATEST
+      case 1 => InitialPositionInStream.TRIM_HORIZON
+      case _ => throw new IllegalArgumentException(
+        "Illegal InitialPositionInStream. Please use " +
+          "InitialPositionInStream.LATEST or 
InitialPositionInStream.TRIM_HORIZON")
+    }
+  }
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: Int,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      awsAccessKeyId: String,
+      awsSecretKey: String
+      ): JavaReceiverInputDStream[Array[Byte]] = {
+    if (awsAccessKeyId == null && awsSecretKey != null) {
+      throw new IllegalArgumentException("awsSecretKey is set but 
awsAccessKeyId is null")
+    }
+    if (awsAccessKeyId != null && awsSecretKey == null) {
+      throw new IllegalArgumentException("awsAccessKeyId is set but 
awsSecretKey is null")
+    }
+    if (awsAccessKeyId == null && awsSecretKey == null) {
+      KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, 
regionName,
+        getInitialPositionInStream(initialPositionInStream), 
checkpointInterval, storageLevel)
+    } else {
+      KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, 
regionName,
+        getInitialPositionInStream(initialPositionInStream), 
checkpointInterval, storageLevel,
+        awsAccessKeyId, awsSecretKey)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
 
b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
new file mode 100644
index 0000000..5c2371c
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Demonstrate the use of the KinesisUtils Java API
+ */
+public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testKinesisStream() {
+    // Tests the API, does not actually test data receiving
+    JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, 
"mySparkStream",
+        "https://kinesis.us-west-2.amazonaws.com";, new Duration(2000),
+        InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+
+    ssc.stop();
+  }
+
+
+  private static Function<Record, String> handler = new Function<Record, 
String>() {
+    @Override
+    public String call(Record record) {
+      return record.getPartitionKey() + "-" + record.getSequenceNumber();
+    }
+  };
+
+  @Test
+  public void testCustomHandler() {
+    // Tests the API, does not actually test data receiving
+    JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, 
"testApp", "mySparkStream",
+        "https://kinesis.us-west-2.amazonaws.com";, "us-west-2", 
InitialPositionInStream.LATEST,
+        new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, 
String.class);
+
+    ssc.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/test/resources/log4j.properties 
b/external/kinesis-asl/src/test/resources/log4j.properties
new file mode 100644
index 0000000..edbecda
--- /dev/null
+++ b/external/kinesis-asl/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark-project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
new file mode 100644
index 0000000..fdb270e
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import com.amazonaws.services.kinesis.producer.{KinesisProducer => 
KPLProducer, KinesisProducerConfiguration, UserRecordResult}
+import com.google.common.util.concurrent.{FutureCallback, Futures}
+
+private[kinesis] class KPLBasedKinesisTestUtils extends KinesisTestUtils {
+  override protected def getProducer(aggregate: Boolean): KinesisDataGenerator 
= {
+    if (!aggregate) {
+      new SimpleDataGenerator(kinesisClient)
+    } else {
+      new KPLDataGenerator(regionName)
+    }
+  }
+}
+
+/** A wrapper for the KinesisProducer provided in the KPL. */
+private[kinesis] class KPLDataGenerator(regionName: String) extends 
KinesisDataGenerator {
+
+  private lazy val producer: KPLProducer = {
+    val conf = new KinesisProducerConfiguration()
+      .setRecordMaxBufferedTime(1000)
+      .setMaxConnections(1)
+      .setRegion(regionName)
+      .setMetricsLevel("none")
+
+    new KPLProducer(conf)
+  }
+
+  override def sendData(streamName: String, data: Seq[Int]): Map[String, 
Seq[(Int, String)]] = {
+    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, 
String)]]()
+    data.foreach { num =>
+      val str = num.toString
+      val data = ByteBuffer.wrap(str.getBytes())
+      val future = producer.addUserRecord(streamName, str, data)
+      val kinesisCallBack = new FutureCallback[UserRecordResult]() {
+        override def onFailure(t: Throwable): Unit = {} // do nothing
+
+        override def onSuccess(result: UserRecordResult): Unit = {
+          val shardId = result.getShardId
+          val seqNumber = result.getSequenceNumber()
+          val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+            new ArrayBuffer[(Int, String)]())
+          sentSeqNumbers += ((num, seqNumber))
+        }
+      }
+      Futures.addCallback(future, kinesisCallBack)
+    }
+    producer.flushSync()
+    shardIdToSeqNumbers.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
new file mode 100644
index 0000000..2555332
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException}
+import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, 
StreamBlockId}
+
+abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
+  extends KinesisFunSuite with BeforeAndAfterEach with LocalSparkContext {
+
+  private val testData = 1 to 8
+
+  private var testUtils: KinesisTestUtils = null
+  private var shardIds: Seq[String] = null
+  private var shardIdToData: Map[String, Seq[Int]] = null
+  private var shardIdToSeqNumbers: Map[String, Seq[String]] = null
+  private var shardIdToDataAndSeqNumbers: Map[String, Seq[(Int, String)]] = 
null
+  private var shardIdToRange: Map[String, SequenceNumberRange] = null
+  private var allRanges: Seq[SequenceNumberRange] = null
+
+  private var blockManager: BlockManager = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    runIfTestsEnabled("Prepare KinesisTestUtils") {
+      testUtils = new KPLBasedKinesisTestUtils()
+      testUtils.createStream()
+
+      shardIdToDataAndSeqNumbers = testUtils.pushData(testData, aggregate = 
aggregateTestData)
+      require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to 
multiple shards")
+
+      shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
+      shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
+      shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { 
_._2 }}
+      shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
+        val seqNumRange = SequenceNumberRange(
+          testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last)
+        (shardId, seqNumRange)
+      }
+      allRanges = shardIdToRange.values.toSeq
+    }
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    val conf = new 
SparkConf().setMaster("local[4]").setAppName("KinesisBackedBlockRDDSuite")
+    sc = new SparkContext(conf)
+    blockManager = sc.env.blockManager
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      if (testUtils != null) {
+        testUtils.deleteStream()
+      }
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  testIfEnabled("Basic reading from Kinesis") {
+    // Verify all data using multiple ranges in a single RDD partition
+    val receivedData1 = new KinesisBackedBlockRDD[Array[Byte]](sc, 
testUtils.regionName,
+      testUtils.endpointUrl, fakeBlockIds(1),
+      Array(SequenceNumberRanges(allRanges.toArray))
+    ).map { bytes => new String(bytes).toInt }.collect()
+    assert(receivedData1.toSet === testData.toSet)
+
+    // Verify all data using one range in each of the multiple RDD partitions
+    val receivedData2 = new KinesisBackedBlockRDD[Array[Byte]](sc, 
testUtils.regionName,
+      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
+      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
+    ).map { bytes => new String(bytes).toInt }.collect()
+    assert(receivedData2.toSet === testData.toSet)
+
+    // Verify ordering within each partition
+    val receivedData3 = new KinesisBackedBlockRDD[Array[Byte]](sc, 
testUtils.regionName,
+      testUtils.endpointUrl, fakeBlockIds(allRanges.size),
+      allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
+    ).map { bytes => new String(bytes).toInt }.collectPartitions()
+    assert(receivedData3.length === allRanges.size)
+    for (i <- 0 until allRanges.size) {
+      assert(receivedData3(i).toSeq === shardIdToData(allRanges(i).shardId))
+    }
+  }
+
+  testIfEnabled("Read data available in both block manager and Kinesis") {
+    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 
2)
+  }
+
+  testIfEnabled("Read data available only in block manager, not in Kinesis") {
+    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 
0)
+  }
+
+  testIfEnabled("Read data available only in Kinesis, not in block manager") {
+    testRDD(numPartitions = 2, numPartitionsInBM = 0, numPartitionsInKinesis = 
2)
+  }
+
+  testIfEnabled("Read data available partially in block manager, rest in 
Kinesis") {
+    testRDD(numPartitions = 2, numPartitionsInBM = 1, numPartitionsInKinesis = 
1)
+  }
+
+  testIfEnabled("Test isBlockValid skips block fetching from block manager") {
+    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 
0,
+      testIsBlockValid = true)
+  }
+
+  testIfEnabled("Test whether RDD is valid after removing blocks from block 
anager") {
+    testRDD(numPartitions = 2, numPartitionsInBM = 2, numPartitionsInKinesis = 
2,
+      testBlockRemove = true)
+  }
+
+  /**
+   * Test the WriteAheadLogBackedRDD, by writing some partitions of the data 
to block manager
+   * and the rest to a write ahead log, and then reading reading it all back 
using the RDD.
+   * It can also test if the partitions that were read from the log were again 
stored in
+   * block manager.
+   *
+   *
+   *
+   * @param numPartitions Number of partitions in RDD
+   * @param numPartitionsInBM Number of partitions to write to the 
BlockManager.
+   *                          Partitions 0 to (numPartitionsInBM-1) will be 
written to BlockManager
+   * @param numPartitionsInKinesis Number of partitions to write to the 
Kinesis.
+   *                           Partitions (numPartitions - 1 - 
numPartitionsInKinesis) to
+   *                           (numPartitions - 1) will be written to Kinesis
+   * @param testIsBlockValid Test whether setting isBlockValid to false skips 
block fetching
+   * @param testBlockRemove Test whether calling rdd.removeBlock() makes the 
RDD still usable with
+   *                        reads falling back to the WAL
+   * Example with numPartitions = 5, numPartitionsInBM = 3, and 
numPartitionsInWAL = 4
+   *
+   *   numPartitionsInBM = 3
+   *   |------------------|
+   *   |                  |
+   *    0       1       2       3       4
+   *           |                         |
+   *           |-------------------------|
+   *              numPartitionsInKinesis = 4
+   */
+  private def testRDD(
+      numPartitions: Int,
+      numPartitionsInBM: Int,
+      numPartitionsInKinesis: Int,
+      testIsBlockValid: Boolean = false,
+      testBlockRemove: Boolean = false
+    ): Unit = {
+    require(shardIds.size > 1, "Need at least 2 shards to test")
+    require(numPartitionsInBM <= shardIds.size,
+      "Number of partitions in BlockManager cannot be more than the Kinesis 
test shards available")
+    require(numPartitionsInKinesis <= shardIds.size,
+      "Number of partitions in Kinesis cannot be more than the Kinesis test 
shards available")
+    require(numPartitionsInBM <= numPartitions,
+      "Number of partitions in BlockManager cannot be more than that in RDD")
+    require(numPartitionsInKinesis <= numPartitions,
+      "Number of partitions in Kinesis cannot be more than that in RDD")
+
+    // Put necessary blocks in the block manager
+    val blockIds = fakeBlockIds(numPartitions)
+    blockIds.foreach(blockManager.removeBlock(_))
+    (0 until numPartitionsInBM).foreach { i =>
+      val blockData = shardIdToData(shardIds(i)).iterator.map { 
_.toString.getBytes() }
+      blockManager.putIterator(blockIds(i), blockData, 
StorageLevel.MEMORY_ONLY)
+    }
+
+    // Create the necessary ranges to use in the RDD
+    val fakeRanges = Array.fill(numPartitions - numPartitionsInKinesis)(
+      SequenceNumberRanges(SequenceNumberRange("fakeStream", "fakeShardId", 
"xxx", "yyy")))
+    val realRanges = Array.tabulate(numPartitionsInKinesis) { i =>
+      val range = shardIdToRange(shardIds(i + (numPartitions - 
numPartitionsInKinesis)))
+      SequenceNumberRanges(Array(range))
+    }
+    val ranges = (fakeRanges ++ realRanges)
+
+
+    // Make sure that the left `numPartitionsInBM` blocks are in block 
manager, and others are not
+    require(
+      blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
+      "Expected blocks not in BlockManager"
+    )
+
+    require(
+      blockIds.drop(numPartitionsInBM).forall(blockManager.get(_).isEmpty),
+      "Unexpected blocks in BlockManager"
+    )
+
+    // Make sure that the right sequence `numPartitionsInKinesis` are 
configured, and others are not
+    require(
+      ranges.takeRight(numPartitionsInKinesis).forall {
+        _.ranges.forall { _.streamName == testUtils.streamName }
+      }, "Incorrect configuration of RDD, expected ranges not set: "
+    )
+
+    require(
+      ranges.dropRight(numPartitionsInKinesis).forall {
+        _.ranges.forall { _.streamName != testUtils.streamName }
+      }, "Incorrect configuration of RDD, unexpected ranges set"
+    )
+
+    val rdd = new KinesisBackedBlockRDD[Array[Byte]](
+      sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
+    val collectedData = rdd.map { bytes =>
+      new String(bytes).toInt
+    }.collect()
+    assert(collectedData.toSet === testData.toSet)
+
+    // Verify that the block fetching is skipped when isBlockValid is set to 
false.
+    // This is done by using a RDD whose data is only in memory but is set to 
skip block fetching
+    // Using that RDD will throw exception, as it skips block fetching even if 
the blocks are in
+    // in BlockManager.
+    if (testIsBlockValid) {
+      require(numPartitionsInBM === numPartitions, "All partitions must be in 
BlockManager")
+      require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
+      val rdd2 = new KinesisBackedBlockRDD[Array[Byte]](
+        sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, 
ranges,
+        isBlockIdValid = Array.fill(blockIds.length)(false))
+      intercept[SparkException] {
+        rdd2.collect()
+      }
+    }
+
+    // Verify that the RDD is not invalid after the blocks are removed and can 
still read data
+    // from write ahead log
+    if (testBlockRemove) {
+      require(numPartitions === numPartitionsInKinesis,
+        "All partitions must be in WAL for this test")
+      require(numPartitionsInBM > 0, "Some partitions must be in BlockManager 
for this test")
+      rdd.removeBlocks()
+      assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSet === 
testData.toSet)
+    }
+  }
+
+  /** Generate fake block ids */
+  private def fakeBlockIds(num: Int): Array[BlockId] = {
+    Array.tabulate(num) { i => new StreamBlockId(0, i) }
+  }
+}
+
+class WithAggregationKinesisBackedBlockRDDSuite
+  extends KinesisBackedBlockRDDTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisBackedBlockRDDSuite
+  extends KinesisBackedBlockRDDTests(aggregateTestData = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
new file mode 100644
index 0000000..e1499a8
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent.{ExecutorService, TimeoutException}
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.ManualClock
+
+class KinesisCheckpointerSuite extends TestSuiteBase
+  with MockitoSugar
+  with BeforeAndAfterEach
+  with PrivateMethodTester
+  with Eventually {
+
+  private val workerId = "dummyWorkerId"
+  private val shardId = "dummyShardId"
+  private val seqNum = "123"
+  private val otherSeqNum = "245"
+  private val checkpointInterval = Duration(10)
+  private val someSeqNum = Some(seqNum)
+  private val someOtherSeqNum = Some(otherSeqNum)
+
+  private var receiverMock: KinesisReceiver[Array[Byte]] = _
+  private var checkpointerMock: IRecordProcessorCheckpointer = _
+  private var kinesisCheckpointer: KinesisCheckpointer = _
+  private var clock: ManualClock = _
+
+  private val checkpoint = PrivateMethod[Unit]('checkpoint)
+
+  override def beforeEach(): Unit = {
+    receiverMock = mock[KinesisReceiver[Array[Byte]]]
+    checkpointerMock = mock[IRecordProcessorCheckpointer]
+    clock = new ManualClock()
+    kinesisCheckpointer = new KinesisCheckpointer(receiverMock, 
checkpointInterval, workerId, clock)
+  }
+
+  test("checkpoint is not called twice for the same sequence number") {
+    
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+
+    verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("checkpoint is called after sequence number increases") {
+    when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+      .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+    kinesisCheckpointer.invokePrivate(checkpoint(shardId, checkpointerMock))
+
+    verify(checkpointerMock, times(1)).checkpoint(seqNum)
+    verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+  }
+
+  test("should checkpoint if we have exceeded the checkpoint interval") {
+    when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+      .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+
+    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+    clock.advance(5 * checkpointInterval.milliseconds)
+
+    eventually(timeout(1 second)) {
+      verify(checkpointerMock, times(1)).checkpoint(seqNum)
+      verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
+    }
+  }
+
+  test("shouldn't checkpoint if we have not exceeded the checkpoint interval") 
{
+    
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+    clock.advance(checkpointInterval.milliseconds / 2)
+
+    verify(checkpointerMock, never()).checkpoint(anyString())
+  }
+
+  test("should not checkpoint for the same sequence number") {
+    
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+
+    clock.advance(checkpointInterval.milliseconds * 5)
+    eventually(timeout(1 second)) {
+      verify(checkpointerMock, atMost(1)).checkpoint(anyString())
+    }
+  }
+
+  test("removing checkpointer checkpoints one last time") {
+    
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+    kinesisCheckpointer.removeCheckpointer(shardId, checkpointerMock)
+    verify(checkpointerMock, times(1)).checkpoint(anyString())
+  }
+
+  test("if checkpointing is going on, wait until finished before removing and 
checkpointing") {
+    when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
+      .thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
+    when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
+      override def answer(invocations: InvocationOnMock): Unit = {
+        clock.waitTillTime(clock.getTimeMillis() + 
checkpointInterval.milliseconds / 2)
+      }
+    })
+
+    kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
+    clock.advance(checkpointInterval.milliseconds)
+    eventually(timeout(1 second)) {
+      verify(checkpointerMock, times(1)).checkpoint(anyString())
+    }
+    // don't block test thread
+    val f = Future(kinesisCheckpointer.removeCheckpointer(shardId, 
checkpointerMock))(
+      ExecutionContext.global)
+
+    intercept[TimeoutException] {
+      Await.ready(f, 50 millis)
+    }
+
+    clock.advance(checkpointInterval.milliseconds / 2)
+    eventually(timeout(1 second)) {
+      verify(checkpointerMock, times(2)).checkpoint(anyString())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
new file mode 100644
index 0000000..ee428f3
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Helper class that runs Kinesis real data transfer tests or
+ * ignores them based on env variable is set or not.
+ */
+trait KinesisFunSuite extends SparkFunSuite  {
+  import KinesisTestUtils._
+
+  /** Run the test if environment variable is set or ignore the test */
+  def testIfEnabled(testName: String)(testBody: => Unit) {
+    if (shouldRunTests) {
+      test(testName)(testBody)
+    } else {
+      ignore(s"$testName [enable by setting env var 
$envVarNameForEnablingTests=1]")(testBody)
+    }
+  }
+
+  /** Run the give body of code only if Kinesis tests are enabled */
+  def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
+    if (shouldRunTests) {
+      body
+    } else {
+      ignore(s"$message [enable by setting env var 
$envVarNameForEnablingTests=1]")()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
new file mode 100644
index 0000000..fd15b6c
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.Arrays
+
+import com.amazonaws.services.kinesis.clientlibrary.exceptions._
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+import org.mockito.Matchers._
+import org.mockito.Matchers.{eq => meq}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.streaming.{Duration, TestSuiteBase}
+import org.apache.spark.util.Utils
+
+/**
+ * Suite of Kinesis streaming receiver tests focusing mostly on the 
KinesisRecordProcessor
+ */
+class KinesisReceiverSuite extends TestSuiteBase with Matchers with 
BeforeAndAfter
+    with MockitoSugar {
+
+  val app = "TestKinesisReceiver"
+  val stream = "mySparkStream"
+  val endpoint = "endpoint-url"
+  val workerId = "dummyWorkerId"
+  val shardId = "dummyShardId"
+  val seqNum = "dummySeqNum"
+  val checkpointInterval = Duration(10)
+  val someSeqNum = Some(seqNum)
+
+  val record1 = new Record()
+  record1.setData(ByteBuffer.wrap("Spark In 
Action".getBytes(StandardCharsets.UTF_8)))
+  val record2 = new Record()
+  record2.setData(ByteBuffer.wrap("Learning 
Spark".getBytes(StandardCharsets.UTF_8)))
+  val batch = Arrays.asList(record1, record2)
+
+  var receiverMock: KinesisReceiver[Array[Byte]] = _
+  var checkpointerMock: IRecordProcessorCheckpointer = _
+
+  override def beforeFunction(): Unit = {
+    receiverMock = mock[KinesisReceiver[Array[Byte]]]
+    checkpointerMock = mock[IRecordProcessorCheckpointer]
+  }
+
+  test("check serializability of SerializableAWSCredentials") {
+    Utils.deserialize[SerializableAWSCredentials](
+      Utils.serialize(new SerializableAWSCredentials("x", "y")))
+  }
+
+  test("process records including store and set checkpointer") {
+    when(receiverMock.isStopped()).thenReturn(false)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+    recordProcessor.initialize(shardId)
+    recordProcessor.processRecords(batch, checkpointerMock)
+
+    verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, times(1)).addRecords(shardId, batch)
+    verify(receiverMock, times(1)).setCheckpointer(shardId, checkpointerMock)
+  }
+
+  test("shouldn't store and update checkpointer when receiver is stopped") {
+    when(receiverMock.isStopped()).thenReturn(true)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+    recordProcessor.processRecords(batch, checkpointerMock)
+
+    verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, never).addRecords(anyString, 
anyListOf(classOf[Record]))
+    verify(receiverMock, never).setCheckpointer(anyString, 
meq(checkpointerMock))
+  }
+
+  test("shouldn't update checkpointer when exception occurs during store") {
+    when(receiverMock.isStopped()).thenReturn(false)
+    when(
+      receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
+    ).thenThrow(new RuntimeException())
+
+    intercept[RuntimeException] {
+      val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+      recordProcessor.initialize(shardId)
+      recordProcessor.processRecords(batch, checkpointerMock)
+    }
+
+    verify(receiverMock, times(1)).isStopped()
+    verify(receiverMock, times(1)).addRecords(shardId, batch)
+    verify(receiverMock, never).setCheckpointer(anyString, 
meq(checkpointerMock))
+  }
+
+  test("shutdown should checkpoint if the reason is TERMINATE") {
+    
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+    recordProcessor.initialize(shardId)
+    recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
+
+    verify(receiverMock, times(1)).removeCheckpointer(meq(shardId), 
meq(checkpointerMock))
+  }
+
+
+  test("shutdown should not checkpoint if the reason is something other than 
TERMINATE") {
+    
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
+
+    val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId)
+    recordProcessor.initialize(shardId)
+    recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
+    recordProcessor.shutdown(checkpointerMock, null)
+
+    verify(receiverMock, times(2)).removeCheckpointer(meq(shardId),
+      meq[IRecordProcessorCheckpointer](null))
+  }
+
+  test("retry success on first attempt") {
+    val expectedIsStopped = false
+    when(receiverMock.isStopped()).thenReturn(expectedIsStopped)
+
+    val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+    assert(actualVal == expectedIsStopped)
+
+    verify(receiverMock, times(1)).isStopped()
+  }
+
+  test("retry success on second attempt after a Kinesis throttling exception") 
{
+    val expectedIsStopped = false
+    when(receiverMock.isStopped())
+        .thenThrow(new ThrottlingException("error message"))
+        .thenReturn(expectedIsStopped)
+
+    val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+    assert(actualVal == expectedIsStopped)
+
+    verify(receiverMock, times(2)).isStopped()
+  }
+
+  test("retry success on second attempt after a Kinesis dependency exception") 
{
+    val expectedIsStopped = false
+    when(receiverMock.isStopped())
+        .thenThrow(new KinesisClientLibDependencyException("error message"))
+        .thenReturn(expectedIsStopped)
+
+    val actualVal = 
KinesisRecordProcessor.retryRandom(receiverMock.isStopped(), 2, 100)
+    assert(actualVal == expectedIsStopped)
+
+    verify(receiverMock, times(2)).isStopped()
+  }
+
+  test("retry failed after a shutdown exception") {
+    when(checkpointerMock.checkpoint()).thenThrow(new ShutdownException("error 
message"))
+
+    intercept[ShutdownException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+    }
+
+    verify(checkpointerMock, times(1)).checkpoint()
+  }
+
+  test("retry failed after an invalid state exception") {
+    when(checkpointerMock.checkpoint()).thenThrow(new 
InvalidStateException("error message"))
+
+    intercept[InvalidStateException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+    }
+
+    verify(checkpointerMock, times(1)).checkpoint()
+  }
+
+  test("retry failed after unexpected exception") {
+    when(checkpointerMock.checkpoint()).thenThrow(new RuntimeException("error 
message"))
+
+    intercept[RuntimeException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+    }
+
+    verify(checkpointerMock, times(1)).checkpoint()
+  }
+
+  test("retry failed after exhausing all retries") {
+    val expectedErrorMessage = "final try error message"
+    when(checkpointerMock.checkpoint())
+        .thenThrow(new ThrottlingException("error message"))
+        .thenThrow(new ThrottlingException(expectedErrorMessage))
+
+    val exception = intercept[RuntimeException] {
+      KinesisRecordProcessor.retryRandom(checkpointerMock.checkpoint(), 2, 100)
+    }
+    exception.getMessage().shouldBe(expectedErrorMessage)
+
+    verify(checkpointerMock, times(2)).checkpoint()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
new file mode 100644
index 0000000..ca5d13d
--- /dev/null
+++ 
b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.amazonaws.regions.RegionUtils
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.Matchers._
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.kinesis.KinesisTestUtils._
+import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+import org.apache.spark.util.Utils
+
+abstract class KinesisStreamTests(aggregateTestData: Boolean) extends 
KinesisFunSuite
+  with Eventually with BeforeAndAfter with BeforeAndAfterAll {
+
+  // This is the name that KCL will use to save metadata to DynamoDB
+  private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
+  private val batchDuration = Seconds(1)
+
+  // Dummy parameters for API testing
+  private val dummyEndpointUrl = defaultEndpointUrl
+  private val dummyRegionName = 
RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
+  private val dummyAWSAccessKey = "dummyAccessKey"
+  private val dummyAWSSecretKey = "dummySecretKey"
+
+  private var testUtils: KinesisTestUtils = null
+  private var ssc: StreamingContext = null
+  private var sc: SparkContext = null
+
+  override def beforeAll(): Unit = {
+    val conf = new SparkConf()
+      .setMaster("local[4]")
+      .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis 
app name
+    sc = new SparkContext(conf)
+
+    runIfTestsEnabled("Prepare KinesisTestUtils") {
+      testUtils = new KPLBasedKinesisTestUtils()
+      testUtils.createStream()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    if (ssc != null) {
+      ssc.stop()
+    }
+    if (sc != null) {
+      sc.stop()
+    }
+    if (testUtils != null) {
+      // Delete the Kinesis stream as well as the DynamoDB table generated by
+      // Kinesis Client Library when consuming the stream
+      testUtils.deleteStream()
+      testUtils.deleteDynamoDBTable(appName)
+    }
+  }
+
+  before {
+    ssc = new StreamingContext(sc, batchDuration)
+  }
+
+  after {
+    if (ssc != null) {
+      ssc.stop(stopSparkContext = false)
+      ssc = null
+    }
+    if (testUtils != null) {
+      testUtils.deleteDynamoDBTable(appName)
+    }
+  }
+
+  test("KinesisUtils API") {
+    // Tests the API, does not actually test data receiving
+    val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
+      dummyEndpointUrl, Seconds(2),
+      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", 
"mySparkStream",
+      dummyEndpointUrl, dummyRegionName,
+      InitialPositionInStream.LATEST, Seconds(2), 
StorageLevel.MEMORY_AND_DISK_2)
+    val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", 
"mySparkStream",
+      dummyEndpointUrl, dummyRegionName,
+      InitialPositionInStream.LATEST, Seconds(2), 
StorageLevel.MEMORY_AND_DISK_2,
+      dummyAWSAccessKey, dummyAWSSecretKey)
+  }
+
+  test("RDD generation") {
+    val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
+      dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, 
Seconds(2),
+      StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
+    assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]])
+
+    val kinesisStream = 
inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]]
+    val time = Time(1000)
+
+    // Generate block info data for testing
+    val seqNumRanges1 = SequenceNumberRanges(
+      SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
+    val blockId1 = StreamBlockId(kinesisStream.id, 123)
+    val blockInfo1 = ReceivedBlockInfo(
+      0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, 
None))
+
+    val seqNumRanges2 = SequenceNumberRanges(
+      SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
+    val blockId2 = StreamBlockId(kinesisStream.id, 345)
+    val blockInfo2 = ReceivedBlockInfo(
+      0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, 
None))
+
+    // Verify that the generated KinesisBackedBlockRDD has the all the right 
information
+    val blockInfos = Seq(blockInfo1, blockInfo2)
+    val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
+    nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
+    val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
+    assert(kinesisRDD.regionName === dummyRegionName)
+    assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
+    assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
+    assert(kinesisRDD.awsCredentialsOption ===
+      Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
+    assert(nonEmptyRDD.partitions.size === blockInfos.size)
+    nonEmptyRDD.partitions.foreach { _ shouldBe a 
[KinesisBackedBlockRDDPartition] }
+    val partitions = nonEmptyRDD.partitions.map {
+      _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
+    assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, 
seqNumRanges2))
+    assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
+    assert(partitions.forall { _.isBlockIdValid === true })
+
+    // Verify that KinesisBackedBlockRDD is generated even when there are no 
blocks
+    val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
+    emptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+    emptyRDD.partitions shouldBe empty
+
+    // Verify that the KinesisBackedBlockRDD has isBlockValid = false when 
blocks are invalid
+    blockInfos.foreach { _.setBlockIdInvalid() }
+    kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { 
partition =>
+      
assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid 
=== false)
+    }
+  }
+
+
+  /**
+   * Test the stream by sending data to a Kinesis stream and receiving from it.
+   * This test is not run by default as it requires AWS credentials that the 
test
+   * environment may not have. Even if there is AWS credentials available, the 
user
+   * may not want to run these tests to avoid the Kinesis costs. To enable 
this test,
+   * you must have AWS credentials available through the default AWS provider 
chain,
+   * and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
+   */
+  testIfEnabled("basic operation") {
+    val awsCredentials = KinesisTestUtils.getAWSCredentials()
+    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+      testUtils.endpointUrl, testUtils.regionName, 
InitialPositionInStream.LATEST,
+      Seconds(10), StorageLevel.MEMORY_ONLY,
+      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+    stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
+      collected ++= rdd.collect()
+      logInfo("Collected = " + collected.mkString(", "))
+    }
+    ssc.start()
+
+    val testData = 1 to 10
+    eventually(timeout(120 seconds), interval(10 second)) {
+      testUtils.pushData(testData, aggregateTestData)
+      assert(collected === testData.toSet, "\nData received does not match 
data sent")
+    }
+    ssc.stop(stopSparkContext = false)
+  }
+
+  testIfEnabled("custom message handling") {
+    val awsCredentials = KinesisTestUtils.getAWSCredentials()
+    def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5
+    val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
+      testUtils.endpointUrl, testUtils.regionName, 
InitialPositionInStream.LATEST,
+      Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
+      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+    stream shouldBe a [ReceiverInputDStream[_]]
+
+    val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
+    stream.foreachRDD { rdd =>
+      collected ++= rdd.collect()
+      logInfo("Collected = " + collected.mkString(", "))
+    }
+    ssc.start()
+
+    val testData = 1 to 10
+    eventually(timeout(120 seconds), interval(10 second)) {
+      testUtils.pushData(testData, aggregateTestData)
+      val modData = testData.map(_ + 5)
+      assert(collected === modData.toSet, "\nData received does not match data 
sent")
+    }
+    ssc.stop(stopSparkContext = false)
+  }
+
+  testIfEnabled("failure recovery") {
+    val sparkConf = new 
SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+    val checkpointDir = Utils.createTempDir().getAbsolutePath
+
+    ssc = new StreamingContext(sc, Milliseconds(1000))
+    ssc.checkpoint(checkpointDir)
+
+    val awsCredentials = KinesisTestUtils.getAWSCredentials()
+    val collectedData = new mutable.HashMap[Time, 
(Array[SequenceNumberRanges], Seq[Int])]
+
+    val kinesisStream = KinesisUtils.createStream(ssc, appName, 
testUtils.streamName,
+      testUtils.endpointUrl, testUtils.regionName, 
InitialPositionInStream.LATEST,
+      Seconds(10), StorageLevel.MEMORY_ONLY,
+      awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
+
+    // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect 
the data in each batch
+    kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
+      val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+      val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
+      collectedData.synchronized {
+        collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
+      }
+    })
+
+    ssc.remember(Minutes(60)) // remember all the batches so that they are all 
saved in checkpoint
+    ssc.start()
+
+    def numBatchesWithData: Int =
+      collectedData.synchronized { collectedData.count(_._2._2.nonEmpty) }
+
+    def isCheckpointPresent: Boolean = 
Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
+
+    // Run until there are at least 10 batches with some data in them
+    // If this times out because numBatchesWithData is empty, then its likely 
that foreachRDD
+    // function failed with exceptions, and nothing got added to 
`collectedData`
+    eventually(timeout(2 minutes), interval(1 seconds)) {
+      testUtils.pushData(1 to 5, aggregateTestData)
+      assert(isCheckpointPresent && numBatchesWithData > 10)
+    }
+    ssc.stop(stopSparkContext = true)  // stop the SparkContext so that the 
blocks are not reused
+
+    // Restart the context from checkpoint and verify whether the
+    logInfo("Restarting from checkpoint")
+    ssc = new StreamingContext(checkpointDir)
+    ssc.start()
+    val recoveredKinesisStream = ssc.graph.getInputStreams().head
+
+    // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the 
same sequence ranges
+    // and return the same data
+    collectedData.synchronized {
+      val times = collectedData.keySet
+      times.foreach { time =>
+        val (arrayOfSeqNumRanges, data) = collectedData(time)
+        val rdd = 
recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
+        rdd shouldBe a[KinesisBackedBlockRDD[_]]
+
+        // Verify the recovered sequence ranges
+        val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+        assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
+        arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case 
(expected, found) =>
+          assert(expected.ranges.toSeq === found.ranges.toSeq)
+        }
+
+        // Verify the recovered data
+        assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq 
=== data)
+      }
+    }
+    ssc.stop()
+  }
+}
+
+class WithAggregationKinesisStreamSuite extends 
KinesisStreamTests(aggregateTestData = true)
+
+class WithoutAggregationKinesisStreamSuite extends 
KinesisStreamTests(aggregateTestData = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/spark-ganglia-lgpl/pom.xml
----------------------------------------------------------------------
diff --git a/external/spark-ganglia-lgpl/pom.xml 
b/external/spark-ganglia-lgpl/pom.xml
new file mode 100644
index 0000000..bfb9279
--- /dev/null
+++ b/external/spark-ganglia-lgpl/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <!-- Ganglia integration is not included by default due to LGPL-licensed 
code -->
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-ganglia-lgpl_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Ganglia Integration</name>
+
+  <properties>
+    <sbt.project.name>ganglia-lgpl</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-ganglia</artifactId>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
----------------------------------------------------------------------
diff --git 
a/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
 
b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
new file mode 100644
index 0000000..3b1880e
--- /dev/null
+++ 
b/external/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.ganglia.GangliaReporter
+import info.ganglia.gmetric4j.gmetric.GMetric
+import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.metrics.MetricsSystem
+
+class GangliaSink(val property: Properties, val registry: MetricRegistry,
+    securityMgr: SecurityManager) extends Sink {
+  val GANGLIA_KEY_PERIOD = "period"
+  val GANGLIA_DEFAULT_PERIOD = 10
+
+  val GANGLIA_KEY_UNIT = "unit"
+  val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
+
+  val GANGLIA_KEY_MODE = "mode"
+  val GANGLIA_DEFAULT_MODE: UDPAddressingMode = 
GMetric.UDPAddressingMode.MULTICAST
+
+  // TTL for multicast messages. If listeners are X hops away in network, must 
be at least X.
+  val GANGLIA_KEY_TTL = "ttl"
+  val GANGLIA_DEFAULT_TTL = 1
+
+  val GANGLIA_KEY_HOST = "host"
+  val GANGLIA_KEY_PORT = "port"
+
+  def propertyToOption(prop: String): Option[String] = 
Option(property.getProperty(prop))
+
+  if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
+    throw new Exception("Ganglia sink requires 'host' property.")
+  }
+
+  if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
+    throw new Exception("Ganglia sink requires 'port' property.")
+  }
+
+  val host = propertyToOption(GANGLIA_KEY_HOST).get
+  val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
+  val ttl = 
propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
+  val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
+    .map(u => 
GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
+  val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
+    .getOrElse(GANGLIA_DEFAULT_PERIOD)
+  val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
+    .map(u => TimeUnit.valueOf(u.toUpperCase))
+    .getOrElse(GANGLIA_DEFAULT_UNIT)
+
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val ganglia = new GMetric(host, port, mode, ttl)
+  val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
+      .convertDurationsTo(TimeUnit.MILLISECONDS)
+      .convertRatesTo(TimeUnit.SECONDS)
+      .build(ganglia)
+
+  override def start() {
+    reporter.start(pollPeriod, pollUnit)
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+
+  override def report() {
+    reporter.report()
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to