Repository: spark
Updated Branches:
  refs/heads/branch-1.1 4f776dfab -> 826356725


[SPARK-1981] updated streaming-kinesis.md

fixed markup, separated out sections more-clearly, more thorough explanations

Author: Chris Fregly <ch...@fregly.com>

Closes #1757 from cfregly/master and squashes the following commits:

9b1c71a [Chris Fregly] better explained why spark checkpoints are disabled in 
the example (due to no stateful operations being used)
0f37061 [Chris Fregly] SPARK-1981:  (Kinesis streaming support) updated 
streaming-kinesis.md
862df67 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
8e1ae2e [Chris Fregly] Merge remote-tracking branch 'upstream/master'
4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more 
clear, removed retries around store() method
0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into 
extras/kinesis-asl
691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with 
JavaKinesisWordCount during union of streams
0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master'
74e5c7c [Chris Fregly] updated per TD's feedback.  simplified examples, updated 
docs
e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master'
bf614e9 [Chris Fregly] per matei's feedback:  moved the kinesis examples into 
the examples/ dir
d17ca6d [Chris Fregly] per TD's feedback:  updated docs, simplified the 
KinesisUtils api
912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail 
class
db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master'
21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master'
6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and 
kinesis client
338997e [Chris Fregly] improve build docs for kinesis
828f8ae [Chris Fregly] more cleanup
e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
cd68c0d [Chris Fregly] fixed typos and backward compatibility
d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support

(cherry picked from commit 99243288b049f4a4fb4ba0505ea2310be5eb4bd2)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82635672
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82635672
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82635672

Branch: refs/heads/branch-1.1
Commit: 826356725ffb3189180f7879d3f9c449924785f3
Parents: 4f776df
Author: Chris Fregly <ch...@fregly.com>
Authored: Sun Aug 17 19:33:15 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sun Aug 17 19:45:16 2014 -0700

----------------------------------------------------------------------
 docs/streaming-kinesis.md | 97 +++++++++++++++++++++---------------------
 1 file changed, 49 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82635672/docs/streaming-kinesis.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kinesis.md b/docs/streaming-kinesis.md
index 801c905..16ad322 100644
--- a/docs/streaming-kinesis.md
+++ b/docs/streaming-kinesis.md
@@ -3,56 +3,57 @@ layout: global
 title: Spark Streaming Kinesis Receiver
 ---
 
-### Kinesis
-Build notes:
-<li>Spark supports a Kinesis Streaming Receiver which is not included in the 
default build due to licensing restrictions.</li>
-<li>_**Note that by embedding this library you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
-<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and 
artifacts live in $SPARK_HOME/extras/kinesis-asl.</li>
-<li>To build with Kinesis, you must run the maven or sbt builds with 
-Pkinesis-asl`.</li>
-<li>Applications will need to link to the 'spark-streaming-kinesis-asl` 
artifact.</li>
+## Kinesis
+###Design
+<li>The KinesisReceiver uses the Kinesis Client Library (KCL) provided by 
Amazon under the Amazon Software License.</li>
+<li>The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides 
load-balancing, fault-tolerance, checkpointing through the concept of Workers, 
Checkpoints, and Shard Leases.</li>
+<li>The KCL uses DynamoDB to maintain all state.  A DynamoDB table is created 
in the us-east-1 region (regardless of Kinesis stream region) during KCL 
initialization for each Kinesis application name.</li>
+<li>A single KinesisReceiver can process many shards of a stream by spinning 
up multiple KinesisRecordProcessor threads.</li>
+<li>You never need more KinesisReceivers than the number of shards in your 
stream as each will spin up at least one KinesisRecordProcessor thread.</li>
+<li>Horizontal scaling is achieved by autoscaling additional KinesisReceiver 
(separate processes) or spinning up new KinesisRecordProcessor threads within 
each KinesisReceiver - up to the number of current shards for a given stream, 
of course.  Don't forget to autoscale back down!</li>
 
-Kinesis examples notes:
-<li>To build the Kinesis examples, you must run the maven or sbt builds with 
-Pkinesis-asl`.</li>
-<li>These examples automatically determine the number of local threads and 
KinesisReceivers to spin up based on the number of shards for the stream.</li>
-<li>KinesisWordCountProducerASL will generate random data to put onto the 
Kinesis stream for testing.</li>
-<li>Checkpointing is disabled (no checkpoint dir is set).  The examples as 
written will not recover from a driver failure.</li>
+### Build
+<li>Spark supports a Streaming KinesisReceiver, but it is not included in the 
default build due to Amazon Software Licensing (ASL) restrictions.</li>
+<li>To build with the Kinesis Streaming Receiver and supporting ASL-licensed 
code, you must run the maven or sbt builds with the **-Pkinesis-asl** 
profile.</li>
+<li>All KinesisReceiver-related code, examples, tests, and artifacts live in 
**$SPARK_HOME/extras/kinesis-asl/**.</li>
+<li>Kinesis-based Spark Applications will need to link to the 
**spark-streaming-kinesis-asl** artifact that is built when **-Pkinesis-asl** 
is specified.</li>
+<li>_**Note that by linking to this library, you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li>
 
-Deployment and runtime notes:
-<li>A single KinesisReceiver can process many shards of a stream.</li>
-<li>Each shard of a stream is processed by one or more KinesisReceiver's 
managed by the Kinesis Client Library (KCL) Worker.</li>
-<li>You never need more KinesisReceivers than the number of shards in your 
stream.</li>
-<li>You can horizontally scale the receiving by creating more 
KinesisReceiver/DStreams (up to the number of shards for a given stream)</li>
-<li>The Kinesis libraries must be present on all worker nodes, as they will 
need access to the Kinesis Client Library.</li>
-<li>This code uses the DefaultAWSCredentialsProviderChain and searches for 
credentials in the following order of precedence:<br/>
-    1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
-    2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
-    3) Credential profiles file - default location (~/.aws/credentials) shared 
by all AWS SDKs<br/>
-    4) Instance profile credentials - delivered through the Amazon EC2 
metadata service<br/>
-</li>
-<li>You need to setup a Kinesis stream with 1 or more shards per the 
following:<br/>
- http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
-<li>Valid Kinesis endpoint urls can be found here:  Valid endpoint urls:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
-<li>When you first start up the KinesisReceiver, the Kinesis Client Library 
(KCL) needs ~30s to establish connectivity with the AWS Kinesis service,
-retrieve any checkpoint data, and negotiate with other KCL's reading from the 
same stream.</li>
-<li>Be careful when changing the app name.  Kinesis maintains a mapping table 
in DynamoDB based on this app name 
(http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization).
  
-Changing the app name could lead to Kinesis errors as only 1 logical 
application can process a stream.  In order to start fresh, 
-it's always best to delete the DynamoDB table that matches your app name.  
This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint 
URL.</li>
+###Example
+<li>To build the Kinesis example, you must run the maven or sbt builds with 
the **-Pkinesis-asl** profile.</li>
+<li>You need to setup a Kinesis stream at one of the valid Kinesis endpoints 
with 1 or more shards per the following:  
http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li>
+<li>Valid Kinesis endpoints can be found here:  
http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region</li>
+<li>When running **locally**, the example automatically determines the number 
of threads and KinesisReceivers to spin up based on the number of shards 
configured for the stream.  Therefore, **local[n]** is not needed when starting 
the example as with other streaming examples.</li>
+<li>While this example could use a single KinesisReceiver which spins up 
multiple KinesisRecordProcessor threads to process multiple shards, I wanted to 
demonstrate unioning multiple KinesisReceivers as a single DStream.  (It's a 
bit confusing in local mode.)</li>
+<li>**KinesisWordCountProducerASL** is provided to generate random records 
into the Kinesis stream for testing.</li>
+<li>The example has been configured to immediately replicate incoming stream 
data to another node by using (StorageLevel.MEMORY_AND_DISK_2)
+<li>Spark checkpointing is disabled because the example does not use any 
stateful or window-based DStream operations such as updateStateByKey and 
reduceByWindow.  If those operations are introduced, you would need to enable 
checkpointing or risk losing data in the case of a failure.</li>
+<li>Kinesis checkpointing is enabled.  This means that the example will 
recover from a Kinesis failure.</li>
+<li>The example uses InitialPositionInStream.LATEST strategy to pull from the 
latest tip of the stream if no Kinesis checkpoint info exists.</li>
+<li>In our example, **KinesisWordCount** is the Kinesis application name for 
both the Scala and Java versions.  The use of this application name is 
described next.</li>
 
-Failure recovery notes:
-<li>The combination of Spark Streaming and Kinesis creates 3 different 
checkpoints as follows:<br/>
-  1) RDD data checkpoint (Spark Streaming) - frequency is configurable with 
DStream.checkpoint(Duration)<br/>
-  2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream 
batch<br/>
-  3) Kinesis checkpointing (Kinesis) - frequency is controlled by the 
developer calling ICheckpointer.checkpoint() directly<br/>
+###Deployment and Runtime
+<li>A Kinesis application name must be unique for a given account and 
region.</li>
+<li>A DynamoDB table and CloudWatch namespace are created during KCL 
initialization using this Kinesis application name.  
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization</li>
+<li>This DynamoDB table lives in the us-east-1 region regardless of the 
Kinesis endpoint URL.</li>
+<li>Changing the app name or stream name could lead to Kinesis errors as only 
a single logical application can process a single stream.</li>
+<li>If you are seeing errors after changing the app name or stream name, it 
may be necessary to manually delete the DynamoDB table and start from 
scratch.</li>
+<li>The Kinesis libraries must be present on all worker nodes, as they will 
need access to the KCL.</li>
+<li>The KinesisReceiver uses the DefaultAWSCredentialsProviderChain for AWS 
credentials which  searches for credentials in the following order of 
precedence:</br>
+1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/>
+2) Java System Properties - aws.accessKeyId and aws.secretKey<br/>
+3) Credential profiles file - default location (~/.aws/credentials) shared by 
all AWS SDKs<br/>
+4) Instance profile credentials - delivered through the Amazon EC2 metadata 
service
 </li>
-<li>Checkpointing too frequently will cause excess load on the AWS checkpoint 
storage layer and may lead to AWS throttling</li>
-<li>Upon startup, a KinesisReceiver will begin processing records with 
sequence numbers greater than the last checkpoint sequence number recorded per 
shard.</li>
-<li>If no checkpoint info exists, the worker will start either from the oldest 
record available (InitialPositionInStream.TRIM_HORIZON)
-or from the tip/latest (InitialPostitionInStream.LATEST).  This is 
configurable.</li>
-<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only 
new stream data will be picked up after the KinesisReceiver starts.</li>
-<li>InitialPositionInStream.LATEST could lead to missed records if data is 
added to the stream while no KinesisReceivers are running.</li>
-<li>In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data
-depending on the checkpoint frequency.</li>
-<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of 
records depending on the checkpoint frequency.</li>
+
+###Fault-Tolerance
+<li>The combination of Spark Streaming and Kinesis creates 2 different 
checkpoints that may occur at different intervals.</li>
+<li>Checkpointing too frequently against Kinesis will cause excess load on the 
AWS checkpoint storage layer and may lead to AWS throttling.  The provided 
example handles this throttling with a random backoff retry strategy.</li>
+<li>Upon startup, a KinesisReceiver will begin processing records with 
sequence numbers greater than the last Kinesis checkpoint sequence number 
recorded per shard (stored in the DynamoDB table).</li>
+<li>If no Kinesis checkpoint info exists, the KinesisReceiver will start 
either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) 
or from the latest tip (InitialPostitionInStream.LATEST).  This is 
configurable.</li>
+<li>InitialPositionInStream.LATEST could lead to missed records if data is 
added to the stream while no KinesisReceivers are running (and no checkpoint 
info is being stored.)</li>
+<li>In production, you'll want to switch to 
InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis 
limit) of previous stream data.</li>
+<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of 
records where the impact is dependent on checkpoint frequency.</li>
 <li>Record processing should be idempotent when possible.</li>
-<li>Failed or latent KinesisReceivers will be detected and automatically 
shutdown/load-balanced by the KCL.</li>
-<li>If possible, explicitly shutdown the worker if a failure occurs in order 
to trigger the final checkpoint.</li>
+<li>A failed or latent KinesisRecordProcessor within the KinesisReceiver will 
be detected and automatically restarted by the KCL.</li>
+<li>If possible, the KinesisReceiver should be shutdown cleanly in order to 
trigger a final checkpoint of all KinesisRecordProcessors to avoid duplicate 
record processing.</li>
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to