This is an automated email from the ASF dual-hosted git repository. hong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit 3abc1c5050cd6fa54e4f3f3137cdf992d6a090d0 Author: Hong Teoh <[email protected]> AuthorDate: Thu Nov 7 15:52:56 2024 +0000 [FLINK-31989][docs] Update english docs for Kinesis Table API --- docs/content/docs/connectors/table/kinesis.md | 634 ++++++++++++++++++++------ 1 file changed, 502 insertions(+), 132 deletions(-) diff --git a/docs/content/docs/connectors/table/kinesis.md b/docs/content/docs/connectors/table/kinesis.md index 46cf1c7..e7381e3 100644 --- a/docs/content/docs/connectors/table/kinesis.md +++ b/docs/content/docs/connectors/table/kinesis.md @@ -40,10 +40,72 @@ Dependencies The Kinesis connector is not part of the binary distribution. See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}). +### Versioning + +There are two available Table API and SQL distributions for the Kinesis connector. +This has resulted from an ongoing migration from the deprecated `SourceFunction` and `SinkFunction` interfaces to the new `Source` and `Sink` interfaces. + +The Table API and SQL interfaces in Flink only allow one TableFactory for each connector identifier. +Only one TableFactory with identifier `kinesis` can be included in your application's dependencies. + +The following table clarifies the underlying interface that is used depending on the distribution selected: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 40%">Dependency</th> + <th class="text-center" style="width: 10%">Connector Version</th> + <th class="text-center" style="width: 25%">Source connector identifier (interface)</th> + <th class="text-center" style="width: 25%">Sink connector identifier (interface)</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>flink-sql-connector-aws-kinesis-streams</code></td> + <td><code>5.x</code> or later</td> + <td><code>kinesis</code>(<code>Source</code>)</td> + <td><code>kinesis</code>(<code>Sink</code>)</td> + </tr> + <tr> + <td><code>flink-sql-connector-aws-kinesis-streams</code></td> + <td><code>4.x</code> or earlier</td> + <td>N/A (no source packaged)</td> + <td><code>kinesis</code>(<code>Sink</code>)</td> + </tr> + <tr> + <td><code>flink-sql-connector-kinesis</code></td> + <td><code>5.x</code> or later</td> + <td><code>kinesis</code>(<code>Source</code>), <code>kinesis-legacy</code>(<code>SourceFunction</code>)</td> + <td><code>kinesis</code>(<code>Sink</code>)</td> + </tr> + <tr> + <td><code>flink-sql-connector-kinesis</code></td> + <td><code>4.x</code> or earlier</td> + <td><code>kinesis</code>(<code>SourceFunction</code>)</td> + <td><code>kinesis</code>(<code>Sink</code>)</td> + </tr> + </tbody> +</table> + +{{< hint warning >}} +Only include one artifact, either `flink-sql-connector-aws-kinesis-streams` or `flink-sql-connector-kinesis`. Including both will result in clashing TableFactory names. +{{< /hint >}} + +These docs are targeted for versions 5.x onwards. The main configuration section targets `kinesis` identifier. +For legacy configuration, please see [Configuration (`kinesis-legacy`)](#connector-options-kinesis-legacy) + +### Migrating from v4.x to v5.x + +There is no state compatibility between Table API and SQL API between 4.x and 5.x. +This is due to the underlying implementation being changed. + +Consider starting the job with v5.x `kinesis` table with `source.init.position` of `AT_TIMESTAMP` slightly before the time when the job with v4.x `kinesis` table was stopped. +Note that this may result in some re-processed some records. + How to create a Kinesis data stream table ----------------------------------------- -Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream. +Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) to set up a Kinesis stream. The following example shows how to create a table backed by a Kinesis data stream: ```sql @@ -57,9 +119,9 @@ CREATE TABLE KinesisTable ( PARTITIONED BY (user_id, item_id) WITH ( 'connector' = 'kinesis', - 'stream' = 'user_behavior', - 'aws.region' = 'us-east-2', - 'scan.stream.initpos' = 'LATEST', + 'stream.arn' = 'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name', + 'aws.region' = 'us-east-1', + 'source.init.position' = 'LATEST', 'format' = 'csv' ); ``` @@ -67,7 +129,12 @@ WITH ( Available Metadata ------------------ -The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. +{{< hint warning >}} +The `kinesis` table Source has a known bug that means `VIRTUAL` columns are not supported. +Please use `kinesis-legacy` until [the fix](https://issues.apache.org/jira/browse/FLINK-36671) is completed. +{{< /hint >}} + +The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. This is only available in the `kinesis-legacy` connector only. <table class="table table-bordered"> <thead> @@ -111,7 +178,7 @@ CREATE TABLE KinesisTable ( ) PARTITIONED BY (user_id, item_id) WITH ( - 'connector' = 'kinesis', + 'connector' = 'kinesis-legacy', 'stream' = 'user_behavior', 'aws.region' = 'us-east-2', 'scan.stream.initpos' = 'LATEST', @@ -123,6 +190,430 @@ WITH ( Connector Options ----------------- +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 42%">Description</th> + </tr> + <tr> + <th colspan="6" class="text-left" style="width: 100%">Common Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>connector</h5></td> + <td>required</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code> or <code>'kinesis-legacy'</code>. See <a href="#versioning">Versioning</a> for details.</td> + </tr> + <tr> + <td><h5>stream.arn</h5></td> + <td>required</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the Kinesis data stream backing this table.</td> + </tr> + <tr> + <td><h5>format</h5></td> + <td>required</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td> + </tr> + <tr> + <td><h5>aws.region</h5></td> + <td>required</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS region where the stream is defined.</td> + </tr> + <tr> + <td><h5>aws.endpoint</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set).</td> + </tr> + <tr> + <td><h5>aws.trust.all.certificates</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If true accepts all SSL certificates. This is not recommended for production environments, but should only be used for testing purposes.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="6" class="text-left" style="width: 100%">Authentication Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>aws.credentials.provider</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">AUTO</td> + <td>String</td> + <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td> + </tr> + <tr> + <td><h5>aws.credentials.basic.accesskeyid</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td> + </tr> + <tr> + <td><h5>aws.credentials.basic.secretkey</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS secret key to use when setting credentials provider type to BASIC.</td> + </tr> + <tr> + <td><h5>aws.credentials.profile.path</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td> + </tr> + <tr> + <td><h5>aws.credentials.profile.name</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.arn</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.sessionName</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.externalId</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.stsEndpoint</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The AWS endpoint for STS (derived from the AWS region setting if not set) to use when credential provider type is set to ASSUME_ROLE.</td> + </tr> + <tr> + <td><h5>aws.credentials.role.provider</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td> + </tr> + <tr> + <td><h5>aws.credentials.webIdentityToken.file</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td> + </tr> + <tr> + <td><h5>aws.credentials.custom.class</h5></td> + <td>required only if credential provider is set to CUSTOM</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The full path (in Java package notation) to the user provided + class to use if credential provider type is set to be CUSTOM e.g. org.user_company.auth.CustomAwsCredentialsProvider.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="6" class="text-left" style="width: 100%">Source Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>source.init.position</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">LATEST</td> + <td>String</td> + <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td> + </tr> + <tr> + <td><h5>source.init.timestamp</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td> + </tr> + <tr> + <td><h5>source.init.timestamp.format</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td> + <td>String</td> + <td>The date format of initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td> + </tr> + <tr> + <td><h5>source.shard.discovery.interval</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">10 s</td> + <td>Duration</td> + <td>The interval between each attempt to discover new shards.</td> + </tr> + <tr> + <td><h5>source.reader.type</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">POLLING</td> + <td>String</td> + <td>The <code>ReaderType</code> to use for sources (<code>POLLING|EFO</code>).</td> + </tr> + <tr> + <td><h5>source.shard.get-records.max-record-count</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">10000</td> + <td>Integer</td> + <td>Only applicable to POLLING <code>ReaderType</code>. The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.</td> + </tr> + <tr> + <td><h5>source.efo.consumer.name</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO consumer to register with KDS.</td> + </tr> + <tr> + <td><h5>source.efo.lifecycle</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">JOB_MANAGED</td> + <td>String</td> + <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td> + </tr> + <tr> + <td><h5>source.efo.subscription.timeout</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">60 s</td> + <td>Duration</td> + <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO Consumer subscription.</td> + </tr> + <tr> + <td><h5>source.efo.deregister.timeout</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">10 s</td> + <td>Duration</td> + <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer deregistration. When timeout is reached, code will continue as per normal.</td> + </tr> + <tr> + <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">100</td> + <td>Integer</td> + <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of attempts for the exponential backoff retry strategy when calling <code>DescribeStreamConsumer</code>.</td> + </tr> + <tr> + <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">2 s</td> + <td>Duration</td> + <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the exponential backoff retry strategy when calling <code>DescribeStreamConsumer</code>.</td> + </tr> + <tr> + <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">60 s</td> + <td>Duration</td> + <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the exponential backoff retry strategy when calling <code>DescribeStreamConsumer</code>.</td> + </tr> + </tbody> + <thead> + <tr> + <th colspan="6" class="text-left" style="width: 100%">Sink Options</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>sink.partitioner</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">random or row-based</td> + <td>String</td> + <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> + </tr> + <tr> + <td><h5>sink.partitioner-field-delimiter</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">|</td> + <td>String</td> + <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> + </tr> + <tr> + <td><h5>sink.producer.*</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">(none)</td> + <td></td> + <td> + Deprecated options previously used by the legacy connector. + Options with equivalant alternatives in <code>KinesisStreamsSink</code> are matched + to their respective properties. Unsupported options are logged out to user as warnings. + </td> + </tr> + <tr> + <td><h5>sink.http-client.max-concurrency</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">10000</td> + <td>Integer</td> + <td> + Maximum number of allowed concurrent requests by <code>KinesisAsyncClient</code>. + </td> + </tr> + <tr> + <td><h5>sink.http-client.read-timeout</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">360000</td> + <td>Integer</td> + <td> + Maximum amount of time in ms for requests to be sent by <code>KinesisAsyncClient</code>. + </td> + </tr> + <tr> + <td><h5>sink.http-client.protocol.version</h5></td> + <td>optional</td> + <td>no</td> + <td style="word-wrap: break-word;">HTTP2</td> + <td>String</td> + <td>Http version used by Kinesis Client.</td> + </tr> + <tr> + <td><h5>sink.batch.max-size</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">500</td> + <td>Integer</td> + <td>Maximum batch size of elements to be passed to <code>KinesisAsyncClient</code> to be written downstream.</td> + </tr> + <tr> + <td><h5>sink.requests.max-inflight</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">16</td> + <td>Integer</td> + <td>Request threshold for uncompleted requests by <code>KinesisAsyncClient</code>before blocking new write requests and applying backpressure.</td> + </tr> + <tr> + <td><h5>sink.requests.max-buffered</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">10000</td> + <td>String</td> + <td>Request buffer threshold for buffered requests by <code>KinesisAsyncClient</code> before blocking new write requests and applying backpressure.</td> + </tr> + <tr> + <td><h5>sink.flush-buffer.size</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">5242880</td> + <td>Long</td> + <td>Threshold value in bytes for writer buffer in <code>KinesisAsyncClient</code> before flushing.</td> + </tr> + <tr> + <td><h5>sink.flush-buffer.timeout</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">5000</td> + <td>Long</td> + <td>Threshold time in milliseconds for an element to be in a buffer of<code>KinesisAsyncClient</code> before flushing.</td> + </tr> + <tr> + <td><h5>sink.fail-on-error</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td> + </tr> + </tbody> +</table> + +Features +-------- + +{{< hint info >}} +Refer to the [Kinesis Datastream API]({{< ref "docs/connectors/datastream/kinesis" >}}) documentation for more detailed description of features. +{{< /hint >}} + +### Sink Partitioning + +Kinesis data streams consist of one or more shards, and the `sink.partitioner` option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards. +Valid values are: + +* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime). +* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the default value for tables not defined with a `PARTITION BY` clause. +* Custom `FixedKinesisPartitioner` subclass: e.g. `'org.mycompany.MyPartitioner'`. + +{{< hint info >}} +Records written into tables defining a `PARTITION BY` clause will always be partitioned based on a concatenated projection of the `PARTITION BY` fields. +In this case, the `sink.partitioner` field cannot be used to modify this behavior (attempting to do this results in a configuration error). +You can, however, use the `sink.partitioner-field-delimiter` option to set the delimiter of field values in the concatenated [PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey) string (an empty string is also a valid delimiter). +{{< /hint >}} + +# Data Type Mapping + +Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure. +Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'. +To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword. +Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details. + +Connector Options (`kinesis-legacy`) +----------------- + <table class="table table-bordered"> <thead> <tr> @@ -327,7 +818,7 @@ Connector Options <td>no</td> <td style="word-wrap: break-word;">POLLING</td> <td>String</td> - <td>The <code>RecordPublisher</code> type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> + <td>The <code>RecordPublisher</code> type to use for sources.</td> </tr> <tr> <td><h5>scan.stream.efo.consumername</h5></td> @@ -335,7 +826,7 @@ Connector Options <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> - <td>The name of the EFO consumer to register with KDS. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> + <td>The name of the EFO consumer to register with KDS.</td> </tr> <tr> <td><h5>scan.stream.efo.registration</h5></td> @@ -343,7 +834,7 @@ Connector Options <td>no</td> <td style="word-wrap: break-word;">LAZY</td> <td>String</td> - <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> + <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE).</td> </tr> <tr> <td><h5>scan.stream.efo.consumerarn</h5></td> @@ -351,7 +842,7 @@ Connector Options <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> - <td>The prefix of consumer ARN for a given stream. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> + <td>The prefix of consumer ARN for a given stream.</td> </tr> <tr> <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td> @@ -359,7 +850,7 @@ Connector Options <td>no</td> <td style="word-wrap: break-word;">10000</td> <td>Integer</td> - <td>Maximum number of allowed concurrent requests for the EFO client. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> + <td>Maximum number of allowed concurrent requests for the EFO client.</td> </tr> <tr> <td><h5>scan.shard-assigner</h5></td> @@ -827,125 +1318,4 @@ Connector Options </tbody> </table> -Features --------- - -### Authorization - -Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) to allow reading from / writing to the Kinesis data streams. - -### Authentication - -Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. -By default, the `AUTO` Credentials Provider is used. -If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider. - -A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting. -Supported values are: - -* `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider. -* `BASIC` - Use access key ID and secret key supplied as configuration. -* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. -* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`. -* `PROFILE` - Use an AWS credentials profile to create the AWS credentials. -* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. -* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. -* `CUSTOM` - Provide a custom class that implements the interface `AWSCredentialsProvider` and has a constructor `MyCustomClass(java.util.Properties config)`. All connector properties will be passed down to this custom -credential provider class via the constructor. - -### Start Reading Position - -You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the `scan.stream.initpos` option. -Available values are: - -* `LATEST`: read shards starting from the latest record. -* `TRIM_HORIZON`: read shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream). -* `AT_TIMESTAMP`: read shards starting from a specified timestamp. The timestamp value should be specified through the `scan.stream.initpos-timestamp` in one of the following formats: - * A non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). - * A value conforming to a user-defined `SimpleDateFormat` specified at `scan.stream.initpos-timestamp-format`. - If a user does not define a format, the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`. - For example, timestamp value is `2016-04-04` and user-defined format is `yyyy-MM-dd`, or timestamp value is `2016-04-04T19:58:46.480-00:00` and a user-defined format is not provided. - -### Sink Partitioning - -Kinesis data streams consist of one or more shards, and the `sink.partitioner` option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards. -Valid values are: - -* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime). -* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the default value for tables not defined with a `PARTITION BY` clause. -* Custom `FixedKinesisPartitioner` subclass: e.g. `'org.mycompany.MyPartitioner'`. - -{{< hint info >}} -Records written into tables defining a `PARTITION BY` clause will always be partitioned based on a concatenated projection of the `PARTITION BY` fields. -In this case, the `sink.partitioner` field cannot be used to modify this behavior (attempting to do this results in a configuration error). -You can, however, use the `sink.partitioner-field-delimiter` option to set the delimiter of field values in the concatenated [PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey) string (an empty string is also a valid delimiter). -{{< /hint >}} - -### Enhanced Fan-Out - -[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum number of concurrent consumers per Kinesis data stream. -Without EFO, all concurrent Kinesis consumers share a single read quota per shard. -Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers. - -<span class="label label-info">Note</span> Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/). - -You can enable and configure EFO with the following properties: - -* `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`. -* `scan.stream.efo.consumername`: A name to identify the consumer when the above value is `EFO`. -* `scan.stream.efo.registration`: Strategy for (de-)registration of `EFO` consumers with the name given by the `scan.stream.efo.consumername` value. Valid strategies are: - * `LAZY` (default): Stream consumers are registered when the Flink job starts running. - If the stream consumer already exists, it will be reused. - This is the preferred strategy for the majority of applications. - However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN. - For jobs with very large parallelism this can result in an increased start-up time. - The `DescribeStreamConsumer` operation has a limit of 20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html), - this means application startup time will increase by roughly `parallelism/20 seconds`. - * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` constructor. - If the stream consumer already exists, it will be reused. - This will result in registration occurring when the job is constructed, - either on the Flink Job Manager or client environment submitting the job. - Using this strategy results in a single thread registering and retrieving the stream consumer ARN, - reducing startup time over `LAZY` (with large parallelism). - However, consider that the client environment will require access to the AWS services. - * `NONE`: Stream consumer registration is not performed by `FlinkKinesisConsumer`. - Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/) - to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html). - Stream consumer ARNs should be provided to the job via the consumer configuration. -* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally registered ARN-consumers (substitute `<stream-name>` with the name of your stream in the parameter name). - Use this if you choose to use `NONE` as a `scan.stream.efo.registration` strategy. - -<span class="label label-info">Note</span> For a given Kinesis data stream, each EFO consumer must have a unique name. -However, consumer names do not have to be unique across data streams. -Reusing a consumer name will result in existing subscriptions being terminated. - -<span class="label label-info">Note</span> With the `LAZY` strategy, stream consumers are de-registered when the job is shutdown gracefully. -In the event that a job terminates without executing the shutdown hooks, stream consumers will remain active. -In this situation the stream consumers will be gracefully reused when the application restarts. -With the `NONE` and `EAGER` strategies, stream consumer de-registration is not performed by `FlinkKinesisConsumer`. - -# Data Type Mapping - - -Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure. -Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'. -To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword. -Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details. - -# Updates in 1.15 - -Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisStreamsSink</code>. - -Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisStreamsSink</code>. - -<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis, -which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code> -are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and -<code>sink.producer.aggregation-count</code>. - -<span class="label label-info">Note</span> Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users. - -Kinesis table API source connector still depends on <code>FlinkKinesisConsumer</code> with no change in configuration options. - - {{< top >}}
