CrynetLogistics commented on a change in pull request #18165:
URL: https://github.com/apache/flink/pull/18165#discussion_r783349826



##########
File path: docs/content/docs/connectors/datastream/kinesis.md
##########
@@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using 
the `ConsumerConfigConstant
 this is called once per stream during stream consumer deregistration, unless 
the `NONE` or `EAGER` registration strategy is configured.
 Retry and backoff parameters can be configured using the 
`ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.  
 
-## Kinesis Producer
-
-The `FlinkKinesisProducer` uses [Kinesis Producer Library 
(KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)
 to put data from a Flink stream into a Kinesis stream.
-
-Note that the producer is not participating in Flink's checkpointing and 
doesn't provide exactly-once processing guarantees. Also, the Kinesis producer 
does not guarantee that records are written in order to the shards (See 
[here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and 
[here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax)
 for more details).
+## Kinesis Data Streams Sink
 
-In case of a failure or a resharding, data will be written again to Kinesis, 
leading to duplicates. This behavior is usually called "at-least-once" 
semantics.
+The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK 
for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html)
 to write data from a Flink stream into a Kinesis stream.
 
-To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" 
in the AWS dashboard.
+To write data into a Kinesis stream, make sure the stream is marked as 
"ACTIVE" in the AWS dashboard.
 
 For the monitoring to work, the user accessing the stream needs access to the 
CloudWatch service.
 
 {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}}
 {{< tab "Java" >}}
 ```java
-Properties producerConfig = new Properties();
-// Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
-// Optional configs
-producerConfig.put("AggregationMaxCount", "4294967295");
-producerConfig.put("CollectionMaxCount", "1000");
-producerConfig.put("RecordTtl", "30000");
-producerConfig.put("RequestTimeout", "6000");
-producerConfig.put("ThreadPoolSize", "15");
-
-// Disable Aggregation if it's not supported by a consumer
-// producerConfig.put("AggregationEnabled", "false");
-// Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST");
-
-FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
-kinesis.setFailOnError(true);
-kinesis.setDefaultStream("kinesis_stream_name");
-kinesis.setDefaultPartition("0");
+ElementConverter<String, PutRecordsRequestEntry> elementConverter =
+    KinesisDataStreamsSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+        .build();
+
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key");
+
+KinesisDataStreamsSink<String> kdsSink =
+    KinesisDataStreamsSink.<String>builder()
+        .setKinesisClientProperties(sinkProperties)    // Required
+        .setElementConverter(elementConverter)         // Required
+        .setStreamName("your-stream-name")             // Required
+        .setFailOnError(false)                         // Optional
+        .setMaxBatchSize(500)                          // Optional
+        .setMaxInFlightRequests(16)                    // Optional
+        .setMaxBufferedRequests(10_000)                // Optional
+        .setMaxBatchSizeInBytes(5 * 1024 * 1024)       // Optional
+        .setMaxTimeInBufferMS(5000)                    // Optional
+        .setMaxRecordSizeInBytes(1 * 1024 * 1024)      // Optional
+        .build();
 
 DataStream<String> simpleStringStream = ...;
-simpleStringStream.addSink(kinesis);
+simpleStringStream.sinkTo(kdsSink);
 ```
 {{< /tab >}}
 {{< tab "Scala" >}}
 ```scala
-val producerConfig = new Properties()
-// Required configs
-producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
-producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
-producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
-// Optional KPL configs
-producerConfig.put("AggregationMaxCount", "4294967295")
-producerConfig.put("CollectionMaxCount", "1000")
-producerConfig.put("RecordTtl", "30000")
-producerConfig.put("RequestTimeout", "6000")
-producerConfig.put("ThreadPoolSize", "15")
-
-// Disable Aggregation if it's not supported by a consumer
-// producerConfig.put("AggregationEnabled", "false")
-// Switch KinesisProducer's threading model
-// producerConfig.put("ThreadingModel", "PER_REQUEST")
-
-val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, 
producerConfig)
-kinesis.setFailOnError(true)
-kinesis.setDefaultStream("kinesis_stream_name")
-kinesis.setDefaultPartition("0")
+val elementConverter =
+    KinesisDataStreamsSinkElementConverter.<String>builder()
+        .setSerializationSchema(new SimpleStringSchema())
+        .setPartitionKeyGenerator(element -> 
String.valueOf(element.hashCode()))
+        .build()
+
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
"aws_secret_access_key")
+
+val kdsSink = KinesisDataStreamsSink.<String>builder()
+    .setKinesisClientProperties(sinkProperties)  // Required
+    .setElementConverter(elementConverter)       // Required
+    .setStreamName("your-stream-name")           // Required
+    .setFailOnError(false)                       // Optional
+    .setMaxBatchSize(500)                        // Optional
+    .setMaxInFlightRequests(16)                  // Optional
+    .setMaxBufferedRequests(10000)               // Optional
+    .setMaxBatchSizeInBytes(5 * 1024 * 1024)     // Optional
+    .setMaxTimeInBufferMS(5000)                  // Optional
+    .setMaxRecordSizeInBytes(1 * 1024 * 1024)    // Optional
+    .build()
 
 val simpleStringStream = ...
-simpleStringStream.addSink(kinesis)
+simpleStringStream.sinkTo(kdsSink)
 ```
 {{< /tab >}}
 {{< /tabs >}}
 
-The above is a simple example of using the producer. To initialize 
`FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, 
`AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` 
instance. Users can also pass in KPL's configurations as optional parameters to 
customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL 
configs and explanations can be found 
[here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties).
 The example demonstrates producing a single Kinesis stream in the AWS region 
"us-east-1".
+The above is a simple example of using the Kinesis sink. Begin by creating a 
`java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and 
`AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the 
builder. The default values for the optional configurations are shown above.
 
-If users don't specify any KPL configs and values, `FlinkKinesisProducer` will 
use default config values of KPL, except `RateLimit`. `RateLimit` limits the 
maximum allowed put rate for a shard, as a percentage of the backend limits. 
KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` 
too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` 
overrides KPL's default value to 100.
+You will always need to supply a `KinesisDataStreamsSinkElementConverter` 
during sink creation. This is where you specify your serialization schema and 
logic for generating a [partition 
key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key)
 from a record.
 
-Instead of a `SerializationSchema`, it also supports a 
`KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send 
the data to multiple streams. This is
-done using the `KinesisSerializationSchema.getTargetStream(T element)` method. 
Returning `null` there will instruct the producer to write the element to the 
default stream.
-Otherwise, the returned stream name is used.
+Some or all of the records in a request may fail to be persisted by Kinesis 
Data Streams for a number of reasons. If `failOnError` is on, then a runtime 
exception will be raised. Otherwise those records will be requeued in the 
buffer for retry.
 
-### Threading Model
+The Kinesis Sink provides some metrics through Flink's [metrics system]({{< 
ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of 
all exposed metrics may be found [here]({{<ref 
"docs/ops/metrics#kinesis-sink">}}).
 
-Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL 
from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool 
mode uses a queue and thread pool to execute requests to Kinesis. This limits 
the number of threads that KPL's native process may create, and therefore 
greatly lowers CPU utilization and improves efficiency. **Thus, We highly 
recommend Flink users use thread-pool model.** The default thread pool size is 
`10`. Users can set the pool size in `java.util.Properties` instance with key 
`ThreadPoolSize`, as shown in the above example.
+The sink default maximum record size is 1MB and maximum batch size is 5MB in 
line with the Kinesis Data Streams maximums.
 
-Users can still switch back to one-thread-per-request mode by setting a 
key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, 
as shown in the code commented out in above example.
+### Kinesis Sinks and Fault Tolerance
 
-### Backpressure
+The sink is designed to participate in Flink's checkpointing to provide 
at-least-once processing guarantees. It does this by flushing the entire 
contents of the buffer when a checkpoint reaches the sink. This effectively 
assures all requests that were triggered before the checkpoint have been 
successfully acknowledged by Kinesis Data Streams, before proceeding to process 
more records sent to the sink.
 
-By default, `FlinkKinesisProducer` does not backpressure. Instead, records that
-cannot be sent because of the rate restriction of 1 MB per second per shard are
-buffered in an unbounded queue and dropped when their `RecordTtl` expires.
+In case of a failure or a resharding, data will be written again to Kinesis, 
leading to duplicates.  Also, the sink does not guarantee that records are 
written in order to the shards.
 
-To avoid data loss, you can enable backpressuring by restricting the size of 
the
-internal queue:
+To use fault tolerant Kinesis Sinks, checkpointing of the topology needs to be 
enabled at the execution environment.
 
-```
-// 200 Bytes per record, 1 shard
-kinesis.setQueueLimit(500);
-```
+### Backpressure
 
-The value for `queueLimit` depends on the expected record size. To choose a 
good
-value, consider that Kinesis is rate-limited to 1MB per second per shard. If
-less than one second's worth of records is buffered, then the queue may not be
-able to operate at full capacity. With the default `RecordMaxBufferedTime` of
-100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit`
-can then be computed via
+Backpressure in the sink arises as the sink buffer fills up and writes to the 
sink 
+begins to exhibit blocking behaviour. Kinesis Data Streams has a rate 
restriction of 
+1 MB per second per shard.
 
+You can ease backpressure by adjusting the size of the internal queue:
 ```
-queue limit = (number of shards * queue size per shard) / record size
+KinesisDataStreamsSink<String> kdsSink =
+    KinesisDataStreamsSink.<String>builder()
+        ...
+        .setMaxBufferedRequests(10_000)

Review comment:
       I'm not sure there is such a location. The community previously agreed 
in an earlier PR that we should not have a documentation page for the Async 
Sink. Another candidate might be the pages on backpressure in the 
documentation(such as 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/back_pressure/),
 but I imagine I would have to be explicit about the fact that this only 
affects the sinks based on the async sink...
   
   I'm not sure WDYT?
   
   Actually, I think all the 5 other sink parameters could circumstantially 
affect backpressure, not just `maxBatchSizeInBytes`. I guess what I wanted to 
say here that no matter what other parameters you have set, if you increase the 
`maxBufferedRequests`, you will always be able to ease backpressure in the sink.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to