hlteoh37 commented on code in PR #179: URL: https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832515104
########## docs/content/docs/connectors/datastream/kinesis.md: ########## @@ -27,644 +27,348 @@ under the License. # Amazon Kinesis Data Streams Connector -The Kinesis connector provides access to [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/). - -To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams: - -<table class="table table-bordered"> - <thead> - <tr> - <th class="text-left">KDS Connectivity</th> - <th class="text-left">Maven Dependency</th> - </tr> - </thead> - <tbody> - <tr> - <td>Source</td> - <td>{{< connector_artifact flink-connector-kinesis kinesis >}}</td> - </tr> - <tr> - <td>Sink</td> - <td>{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}</td> - </tr> - </tbody> -</table> +The Kinesis connector allows users to read/write from [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/). -{{< py_connector_download_link "kinesis" >}} +## Dependency -## Using the Amazon Kinesis Streams Service -Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) -to setup Kinesis streams. +To use this connector, add the below dependency to your project: -## Configuring Access to Kinesis with IAM -Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html). +{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}} -Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. -By default, the `AUTO` Credentials Provider is used. -If the access key ID and secret key are set in the configuration, the `BASIC` provider is used. +For use in PyFlink jobs, use the following dependency: -A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting. - -Supported Credential Providers are: -* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider. -* `BASIC` - Using access key ID and secret key supplied as configuration. -* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. -* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey. -* `CUSTOM` - Use a custom user class as credential provider. -* `PROFILE` - Use AWS credentials profile file to create the AWS credentials. -* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. -* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. +{{< py_connector_download_link "kinesis" >}} -## Kinesis Consumer -The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis -streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is -responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will -change as shards are closed and created by Kinesis. +## Kinesis Streams Source +The `KinesisStreamsSource` is an exactly-once, parallel streaming data source based on the [FLIP-27 source interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface). +The source subscribes to a single Amazon Kinesis Data stream, and reads events whilst maintaining order within a specific Kinesis `partitionId`. +The `KinesisStreamsSource` will discover the shards of the stream and start reading from each shard in parallel, depending on the parallelism of the operator. +For more details on selecting the right parallelism, see section on [parallelism](#parallelism-and-number-of-shards). +It also transparently handles discovery of new shards of the Kinesis Data stream if resharding of streams occurs while the job is running. + +{{< hint info >}} +Note: Before consuming data, ensure that the Kinesis Data Stream is created with `ACTIVE` status on the Amazon Kinesis Data Streams console. +{{< /hint >}} -Before consuming data from Kinesis streams, make sure that all streams are created with the status "ACTIVE" in the Amazon Kinesis Data Stream console. +The `KinesisStreamsSource` provides a fluent builder to construct an instance of the `KinesisStreamsSource`. +The code snippet below illustrates how to do so. -{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}} +{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}} {{< tab "Java" >}} ```java -Properties consumerConfig = new Properties(); -consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +// Configure the KinesisStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new KinesisStreamsSource to read from specified Kinesis Stream. +KinesisStreamsSource<String> kdsSource = + KinesisStreamsSource.<String>builder() + .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream") + .setSourceConfig(sourceConfig) + .setDeserializationSchema(new SimpleStringSchema()) + .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used. + .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( - "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); +// Specify watermarking strategy and the name of the Kinesis Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
