hlteoh37 commented on code in PR #179:
URL: 
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832515104


##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -27,644 +27,348 @@ under the License.
 
 # Amazon Kinesis Data Streams Connector
 
-The Kinesis connector provides access to [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
-
-To use this connector, add one or more of the following dependencies to your 
project, depending on whether you are reading from and/or writing to Kinesis 
Data Streams:
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">KDS Connectivity</th>
-      <th class="text-left">Maven Dependency</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>Source</td>
-        <td>{{< connector_artifact flink-connector-kinesis kinesis >}}</td>
-    </tr>
-    <tr>
-        <td>Sink</td>
-        <td>{{< connector_artifact flink-connector-aws-kinesis-streams kinesis 
>}}</td>
-    </tr>
-  </tbody>
-</table>
+The Kinesis connector allows users to read/write from [Amazon Kinesis Data 
Streams](http://aws.amazon.com/kinesis/streams/).
 
-{{< py_connector_download_link "kinesis" >}}
+## Dependency
 
-## Using the Amazon Kinesis Streams Service
-Follow the instructions from the [Amazon Kinesis Streams Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
-to setup Kinesis streams.
+To use this connector, add the below dependency to your project:
 
-## Configuring Access to Kinesis with IAM
-Make sure to create the appropriate IAM policy to allow reading / writing to / 
from the Kinesis streams. See examples 
[here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+{{< connector_artifact flink-connector-aws-kinesis-streams kinesis >}}
 
-Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
-By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the configuration, the `BASIC` 
provider is used.  
+For use in PyFlink jobs, use the following dependency:
 
-A specific Credentials Provider can **optionally** be set by using the 
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
- 
-Supported Credential Providers are:
-* `AUTO` - Using the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
-* `BASIC` - Using access key ID and secret key supplied as configuration. 
-* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
-* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
-* `CUSTOM` - Use a custom user class as credential provider.
-* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
-* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+{{< py_connector_download_link "kinesis" >}}
 
-## Kinesis Consumer
 
-The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source 
that subscribes to multiple AWS Kinesis
-streams within the same AWS service region, and can transparently handle 
resharding of streams while the job is running. Each subtask of the consumer is
-responsible for fetching data records from multiple Kinesis shards. The number 
of shards fetched by each subtask will
-change as shards are closed and created by Kinesis.
+## Kinesis Streams Source
+The `KinesisStreamsSource` is an exactly-once, parallel streaming data source 
based on the [FLIP-27 source 
interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface).
+The source subscribes to a single Amazon Kinesis Data stream, and reads events 
whilst maintaining order within a specific Kinesis `partitionId`.
+The `KinesisStreamsSource` will discover the shards of the stream and start 
reading from each shard in parallel, depending on the parallelism of the 
operator.
+For more details on selecting the right parallelism, see section on 
[parallelism](#parallelism-and-number-of-shards).
+It also transparently handles discovery of new shards of the Kinesis Data 
stream if resharding of streams occurs while the job is running.
+
+{{< hint info >}}
+Note: Before consuming data, ensure that the Kinesis Data Stream is created 
with `ACTIVE` status on the Amazon Kinesis Data Streams console.
+{{< /hint >}}
 
-Before consuming data from Kinesis streams, make sure that all streams are 
created with the status "ACTIVE" in the Amazon Kinesis Data Stream console.
+The `KinesisStreamsSource` provides a fluent builder to construct an instance 
of the `KinesisStreamsSource`. 
+The code snippet below illustrates how to do so. 
 
-{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}}
+{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3371" >}}
 {{< tab "Java" >}}
 ```java
-Properties consumerConfig = new Properties();
-consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+// Configure the KinesisStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, 
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, 
by default connector will read from LATEST
+
+// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
+KinesisStreamsSource<String> kdsSource =
+        KinesisStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+                .setSourceConfig(sourceConfig)
+                .setDeserializationSchema(new SimpleStringSchema())
+                
.setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This 
is optional, by default uniformShardAssigner will be used.
+                .build();
 
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
-    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+// Specify watermarking strategy and the name of the Kinesis Source operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to