Github user srosenthal commented on a diff in the pull request: https://github.com/apache/spark/pull/1434#discussion_r15137079 --- Diff: docs/streaming-programming-guide.md --- @@ -467,6 +468,62 @@ For more details on these additional sources, see the corresponding [API documen Furthermore, you can also implement your own custom receiver for your sources. See the [Custom Receiver Guide](streaming-custom-receivers.html). +### Kinesis +Build notes: +<li>Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.</li> +<li>_**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.</li> +<li>For sbt users, set the `SPARK_KINESIS_ASL` environment variable before building.</li> +<li>For Maven users, enable the `-Pspark-kinesis-asl` profile.</li> +<li>User applications will need to link to the `spark-kinesis-asl` artifact.</li> +<li>The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/spark-kinesis-asl.</li> + +Deployment and runtime notes: +<li>Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.</li> +<li>Said differently, a single KinesisReceiver can process many shards of a stream.</li> +<li>You never need more KinesisReceivers than the number of shards in your stream.</li> +<li>The Kinesis assembly jar must also be present on all worker nodes, as they will need access to the Kinesis Client Library.</li> +<li>/tmp/checkpoint is a valid and accessible directory on all workers (or locally if running in local mode)</li> +<li>This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:<br/> + 1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY<br/> + 2) Java System Properties - aws.accessKeyId and aws.secretKey<br/> + 3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs<br/> + 4) Instance profile credentials - delivered through the Amazon EC2 metadata service<br/> +</li> +<li>You need to setup a Kinesis stream with 1 or more shards per the following:<br/> + http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html</li> +<li>When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service, +retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.</li> +<li>During testing, I noticed varying degrees of delays while retrieving records from Kinesis depending on which coffee shop in San Francisco I was working. +The input and output data eventually matched, but sometimes after an unusually long time.</li> +<li>Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization). +Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream.</li> + +Failure recovery notes: +<li>The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:<br/> + 1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)<br/> + 2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch<br/> + 3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly<br/> +</li> +<li>During testing, if you see the same data being read from the stream twice, it's likely due to the Kinesis checkpoints not being written.</li> +<li>Checkpointing too freqently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling</li> +<li>Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.</li> +<li>If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) +or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.</li> +<li>When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.</li> +<li>InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.</li> +<li>In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data +depending on the checkpoint frequency.</li> +<li>InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.</li> +<li>Record processing should be idempotent when possible.</li> +<li>Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.</li> +<li>If possible, explicitly shutdown the worker if a failure occurs.</li> + +Example KinesisWordCount (and JavaKiensisWordCount) notes: --- End diff -- Typo - JavaKinesisWordCount
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---