[SPARK-1981] Add AWS Kinesis streaming support

Author: Chris Fregly <ch...@fregly.com>

Closes #1434 from cfregly/master and squashes the following commits:

4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more 
clear, removed retries around store() method
0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into 
extras/kinesis-asl
691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with 
JavaKinesisWordCount during union of streams
0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master'
74e5c7c [Chris Fregly] updated per TD's feedback.  simplified examples, updated 
docs
e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master'
bf614e9 [Chris Fregly] per matei's feedback:  moved the kinesis examples into 
the examples/ dir
d17ca6d [Chris Fregly] per TD's feedback:  updated docs, simplified the 
KinesisUtils api
912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail 
class
db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master'
21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master'
6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and 
kinesis client
338997e [Chris Fregly] improve build docs for kinesis
828f8ae [Chris Fregly] more cleanup
e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
cd68c0d [Chris Fregly] fixed typos and backward compatibility
d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91f9504e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91f9504e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91f9504e

Branch: refs/heads/master
Commit: 91f9504e6086fac05b40545099f9818949c24bca
Parents: 67bd8e3
Author: Chris Fregly <ch...@fregly.com>
Authored: Sat Aug 2 13:35:35 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sat Aug 2 13:35:35 2014 -0700

----------------------------------------------------------------------
 bin/run-example                                 |   3 +-
 bin/run-example2.cmd                            |   3 +-
 dev/audit-release/audit_release.py              |   4 +-
 .../sbt_app_core/src/main/scala/SparkApp.scala  |   7 +
 dev/audit-release/sbt_app_kinesis/build.sbt     |  28 ++
 .../src/main/scala/SparkApp.scala               |  33 +++
 dev/create-release/create-release.sh            |   4 +-
 dev/run-tests                                   |   3 +
 docs/streaming-custom-receivers.md              |   4 +-
 docs/streaming-kinesis.md                       |  58 ++++
 docs/streaming-programming-guide.md             |  12 +-
 examples/pom.xml                                |  13 +
 extras/kinesis-asl/pom.xml                      |  96 +++++++
 .../streaming/JavaKinesisWordCountASL.java      | 180 ++++++++++++
 .../src/main/resources/log4j.properties         |  37 +++
 .../streaming/KinesisWordCountASL.scala         | 251 +++++++++++++++++
 .../kinesis/KinesisCheckpointState.scala        |  56 ++++
 .../streaming/kinesis/KinesisReceiver.scala     | 149 ++++++++++
 .../kinesis/KinesisRecordProcessor.scala        | 212 ++++++++++++++
 .../spark/streaming/kinesis/KinesisUtils.scala  |  96 +++++++
 .../kinesis/JavaKinesisStreamSuite.java         |  41 +++
 .../src/test/resources/log4j.properties         |  26 ++
 .../kinesis/KinesisReceiverSuite.scala          | 275 +++++++++++++++++++
 pom.xml                                         |  10 +
 project/SparkBuild.scala                        |   6 +-
 25 files changed, 1592 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/bin/run-example
----------------------------------------------------------------------
diff --git a/bin/run-example b/bin/run-example
index 942706d..68a3570 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -29,7 +29,8 @@ if [ -n "$1" ]; then
 else
   echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
   echo "  - set MASTER=XX to use a specific master" 1>&2
-  echo "  - can use abbreviated example class name (e.g. SparkPi, 
mllib.LinearRegression)" 1>&2
+  echo "  - can use abbreviated example class name relative to 
com.apache.spark.examples" 1>&2
+  echo "     (e.g. SparkPi, mllib.LinearRegression, 
streaming.KinesisWordCountASL)" 1>&2
   exit 1
 fi
 

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/bin/run-example2.cmd
----------------------------------------------------------------------
diff --git a/bin/run-example2.cmd b/bin/run-example2.cmd
index eadedd7..b29bf90 100644
--- a/bin/run-example2.cmd
+++ b/bin/run-example2.cmd
@@ -32,7 +32,8 @@ rem Test that an argument was given
 if not "x%1"=="x" goto arg_given
   echo Usage: run-example ^<example-class^> [example-args]
   echo   - set MASTER=XX to use a specific master
-  echo   - can use abbreviated example class name (e.g. SparkPi, 
mllib.LinearRegression)
+  echo   - can use abbreviated example class name relative to 
com.apache.spark.examples
+  echo      (e.g. SparkPi, mllib.LinearRegression, 
streaming.KinesisWordCountASL)
   goto exit
 :arg_given
 

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/dev/audit-release/audit_release.py
----------------------------------------------------------------------
diff --git a/dev/audit-release/audit_release.py 
b/dev/audit-release/audit_release.py
index 230e900..16ea1a7 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -105,7 +105,7 @@ modules = [
     "spark-core", "spark-bagel", "spark-mllib", "spark-streaming", 
"spark-repl",
     "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
     "spark-streaming-mqtt", "spark-streaming-twitter", 
"spark-streaming-zeromq",
-    "spark-catalyst", "spark-sql", "spark-hive"
+    "spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
 ]
 modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)
 
@@ -136,7 +136,7 @@ for module in modules:
 os.chdir(original_dir)
 
 # SBT application tests
-for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", 
"sbt_app_sql", "sbt_app_hive"]:
+for app in ["sbt_app_core", "sbt_app_graphx", "sbt_app_streaming", 
"sbt_app_sql", "sbt_app_hive", "sbt_app_kinesis"]:
     os.chdir(app)
     ret = run_cmd("sbt clean run", exit_on_failure=False)
     test(ret == 0, "sbt application (%s)" % app)

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
----------------------------------------------------------------------
diff --git a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala 
b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
index 77bbd16..fc03fec 100644
--- a/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
+++ b/dev/audit-release/sbt_app_core/src/main/scala/SparkApp.scala
@@ -50,5 +50,12 @@ object SimpleApp {
       println("Ganglia sink was loaded via spark-core")
       System.exit(-1)
     }
+
+    // Remove kinesis from default build due to ASL license issue
+    val foundKinesis = 
Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
+    if (foundKinesis) {
+      println("Kinesis was loaded via spark-core")
+      System.exit(-1)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/dev/audit-release/sbt_app_kinesis/build.sbt
----------------------------------------------------------------------
diff --git a/dev/audit-release/sbt_app_kinesis/build.sbt 
b/dev/audit-release/sbt_app_kinesis/build.sbt
new file mode 100644
index 0000000..981bc79
--- /dev/null
+++ b/dev/audit-release/sbt_app_kinesis/build.sbt
@@ -0,0 +1,28 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+name := "Kinesis Test"
+
+version := "1.0"
+
+scalaVersion := System.getenv.get("SCALA_VERSION")
+
+libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % 
System.getenv.get("SPARK_VERSION")
+
+resolvers ++= Seq(
+  "Spark Release Repository" at System.getenv.get("SPARK_RELEASE_REPOSITORY"),
+  "Spray Repository" at "http://repo.spray.cc/";)

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
----------------------------------------------------------------------
diff --git a/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala 
b/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
new file mode 100644
index 0000000..9f85066
--- /dev/null
+++ b/dev/audit-release/sbt_app_kinesis/src/main/scala/SparkApp.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main.scala
+
+import scala.util.Try
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+
+object SimpleApp {
+  def main(args: Array[String]) {
+    val foundKinesis = 
Try(Class.forName("org.apache.spark.streaming.kinesis.KinesisUtils")).isSuccess
+    if (!foundKinesis) {
+      println("Kinesis not loaded via kinesis-asl")
+      System.exit(-1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/dev/create-release/create-release.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/create-release.sh 
b/dev/create-release/create-release.sh
index af46572..4247362 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -53,15 +53,15 @@ if [[ ! "$@" =~ --package-only ]]; then
     -Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
     -Dmaven.javadoc.skip=true \
     -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
     -Dtag=$GIT_TAG -DautoVersionSubmodules=true \
+    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl 
-Pkinesis-asl \
     --batch-mode release:prepare
 
   mvn -DskipTests \
     -Darguments="-DskipTests=true -Dmaven.javadoc.skip=true 
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" 
\
     -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
     -Dmaven.javadoc.skip=true \
-    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
+    -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl 
-Pkinesis-asl \
     release:perform
 
   cd ..

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/dev/run-tests
----------------------------------------------------------------------
diff --git a/dev/run-tests b/dev/run-tests
index daa85bc..d401c90 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -36,6 +36,9 @@ fi
 if [ -z "$SBT_MAVEN_PROFILES_ARGS" ]; then
   export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
 fi
+
+export SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Pkinesis-asl"
+
 echo "SBT_MAVEN_PROFILES_ARGS=\"$SBT_MAVEN_PROFILES_ARGS\""
 
 # Remove work directory

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/docs/streaming-custom-receivers.md
----------------------------------------------------------------------
diff --git a/docs/streaming-custom-receivers.md 
b/docs/streaming-custom-receivers.md
index a2dc3a8..1e045a3 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -4,7 +4,7 @@ title: Spark Streaming Custom Receivers
 ---
 
 Spark Streaming can receive streaming data from any arbitrary data source 
beyond
-the one's for which it has in-built support (that is, beyond Flume, Kafka, 
files, sockets, etc.).
+the one's for which it has in-built support (that is, beyond Flume, Kafka, 
Kinesis, files, sockets, etc.).
 This requires the developer to implement a *receiver* that is customized for 
receiving data from
 the concerned data source. This guide walks through the process of 
implementing a custom receiver
 and using it in a Spark Streaming application.
@@ -174,7 +174,7 @@ val words = lines.flatMap(_.split(" "))
 ...
 {% endhighlight %}
 
-The full source code is in the example 
[CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala).
+The full source code is in the example 
[CustomReceiver.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala).
 
 </div>
 <div data-lang="java" markdown="1">

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/docs/streaming-kinesis.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kinesis.md b/docs/streaming-kinesis.md
new file mode 100644
index 0000000..801c905
--- /dev/null
+++ b/docs/streaming-kinesis.md
@@ -0,0 +1,58 @@
+---
+layout: global
+title: Spark Streaming Kinesis Receiver
+---
+
+### Kinesis
+Build notes:
+<li>Spark supports a Kinesis Streaming Receiver which is not included in the 
default build due to licensing restrictions.</li>
+<li>_**Note that by embedding this library you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
+<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and 
artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
+<li>To build with Kinesis, you must run the maven or sbt builds with 
-Pkinesis-asl`.</li>
+<li>Applications will need to link to the 'spark-streaming-kinesis-asl` 
artifact.</li>
+
+Kinesis examples notes:
+<li>To build the Kinesis examples, you must run the maven or sbt builds with 
-Pkinesis-asl`.</li>
+<li>These examples automatically determine the number of local threads and 
KinesisReceivers to spin up based on the number of shards for the stream.</li>
+<li>KinesisWordCountProducerASL will generate random data to put onto the 
Kinesis stream for testing.</li>
+<li>Checkpointing is disabled (no checkpoint dir is set).  The examples as 
written will not recover from a driver failure.</li>
+
+Deployment and runtime notes:
+<li>A single KinesisReceiver can process many shards of a stream.</li>
+<li>Each shard of a stream is processed by one or more KinesisReceiver's 
managed by the Kinesis Client Library (KCL) Worker.</li>
+<li>You never need more KinesisReceivers than the number of shards in your 
stream.</li>
+<li>You can horizontally scale the receiving by creating more 
KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
+<li>The Kinesis libraries must be present on all worker nodes, as they will 
need access to the Kinesis Client Library.</li>
+<li>This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence:<br/>
+    1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
+    2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
+    3) Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs<br/>
+    4) Instance profile credentials - delivered through the Amazon EC2 
metadata service<br/>
+</li>
+<li>You need to setup a Kinesis stream with 1 or more shards per the 
following:<br/>
+ http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
+<li>Valid Kinesis endpoint urls can be found here:  Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
+<li>When you first start up the KinesisReceiver, the Kinesis Client Library 
(KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
+retrieve any checkpoint data, and negotiate with other KCL's reading from the 
same stream.</li>
+<li>Be careful when changing the app name.  Kinesis maintains a mapping table 
in DynamoDB based on this app name 
(http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
  
+Changing the app name could lead to Kinesis errors as only 1 logical 
application can process a stream.  In order to start fresh, 
+it's always best to delete the DynamoDB table that matches your app name.  
This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint 
URL.</li>
+
+Failure recovery notes:
+<li>The combination of Spark Streaming and Kinesis creates 3 different 
checkpoints as follows:<br/>
+  1) RDD data checkpoint (Spark Streaming) - frequency is configurable with 
DStream.checkpoint(Duration)<br/>
+  2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream 
batch<br/>
+  3) Kinesis checkpointing (Kinesis) - frequency is controlled by the 
developer calling ICheckpointer.checkpoint() directly<br/>
+</li>
+<li>Checkpointing too frequently will cause excess load on the AWS checkpoint 
storage layer and may lead to AWS throttling</li>
+<li>Upon startup, a KinesisReceiver will begin processing records with 
sequence numbers greater than the last checkpoint sequence number recorded per 
shard.</li>
+<li>If no checkpoint info exists, the worker will start either from the oldest 
record available (InitialPositionInStream.TRIM_HORIZON)
+or from the tip/latest (InitialPostitionInStream.LATEST).  This is 
configurable.</li>
+<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only 
new stream data will be picked up after the KinesisReceiver starts.</li>
+<li>InitialPositionInStream.LATEST could lead to missed records if data is 
added to the stream while no KinesisReceivers are running.</li>
+<li>In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data
+depending on the checkpoint frequency.</li>
+<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of 
records depending on the checkpoint frequency.</li>
+<li>Record processing should be idempotent when possible.</li>
+<li>Failed or latent KinesisReceivers will be detected and automatically 
shutdown/load-balanced by the KCL.</li>
+<li>If possible, explicitly shutdown the worker if a failure occurs in order 
to trigger the final checkpoint.</li>

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 7b8b793..9f331ed 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -9,7 +9,7 @@ title: Spark Streaming Programming Guide
 # Overview
 Spark Streaming is an extension of the core Spark API that allows enables 
high-throughput,
 fault-tolerant stream processing of live data streams. Data can be ingested 
from many sources
-like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed 
using complex
+like Kafka, Flume, Twitter, ZeroMQ, Kinesis or plain old TCP sockets and be 
processed using complex
 algorithms expressed with high-level functions like `map`, `reduce`, `join` 
and `window`.
 Finally, processed data can be pushed out to filesystems, databases,
 and live dashboards. In fact, you can apply Spark's in-built
@@ -38,7 +38,7 @@ stream of results in batches.
 
 Spark Streaming provides a high-level abstraction called *discretized stream* 
or *DStream*,
 which represents a continuous stream of data. DStreams can be created either 
from input data
-stream from sources such as Kafka and Flume, or by applying high-level
+stream from sources such as Kafka, Flume, and Kinesis, or by applying 
high-level
 operations on other DStreams. Internally, a DStream is represented as a 
sequence of
 [RDDs](api/scala/index.html#org.apache.spark.rdd.RDD).
 
@@ -313,7 +313,7 @@ To write your own Spark Streaming program, you will have to 
add the following de
     artifactId = spark-streaming_{{site.SCALA_BINARY_VERSION}}
     version = {{site.SPARK_VERSION}}
 
-For ingesting data from sources like Kafka and Flume that are not present in 
the Spark
+For ingesting data from sources like Kafka, Flume, and Kinesis that are not 
present in the Spark
 Streaming core
  API, you will have to add the corresponding
 artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the 
dependencies. For example,
@@ -327,6 +327,7 @@ some of the common ones are as follows.
 <tr><td> Twitter </td><td> 
spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> ZeroMQ </td><td> spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} 
</td></tr>
 <tr><td> MQTT </td><td> spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} 
</td></tr>
+<tr><td> Kinesis<br/>(built separately)</td><td> 
kinesis-asl_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> </td><td></td></tr>
 </table>
 
@@ -442,7 +443,7 @@ see the API documentations of the relevant functions in
 Scala and 
[JavaStreamingContext](api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext)
  for Java.
 
-Additional functionality for creating DStreams from sources such as Kafka, 
Flume, and Twitter
+Additional functionality for creating DStreams from sources such as Kafka, 
Flume, Kinesis, and Twitter
 can be imported by adding the right dependencies as explained in an
 [earlier](#linking) section. To take the
 case of Kafka, after adding the artifact 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` to the
@@ -467,6 +468,9 @@ For more details on these additional sources, see the 
corresponding [API documen
 Furthermore, you can also implement your own custom receiver for your sources. 
See the
 [Custom Receiver Guide](streaming-custom-receivers.html).
 
+### Kinesis
+[Kinesis](streaming-kinesis.html)
+
 ## Operations
 There are two kinds of DStream operations - _transformations_ and _output 
operations_. Similar to
 RDD transformations, DStream transformations operate on one or more DStreams 
to create new DStreams

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index c4ed0f5..8c4c128 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,6 +34,19 @@
   <name>Spark Project Examples</name>
   <url>http://spark.apache.org/</url>
 
+  <profiles>
+    <profile>
+      <id>kinesis-asl</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+  
   <dependencies>
     <dependency>
       <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/pom.xml
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
new file mode 100644
index 0000000..a54b342
--- /dev/null
+++ b/extras/kinesis-asl/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements.  See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License.  You may obtain a copy of the License at
+~
+~    http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent</artifactId>
+    <version>1.1.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <!-- Kinesis integration is not included by default due to ASL-licensed 
code. -->
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Kinesis Integration</name>
+
+  <properties>
+    <sbt.project.name>kinesis-asl</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>amazon-kinesis-client</artifactId>
+      <version>${aws.kinesis.client.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk</artifactId>
+      <version>${aws.java.sdk.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymockclassextension</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.novocode</groupId>
+      <artifactId>junit-interface</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
 
b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
new file mode 100644
index 0000000..a8b907b
--- /dev/null
+++ 
b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kinesis.KinesisUtils;
+
+import scala.Tuple2;
+
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Lists;
+
+/**
+ * Java-friendly Kinesis Spark Streaming WordCount example
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more 
details
+ * on the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the 
given
+ *   <stream-name> and <endpoint-url>. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ *
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials 
+ *  in the following order of precedence: 
+ *         Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ *         Java System Properties - aws.accessKeyId and aws.secretKey
+ *         Credential profiles file - default location (~/.aws/credentials) 
shared by all AWS SDKs
+ *         Instance profile credentials - delivered through the Amazon EC2 
metadata service
+ *
+ * Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>
+ *         <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *         <endpoint-url> is the endpoint of the Kinesis service 
+ *           (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *      $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *      $ export AWS_SECRET_KEY=<your-secret-key>
+ *      $ $SPARK_HOME/bin/run-example \
+ *            org.apache.spark.examples.streaming.JavaKinesisWordCountASL 
mySparkStream \
+ *            https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class called KinesisWordCountProducerASL which 
puts dummy data 
+ *   onto the Kinesis stream. 
+ * Usage instructions for KinesisWordCountProducerASL are provided in the 
class definition.
+ */
+public final class JavaKinesisWordCountASL {
+    private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
+    private static final Logger logger = 
Logger.getLogger(JavaKinesisWordCountASL.class);
+
+    /* Make the constructor private to enforce singleton */
+    private JavaKinesisWordCountASL() {
+    }
+
+    public static void main(String[] args) {
+        /* Check that all required args were passed in. */
+        if (args.length < 2) {
+          System.err.println(
+              "|Usage: KinesisWordCount <stream-name> <endpoint-url>\n" +
+              "|    <stream-name> is the name of the Kinesis stream\n" +
+              "|    <endpoint-url> is the endpoint of the Kinesis service\n" +
+              "|                   (e.g. 
https://kinesis.us-east-1.amazonaws.com)\n");
+          System.exit(1);
+        }
+
+        StreamingExamples.setStreamingLogLevels();
+
+        /* Populate the appropriate variables from the given args */
+        String streamName = args[0];
+        String endpointUrl = args[1];
+        /* Set the batch interval to a fixed 2000 millis (2 seconds) */
+        Duration batchInterval = new Duration(2000);
+
+        /* Create a Kinesis client in order to determine the number of shards 
for the given stream */
+        AmazonKinesisClient kinesisClient = new AmazonKinesisClient(
+                new DefaultAWSCredentialsProviderChain());
+        kinesisClient.setEndpoint(endpointUrl);
+
+        /* Determine the number of shards from the stream */
+        int numShards = kinesisClient.describeStream(streamName)
+                .getStreamDescription().getShards().size();
+
+        /* In this example, we're going to create 1 Kinesis 
Worker/Receiver/DStream for each shard */ 
+        int numStreams = numShards;
+
+        /* Must add 1 more thread than the number of receivers or the output 
won't show properly from the driver */
+        int numSparkThreads = numStreams + 1;
+
+        /* Setup the Spark config. */
+        SparkConf sparkConfig = new 
SparkConf().setAppName("KinesisWordCount").setMaster(
+                "local[" + numSparkThreads + "]");
+
+        /* Kinesis checkpoint interval.  Same as batchInterval for this 
example. */
+        Duration checkpointInterval = batchInterval;
+
+        /* Setup the StreamingContext */
+        JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, 
batchInterval);
+
+        /* Create the same number of Kinesis DStreams/Receivers as Kinesis 
stream's shards */
+        List<JavaDStream<byte[]>> streamsList = new 
ArrayList<JavaDStream<byte[]>>(numStreams);
+        for (int i = 0; i < numStreams; i++) {
+               streamsList.add(
+                KinesisUtils.createStream(jssc, streamName, endpointUrl, 
checkpointInterval, 
+                InitialPositionInStream.LATEST, 
StorageLevel.MEMORY_AND_DISK_2())
+            );
+        }
+
+        /* Union all the streams if there is more than 1 stream */
+        JavaDStream<byte[]> unionStreams;
+        if (streamsList.size() > 1) {
+            unionStreams = jssc.union(streamsList.get(0), 
streamsList.subList(1, streamsList.size()));
+        } else {
+            /* Otherwise, just use the 1 stream */
+            unionStreams = streamsList.get(0);
+        }
+
+        /*
+         * Split each line of the union'd DStreams into multiple words using 
flatMap to produce the collection.
+         * Convert lines of byte[] to multiple Strings by first converting to 
String, then splitting on WORD_SEPARATOR.
+         */
+        JavaDStream<String> words = unionStreams.flatMap(new 
FlatMapFunction<byte[], String>() {
+                @Override
+                public Iterable<String> call(byte[] line) {
+                    return Lists.newArrayList(WORD_SEPARATOR.split(new 
String(line)));
+                }
+            });
+
+        /* Map each word to a (word, 1) tuple, then reduce/aggregate by word. 
*/
+        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+            new PairFunction<String, String, Integer>() {
+                @Override
+                public Tuple2<String, Integer> call(String s) {
+                    return new Tuple2<String, Integer>(s, 1);
+                }
+            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+                @Override
+                public Integer call(Integer i1, Integer i2) {
+                  return i1 + i2;
+                }
+            });
+
+        /* Print the first 10 wordCounts */
+        wordCounts.print();
+
+        /* Start the streaming context and await termination */
+        jssc.start();
+        jssc.awaitTermination();
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties 
b/extras/kinesis-asl/src/main/resources/log4j.properties
new file mode 100644
index 0000000..97348fb
--- /dev/null
+++ b/extras/kinesis-asl/src/main/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootCategory=WARN, console
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p 
%c{1}: %m%n
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
new file mode 100644
index 0000000..d03edf8
--- /dev/null
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.nio.ByteBuffer
+import scala.util.Random
+import org.apache.spark.Logging
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Milliseconds
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.kinesis.KinesisUtils
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.AmazonKinesisClient
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import com.amazonaws.services.kinesis.model.PutRecordRequest
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+/**
+ * Kinesis Spark Streaming WordCount example.
+ *
+ * See http://spark.apache.org/docs/latest/streaming-kinesis.html for more 
details on
+ *   the Kinesis Spark Streaming integration.
+ *
+ * This example spins up 1 Kinesis Worker (Spark Streaming Receiver) per shard 
+ *   for the given stream.
+ * It then starts pulling from the last checkpointed sequence number of the 
given 
+ *   <stream-name> and <endpoint-url>. 
+ *
+ * Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
+ * 
+ * This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials
+ *   in the following order of precedence:
+ * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+ * Java System Properties - aws.accessKeyId and aws.secretKey
+ * Credential profiles file - default location (~/.aws/credentials) shared by 
all AWS SDKs
+ * Instance profile credentials - delivered through the Amazon EC2 metadata 
service
+ *
+ * Usage: KinesisWordCountASL <stream-name> <endpoint-url>
+ *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *   <endpoint-url> is the endpoint of the Kinesis service
+ *     (ie. https://kinesis.us-east-1.amazonaws.com)
+ *
+ * Example:
+ *    $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *    $ export AWS_SECRET_KEY=<your-secret-key>
+ *    $ $SPARK_HOME/bin/run-example \
+ *        org.apache.spark.examples.streaming.KinesisWordCountASL 
mySparkStream \
+ *        https://kinesis.us-east-1.amazonaws.com
+ *
+ * There is a companion helper class below called KinesisWordCountProducerASL 
which puts
+ *   dummy data onto the Kinesis stream.
+ * Usage instructions for KinesisWordCountProducerASL are provided in that 
class definition.
+ */
+object KinesisWordCountASL extends Logging {
+  def main(args: Array[String]) {
+    /* Check that all required args were passed in. */
+    if (args.length < 2) {
+      System.err.println(
+        """
+          |Usage: KinesisWordCount <stream-name> <endpoint-url>
+          |    <stream-name> is the name of the Kinesis stream
+          |    <endpoint-url> is the endpoint of the Kinesis service
+          |                   (e.g. https://kinesis.us-east-1.amazonaws.com)
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    /* Populate the appropriate variables from the given args */
+    val Array(streamName, endpointUrl) = args
+
+    /* Determine the number of shards from the stream */
+    val kinesisClient = new AmazonKinesisClient(new 
DefaultAWSCredentialsProviderChain())
+    kinesisClient.setEndpoint(endpointUrl)
+    val numShards = 
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
+      .size()
+
+    /* In this example, we're going to create 1 Kinesis 
Worker/Receiver/DStream for each shard. */
+    val numStreams = numShards
+
+    /* 
+     *  numSparkThreads should be 1 more thread than the number of receivers.
+     *  This leaves one thread available for actually processing the data.
+     */
+    val numSparkThreads = numStreams + 1
+
+    /* Setup the and SparkConfig and StreamingContext */
+    /* Spark Streaming batch interval */
+    val batchInterval = Milliseconds(2000)    
+    val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
+      .setMaster(s"local[$numSparkThreads]")
+    val ssc = new StreamingContext(sparkConfig, batchInterval)
+
+    /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
+    val kinesisCheckpointInterval = batchInterval
+
+    /* Create the same number of Kinesis DStreams/Receivers as Kinesis 
stream's shards */
+    val kinesisStreams = (0 until numStreams).map { i =>
+      KinesisUtils.createStream(ssc, streamName, endpointUrl, 
kinesisCheckpointInterval,
+          InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
+    }
+
+    /* Union all the streams */
+    val unionStreams = ssc.union(kinesisStreams)
+
+    /* Convert each line of Array[Byte] to String, split into words, and count 
them */
+    val words = unionStreams.flatMap(byteArray => new String(byteArray)
+      .split(" "))
+
+    /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
+    val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
+
+    /* Print the first 10 wordCounts */
+    wordCounts.print()
+
+    /* Start the streaming context and await termination */
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+/**
+ * Usage: KinesisWordCountProducerASL <stream-name> <kinesis-endpoint-url>
+ *     <recordsPerSec> <wordsPerRecord>
+ *   <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
+ *   <kinesis-endpoint-url> is the endpoint of the Kinesis service
+ *     (ie. https://kinesis.us-east-1.amazonaws.com)
+ *   <records-per-sec> is the rate of records per second to put onto the stream
+ *   <words-per-record> is the rate of records per second to put onto the 
stream
+ *
+ * Example:
+ *    $ export AWS_ACCESS_KEY_ID=<your-access-key>
+ *    $ export AWS_SECRET_KEY=<your-secret-key>
+ *    $ $SPARK_HOME/bin/run-example \
+ *         org.apache.spark.examples.streaming.KinesisWordCountProducerASL 
mySparkStream \
+ *         https://kinesis.us-east-1.amazonaws.com 10 5
+ */
+object KinesisWordCountProducerASL {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: KinesisWordCountProducerASL <stream-name> 
<endpoint-url>" +
+          " <records-per-sec> <words-per-record>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    /* Populate the appropriate variables from the given args */
+    val Array(stream, endpoint, recordsPerSecond, wordsPerRecord) = args
+
+    /* Generate the records and return the totals */
+    val totals = generate(stream, endpoint, recordsPerSecond.toInt, 
wordsPerRecord.toInt)
+
+    /* Print the array of (index, total) tuples */
+    println("Totals")
+    totals.foreach(total => println(total.toString()))
+  }
+
+  def generate(stream: String,
+      endpoint: String,
+      recordsPerSecond: Int,
+      wordsPerRecord: Int): Seq[(Int, Int)] = {
+
+    val MaxRandomInts = 10
+
+    /* Create the Kinesis client */
+    val kinesisClient = new AmazonKinesisClient(new 
DefaultAWSCredentialsProviderChain())
+    kinesisClient.setEndpoint(endpoint)
+
+    println(s"Putting records onto stream $stream and endpoint $endpoint at a 
rate of" +
+      s" $recordsPerSecond records per second and $wordsPerRecord words per 
record");
+
+    val totals = new Array[Int](MaxRandomInts)
+    /* Put String records onto the stream per the given recordPerSec and 
wordsPerRecord */
+    for (i <- 1 to 5) {
+
+      /* Generate recordsPerSec records to put onto the stream */
+      val records = (1 to recordsPerSecond.toInt).map { recordNum =>
+        /* 
+         *  Randomly generate each wordsPerRec words between 0 (inclusive)
+         *  and MAX_RANDOM_INTS (exclusive) 
+         */
+        val data = (1 to wordsPerRecord.toInt).map(x => {
+          /* Generate the random int */
+          val randomInt = Random.nextInt(MaxRandomInts)
+
+          /* Keep track of the totals */
+          totals(randomInt) += 1
+
+          randomInt.toString()
+        }).mkString(" ")
+
+        /* Create a partitionKey based on recordNum */
+        val partitionKey = s"partitionKey-$recordNum"
+
+        /* Create a PutRecordRequest with an Array[Byte] version of the data */
+        val putRecordRequest = new PutRecordRequest().withStreamName(stream)
+            .withPartitionKey(partitionKey)
+            .withData(ByteBuffer.wrap(data.getBytes()));
+
+        /* Put the record onto the stream and capture the PutRecordResult */
+        val putRecordResult = kinesisClient.putRecord(putRecordRequest);
+      }
+
+      /* Sleep for a second */
+      Thread.sleep(1000)
+      println("Sent " + recordsPerSecond + " records")
+    }
+
+    /* Convert the totals to (index, total) tuple */
+    (0 to (MaxRandomInts - 1)).zip(totals)
+  }
+}
+
+/** 
+ *  Utility functions for Spark Streaming examples. 
+ *  This has been lifted from the examples/ project to remove the circular 
dependency.
+ */
+object StreamingExamples extends Logging {
+
+  /** Set reasonable logging levels for streaming if the user has not 
configured log4j. */
+  def setStreamingLogLevels() {
+    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+    if (!log4jInitialized) {
+      // We first log something to initialize Spark's default logging, then we 
override the
+      // logging level.
+      logInfo("Setting log level to [WARN] for streaming example." +
+        " To override add a custom log4j.properties to the classpath.")
+      Logger.getRootLogger.setLevel(Level.WARN)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
new file mode 100644
index 0000000..0b80b61
--- /dev/null
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.util.Clock
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.streaming.util.SystemClock
+
+/**
+ * This is a helper class for managing checkpoint clocks.
+ *
+ * @param checkpointInterval 
+ * @param currentClock.  Default to current SystemClock if none is passed in 
(mocking purposes)
+ */
+private[kinesis] class KinesisCheckpointState(
+    checkpointInterval: Duration, 
+    currentClock: Clock = new SystemClock())
+  extends Logging {
+  
+  /* Initialize the checkpoint clock using the given currentClock + 
checkpointInterval millis */
+  val checkpointClock = new ManualClock()
+  checkpointClock.setTime(currentClock.currentTime() + 
checkpointInterval.milliseconds)
+
+  /**
+   * Check if it's time to checkpoint based on the current time and the 
derived time 
+   *   for the next checkpoint
+   *
+   * @return true if it's time to checkpoint
+   */
+  def shouldCheckpoint(): Boolean = {
+    new SystemClock().currentTime() > checkpointClock.currentTime()
+  }
+
+  /**
+   * Advance the checkpoint clock by the checkpoint interval.
+   */
+  def advanceCheckpoint() = {
+    checkpointClock.addToTime(checkpointInterval.milliseconds)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
new file mode 100644
index 0000000..1bd1f32
--- /dev/null
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.net.InetAddress
+import java.util.UUID
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.receiver.Receiver
+
+import com.amazonaws.auth.AWSCredentialsProvider
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+/**
+ * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
+ * This implementation relies on the Kinesis Client Library (KCL) Worker as 
described here:
+ * https://github.com/awslabs/amazon-kinesis-client
+ * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) 
+ *   as described here:
+ *     http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ * Instances of this class will get shipped to the Spark Streaming Workers 
+ *   to run within a Spark Executor.
+ *
+ * @param appName  Kinesis application name. Kinesis Apps are mapped to 
Kinesis Streams
+ *                 by the Kinesis Client Library.  If you change the App name 
or Stream name,
+ *                 the KCL will throw errors.  This usually requires deleting 
the backing  
+ *                 DynamoDB table with the same name this Kinesis application.
+ * @param streamName   Kinesis stream name
+ * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+ * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+ *                            See the Kinesis Spark Streaming documentation 
for more
+ *                            details on the different types of checkpoints.
+ * @param initialPositionInStream  In the absence of Kinesis checkpoint info, 
this is the
+ *                                 worker's initial starting position in the 
stream.
+ *                                 The values are either the beginning of the 
stream
+ *                                 per Kinesis' limit of 24 hours
+ *                                 (InitialPositionInStream.TRIM_HORIZON) or
+ *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @return ReceiverInputDStream[Array[Byte]]   
+ */
+private[kinesis] class KinesisReceiver(
+    appName: String,
+    streamName: String,
+    endpointUrl: String,
+    checkpointInterval: Duration,
+    initialPositionInStream: InitialPositionInStream,
+    storageLevel: StorageLevel)
+  extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
+
+  /*
+   * The following vars are built in the onStart() method which executes in 
the Spark Worker after
+   *   this code is serialized and shipped remotely.
+   */
+
+  /*
+   *  workerId should be based on the ip address of the actual Spark Worker 
where this code runs
+   *   (not the Driver's ip address.)
+   */
+  var workerId: String = null
+
+  /*
+   * This impl uses the DefaultAWSCredentialsProviderChain and searches for 
credentials 
+   *   in the following order of precedence:
+   * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
+   * Java System Properties - aws.accessKeyId and aws.secretKey
+   * Credential profiles file at the default location (~/.aws/credentials) 
shared by all 
+   *   AWS SDKs and the AWS CLI
+   * Instance profile credentials delivered through the Amazon EC2 metadata 
service
+   */
+  var credentialsProvider: AWSCredentialsProvider = null
+
+  /* KCL config instance. */
+  var kinesisClientLibConfiguration: KinesisClientLibConfiguration = null
+
+  /*
+   *  RecordProcessorFactory creates impls of IRecordProcessor.
+   *  IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the 
+   *    IRecordProcessor.processRecords() method.
+   *  We're using our custom KinesisRecordProcessor in this case.
+   */
+  var recordProcessorFactory: IRecordProcessorFactory = null
+
+  /*
+   * Create a Kinesis Worker.
+   * This is the core client abstraction from the Kinesis Client Library (KCL).
+   * We pass the RecordProcessorFactory from above as well as the KCL config 
instance.
+   * A Kinesis Worker can process 1..* shards from the given stream - each 
with its 
+   *   own RecordProcessor.
+   */
+  var worker: Worker = null
+
+  /**
+   *  This is called when the KinesisReceiver starts and must be non-blocking.
+   *  The KCL creates and manages the receiving/processing thread pool through 
the Worker.run() 
+   *    method.
+   */
+  override def onStart() {
+    workerId = InetAddress.getLocalHost.getHostAddress() + ":" + 
UUID.randomUUID()
+    credentialsProvider = new DefaultAWSCredentialsProviderChain()
+    kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, 
streamName,
+      credentialsProvider, workerId).withKinesisEndpoint(endpointUrl)
+      
.withInitialPositionInStream(initialPositionInStream).withTaskBackoffTimeMillis(500)
+    recordProcessorFactory = new IRecordProcessorFactory {
+      override def createProcessor: IRecordProcessor = new 
KinesisRecordProcessor(receiver,
+        workerId, new KinesisCheckpointState(checkpointInterval))
+    }
+    worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration)
+    worker.run()
+    logInfo(s"Started receiver with workerId $workerId")
+  }
+
+  /**
+   *  This is called when the KinesisReceiver stops.
+   *  The KCL worker.shutdown() method stops the receiving/processing threads.
+   *  The KCL will do its best to drain and checkpoint any in-flight records 
upon shutdown.
+   */
+  override def onStop() {
+    worker.shutdown()
+    logInfo(s"Shut down receiver with workerId $workerId")
+    workerId = null
+    credentialsProvider = null
+    kinesisClientLibConfiguration = null
+    recordProcessorFactory = null
+    worker = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
new file mode 100644
index 0000000..8ecc2d9
--- /dev/null
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import java.util.List
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.util.Random
+
+import org.apache.spark.Logging
+
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
+import 
com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
+import com.amazonaws.services.kinesis.model.Record
+
+/**
+ * Kinesis-specific implementation of the Kinesis Client Library (KCL) 
IRecordProcessor.
+ * This implementation operates on the Array[Byte] from the KinesisReceiver.
+ * The Kinesis Worker creates an instance of this KinesisRecordProcessor upon 
startup.
+ *
+ * @param receiver Kinesis receiver
+ * @param workerId for logging purposes
+ * @param checkpointState represents the checkpoint state including the next 
checkpoint time.
+ *   It's injected here for mocking purposes.
+ */
+private[kinesis] class KinesisRecordProcessor(
+    receiver: KinesisReceiver,
+    workerId: String,
+    checkpointState: KinesisCheckpointState) extends IRecordProcessor with 
Logging {
+
+  /* shardId to be populated during initialize() */
+  var shardId: String = _
+
+  /**
+   * The Kinesis Client Library calls this method during IRecordProcessor 
initialization.
+   *
+   * @param shardId assigned by the KCL to this particular RecordProcessor.
+   */
+  override def initialize(shardId: String) {
+    logInfo(s"Initialize:  Initializing workerId $workerId with shardId 
$shardId")
+    this.shardId = shardId
+  }
+
+  /**
+   * This method is called by the KCL when a batch of records is pulled from 
the Kinesis stream.
+   * This is the record-processing bridge between the KCL's 
IRecordProcessor.processRecords()
+   * and Spark Streaming's Receiver.store().
+   *
+   * @param batch list of records from the Kinesis stream shard
+   * @param checkpointer used to update Kinesis when this batch has been 
processed/stored 
+   *   in the DStream
+   */
+  override def processRecords(batch: List[Record], checkpointer: 
IRecordProcessorCheckpointer) {
+    if (!receiver.isStopped()) {
+      try {
+        /*
+         * Note:  If we try to store the raw ByteBuffer from record.getData(), 
the Spark Streaming
+         * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer 
using the
+         *   internally-configured Spark serializer (kryo, etc).
+         * This is not desirable, so we instead store a raw Array[Byte] and 
decouple
+         *   ourselves from Spark's internal serialization strategy.
+         */
+        batch.foreach(record => receiver.store(record.getData().array()))
+        
+        logDebug(s"Stored:  Worker $workerId stored ${batch.size} records for 
shardId $shardId")
+
+        /*
+         * Checkpoint the sequence number of the last record successfully 
processed/stored 
+         *   in the batch.
+         * In this implementation, we're checkpointing after the given 
checkpointIntervalMillis.
+         * Note that this logic requires that processRecords() be called AND 
that it's time to 
+         *   checkpoint.  I point this out because there is no background 
thread running the 
+         *   checkpointer.  Checkpointing is tested and trigger only when a 
new batch comes in.
+         * If the worker is shutdown cleanly, checkpoint will happen (see 
shutdown() below).
+         * However, if the worker dies unexpectedly, a checkpoint may not 
happen.
+         * This could lead to records being processed more than once.
+         */
+        if (checkpointState.shouldCheckpoint()) {
+          /* Perform the checkpoint */
+          KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+
+          /* Update the next checkpoint time */
+          checkpointState.advanceCheckpoint()
+
+          logDebug(s"Checkpoint:  WorkerId $workerId completed checkpoint of 
${batch.size}" +
+              s" records for shardId $shardId")
+          logDebug(s"Checkpoint:  Next checkpoint is at " +
+              s" ${checkpointState.checkpointClock.currentTime()} for shardId 
$shardId")
+        }
+      } catch {
+        case e: Throwable => {
+          /*
+           *  If there is a failure within the batch, the batch will not be 
checkpointed.
+           *  This will potentially cause records since the last checkpoint to 
be processed
+           *     more than once.
+           */
+          logError(s"Exception:  WorkerId $workerId encountered and exception 
while storing " +
+              " or checkpointing a batch for workerId $workerId and shardId 
$shardId.", e)
+
+          /* Rethrow the exception to the Kinesis Worker that is managing this 
RecordProcessor.*/
+          throw e
+        }
+      }
+    } else {
+      /* RecordProcessor has been stopped. */
+      logInfo(s"Stopped:  The Spark KinesisReceiver has stopped for workerId 
$workerId" + 
+          s" and shardId $shardId.  No more records will be processed.")
+    }
+  }
+
+  /**
+   * Kinesis Client Library is shutting down this Worker for 1 of 2 reasons:
+   * 1) the stream is resharding by splitting or merging adjacent shards 
+   *     (ShutdownReason.TERMINATE)
+   * 2) the failed or latent Worker has stopped sending heartbeats for 
whatever reason 
+   *     (ShutdownReason.ZOMBIE)
+   *
+   * @param checkpointer used to perform a Kinesis checkpoint for 
ShutdownReason.TERMINATE
+   * @param reason for shutdown (ShutdownReason.TERMINATE or 
ShutdownReason.ZOMBIE)
+   */
+  override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: 
ShutdownReason) {
+    logInfo(s"Shutdown:  Shutting down workerId $workerId with reason $reason")
+    reason match {
+      /*
+       * TERMINATE Use Case.  Checkpoint.
+       * Checkpoint to indicate that all records from the shard have been 
drained and processed.
+       * It's now OK to read from the new shards that resulted from a 
resharding event.
+       */
+      case ShutdownReason.TERMINATE => 
+        KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
+
+      /*
+       * ZOMBIE Use Case.  NoOp.
+       * No checkpoint because other workers may have taken over and already 
started processing
+       *    the same records.
+       * This may lead to records being processed more than once.
+       */
+      case ShutdownReason.ZOMBIE =>
+
+      /* Unknown reason.  NoOp */
+      case _ =>
+    }
+  }
+}
+
+private[kinesis] object KinesisRecordProcessor extends Logging {
+  /**
+   * Retry the given amount of times with a random backoff time (millis) less 
than the
+   *   given maxBackOffMillis
+   *
+   * @param expression expression to evalute
+   * @param numRetriesLeft number of retries left
+   * @param maxBackOffMillis: max millis between retries
+   *
+   * @return evaluation of the given expression
+   * @throws Unretryable exception, unexpected exception,
+   *  or any exception that persists after numRetriesLeft reaches 0
+   */
+  @annotation.tailrec
+  def retryRandom[T](expression: => T, numRetriesLeft: Int, maxBackOffMillis: 
Int): T = {
+    util.Try { expression } match {
+      /* If the function succeeded, evaluate to x. */
+      case util.Success(x) => x
+      /* If the function failed, either retry or throw the exception */
+      case util.Failure(e) => e match {
+        /* Retry:  Throttling or other Retryable exception has occurred */
+        case _: ThrottlingException | _: KinesisClientLibDependencyException 
if numRetriesLeft > 1
+          => {
+               val backOffMillis = Random.nextInt(maxBackOffMillis)
+               Thread.sleep(backOffMillis)
+               logError(s"Retryable Exception:  Random 
backOffMillis=${backOffMillis}", e)
+               retryRandom(expression, numRetriesLeft - 1, maxBackOffMillis)
+             }
+        /* Throw:  Shutdown has been requested by the Kinesis Client Library.*/
+        case _: ShutdownException => {
+          logError(s"ShutdownException:  Caught shutdown exception, skipping 
checkpoint.", e)
+          throw e
+        }
+        /* Throw:  Non-retryable exception has occurred with the Kinesis 
Client Library */
+        case _: InvalidStateException => {
+          logError(s"InvalidStateException:  Cannot save checkpoint to the 
DynamoDB table used" +
+              s" by the Amazon Kinesis Client Library.  Table likely doesn't 
exist.", e)
+          throw e
+        }
+        /* Throw:  Unexpected exception has occurred */
+        case _ => {
+          logError(s"Unexpected, non-retryable exception.", e)
+          throw e
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
new file mode 100644
index 0000000..713cac0
--- /dev/null
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.streaming.kinesis
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.Duration
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
+import org.apache.spark.streaming.api.java.JavaStreamingContext
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+
+/**
+ * Helper class to create Amazon Kinesis Input Stream
+ * :: Experimental ::
+ */
+@Experimental
+object KinesisUtils {
+  /**
+   * Create an InputDStream that pulls messages from a Kinesis stream.
+   *
+   * @param ssc    StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @return ReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+      ssc: StreamingContext,
+      streamName: String,
+      endpointUrl: String,
+      checkpointInterval: Duration,
+      initialPositionInStream: InitialPositionInStream,
+      storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
+    ssc.receiverStream(new KinesisReceiver(ssc.sc.appName, streamName, 
endpointUrl,
+        checkpointInterval, initialPositionInStream, storageLevel))
+  }
+
+  /**
+   * Create a Java-friendly InputDStream that pulls messages from a Kinesis 
stream.
+   *
+   * @param jssc Java StreamingContext object
+   * @param ssc    StreamingContext object
+   * @param streamName   Kinesis stream name
+   * @param endpointUrl  Url of Kinesis service (e.g., 
https://kinesis.us-east-1.amazonaws.com)
+   * @param checkpointInterval  Checkpoint interval for Kinesis checkpointing.
+   *                            See the Kinesis Spark Streaming documentation 
for more
+   *                            details on the different types of checkpoints.
+   * @param initialPositionInStream  In the absence of Kinesis checkpoint 
info, this is the
+   *                                 worker's initial starting position in the 
stream.
+   *                                 The values are either the beginning of 
the stream
+   *                                 per Kinesis' limit of 24 hours
+   *                                 (InitialPositionInStream.TRIM_HORIZON) or
+   *                                 the tip of the stream 
(InitialPositionInStream.LATEST).
+   * @param storageLevel Storage level to use for storing the received objects
+   *
+   * @return JavaReceiverInputDStream[Array[Byte]]
+   */
+  def createStream(
+      jssc: JavaStreamingContext, 
+      streamName: String, 
+      endpointUrl: String, 
+      checkpointInterval: Duration,
+      initialPositionInStream: InitialPositionInStream,
+      storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = {
+    jssc.receiverStream(new KinesisReceiver(jssc.ssc.sc.appName, streamName,
+        endpointUrl, checkpointInterval, initialPositionInStream, 
storageLevel))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
 
b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
new file mode 100644
index 0000000..87954a3
--- /dev/null
+++ 
b/extras/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kinesis;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.junit.Test;
+
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+/**
+ * Demonstrate the use of the KinesisUtils Java API
+ */
+public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
+  @Test
+  public void testKinesisStream() {
+    // Tests the API, does not actually test data receiving
+    JavaDStream<byte[]> kinesisStream = KinesisUtils.createStream(ssc, 
"mySparkStream",
+        "https://kinesis.us-west-2.amazonaws.com";, new Duration(2000), 
+        InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
+    
+    ssc.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/91f9504e/extras/kinesis-asl/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/resources/log4j.properties 
b/extras/kinesis-asl/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e01e049
--- /dev/null
+++ b/extras/kinesis-asl/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=false
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p 
%c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.eclipse.jetty=WARN


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to