atoomula and prateekm FYI.. Author: Jagadish <jvenkatra...@linkedin.com>
Reviewers: Jagadish<jagad...@apache.org> Closes #785 from vjagadish1989/website-reorg28 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ac5f9480 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ac5f9480 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ac5f9480 Branch: refs/heads/1.0.0 Commit: ac5f948018cb05bee5f1c44568146961152863aa Parents: 299c031 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Tue Oct 30 12:11:56 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Nov 13 19:32:55 2018 -0800 ---------------------------------------------------------------------- .../versioned/connectors/kinesis.md | 104 +++++++++++-------- 1 file changed, 61 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ac5f9480/docs/learn/documentation/versioned/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/kinesis.md b/docs/learn/documentation/versioned/connectors/kinesis.md index a866484..85149f6 100644 --- a/docs/learn/documentation/versioned/connectors/kinesis.md +++ b/docs/learn/documentation/versioned/connectors/kinesis.md @@ -19,11 +19,16 @@ title: Kinesis Connector limitations under the License. --> -## Overview +## Kinesis I/O: Quickstart -The Samza Kinesis connector provides access to [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams), -Amazonâs data streaming service. A Kinesis Data Stream is similar to a Kafka topic and can have multiple partitions. -Each message consumed from a Kinesis Data Stream is an instance of [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record). +The Samza Kinesis connector allows you to interact with [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams), +Amazonâs data streaming service. The `hello-samza` project includes an example of processing Kinesis streams using Samza. Here is the complete [source code](https://github.com/apache/samza-hello-samza/blob/master/src/main/java/samza/examples/kinesis/KinesisHelloSamza.java) and [configs](https://github.com/apache/samza-hello-samza/blob/master/src/main/config/kinesis-hello-samza.properties). +You can build and run this example using this [tutorial](https://github.com/apache/samza-hello-samza#hello-samza). + + +###Data Format +Like a Kafka topic, a Kinesis stream can have multiple shards with producers and consumers. +Each message consumed from the stream is an instance of a Kinesis [Record](http://docs.aws.amazon.com/goto/WebAPI/kinesis-2013-12-02/Record). Samzaâs [KinesisSystemConsumer](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java) wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apache/samza/blob/master/samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisIncomingMessageEnvelope.java). @@ -31,90 +36,103 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac ### Basic Configuration -You can configure your Samza jobs to process data from Kinesis Streams. To configure Samza job to consume from Kinesis -streams, please add the below configuration: +Here is the required configuration for consuming messages from Kinesis. {% highlight jproperties %} -// define a kinesis system factory with your identifier. eg: kinesis-system -systems.kinesis-system.samza.factory=org.apache.samza.system.eventhub.KinesisSystemFactory +// Define a Kinesis system factory with your identifier. eg: kinesis-system +systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory -// kinesis system consumer works with only AllSspToSingleTaskGrouperFactory +// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory -// define your streams +// Define your streams task.inputs=kinesis-system.input0 -// define required properties for your streams +// Define required properties for your streams systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY {% endhighlight %} -The tuple required to access the Kinesis data stream must be provided, namely the following fields:<br> -**YOUR-STREAM-REGION**, **YOUR-ACCESS-KEY**, **YOUR-SECRET-KEY**. +####Coordination +The Kinesis system consumer does not rely on Samza's coordination mechanism. Instead, it uses the Kinesis client library (KCL) for coordination and distributing available shards among available instances. Hence, you should +set your `grouper` configuration to `AllSspToSingleTaskGrouperFactory`. +{% highlight jproperties %} +job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory +{% endhighlight %} -### Advanced Configuration +####Security -#### AWS Client configs -You can configure any [AWS client config](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) -with the prefix **systems.system-name.aws.clientConfig.*** +Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS. {% highlight jproperties %} -systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE +systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION +systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY +sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY {% endhighlight %} -As an example, to set a *proxy host* and *proxy port* for the AWS Client: +### Advanced Configuration + +#### Kinesis Client Library Configs +Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl) +(KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java) +for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix. {% highlight jproperties %} -systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com -systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port +systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE {% endhighlight %} -#### Kinesis Client Library Configs -Samza Kinesis Connector uses [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl) -(KCL) to access the Kinesis data streams. You can set any [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java) -for a stream by configuring it under **systems.system-name.streams.stream-name.aws.kcl.*** +As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance. +{% highlight jproperties %} +systems.system-name.streams.stream-name.aws.kcl.TableName=myTable +{% endhighlight %} + +#### AWS Client configs +Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance. +You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix. {% highlight jproperties %} -systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE +systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE {% endhighlight %} -Obtain the config param from the public functions in [Kinesis Client Lib Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java) -by removing the *"with"* prefix. For example: config param corresponding to **withTableName()** is **TableName**. +As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client: +{% highlight jproperties %} +systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com +systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port +{% endhighlight %} ### Resetting Offsets -The source of truth for checkpointing while using Kinesis Connector is not the Samza checkpoint topic but Kinesis itself. -The Kinesis Client Library (KCL) [uses DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html) -to store itâs checkpoints. By default, Kinesis Connector reads from the latest offset in the stream. - -To reset the checkpoints and consume from earliest/latest offset of a Kinesis data stream, please change the KCL TableName -and set the appropriate starting position for the stream as shown below. +Unlike other connectors where Samza stores and manages checkpointed offsets, Kinesis checkpoints are stored in a [DynamoDB](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html) table. +These checkpoints are stored and managed by the KCL library internally. You can reset the checkpoints by configuring a different name for the DynamoDB table. {% highlight jproperties %} -// change the TableName to a unique name to reset checkpoint. +// change the TableName to a unique name to reset checkpoints. systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name +{% endhighlight %} + +When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream. + +{% highlight jproperties %} // set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest) -systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=my-start-position +systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST {% endhighlight %} -To manipulate checkpoints to start from a particular position in the Kinesis stream, in lieu of Samza CheckpointTool, -please login to the AWS Console and change the offsets in the DynamoDB Table with the table name that you have specified -in the config above. By default, the table name has the following format: -"\<job name\>-\<job id\>-\<kinesis stream\>". +Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table. +By default, the table-name has the following format: "\<job name\>-\<job id\>-\<kinesis stream\>". ### Known Limitations -The following limitations apply to Samza jobs consuming from Kinesis streams using the Samza consumer: +The following limitations apply to Samza jobs consuming from Kinesis streams : - Stateful processing (eg: windows or joins) is not supported on Kinesis streams. However, you can accomplish this by chaining two Samza jobs where the first job reads from Kinesis and sends to Kafka while the second job processes the data from Kafka. - Kinesis streams cannot be configured as [bootstrap](https://samza.apache.org/learn/documentation/latest/container/streams.html) or [broadcast](https://samza.apache.org/learn/documentation/latest/container/samza-container.html) streams. -- Kinesis streams must be used ONLY with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java) -as the Kinesis consumer does the partition management by itself. No other grouper is supported. +- Kinesis streams must be used only with the [AllSspToSingleTaskGrouperFactory](https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java) +as the Kinesis consumer does the partition management by itself. No other grouper is currently supported. - A Samza job that consumes from Kinesis cannot consume from any other input source. However, you can send your results to any destination (eg: Kafka, EventHubs), and have another Samza job consume them.