Repository: spark
Updated Branches:
  refs/heads/master 2ca60ace8 -> ca4257aec


[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] 
Updates to the Kinesis API

SPARK-6514 - Use correct region
SPARK-5960 - Allow AWS Credentials to be directly passed
SPARK-6656 - Specify kinesis application name explicitly
SPARK-7679 - Upgrade to latest KCL and AWS SDK.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #6147 from tdas/kinesis-api-update and squashes the following commits:

f23ea77 [Tathagata Das] Updated versions and updated APIs
373b201 [Tathagata Das] Updated Kinesis API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca4257ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca4257ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca4257ae

Branch: refs/heads/master
Commit: ca4257aec658aaa87f4f097dd7534033d5f13ddc
Parents: 2ca60ac
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Sun May 17 16:49:07 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sun May 17 16:49:07 2015 -0700

----------------------------------------------------------------------
 .../kinesis/KinesisCheckpointState.scala        |   2 +-
 .../streaming/kinesis/KinesisReceiver.scala     | 152 ++++++-----
 .../kinesis/KinesisRecordProcessor.scala        |  32 ++-
 .../spark/streaming/kinesis/KinesisUtils.scala  | 263 ++++++++++++++++---
 .../kinesis/KinesisReceiverSuite.scala          |  15 +-
 pom.xml                                         |   4 +-
 6 files changed, 348 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 588e86a..1c9b0c2 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
   /**
    * Advance the checkpoint clock by the checkpoint interval.
    */
-  def advanceCheckpoint() = {
+  def advanceCheckpoint(): Unit = {
     checkpointClock.advance(checkpointInterval.milliseconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index a7fe447..01608fb 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -16,32 +16,31 @@
  */
 package org.apache.spark.streaming.kinesis
 
-import java.net.InetAddress
 import java.util.UUID
 
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorFactory}
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
 KinesisClientLibConfiguration, Worker}
+
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.util.Utils
 
-import com.amazonaws.auth.AWSCredentialsProvider
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
-import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+private[kinesis]
+case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
+  extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
 
 /**
  * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
  * This implementation relies on the Kinesis Client Library (KCL) Worker as 
described here:
  * https://github.com/awslabs/amazon-kinesis-client
- * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) 
- *   as described here:
- *     http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * Instances of this class will get shipped to the Spark Streaming Workers 
- *   to run within a Spark Executor.
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) as described here:
+ *   http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers to 
run within a 
+ *   Spark Executor.
  *
  * @param appName  Kinesis application name. Kinesis Apps are mapped to 
Kinesis Streams
  *                 by the Kinesis Client Library.  If you change the App name 
or Stream name,
@@ -49,6 +48,8 @@ import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
  *                 DynamoDB table with the same name this Kinesis application.
  * @param streamName   Kinesis stream name
  * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName  Region name used by the Kinesis Client Library for
+ *                    DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
  * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
  *                            See the Kinesis Spark Streaming documentation 
for more
  *                            details on the different types of checkpoints.
@@ -59,92 +60,103 @@ import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
  *                                 (InitialPositionInStream.TRIM_HORIZON) or
  *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
  * @param storageLevel Storage level to use for storing the received objects
- *
- * @return ReceiverInputDStream[Array[Byte]]   
+ * @param awsCredentialsOption Optional AWS credentials, used when user 
directly specifies
+ *                             the credentials
  */
 private[kinesis] class KinesisReceiver(
     appName: String,
     streamName: String,
     endpointUrl: String,
-    checkpointInterval: Duration,
+    regionName: String,
     initialPositionInStream: InitialPositionInStream,
-    storageLevel: StorageLevel)
-  extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
-
-  /*
-   * The following vars are built in the onStart() method which executes in 
the Spark Worker after
-   *   this code is serialized and shipped remotely.
-   */
-
-  /*
-   *  workerId should be based on the ip address of the actual Spark Worker 
where this code runs
-   *   (not the Driver's ip address.)
-   */
-  var workerId: String = null
+    checkpointInterval: Duration,
+    storageLevel: StorageLevel,
+    awsCredentialsOption: Option[SerializableAWSCredentials]
+  ) extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
 
   /*
-   * This impl uses the DefaultAWSCredentialsProviderChain and searches for 
credentials 
-   *   in the following order of precedence:
-   * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
-   * Java System Properties - aws.accessKeyId and aws.secretKey
-   * Credential profiles file at the default location (~/.aws/credentials) 
shared by all 
-   *   AWS SDKs and the AWS CLI
-   * Instance profile credentials delivered through the Amazon EC2 metadata 
service
+   * 
=================================================================================
+   * The following vars are initialize in the onStart() method which executes 
in the
+   * Spark worker after this Receiver is serialized and shipped to the worker.
+   * 
=================================================================================
    */
-  var credentialsProvider: AWSCredentialsProvider = null
-
-  /* KCL config instance. */
-  var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null
 
-  /*
-   *  RecordProcessorFactory creates impls of IRecordProcessor.
-   *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the 
-   *    IRecordProcessor.processRecords() method.
-   *  We're using our custom KinesisRecordProcessor in this case.
+  /**
+   * workerId is used by the KCL should be based on the ip address of the 
actual Spark Worker where this code runs
+   * (not the driver's IP address.)
    */
-  var recordProcessorFactory: IRecordProcessorFactory = null
+  private var workerId: String = null
 
-  /*
-   * Create a Kinesis Worker.
-   * This is the core client abstraction from the Kinesis Client Library (KCL).
-   * We pass the RecordProcessorFactory from above as well as the KCL config 
instance.
-   * A Kinesis Worker can process 1..* shards from the given stream - each 
with its 
-   *   own RecordProcessor.
+  /**
+   * Worker is the core client abstraction from the Kinesis Client Library 
(KCL).
+   * A worker can process more than one shards from the given stream.
+   * Each shard is assigned its own IRecordProcessor and the worker run 
multiple such
+   * processors.
    */
-  var worker: Worker = null
+  private var worker: Worker = null
 
   /**
-   *  This is called when the KinesisReceiver starts and must be non-blocking.
-   *  The KCL creates and manages the receiving/processing thread pool through 
the Worker.run() 
-   *    method.
+   * This is called when the KinesisReceiver starts and must be non-blocking.
+   * The KCL creates and manages the receiving/processing thread pool through 
Worker.run().
    */
   override def onStart() {
     workerId = Utils.localHostName() + ":" + UUID.randomUUID()
-    credentialsProvider = new DefaultAWSCredentialsProviderChain()
-    kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, 
streamName,
-      credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
-      
.withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
-    recordProcessorFactory = new IRecordProcessorFactory {
+
+    // KCL config instance
+    val awsCredProvider = resolveAWSCredentialsProvider()
+    val kinesisClientLibConfiguration =
+      new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, 
workerId)
+      .withKinesisEndpoint(endpointUrl)
+      .withInitialPositionInStream(initialPositionInStream)
+      .withTaskBackoffTimeMillis(500)
+      .withRegionName(regionName)
+
+   /*
+    *  RecordProcessorFactory creates impls of IRecordProcessor.
+    *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the 
+    *  IRecordProcessor.processRecords() method.
+    *  We're using our custom KinesisRecordProcessor in this case.
+    */
+    val recordProcessorFactory = new IRecordProcessorFactory {
       override def createProcessor: IRecordProcessor = new 
KinesisRecordProcessor(receiver,
         workerId, new KinesisCheckpointState(checkpointInterval))
     }
+
     worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
     worker.run()
+
     logInfo(s"Started receiver with workerId $workerId")
   }
 
   /**
-   *  This is called when the KinesisReceiver stops.
-   *  The KCL worker.shutdown() method stops the receiving/processing threads.
-   *  The KCL will do its best to drain and checkpoint any in-flight records 
upon shutdown.
+   * This is called when the KinesisReceiver stops.
+   * The KCL worker.shutdown() method stops the receiving/processing threads.
+   * The KCL will do its best to drain and checkpoint any in-flight records 
upon shutdown.
    */
   override def onStop() {
-    worker.shutdown()
-    logInfo(s"Shut down receiver with workerId $workerId")
+    if (worker != null) {
+      worker.shutdown()
+      logInfo(s"Stopped receiver for workerId $workerId")
+      worker = null
+    }
     workerId = null
-    credentialsProvider = null
-    kinesisClientLibConfiguration = null
-    recordProcessorFactory = null
-    worker = null
+  }
+
+  /**
+   * If AWS credential is provided, return a AWSCredentialProvider returning 
that credential.
+   * Otherwise, return the DefaultAWSCredentialsProviderChain.
+   */
+  private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
+    awsCredentialsOption match {
+      case Some(awsCredentials) =>
+        logInfo("Using provided AWS credentials")
+        new AWSCredentialsProvider {
+          override def getCredentials: AWSCredentials = awsCredentials
+          override def refresh(): Unit = { }
+        }
+      case None =>
+        logInfo("Using DefaultAWSCredentialsProviderChain")
+        new DefaultAWSCredentialsProviderChain()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
index af8cd87..f65e743 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -35,7 +35,10 @@ import com.amazonaws.services.kinesis.model.Record
 /**
  * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
  * This implementation operates on the Array[Byte] from the KinesisReceiver.
- * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon 
startup.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor for 
each 
+ *   shard in the Kinesis stream upon startup.  This is normally done in 
separate threads, 
+ *   but the KCLs within the KinesisReceivers will balance themselves out if 
you create 
+ *   multiple Receivers.
  *
  * @param receiver Kinesis receiver
  * @param workerId for logging purposes
@@ -47,8 +50,8 @@ private[kinesis] class KinesisRecordProcessor(
     workerId: String,
     checkpointState: KinesisCheckpointState) extends IRecordProcessor with 
Logging {
 
-  /* shardId to be populated during initialize() */
-  var shardId: String = _
+  // shardId to be populated during initialize()
+  private var shardId: String = _
 
   /**
    * The Kinesis Client Library calls this method during IRecordProcessor 
initialization.
@@ -56,8 +59,8 @@ private[kinesis] class KinesisRecordProcessor(
    * @param shardId assigned by the KCL to this particular RecordProcessor.
    */
   override def initialize(shardId: String) {
-    logInfo(s"Initialize:  Initializing workerId $workerId with shardId 
$shardId")
     this.shardId = shardId
+    logInfo(s"Initialized workerId $workerId with shardId $shardId")
   }
 
   /**
@@ -73,12 +76,17 @@ private[kinesis] class KinesisRecordProcessor(
     if (!receiver.isStopped()) {
       try {
         /*
-         * Note:  If we try to store the raw ByteBuffer from record.getData(), 
the Spark Streaming
-         * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer 
using the
-         *   internally-configured Spark serializer (kryo, etc).
-         * This is not desirable, so we instead store a raw Array[Byte] and 
decouple
-         *   ourselves from Spark's internal serialization strategy.
-         */
+         * Notes:  
+         * 1) If we try to store the raw ByteBuffer from record.getData(), the 
Spark Streaming
+         *    Receiver.store(ByteBuffer) attempts to deserialize the 
ByteBuffer using the
+         *    internally-configured Spark serializer (kryo, etc).
+         * 2) This is not desirable, so we instead store a raw Array[Byte] and 
decouple
+         *    ourselves from Spark's internal serialization strategy.
+         * 3) For performance, the BlockGenerator is asynchronously queuing 
elements within its
+         *    memory before creating blocks.  This prevents the small block 
scenario, but requires
+         *    that you register callbacks to know when a block has been 
generated and stored 
+         *    (WAL is sufficient for storage) before can checkpoint back to 
the source.
+        */
         batch.foreach(record => receiver.store(record.getData().array()))
         
         logDebug(s"Stored:  Worker $workerId stored ${batch.size} records for 
shardId $shardId")
@@ -116,7 +124,7 @@ private[kinesis] class KinesisRecordProcessor(
           logError(s"Exception:  WorkerId $workerId encountered and exception 
while storing " +
               " or checkpointing a batch for workerId $workerId and shardId 
$shardId.", e)
 
-          /* Rethrow the exception to the Kinesis Worker that is managing this 
RecordProcessor.*/
+          /* Rethrow the exception to the Kinesis Worker that is managing this 
RecordProcessor. */
           throw e
         }
       }
@@ -190,7 +198,7 @@ private[kinesis] object KinesisRecordProcessor extends 
Logging {
                logError(s"Retryable Exception:  Random 
backOffMillis=${backOffMillis}", e)
                retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
              }
-        /* Throw:  Shutdown has been requested by the Kinesis Client Library.*/
+        /* Throw:  Shutdown has been requested by the Kinesis Client Library. 
*/
         case _: ShutdownException => {
           logError(s"ShutdownException:  Caught shutdown exception, skipping 
checkpoint.", e)
           throw e

http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
index 96f4399..b114bcf 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -16,29 +16,75 @@
  */
 package org.apache.spark.streaming.kinesis
 
-import org.apache.spark.annotation.Experimental
+import com.amazonaws.regions.RegionUtils
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Duration
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
-import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, 
JavaStreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import org.apache.spark.streaming.{Duration, StreamingContext}
 
 
-/**
- * Helper class to create Amazon Kinesis Input Stream
- * :: Experimental ::
- */
-@Experimental
 object KinesisUtils {
   /**
-   * Create an InputDStream that pulls messages from a Kinesis stream.
-   * :: Experimental ::
-   * @param ssc    StreamingContext object
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note: The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  def createStream(
+      ssc: StreamingContext,
+      kinesisAppName:  String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel
+    ): ReceiverInputDStream[Array[Byte]] = {
+    ssc.receiverStream(
+      new KinesisReceiver(kinesisAppName, streamName, endpointUrl, 
validateRegion(regionName),
+        initialPositionInStream, checkpointInterval, storageLevel, None))
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   *  The given AWS credentials will get saved in DStream checkpoints if 
checkpointing
+   *  is enabled. Make sure that your checkpoint directory is secure.
+   *
+   * @param ssc StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
    * @param streamName   Kinesis stream name
    * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use 
DefaultAWSCredentialsProviderChain)
    * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
    *                            See the Kinesis Spark Streaming documentation 
for more
    *                            details on the different types of checkpoints.
@@ -48,28 +94,84 @@ object KinesisUtils {
    *                                 per Kinesis' limit of 24 hours
    *                                 (InitialPositionInStream.TRIM_HORIZON) or
    *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
-   * @param storageLevel Storage level to use for storing the received objects
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  def createStream(
+      ssc: StreamingContext,
+      kinesisAppName:  String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      awsAccessKeyId: String,
+      awsSecretKey: String
+    ): ReceiverInputDStream[Array[Byte]] = {
+    ssc.receiverStream(
+      new KinesisReceiver(kinesisAppName, streamName, endpointUrl, 
validateRegion(regionName),
+        initialPositionInStream, checkpointInterval, storageLevel,
+        Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))))
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
    *
-   * @return ReceiverInputDStream[Array[Byte]]
+   * Note:
+   * - The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   *   on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   *   gets AWS credentials.
+   * - The region of the `endpointUrl` will be used for DynamoDB and 
CloudWatch.
+   * - The Kinesis application name used by the Kinesis Client Library (KCL) 
will be the app name in
+   *   [[org.apache.spark.SparkConf]].
+   *
+   * @param ssc Java StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Endpoint url of Kinesis service
+   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
    */
-  @Experimental
+  @deprecated("use other forms of createStream", "1.4.0")
   def createStream(
       ssc: StreamingContext,
       streamName: String,
       endpointUrl: String,
       checkpointInterval: Duration,
       initialPositionInStream: InitialPositionInStream,
-      storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
-    ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, 
endpointUrl,
-        checkpointInterval, initialPositionInStream, storageLevel))
+      storageLevel: StorageLevel
+    ): ReceiverInputDStream[Array[Byte]] = {
+    ssc.receiverStream(
+      new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, 
getRegionByEndpoint(endpointUrl),
+        initialPositionInStream, checkpointInterval, storageLevel, None))
   }
 
   /**
-   * Create a Java-friendly InputDStream that pulls messages from a Kinesis 
stream.
-   * :: Experimental ::
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note: The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   * on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   * gets the AWS credentials.
+   *
    * @param jssc Java StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
    * @param streamName   Kinesis stream name
    * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
    * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
    *                            See the Kinesis Spark Streaming documentation 
for more
    *                            details on the different types of checkpoints.
@@ -79,19 +181,116 @@ object KinesisUtils {
    *                                 per Kinesis' limit of 24 hours
    *                                 (InitialPositionInStream.TRIM_HORIZON) or
    *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
-   * @param storageLevel Storage level to use for storing the received objects
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[Array[Byte]] = {
+    createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+      initialPositionInStream, checkpointInterval, storageLevel)
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
    *
-   * @return JavaReceiverInputDStream[Array[Byte]]
+   * Note:
+   *  The given AWS credentials will get saved in DStream checkpoints if 
checkpointing
+   *  is enabled. Make sure that your checkpoint directory is secure.
+   *
+   * @param jssc Java StreamingContext object
+   * @param kinesisAppName  Kinesis application name used by the Kinesis 
Client Library
+   *                        (KCL) to update DynamoDB
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param regionName   Name of region used by the Kinesis Client Library 
(KCL) to update
+   *                     DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+   * @param awsAccessKeyId  AWS AccessKeyId (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param awsSecretKey  AWS SecretKey (if null, will use 
DefaultAWSCredentialsProviderChain)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects.
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
    */
-  @Experimental
   def createStream(
-      jssc: JavaStreamingContext, 
-      streamName: String, 
-      endpointUrl: String, 
+      jssc: JavaStreamingContext,
+      kinesisAppName: String,
+      streamName: String,
+      endpointUrl: String,
+      regionName: String,
+      initialPositionInStream: InitialPositionInStream,
+      checkpointInterval: Duration,
+      storageLevel: StorageLevel,
+      awsAccessKeyId: String,
+      awsSecretKey: String
+    ): JavaReceiverInputDStream[Array[Byte]] = {
+    createStream(jssc.ssc, kinesisAppName, streamName, endpointUrl, regionName,
+        initialPositionInStream, checkpointInterval, storageLevel, 
awsAccessKeyId, awsSecretKey)
+  }
+
+  /**
+   * Create an input stream that pulls messages from a Kinesis stream.
+   * This uses the Kinesis Client Library (KCL) to pull messages from Kinesis.
+   *
+   * Note:
+   * - The AWS credentials will be discovered using the 
DefaultAWSCredentialsProviderChain
+   *   on the workers. See AWS documentation to understand how 
DefaultAWSCredentialsProviderChain
+   *   gets AWS credentials.
+   * - The region of the `endpointUrl` will be used for DynamoDB and 
CloudWatch.
+   * - The Kinesis application name used by the Kinesis Client Library (KCL) 
will be the app name in
+   *   [[org.apache.spark.SparkConf]].
+   *
+   * @param jssc Java StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Endpoint url of Kinesis service
+   *                     (e.g., https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *                     StorageLevel.MEMORY_AND_DISK_2 is recommended.
+   */
+  @deprecated("use other forms of createStream", "1.4.0")
+  def createStream(
+      jssc: JavaStreamingContext,
+      streamName: String,
+      endpointUrl: String,
       checkpointInterval: Duration,
       initialPositionInStream: InitialPositionInStream,
-      storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = {
-    jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName,
-        endpointUrl, checkpointInterval, initialPositionInStream, 
storageLevel))
+      storageLevel: StorageLevel
+    ): JavaReceiverInputDStream[Array[Byte]] = {
+    createStream(
+      jssc.ssc, streamName, endpointUrl, checkpointInterval, 
initialPositionInStream, storageLevel)
+  }
+
+  private def getRegionByEndpoint(endpointUrl: String): String = {
+    RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+  }
+
+  private def validateRegion(regionName: String): String = {
+    Option(RegionUtils.getRegion(regionName)).map { _.getName }.getOrElse {
+      throw new IllegalArgumentException(s"Region name '$regionName' is not 
valid")
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 255fe65..7c17ee9 100644
--- 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -40,6 +40,7 @@ import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
 
 /**
  * Suite of Kinesis streaming receiver tests focusing mostly on the 
KinesisRecordProcessor
@@ -81,12 +82,20 @@ class KinesisReceiverSuite extends TestSuiteBase with 
Matchers with BeforeAndAft
       checkpointStateMock, currentClockMock)
   }
 
-  test("kinesis utils api") {
+  test("KinesisUtils API") {
     val ssc = new StreamingContext(master, framework, batchDuration)
     // Tests the API, does not actually test data receiving
-    val kinesisStream = KinesisUtils.createStream(ssc, "mySparkStream",
+    val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
       "https://kinesis.us-west-2.amazonaws.com";, Seconds(2),
-      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2);
+      InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+    val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", 
"mySparkStream",
+      "https://kinesis.us-west-2.amazonaws.com";, "us-west-2",
+      InitialPositionInStream.LATEST, Seconds(2), 
StorageLevel.MEMORY_AND_DISK_2)
+    val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", 
"mySparkStream",
+      "https://kinesis.us-west-2.amazonaws.com";, "us-west-2",
+      InitialPositionInStream.LATEST, Seconds(2), 
StorageLevel.MEMORY_AND_DISK_2,
+      "awsAccessKey", "awsSecretKey")
+
     ssc.stop()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6768a03..6f525b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,8 +148,8 @@
     <avro.version>1.7.7</avro.version>
     <avro.mapred.classifier>hadoop2</avro.mapred.classifier>
     <jets3t.version>0.7.1</jets3t.version>
-    <aws.java.sdk.version>1.8.3</aws.java.sdk.version>
-    <aws.kinesis.client.version>1.1.0</aws.kinesis.client.version>
+    <aws.java.sdk.version>1.9.16</aws.java.sdk.version>
+    <aws.kinesis.client.version>1.2.1</aws.kinesis.client.version>
     <commons.httpclient.version>4.3.2</commons.httpclient.version>
     <commons.math3.version>3.4.1</commons.math3.version>
     
<test_classpath_file>${project.build.directory}/spark-test-classpath.txt</test_classpath_file>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to