http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
 
b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
new file mode 100644
index 0000000..5dc825d
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.amazonaws.regions.RegionUtils;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
+ *
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the 
given stream.
+ *
+ * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] 
[region-name]
+ *   [app-name] is the name of the consumer app, used to track the read data 
in DynamoDB
+ *   [stream-name] name of the Kinesis stream (ie. mySparkStream)
+ *   [endpoint-url] endpoint of the Kinesis service
+ *     (e.g. https://kinesis.us-east-1.amazonaws.com)
+ *
+ *
+ * Example:
+ *      # export AWS keys if necessary
+ *      $ export AWS_ACCESS_KEY_ID=[your-access-key]
+ *      $ export AWS_SECRET_KEY=<your-secret-key>
+ *
+ *      # run the example
+ *      $ SPARK_HOME/bin/run-example   streaming.JavaKinesisWordCountASL 
myAppName  mySparkStream \
+ *             https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordProducerASL which puts 
dummy data
+ * onto the Kinesis stream.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ *    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ *    Java System Properties - aws.accessKeyId and aws.secretKey
+ *    Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ *    Instance profile credentials - delivered through the Amazon EC2 metadata 
service
+ * For more information, see
+ * 
http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
for more details on
+ * the Kinesis Spark Streaming integration.
+ */
+public final class JavaKinesisWordCountASL { // needs to be public for access 
from run-example
+  private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+  private static final Logger logger = 
Logger.getLogger(JavaKinesisWordCountASL.class);
+
+  public static void main(String[] args) {
+    // Check that all required args were passed in.
+    if (args.length != 3) {
+      System.err.println(
+          "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" +
+          "    <app-name> is the name of the app, used to track the read data 
in DynamoDB\n" +
+          "    <stream-name> is the name of the Kinesis stream\n" +
+          "    <endpoint-url> is the endpoint of the Kinesis service\n" +
+          "                   (e.g. 
https://kinesis.us-east-1.amazonaws.com)\n" +
+          "Generate data for the Kinesis stream using the example 
KinesisWordProducerASL.\n" +
+          "See 
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for 
more\n" +
+          "details.\n"
+      );
+      System.exit(1);
+    }
+
+    // Set default log4j logging level to WARN to hide Spark logs
+    StreamingExamples.setStreamingLogLevels();
+
+    // Populate the appropriate variables from the given args
+    String kinesisAppName = args[0];
+    String streamName = args[1];
+    String endpointUrl = args[2];
+
+    // Create a Kinesis client in order to determine the number of shards for 
the given stream
+    AmazonKinesisClient kinesisClient =
+        new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
+    kinesisClient.setEndpoint(endpointUrl);
+    int numShards =
+        
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
+
+
+    // In this example, we're going to create 1 Kinesis Receiver/input DStream 
for each shard.
+    // This is not a necessity; if there are less receivers/DStreams than the 
number of shards,
+    // then the shards will be automatically distributed among the receivers 
and each receiver
+    // will receive data from multiple shards.
+    int numStreams = numShards;
+
+    // Spark Streaming batch interval
+    Duration batchInterval = new Duration(2000);
+
+    // Kinesis checkpoint interval.  Same as batchInterval for this example.
+    Duration kinesisCheckpointInterval = batchInterval;
+
+    // Get the region name from the endpoint URL to save Kinesis Client 
Library metadata in
+    // DynamoDB of the same region as the Kinesis stream
+    String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName();
+
+    // Setup the Spark config and StreamingContext
+    SparkConf sparkConfig = new 
SparkConf().setAppName("JavaKinesisWordCountASL");
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, 
batchInterval);
+
+    // Create the Kinesis DStreams
+    List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams);
+    for (int i = 0; i < numStreams; i++) {
+      streamsList.add(
+          KinesisUtils.createStream(jssc, kinesisAppName, streamName, 
endpointUrl, regionName,
+              InitialPositionInStream.LATEST, kinesisCheckpointInterval, 
StorageLevel.MEMORY_AND_DISK_2())
+      );
+    }
+
+    // Union all the streams if there is more than 1 stream
+    JavaDStream<byte[]> unionStreams;
+    if (streamsList.size() > 1) {
+      unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, 
streamsList.size()));
+    } else {
+      // Otherwise, just use the 1 stream
+      unionStreams = streamsList.get(0);
+    }
+
+    // Convert each line of Array[Byte] to String, and split into words
+    JavaDStream<String> words = unionStreams.flatMap(new 
FlatMapFunction<byte[], String>() {
+      @Override
+      public Iterator<String> call(byte[] line) {
+        String s = new String(line, StandardCharsets.UTF_8);
+        return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
+      }
+    });
+
+    // Map each word to a (word, 1) tuple so we can reduce by key to count the 
words
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+        new PairFunction<String, String, Integer>() {
+          @Override
+          public Tuple2<String, Integer> call(String s) {
+            return new Tuple2<String, Integer>(s, 1);
+          }
+        }
+    ).reduceByKey(
+        new Function2<Integer, Integer, Integer>() {
+          @Override
+          public Integer call(Integer i1, Integer i2) {
+            return i1 + i2;
+          }
+        }
+    );
+
+    // Print the first 10 wordCounts
+    wordCounts.print();
+
+    // Start the streaming context and await termination
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
 
b/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
new file mode 100644
index 0000000..4d7fc9a
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  Consumes messages from a Amazon Kinesis streams and does wordcount.
+
+  This example spins up 1 Kinesis Receiver per shard for the given stream.
+  It then starts pulling from the last checkpointed sequence number of the 
given stream.
+
+  Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> 
<region-name>
+    <app-name> is the name of the consumer app, used to track the read data in 
DynamoDB
+    <stream-name> name of the Kinesis stream (ie. mySparkStream)
+    <endpoint-url> endpoint of the Kinesis service
+      (e.g. https://kinesis.us-east-1.amazonaws.com)
+
+
+  Example:
+      # export AWS keys if necessary
+      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+      $ export AWS_SECRET_KEY=<your-secret-key>
+
+      # run the example
+      $ bin/spark-submit -jar external/kinesis-asl/target/scala-*/\
+        spark-streaming-kinesis-asl-assembly_*.jar \
+        
external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
 \
+        myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com
+
+  There is a companion helper class called KinesisWordProducerASL which puts 
dummy data
+  onto the Kinesis stream.
+
+  This code uses the DefaultAWSCredentialsProviderChain to find credentials
+  in the following order:
+      Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+      Java System Properties - aws.accessKeyId and aws.secretKey
+      Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+      Instance profile credentials - delivered through the Amazon EC2 metadata 
service
+  For more information, see
+      
http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+
+  See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
for more details on
+  the Kinesis Spark Streaming integration.
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+if __name__ == "__main__":
+    if len(sys.argv) != 5:
+        print(
+            "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> 
<endpoint-url> <region-name>",
+            file=sys.stderr)
+        sys.exit(-1)
+
+    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
+    ssc = StreamingContext(sc, 1)
+    appName, streamName, endpointUrl, regionName = sys.argv[1:]
+    lines = KinesisUtils.createStream(
+        ssc, appName, streamName, endpointUrl, regionName, 
InitialPositionInStream.LATEST, 2)
+    counts = lines.flatMap(lambda line: line.split(" ")) \
+        .map(lambda word: (word, 1)) \
+        .reduceByKey(lambda a, b: a+b)
+    counts.pprint()
+
+    ssc.start()
+    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/resources/log4j.properties 
b/external/kinesis-asl/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6cdc928
--- /dev/null
+++ b/external/kinesis-asl/src/main/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p 
%c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark-project.jetty=WARN
+log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
new file mode 100644
index 0000000..6a73bc0
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import com.amazonaws.auth.{BasicAWSCredentials, 
DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import org.apache.log4j.{Level, Logger}
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+
+
+/**
+ * Consumes messages from a Amazon Kinesis streams and does wordcount.
+ *
+ * This example spins up 1 Kinesis Receiver per shard for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the 
given stream.
+ *
+ * Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> 
<region-name>
+ *   <app-name> is the name of the consumer app, used to track the read data 
in DynamoDB
+ *   <stream-name> name of the Kinesis stream (ie. mySparkStream)
+ *   <endpoint-url> endpoint of the Kinesis service
+ *     (e.g. https://kinesis.us-east-1.amazonaws.com)
+ *
+ *
+ * Example:
+ *      # export AWS keys if necessary
+ *      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *      $ export AWS_SECRET_KEY=<your-secret-key>
+ *
+ *      # run the example
+ *      $ SPARK_HOME/bin/run-example  streaming.KinesisWordCountASL myAppName  
mySparkStream \
+ *              https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordProducerASL which puts 
dummy data
+ * onto the Kinesis stream.
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain to find credentials
+ * in the following order:
+ *    Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ *    Java System Properties - aws.accessKeyId and aws.secretKey
+ *    Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs
+ *    Instance profile credentials - delivered through the Amazon EC2 metadata 
service
+ * For more information, see
+ * 
http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html 
for more details on
+ * the Kinesis Spark Streaming integration.
+ */
+object KinesisWordCountASL extends Logging {
+  def main(args: Array[String]) {
+    // Check that all required args were passed in.
+    if (args.length != 3) {
+      System.err.println(
+        """
+          |Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> 
<region-name>
+          |
+          |    <app-name> is the name of the consumer app, used to track the 
read data in DynamoDB
+          |    <stream-name> is the name of the Kinesis stream
+          |    <endpoint-url> is the endpoint of the Kinesis service
+          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
+          |
+          |Generate input data for Kinesis stream using the example 
KinesisWordProducerASL.
+          |See 
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more
+          |details.
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    // Populate the appropriate variables from the given args
+    val Array(appName, streamName, endpointUrl) = args
+
+
+    // Determine the number of shards from the stream using the low-level 
Kinesis Client
+    // from the AWS Java SDK.
+    val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
+    require(credentials != null,
+      "No AWS credentials found. Please specify credentials using one of the 
methods specified " +
+        "in 
http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html";)
+    val kinesisClient = new AmazonKinesisClient(credentials)
+    kinesisClient.setEndpoint(endpointUrl)
+    val numShards = 
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
+
+
+    // In this example, we're going to create 1 Kinesis Receiver/input DStream 
for each shard.
+    // This is not a necessity; if there are less receivers/DStreams than the 
number of shards,
+    // then the shards will be automatically distributed among the receivers 
and each receiver
+    // will receive data from multiple shards.
+    val numStreams = numShards
+
+    // Spark Streaming batch interval
+    val batchInterval = Milliseconds(2000)
+
+    // Kinesis checkpoint interval is the interval at which the DynamoDB is 
updated with information
+    // on sequence number of records that have been received. Same as 
batchInterval for this
+    // example.
+    val kinesisCheckpointInterval = batchInterval
+
+    // Get the region name from the endpoint URL to save Kinesis Client 
Library metadata in
+    // DynamoDB of the same region as the Kinesis stream
+    val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+
+    // Setup the SparkConfig and StreamingContext
+    val sparkConfig = new SparkConf().setAppName("KinesisWordCountASL")
+    val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+    // Create the Kinesis DStreams
+    val kinesisStreams = (0 until numStreams).map { i =>
+      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, 
regionName,
+        InitialPositionInStream.LATEST, kinesisCheckpointInterval, 
StorageLevel.MEMORY_AND_DISK_2)
+    }
+
+    // Union all the streams
+    val unionStreams = ssc.union(kinesisStreams)
+
+    // Convert each line of Array[Byte] to String, and split into words
+    val words = unionStreams.flatMap(byteArray => new 
String(byteArray).split(" "))
+
+    // Map each word to a (word, 1) tuple so we can reduce by key to count the 
words
+    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
+
+    // Print the first 10 wordCounts
+    wordCounts.print()
+
+    // Start the streaming context and await termination
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+/**
+ * Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
+ *   <records-per-sec> <words-per-record>
+ *
+ *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *   <endpoint-url> is the endpoint of the Kinesis service
+ *     (ie. https://kinesis.us-east-1.amazonaws.com)
+ *   <records-per-sec> is the rate of records per second to put onto the stream
+ *   <words-per-record> is the rate of records per second to put onto the 
stream
+ *
+ * Example:
+ *    $ SPARK_HOME/bin/run-example streaming.KinesisWordProducerASL 
mySparkStream \
+ *         https://kinesis.us-east-1.amazonaws.com us-east-1 10 5
+ */
+object KinesisWordProducerASL {
+  def main(args: Array[String]) {
+    if (args.length != 4) {
+      System.err.println(
+        """
+          |Usage: KinesisWordProducerASL <stream-name> <endpoint-url> 
<records-per-sec>
+                                         <words-per-record>
+          |
+          |    <stream-name> is the name of the Kinesis stream
+          |    <endpoint-url> is the endpoint of the Kinesis service
+          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
+          |    <records-per-sec> is the rate of records per second to put onto 
the stream
+          |    <words-per-record> is the rate of records per second to put 
onto the stream
+          |
+        """.stripMargin)
+
+      System.exit(1)
+    }
+
+    // Set default log4j logging level to WARN to hide Spark logs
+    StreamingExamples.setStreamingLogLevels()
+
+    // Populate the appropriate variables from the given args
+    val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
+
+    // Generate the records and return the totals
+    val totals = generate(stream, endpoint, recordsPerSecond.toInt,
+        wordsPerRecord.toInt)
+
+    // Print the array of (word, total) tuples
+    println("Totals for the words sent")
+    totals.foreach(println(_))
+  }
+
+  def generate(stream: String,
+      endpoint: String,
+      recordsPerSecond: Int,
+      wordsPerRecord: Int): Seq[(String, Int)] = {
+
+    val randomWords = List("spark", "you", "are", "my", "father")
+    val totals = scala.collection.mutable.Map[String, Int]()
+
+    // Create the low-level Kinesis Client from the AWS Java SDK.
+    val kinesisClient = new AmazonKinesisClient(new 
DefaultAWSCredentialsProviderChain())
+    kinesisClient.setEndpoint(endpoint)
+
+    println(s"Putting records onto stream $stream and endpoint $endpoint at a 
rate of" +
+        s" $recordsPerSecond records per second and $wordsPerRecord words per 
record")
+
+    // Iterate and put records onto the stream per the given recordPerSec and 
wordsPerRecord
+    for (i <- 1 to 10) {
+      // Generate recordsPerSec records to put onto the stream
+      val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
+        // Randomly generate wordsPerRecord number of words
+        val data = (1 to wordsPerRecord.toInt).map(x => {
+          // Get a random index to a word
+          val randomWordIdx = Random.nextInt(randomWords.size)
+          val randomWord = randomWords(randomWordIdx)
+
+          // Increment total count to compare to server counts later
+          totals(randomWord) = totals.getOrElse(randomWord, 0) + 1
+
+          randomWord
+        }).mkString(" ")
+
+        // Create a partitionKey based on recordNum
+        val partitionKey = s"partitionKey-$recordNum"
+
+        // Create a PutRecordRequest with an Array[Byte] version of the data
+        val putRecordRequest = new PutRecordRequest().withStreamName(stream)
+            .withPartitionKey(partitionKey)
+            .withData(ByteBuffer.wrap(data.getBytes()))
+
+        // Put the record onto the stream and capture the PutRecordResult
+        val putRecordResult = kinesisClient.putRecord(putRecordRequest)
+      }
+
+      // Sleep for a second
+      Thread.sleep(1000)
+      println("Sent " + recordsPerSecond + " records")
+    }
+     // Convert the totals to (index, total) tuple
+    totals.toSeq.sortBy(_._1)
+  }
+}
+
+/**
+ *  Utility functions for Spark Streaming examples.
+ *  This has been lifted from the examples/ project to remove the circular 
dependency.
+ */
+private[streaming] object StreamingExamples extends Logging {
+  // Set reasonable logging levels for streaming if the user has not 
configured log4j.
+  def setStreamingLogLevels() {
+    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+    if (!log4jInitialized) {
+      // We first log something to initialize Spark's default logging, then we 
override the
+      // logging level.
+      logInfo("Setting log level to [WARN] for streaming example." +
+        " To override add a custom log4j.properties to the classpath.")
+      Logger.getRootLogger.setLevel(Level.WARN)
+    }
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
new file mode 100644
index 0000000..3996f16
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+import scala.util.control.NonFatal
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark._
+import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.util.NextIterator
+
+
+/** Class representing a range of Kinesis sequence numbers. Both sequence 
numbers are inclusive. */
+private[kinesis]
+case class SequenceNumberRange(
+    streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: 
String)
+
+/** Class representing an array of Kinesis sequence number ranges */
+private[kinesis]
+case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
+  def isEmpty(): Boolean = ranges.isEmpty
+
+  def nonEmpty(): Boolean = ranges.nonEmpty
+
+  override def toString(): String = ranges.mkString("SequenceNumberRanges(", 
", ", ")")
+}
+
+private[kinesis]
+object SequenceNumberRanges {
+  def apply(range: SequenceNumberRange): SequenceNumberRanges = {
+    new SequenceNumberRanges(Seq(range))
+  }
+}
+
+
+/** Partition storing the information of the ranges of Kinesis sequence 
numbers to read */
+private[kinesis]
+class KinesisBackedBlockRDDPartition(
+    idx: Int,
+    blockId: BlockId,
+    val isBlockIdValid: Boolean,
+    val seqNumberRanges: SequenceNumberRanges
+  ) extends BlockRDDPartition(blockId, idx)
+
+/**
+ * A BlockRDD where the block data is backed by Kinesis, which can accessed 
using the
+ * sequence numbers of the corresponding blocks.
+ */
+private[kinesis]
+class KinesisBackedBlockRDD[T: ClassTag](
+    sc: SparkContext,
+    val regionName: String,
+    val endpointUrl: String,
+    @transient private val _blockIds: Array[BlockId],
+    @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
+    @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
+    val retryTimeoutMs: Int = 10000,
+    val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
+    val awsCredentialsOption: Option[SerializableAWSCredentials] = None
+  ) extends BlockRDD[T](sc, _blockIds) {
+
+  require(_blockIds.length == arrayOfseqNumberRanges.length,
+    "Number of blockIds is not equal to the number of sequence number ranges")
+
+  override def isValid(): Boolean = true
+
+  override def getPartitions: Array[Partition] = {
+    Array.tabulate(_blockIds.length) { i =>
+      val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
+      new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, 
arrayOfseqNumberRanges(i))
+    }
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    val blockManager = SparkEnv.get.blockManager
+    val partition = split.asInstanceOf[KinesisBackedBlockRDDPartition]
+    val blockId = partition.blockId
+
+    def getBlockFromBlockManager(): Option[Iterator[T]] = {
+      logDebug(s"Read partition data of $this from block manager, block 
$blockId")
+      blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+    }
+
+    def getBlockFromKinesis(): Iterator[T] = {
+      val credentials = awsCredentialsOption.getOrElse {
+        new DefaultAWSCredentialsProviderChain().getCredentials()
+      }
+      partition.seqNumberRanges.ranges.iterator.flatMap { range =>
+        new KinesisSequenceRangeIterator(credentials, endpointUrl, regionName,
+          range, retryTimeoutMs).map(messageHandler)
+      }
+    }
+    if (partition.isBlockIdValid) {
+      getBlockFromBlockManager().getOrElse { getBlockFromKinesis() }
+    } else {
+      getBlockFromKinesis()
+    }
+  }
+}
+
+
+/**
+ * An iterator that return the Kinesis data based on the given range of 
sequence numbers.
+ * Internally, it repeatedly fetches sets of records starting from the 
fromSequenceNumber,
+ * until the endSequenceNumber is reached.
+ */
+private[kinesis]
+class KinesisSequenceRangeIterator(
+    credentials: AWSCredentials,
+    endpointUrl: String,
+    regionId: String,
+    range: SequenceNumberRange,
+    retryTimeoutMs: Int) extends NextIterator[Record] with Logging {
+
+  private val client = new AmazonKinesisClient(credentials)
+  private val streamName = range.streamName
+  private val shardId = range.shardId
+
+  private var toSeqNumberReceived = false
+  private var lastSeqNumber: String = null
+  private var internalIterator: Iterator[Record] = null
+
+  client.setEndpoint(endpointUrl, "kinesis", regionId)
+
+  override protected def getNext(): Record = {
+    var nextRecord: Record = null
+    if (toSeqNumberReceived) {
+      finished = true
+    } else {
+
+      if (internalIterator == null) {
+
+        // If the internal iterator has not been initialized,
+        // then fetch records from starting sequence number
+        internalIterator = getRecords(ShardIteratorType.AT_SEQUENCE_NUMBER, 
range.fromSeqNumber)
+      } else if (!internalIterator.hasNext) {
+
+        // If the internal iterator does not have any more records,
+        // then fetch more records after the last consumed sequence number
+        internalIterator = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER, 
lastSeqNumber)
+      }
+
+      if (!internalIterator.hasNext) {
+
+        // If the internal iterator still does not have any data, then throw 
exception
+        // and terminate this iterator
+        finished = true
+        throw new SparkException(
+          s"Could not read until the end sequence number of the range: $range")
+      } else {
+
+        // Get the record, copy the data into a byte array and remember its 
sequence number
+        nextRecord = internalIterator.next()
+        lastSeqNumber = nextRecord.getSequenceNumber()
+
+        // If the this record's sequence number matches the stopping sequence 
number, then make sure
+        // the iterator is marked finished next time getNext() is called
+        if (nextRecord.getSequenceNumber == range.toSeqNumber) {
+          toSeqNumberReceived = true
+        }
+      }
+    }
+    nextRecord
+  }
+
+  override protected def close(): Unit = {
+    client.shutdown()
+  }
+
+  /**
+   * Get records starting from or after the given sequence number.
+   */
+  private def getRecords(iteratorType: ShardIteratorType, seqNum: String): 
Iterator[Record] = {
+    val shardIterator = getKinesisIterator(iteratorType, seqNum)
+    val result = getRecordsAndNextKinesisIterator(shardIterator)
+    result._1
+  }
+
+  /**
+   * Get the records starting from using a Kinesis shard iterator (which is a 
progress handle
+   * to get records from Kinesis), and get the next shard iterator for next 
consumption.
+   */
+  private def getRecordsAndNextKinesisIterator(
+      shardIterator: String): (Iterator[Record], String) = {
+    val getRecordsRequest = new GetRecordsRequest
+    getRecordsRequest.setRequestCredentials(credentials)
+    getRecordsRequest.setShardIterator(shardIterator)
+    val getRecordsResult = retryOrTimeout[GetRecordsResult](
+      s"getting records using shard iterator") {
+        client.getRecords(getRecordsRequest)
+      }
+    // De-aggregate records, if KPL was used in producing the records. The KCL 
automatically
+    // handles de-aggregation during regular operation. This code path is used 
during recovery
+    val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
+    (recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator)
+  }
+
+  /**
+   * Get the Kinesis shard iterator for getting records starting from or after 
the given
+   * sequence number.
+   */
+  private def getKinesisIterator(
+      iteratorType: ShardIteratorType,
+      sequenceNumber: String): String = {
+    val getShardIteratorRequest = new GetShardIteratorRequest
+    getShardIteratorRequest.setRequestCredentials(credentials)
+    getShardIteratorRequest.setStreamName(streamName)
+    getShardIteratorRequest.setShardId(shardId)
+    getShardIteratorRequest.setShardIteratorType(iteratorType.toString)
+    getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber)
+    val getShardIteratorResult = retryOrTimeout[GetShardIteratorResult](
+        s"getting shard iterator from sequence number $sequenceNumber") {
+          client.getShardIterator(getShardIteratorRequest)
+        }
+    getShardIteratorResult.getShardIterator
+  }
+
+  /** Helper method to retry Kinesis API request with exponential backoff and 
timeouts */
+  private def retryOrTimeout[T](message: String)(body: => T): T = {
+    import KinesisSequenceRangeIterator._
+
+    var startTimeMs = System.currentTimeMillis()
+    var retryCount = 0
+    var waitTimeMs = MIN_RETRY_WAIT_TIME_MS
+    var result: Option[T] = None
+    var lastError: Throwable = null
+
+    def isTimedOut = (System.currentTimeMillis() - startTimeMs) >= 
retryTimeoutMs
+    def isMaxRetryDone = retryCount >= MAX_RETRIES
+
+    while (result.isEmpty && !isTimedOut && !isMaxRetryDone) {
+      if (retryCount > 0) {  // wait only if this is a retry
+        Thread.sleep(waitTimeMs)
+        waitTimeMs *= 2  // if you have waited, then double wait time for next 
round
+      }
+      try {
+        result = Some(body)
+      } catch {
+        case NonFatal(t) =>
+          lastError = t
+           t match {
+             case ptee: ProvisionedThroughputExceededException =>
+               logWarning(s"Error while $message [attempt = ${retryCount + 
1}]", ptee)
+             case e: Throwable =>
+               throw new SparkException(s"Error while $message", e)
+           }
+      }
+      retryCount += 1
+    }
+    result.getOrElse {
+      if (isTimedOut) {
+        throw new SparkException(
+          s"Timed out after $retryTimeoutMs ms while $message, last exception: 
", lastError)
+      } else {
+        throw new SparkException(
+          s"Gave up after $retryCount retries while $message, last exception: 
", lastError)
+      }
+    }
+  }
+}
+
+private[streaming]
+object KinesisSequenceRangeIterator {
+  val MAX_RETRIES = 3
+  val MIN_RETRY_WAIT_TIME_MS = 100
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
new file mode 100644
index 0000000..1ca6d43
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.concurrent._
+
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.RecurringTimer
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+
+/**
+ * This is a helper class for managing Kinesis checkpointing.
+ *
+ * @param receiver The receiver that keeps track of which sequence numbers we 
can checkpoint
+ * @param checkpointInterval How frequently we will checkpoint to DynamoDB
+ * @param workerId Worker Id of KCL worker for logging purposes
+ * @param clock In order to use ManualClocks for the purpose of testing
+ */
+private[kinesis] class KinesisCheckpointer(
+    receiver: KinesisReceiver[_],
+    checkpointInterval: Duration,
+    workerId: String,
+    clock: Clock = new SystemClock) extends Logging {
+
+  // a map from shardId's to checkpointers
+  private val checkpointers = new ConcurrentHashMap[String, 
IRecordProcessorCheckpointer]()
+
+  private val lastCheckpointedSeqNums = new ConcurrentHashMap[String, String]()
+
+  private val checkpointerThread: RecurringTimer = startCheckpointerThread()
+
+  /** Update the checkpointer instance to the most recent one for the given 
shardId. */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+    checkpointers.put(shardId, checkpointer)
+  }
+
+  /**
+   * Stop tracking the specified shardId.
+   *
+   * If a checkpointer is provided, e.g. on IRecordProcessor.shutdown 
[[ShutdownReason.TERMINATE]],
+   * we will use that to make the final checkpoint. If `null` is provided, we 
will not make the
+   * checkpoint, e.g. in case of [[ShutdownReason.ZOMBIE]].
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+    synchronized {
+      checkpointers.remove(shardId)
+      checkpoint(shardId, checkpointer)
+    }
+  }
+
+  /** Perform the checkpoint. */
+  private def checkpoint(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+    try {
+      if (checkpointer != null) {
+        receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
+          val lastSeqNum = lastCheckpointedSeqNums.get(shardId)
+          // Kinesis sequence numbers are monotonically increasing strings, 
therefore we can do
+          // safely do the string comparison
+          if (lastSeqNum == null || latestSeqNum > lastSeqNum) {
+            /* Perform the checkpoint */
+            
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 
100)
+            logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint at 
sequence number" +
+              s" $latestSeqNum for shardId $shardId")
+            lastCheckpointedSeqNums.put(shardId, latestSeqNum)
+          }
+        }
+      } else {
+        logDebug(s"Checkpointing skipped for shardId $shardId. Checkpointer 
not set.")
+      }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Failed to checkpoint shardId $shardId to DynamoDB.", e)
+    }
+  }
+
+  /** Checkpoint the latest saved sequence numbers for all active shardId's. */
+  private def checkpointAll(): Unit = synchronized {
+    // if this method throws an exception, then the scheduled task will not 
run again
+    try {
+      val shardIds = checkpointers.keys()
+      while (shardIds.hasMoreElements) {
+        val shardId = shardIds.nextElement()
+        checkpoint(shardId, checkpointers.get(shardId))
+      }
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Failed to checkpoint to DynamoDB.", e)
+    }
+  }
+
+  /**
+   * Start the checkpointer thread with the given checkpoint duration.
+   */
+  private def startCheckpointerThread(): RecurringTimer = {
+    val period = checkpointInterval.milliseconds
+    val threadName = s"Kinesis Checkpointer - Worker $workerId"
+    val timer = new RecurringTimer(clock, period, _ => checkpointAll(), 
threadName)
+    timer.start()
+    logDebug(s"Started checkpointer thread: $threadName")
+    timer
+  }
+
+  /**
+   * Shutdown the checkpointer. Should be called on the onStop of the Receiver.
+   */
+  def shutdown(): Unit = {
+    // the recurring timer checkpoints for us one last time.
+    checkpointerThread.stop(interruptTimer = false)
+    checkpointers.clear()
+    lastCheckpointedSeqNums.clear()
+    logInfo("Successfully shutdown Kinesis Checkpointer.")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
new file mode 100644
index 0000000..5223c81
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import scala.reflect.ClassTag
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.{BlockId, StorageLevel}
+import org.apache.spark.streaming.{Duration, StreamingContext, Time}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
+
+private[kinesis] class KinesisInputDStream[T: ClassTag](
+    _ssc: StreamingContext,
+    streamName: String,
+    endpointUrl: String,
+    regionName: String,
+    initialPositionInStream: InitialPositionInStream,
+    checkpointAppName: String,
+    checkpointInterval: Duration,
+    storageLevel: StorageLevel,
+    messageHandler: Record => T,
+    awsCredentialsOption: Option[SerializableAWSCredentials]
+  ) extends ReceiverInputDStream[T](_ssc) {
+
+  private[streaming]
+  override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): 
RDD[T] = {
+
+    // This returns true even for when blockInfos is empty
+    val allBlocksHaveRanges = blockInfos.map { _.metadataOption 
}.forall(_.nonEmpty)
+
+    if (allBlocksHaveRanges) {
+      // Create a KinesisBackedBlockRDD, even when there are no blocks
+      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
+      val seqNumRanges = blockInfos.map {
+        _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
+      val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+      logDebug(s"Creating KinesisBackedBlockRDD for $time with 
${seqNumRanges.length} " +
+          s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
+      new KinesisBackedBlockRDD(
+        context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
+        isBlockIdValid = isBlockIdValid,
+        retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
+        messageHandler = messageHandler,
+        awsCredentialsOption = awsCredentialsOption)
+    } else {
+      logWarning("Kinesis sequence number information was not present with 
some block metadata," +
+        " it may not be possible to recover from failures")
+      super.createBlockRDD(time, blockInfos)
+    }
+  }
+
+  override def getReceiver(): Receiver[T] = {
+    new KinesisReceiver(streamName, endpointUrl, regionName, 
initialPositionInStream,
+      checkpointAppName, checkpointInterval, storageLevel, messageHandler, 
awsCredentialsOption)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
new file mode 100644
index 0000000..48ee2a9
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
DefaultAWSCredentialsProviderChain}
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorCheckpointer, IRecordProcessorFactory}
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
 KinesisClientLibConfiguration, Worker}
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
+import org.apache.spark.util.Utils
+import org.apache.spark.Logging
+
+private[kinesis]
+case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
+  extends AWSCredentials {
+  override def getAWSAccessKeyId: String = accessKeyId
+  override def getAWSSecretKey: String = secretKey
+}
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker as 
described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ *
+ * The way this Receiver works is as follows:
+ *
+ *  - The receiver starts a KCL Worker, which is essentially runs a threadpool 
of multiple
+ *    KinesisRecordProcessor
+ *  - Each KinesisRecordProcessor receives data from a Kinesis shard in 
batches. Each batch is
+ *    inserted into a Block Generator, and the corresponding range of sequence 
numbers is recorded.
+ *  - When the block generator defines a block, then the recorded sequence 
number ranges that were
+ *    inserted into the block are recorded separately for being used later.
+ *  - When the block is ready to be pushed, the block is pushed and the ranges 
are reported as
+ *    metadata of the block. In addition, the ranges are used to find out the 
latest sequence
+ *    number for each shard that can be checkpointed through the DynamoDB.
+ *  - Periodically, each KinesisRecordProcessor checkpoints the latest 
successfully stored sequence
+ *    number for it own shard.
+ *
+ * @param streamName   Kinesis stream name
+ * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+ * @param regionName  Region name used by the Kinesis Client Library for
+ *                    DynamoDB (lease coordination and checkpointing) and 
CloudWatch (metrics)
+ * @param initialPositionInStream  In the absence of Kinesis checkpoint info, 
this is the
+ *                                 worker's initial starting position in the 
stream.
+ *                                 The values are either the beginning of the 
stream
+ *                                 per Kinesis' limit of 24 hours
+ *                                 (InitialPositionInStream.TRIM_HORIZON) or
+ *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+ * @param checkpointAppName  Kinesis application name. Kinesis Apps are mapped 
to Kinesis Streams
+ *                 by the Kinesis Client Library.  If you change the App name 
or Stream name,
+ *                 the KCL will throw errors.  This usually requires deleting 
the backing
+ *                 DynamoDB table with the same name this Kinesis application.
+ * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+ *                            See the Kinesis Spark Streaming documentation 
for more
+ *                            details on the different types of checkpoints.
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param awsCredentialsOption Optional AWS credentials, used when user 
directly specifies
+ *                             the credentials
+ */
+private[kinesis] class KinesisReceiver[T](
+    val streamName: String,
+    endpointUrl: String,
+    regionName: String,
+    initialPositionInStream: InitialPositionInStream,
+    checkpointAppName: String,
+    checkpointInterval: Duration,
+    storageLevel: StorageLevel,
+    messageHandler: Record => T,
+    awsCredentialsOption: Option[SerializableAWSCredentials])
+  extends Receiver[T](storageLevel) with Logging { receiver =>
+
+  /*
+   * 
=================================================================================
+   * The following vars are initialize in the onStart() method which executes 
in the
+   * Spark worker after this Receiver is serialized and shipped to the worker.
+   * 
=================================================================================
+   */
+
+  /**
+   * workerId is used by the KCL should be based on the ip address of the 
actual Spark Worker
+   * where this code runs (not the driver's IP address.)
+   */
+  @volatile private var workerId: String = null
+
+  /**
+   * Worker is the core client abstraction from the Kinesis Client Library 
(KCL).
+   * A worker can process more than one shards from the given stream.
+   * Each shard is assigned its own IRecordProcessor and the worker run 
multiple such
+   * processors.
+   */
+  @volatile private var worker: Worker = null
+  @volatile private var workerThread: Thread = null
+
+  /** BlockGenerator used to generates blocks out of Kinesis data */
+  @volatile private var blockGenerator: BlockGenerator = null
+
+  /**
+   * Sequence number ranges added to the current block being generated.
+   * Accessing and updating of this map is synchronized by locks in 
BlockGenerator.
+   */
+  private val seqNumRangesInCurrentBlock = new 
mutable.ArrayBuffer[SequenceNumberRange]
+
+  /** Sequence number ranges of data added to each generated block */
+  private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, 
SequenceNumberRanges]
+
+  /**
+   * The centralized kinesisCheckpointer that checkpoints based on the given 
checkpointInterval.
+   */
+  @volatile private var kinesisCheckpointer: KinesisCheckpointer = null
+
+  /**
+   * Latest sequence number ranges that have been stored successfully.
+   * This is used for checkpointing through KCL */
+  private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, 
String]
+
+  /**
+   * This is called when the KinesisReceiver starts and must be non-blocking.
+   * The KCL creates and manages the receiving/processing thread pool through 
Worker.run().
+   */
+  override def onStart() {
+    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
+
+    workerId = Utils.localHostName() + ":" + UUID.randomUUID()
+
+    kinesisCheckpointer = new KinesisCheckpointer(receiver, 
checkpointInterval, workerId)
+    // KCL config instance
+    val awsCredProvider = resolveAWSCredentialsProvider()
+    val kinesisClientLibConfiguration =
+      new KinesisClientLibConfiguration(checkpointAppName, streamName, 
awsCredProvider, workerId)
+      .withKinesisEndpoint(endpointUrl)
+      .withInitialPositionInStream(initialPositionInStream)
+      .withTaskBackoffTimeMillis(500)
+      .withRegionName(regionName)
+
+   /*
+    *  RecordProcessorFactory creates impls of IRecordProcessor.
+    *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the
+    *  IRecordProcessor.processRecords() method.
+    *  We're using our custom KinesisRecordProcessor in this case.
+    */
+    val recordProcessorFactory = new IRecordProcessorFactory {
+      override def createProcessor: IRecordProcessor =
+        new KinesisRecordProcessor(receiver, workerId)
+    }
+
+    worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
+    workerThread = new Thread() {
+      override def run(): Unit = {
+        try {
+          worker.run()
+        } catch {
+          case NonFatal(e) =>
+            restart("Error running the KCL worker in Receiver", e)
+        }
+      }
+    }
+
+    blockIdToSeqNumRanges.clear()
+    blockGenerator.start()
+
+    workerThread.setName(s"Kinesis Receiver ${streamId}")
+    workerThread.setDaemon(true)
+    workerThread.start()
+
+    logInfo(s"Started receiver with workerId $workerId")
+  }
+
+  /**
+   * This is called when the KinesisReceiver stops.
+   * The KCL worker.shutdown() method stops the receiving/processing threads.
+   * The KCL will do its best to drain and checkpoint any in-flight records 
upon shutdown.
+   */
+  override def onStop() {
+    if (workerThread != null) {
+      if (worker != null) {
+        worker.shutdown()
+        worker = null
+      }
+      workerThread.join()
+      workerThread = null
+      logInfo(s"Stopped receiver for workerId $workerId")
+    }
+    workerId = null
+    if (kinesisCheckpointer != null) {
+      kinesisCheckpointer.shutdown()
+      kinesisCheckpointer = null
+    }
+  }
+
+  /** Add records of the given shard to the current block being generated */
+  private[kinesis] def addRecords(shardId: String, records: 
java.util.List[Record]): Unit = {
+    if (records.size > 0) {
+      val dataIterator = records.iterator().asScala.map(messageHandler)
+      val metadata = SequenceNumberRange(streamName, shardId,
+        records.get(0).getSequenceNumber(), records.get(records.size() - 
1).getSequenceNumber())
+      blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
+    }
+  }
+
+  /** Get the latest sequence number for the given shard that can be 
checkpointed through KCL */
+  private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): 
Option[String] = {
+    Option(shardIdToLatestStoredSeqNum.get(shardId))
+  }
+
+  /**
+   * Set the checkpointer that will be used to checkpoint sequence numbers to 
DynamoDB for the
+   * given shardId.
+   */
+  def setCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+    assert(kinesisCheckpointer != null, "Kinesis Checkpointer not 
initialized!")
+    kinesisCheckpointer.setCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Remove the checkpointer for the given shardId. The provided checkpointer 
will be used to
+   * checkpoint one last time for the given shard. If `checkpointer` is 
`null`, then we will not
+   * checkpoint.
+   */
+  def removeCheckpointer(shardId: String, checkpointer: 
IRecordProcessorCheckpointer): Unit = {
+    assert(kinesisCheckpointer != null, "Kinesis Checkpointer not 
initialized!")
+    kinesisCheckpointer.removeCheckpointer(shardId, checkpointer)
+  }
+
+  /**
+   * Remember the range of sequence numbers that was added to the currently 
active block.
+   * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
+   */
+  private def rememberAddedRange(range: SequenceNumberRange): Unit = {
+    seqNumRangesInCurrentBlock += range
+  }
+
+  /**
+   * Finalize the ranges added to the block that was active and prepare the 
ranges buffer
+   * for next block. Internally, this is synchronized with 
`rememberAddedRange()`.
+   */
+  private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
+    blockIdToSeqNumRanges.put(blockId, 
SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
+    seqNumRangesInCurrentBlock.clear()
+    logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
+  }
+
+  /** Store the block along with its associated ranges */
+  private def storeBlockWithRanges(
+      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
+    val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
+    if (rangesToReportOption.isEmpty) {
+      stop("Error while storing block into Spark, could not find sequence 
number ranges " +
+        s"for block $blockId")
+      return
+    }
+
+    val rangesToReport = rangesToReportOption.get
+    var attempt = 0
+    var stored = false
+    var throwable: Throwable = null
+    while (!stored && attempt <= 3) {
+      try {
+        store(arrayBuffer, rangesToReport)
+        stored = true
+      } catch {
+        case NonFatal(th) =>
+          attempt += 1
+          throwable = th
+      }
+    }
+    if (!stored) {
+      stop("Error while storing block into Spark", throwable)
+    }
+
+    // Update the latest sequence number that have been successfully stored 
for each shard
+    // Note that we are doing this sequentially because the array of sequence 
number ranges
+    // is assumed to be
+    rangesToReport.ranges.foreach { range =>
+      shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
+    }
+  }
+
+  /**
+   * If AWS credential is provided, return a AWSCredentialProvider returning 
that credential.
+   * Otherwise, return the DefaultAWSCredentialsProviderChain.
+   */
+  private def resolveAWSCredentialsProvider(): AWSCredentialsProvider = {
+    awsCredentialsOption match {
+      case Some(awsCredentials) =>
+        logInfo("Using provided AWS credentials")
+        new AWSCredentialsProvider {
+          override def getCredentials: AWSCredentials = awsCredentials
+          override def refresh(): Unit = { }
+        }
+      case None =>
+        logInfo("Using DefaultAWSCredentialsProviderChain")
+        new DefaultAWSCredentialsProviderChain()
+    }
+  }
+
+
+  /**
+   * Class to handle blocks generated by this receiver's block generator. 
Specifically, in
+   * the context of the Kinesis Receiver, this handler does the following.
+   *
+   * - When an array of records is added to the current active block in the 
block generator,
+   *   this handler keeps track of the corresponding sequence number range.
+   * - When the currently active block is ready to sealed (not more records), 
this handler
+   *   keep track of the list of ranges added into this block in another H
+   */
+  private class GeneratedBlockHandler extends BlockGeneratorListener {
+
+    /**
+     * Callback method called after a data item is added into the 
BlockGenerator.
+     * The data addition, block generation, and calls to onAddData and 
onGenerateBlock
+     * are all synchronized through the same lock.
+     */
+    def onAddData(data: Any, metadata: Any): Unit = {
+      rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
+    }
+
+    /**
+     * Callback method called after a block has been generated.
+     * The data addition, block generation, and calls to onAddData and 
onGenerateBlock
+     * are all synchronized through the same lock.
+     */
+    def onGenerateBlock(blockId: StreamBlockId): Unit = {
+      finalizeRangesForCurrentBlock(blockId)
+    }
+
+    /** Callback method called when a block is ready to be pushed / stored. */
+    def onPushBlock(blockId: StreamBlockId, arrayBuffer: 
mutable.ArrayBuffer[_]): Unit = {
+      storeBlockWithRanges(blockId,
+        arrayBuffer.asInstanceOf[mutable.ArrayBuffer[T]])
+    }
+
+    /** Callback called in case of any error in internal of the BlockGenerator 
*/
+    def onError(message: String, throwable: Throwable): Unit = {
+      reportError(message, throwable)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
new file mode 100644
index 0000000..b5b76cb
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.List
+
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, 
KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorCheckpointer}
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+
+/**
+ * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
+ * This implementation operates on the Array[Byte] from the KinesisReceiver.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor for 
each
+ * shard in the Kinesis stream upon startup.  This is normally done in 
separate threads,
+ * but the KCLs within the KinesisReceivers will balance themselves out if you 
create
+ * multiple Receivers.
+ *
+ * @param receiver Kinesis receiver
+ * @param workerId for logging purposes
+ */
+private[kinesis] class KinesisRecordProcessor[T](receiver: KinesisReceiver[T], 
workerId: String)
+  extends IRecordProcessor with Logging {
+
+  // shardId populated during initialize()
+  @volatile
+  private var shardId: String = _
+
+  /**
+   * The Kinesis Client Library calls this method during IRecordProcessor 
initialization.
+   *
+   * @param shardId assigned by the KCL to this particular RecordProcessor.
+   */
+  override def initialize(shardId: String) {
+    this.shardId = shardId
+    logInfo(s"Initialized workerId $workerId with shardId $shardId")
+  }
+
+  /**
+   * This method is called by the KCL when a batch of records is pulled from 
the Kinesis stream.
+   * This is the record-processing bridge between the KCL's 
IRecordProcessor.processRecords()
+   * and Spark Streaming's Receiver.store().
+   *
+   * @param batch list of records from the Kinesis stream shard
+   * @param checkpointer used to update Kinesis when this batch has been 
processed/stored
+   *   in the DStream
+   */
+  override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
+    if (!receiver.isStopped()) {
+      try {
+        receiver.addRecords(shardId, batch)
+        logDebug(s"Stored: Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+        receiver.setCheckpointer(shardId, checkpointer)
+      } catch {
+        case NonFatal(e) => {
+          /*
+           *  If there is a failure within the batch, the batch will not be 
checkpointed.
+           *  This will potentially cause records since the last checkpoint to 
be processed
+           *     more than once.
+           */
+          logError(s"Exception:  WorkerId $workerId encountered and exception 
while storing " +
+              s" or checkpointing a batch for workerId $workerId and shardId 
$shardId.", e)
+
+          /* Rethrow the exception to the Kinesis Worker that is managing this 
RecordProcessor. */
+          throw e
+        }
+      }
+    } else {
+      /* RecordProcessor has been stopped. */
+      logInfo(s"Stopped:  KinesisReceiver has stopped for workerId $workerId" +
+          s" and shardId $shardId.  No more records will be processed.")
+    }
+  }
+
+  /**
+   * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
+   * 1) the stream is resharding by splitting or merging adjacent shards
+   *     (ShutdownReason.TERMINATE)
+   * 2) the failed or latent Worker has stopped sending heartbeats for 
whatever reason
+   *     (ShutdownReason.ZOMBIE)
+   *
+   * @param checkpointer used to perform a Kinesis checkpoint for 
ShutdownReason.TERMINATE
+   * @param reason for shutdown (ShutdownReason.TERMINATE or 
ShutdownReason.ZOMBIE)
+   */
+  override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: 
ShutdownReason) {
+    logInfo(s"Shutdown:  Shutting down workerId $workerId with reason $reason")
+    reason match {
+      /*
+       * TERMINATE Use Case.  Checkpoint.
+       * Checkpoint to indicate that all records from the shard have been 
drained and processed.
+       * It's now OK to read from the new shards that resulted from a 
resharding event.
+       */
+      case ShutdownReason.TERMINATE =>
+        receiver.removeCheckpointer(shardId, checkpointer)
+
+      /*
+       * ZOMBIE Use Case or Unknown reason.  NoOp.
+       * No checkpoint because other workers may have taken over and already 
started processing
+       *    the same records.
+       * This may lead to records being processed more than once.
+       */
+      case _ =>
+        receiver.removeCheckpointer(shardId, null) // return null so that we 
don't checkpoint
+    }
+
+  }
+}
+
+private[kinesis] object KinesisRecordProcessor extends Logging {
+  /**
+   * Retry the given amount of times with a random backoff time (millis) less 
than the
+   *   given maxBackOffMillis
+   *
+   * @param expression expression to evalute
+   * @param numRetriesLeft number of retries left
+   * @param maxBackOffMillis: max millis between retries
+   *
+   * @return evaluation of the given expression
+   * @throws Unretryable exception, unexpected exception,
+   *  or any exception that persists after numRetriesLeft reaches 0
+   */
+  @annotation.tailrec
+  def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: 
Int): T = {
+    util.Try { expression } match {
+      /* If the function succeeded, evaluate to x. */
+      case util.Success(x) => x
+      /* If the function failed, either retry or throw the exception */
+      case util.Failure(e) => e match {
+        /* Retry:  Throttling or other Retryable exception has occurred */
+        case _: ThrottlingException | _: KinesisClientLibDependencyException 
if numRetriesLeft > 1
+          => {
+               val backOffMillis = Random.nextInt(maxBackOffMillis)
+               Thread.sleep(backOffMillis)
+               logError(s"Retryable Exception:  Random 
backOffMillis=${backOffMillis}", e)
+               retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
+             }
+        /* Throw:  Shutdown has been requested by the Kinesis Client Library. 
*/
+        case _: ShutdownException => {
+          logError(s"ShutdownException:  Caught shutdown exception, skipping 
checkpoint.", e)
+          throw e
+        }
+        /* Throw:  Non-retryable exception has occurred with the Kinesis 
Client Library */
+        case _: InvalidStateException => {
+          logError(s"InvalidStateException:  Cannot save checkpoint to the 
DynamoDB table used" +
+              s" by the Amazon Kinesis Client Library.  Table likely doesn't 
exist.", e)
+          throw e
+        }
+        /* Throw:  Unexpected exception has occurred */
+        case _ => {
+          logError(s"Unexpected, non-retryable exception.", e)
+          throw e
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
new file mode 100644
index 0000000..0ace453
--- /dev/null
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.{Failure, Random, Success, Try}
+
+import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
+import com.amazonaws.regions.RegionUtils
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
+import com.amazonaws.services.dynamodbv2.document.DynamoDB
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import com.amazonaws.services.kinesis.model._
+
+import org.apache.spark.Logging
+
+/**
+ * Shared utility methods for performing Kinesis tests that actually transfer 
data.
+ *
+ * PLEASE KEEP THIS FILE UNDER src/main AS PYTHON TESTS NEED ACCESS TO THIS 
FILE!
+ */
+private[kinesis] class KinesisTestUtils extends Logging {
+
+  val endpointUrl = KinesisTestUtils.endpointUrl
+  val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
+  val streamShardCount = 2
+
+  private val createStreamTimeoutSeconds = 300
+  private val describeStreamPollTimeSeconds = 1
+
+  @volatile
+  private var streamCreated = false
+
+  @volatile
+  private var _streamName: String = _
+
+  protected lazy val kinesisClient = {
+    val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
+    client.setEndpoint(endpointUrl)
+    client
+  }
+
+  private lazy val dynamoDB = {
+    val dynamoDBClient = new AmazonDynamoDBClient(new 
DefaultAWSCredentialsProviderChain())
+    dynamoDBClient.setRegion(RegionUtils.getRegion(regionName))
+    new DynamoDB(dynamoDBClient)
+  }
+
+  protected def getProducer(aggregate: Boolean): KinesisDataGenerator = {
+    if (!aggregate) {
+      new SimpleDataGenerator(kinesisClient)
+    } else {
+      throw new UnsupportedOperationException("Aggregation is not supported 
through this code path")
+    }
+  }
+
+  def streamName: String = {
+    require(streamCreated, "Stream not yet created, call createStream() to 
create one")
+    _streamName
+  }
+
+  def createStream(): Unit = {
+    require(!streamCreated, "Stream already created")
+    _streamName = findNonExistentStreamName()
+
+    // Create a stream. The number of shards determines the provisioned 
throughput.
+    logInfo(s"Creating stream ${_streamName}")
+    val createStreamRequest = new CreateStreamRequest()
+    createStreamRequest.setStreamName(_streamName)
+    createStreamRequest.setShardCount(2)
+    kinesisClient.createStream(createStreamRequest)
+
+    // The stream is now being created. Wait for it to become active.
+    waitForStreamToBeActive(_streamName)
+    streamCreated = true
+    logInfo(s"Created stream ${_streamName}")
+  }
+
+  /**
+   * Push data to Kinesis stream and return a map of
+   * shardId -> seq of (data, seq number) pushed to corresponding shard
+   */
+  def pushData(testData: Seq[Int], aggregate: Boolean): Map[String, Seq[(Int, 
String)]] = {
+    require(streamCreated, "Stream not yet created, call createStream() to 
create one")
+    val producer = getProducer(aggregate)
+    val shardIdToSeqNumbers = producer.sendData(streamName, testData)
+    logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
+    shardIdToSeqNumbers.toMap
+  }
+
+  /**
+   * Expose a Python friendly API.
+   */
+  def pushData(testData: java.util.List[Int]): Unit = {
+    pushData(testData.asScala, aggregate = false)
+  }
+
+  def deleteStream(): Unit = {
+    try {
+      if (streamCreated) {
+        kinesisClient.deleteStream(streamName)
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete stream $streamName")
+    }
+  }
+
+  def deleteDynamoDBTable(tableName: String): Unit = {
+    try {
+      val table = dynamoDB.getTable(tableName)
+      table.delete()
+      table.waitForDelete()
+    } catch {
+      case e: Exception =>
+        logWarning(s"Could not delete DynamoDB table $tableName")
+    }
+  }
+
+  private def describeStream(streamNameToDescribe: String): 
Option[StreamDescription] = {
+    try {
+      val describeStreamRequest = new 
DescribeStreamRequest().withStreamName(streamNameToDescribe)
+      val desc = 
kinesisClient.describeStream(describeStreamRequest).getStreamDescription()
+      Some(desc)
+    } catch {
+      case rnfe: ResourceNotFoundException =>
+        None
+    }
+  }
+
+  private def findNonExistentStreamName(): String = {
+    var testStreamName: String = null
+    do {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      testStreamName = s"KinesisTestUtils-${math.abs(Random.nextLong())}"
+    } while (describeStream(testStreamName).nonEmpty)
+    testStreamName
+  }
+
+  private def waitForStreamToBeActive(streamNameToWaitFor: String): Unit = {
+    val startTime = System.currentTimeMillis()
+    val endTime = startTime + 
TimeUnit.SECONDS.toMillis(createStreamTimeoutSeconds)
+    while (System.currentTimeMillis() < endTime) {
+      Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
+      describeStream(streamNameToWaitFor).foreach { description =>
+        val streamStatus = description.getStreamStatus()
+        logDebug(s"\t- current state: $streamStatus\n")
+        if ("ACTIVE".equals(streamStatus)) {
+          return
+        }
+      }
+    }
+    require(false, s"Stream $streamName never became active")
+  }
+}
+
+private[kinesis] object KinesisTestUtils {
+
+  val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
+  val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
+  val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com";
+
+  lazy val shouldRunTests = {
+    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+    if (isEnvSet) {
+      // scalastyle:off println
+      // Print this so that they are easily visible on the console and not 
hidden in the log4j logs.
+      println(
+        s"""
+          |Kinesis tests that actually send data has been enabled by setting 
the environment
+          |variable $envVarNameForEnablingTests to 1. This will create Kinesis 
Streams and
+          |DynamoDB tables in AWS. Please be aware that this may incur some 
AWS costs.
+          |By default, the tests use the endpoint URL $defaultEndpointUrl to 
create Kinesis streams.
+          |To change this endpoint URL to a different region, you can set the 
environment variable
+          |$endVarNameForEndpoint to the desired endpoint URL
+          |(e.g. 
$endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com";).
+        """.stripMargin)
+      // scalastyle:on println
+    }
+    isEnvSet
+  }
+
+  lazy val endpointUrl = {
+    val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
+    // scalastyle:off println
+    // Print this so that they are easily visible on the console and not 
hidden in the log4j logs.
+    println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
+    // scalastyle:on println
+    url
+  }
+
+  def isAWSCredentialsPresent: Boolean = {
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
+  }
+
+  def getAWSCredentials(): AWSCredentials = {
+    assert(shouldRunTests,
+      "Kinesis test not enabled, should not attempt to get AWS credentials")
+    Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
+      case Success(cred) => cred
+      case Failure(e) =>
+        throw new Exception(
+          s"""
+             |Kinesis tests enabled using environment variable 
$envVarNameForEnablingTests
+             |but could not find AWS credentials. Please follow instructions 
in AWS documentation
+             |to set the credentials in your system such that the 
DefaultAWSCredentialsProviderChain
+             |can find the credentials.
+           """.stripMargin)
+    }
+  }
+}
+
+/** A wrapper interface that will allow us to consolidate the code for 
synthetic data generation. */
+private[kinesis] trait KinesisDataGenerator {
+  /** Sends the data to Kinesis and returns the metadata for everything that 
has been sent. */
+  def sendData(streamName: String, data: Seq[Int]): Map[String, Seq[(Int, 
String)]]
+}
+
+private[kinesis] class SimpleDataGenerator(
+    client: AmazonKinesisClient) extends KinesisDataGenerator {
+  override def sendData(streamName: String, data: Seq[Int]): Map[String, 
Seq[(Int, String)]] = {
+    val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, 
String)]]()
+    data.foreach { num =>
+      val str = num.toString
+      val data = ByteBuffer.wrap(str.getBytes())
+      val putRecordRequest = new PutRecordRequest().withStreamName(streamName)
+        .withData(data)
+        .withPartitionKey(str)
+
+      val putRecordResult = client.putRecord(putRecordRequest)
+      val shardId = putRecordResult.getShardId
+      val seqNumber = putRecordResult.getSequenceNumber()
+      val sentSeqNumbers = shardIdToSeqNumbers.getOrElseUpdate(shardId,
+        new ArrayBuffer[(Int, String)]())
+      sentSeqNumbers += ((num, seqNumber))
+    }
+
+    shardIdToSeqNumbers.toMap
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to