This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit 3abc1c5050cd6fa54e4f3f3137cdf992d6a090d0
Author: Hong Teoh <[email protected]>
AuthorDate: Thu Nov 7 15:52:56 2024 +0000

    [FLINK-31989][docs] Update english docs for Kinesis Table API
---
 docs/content/docs/connectors/table/kinesis.md | 634 ++++++++++++++++++++------
 1 file changed, 502 insertions(+), 132 deletions(-)

diff --git a/docs/content/docs/connectors/table/kinesis.md 
b/docs/content/docs/connectors/table/kinesis.md
index 46cf1c7..e7381e3 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -40,10 +40,72 @@ Dependencies
 The Kinesis connector is not part of the binary distribution.
 See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
 
+### Versioning
+
+There are two available Table API and SQL distributions for the Kinesis 
connector. 
+This has resulted from an ongoing migration from the deprecated 
`SourceFunction` and `SinkFunction` interfaces to the new `Source` and `Sink` 
interfaces.
+
+The Table API and SQL interfaces in Flink only allow one TableFactory for each 
connector identifier. 
+Only one TableFactory with identifier `kinesis` can be included in your 
application's dependencies. 
+
+The following table clarifies the underlying interface that is used depending 
on the distribution selected:
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Dependency</th>
+      <th class="text-center" style="width: 10%">Connector Version</th>
+      <th class="text-center" style="width: 25%">Source connector identifier 
(interface)</th>
+      <th class="text-center" style="width: 25%">Sink connector identifier 
(interface)</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>flink-sql-connector-aws-kinesis-streams</code></td>
+      <td><code>5.x</code> or later</td>
+      <td><code>kinesis</code>(<code>Source</code>)</td>
+      <td><code>kinesis</code>(<code>Sink</code>)</td>
+    </tr>
+    <tr>
+      <td><code>flink-sql-connector-aws-kinesis-streams</code></td>
+      <td><code>4.x</code> or earlier</td>
+      <td>N/A (no source packaged)</td>
+      <td><code>kinesis</code>(<code>Sink</code>)</td>
+    </tr>
+    <tr>
+      <td><code>flink-sql-connector-kinesis</code></td>
+      <td><code>5.x</code> or later</td>
+      <td><code>kinesis</code>(<code>Source</code>), 
<code>kinesis-legacy</code>(<code>SourceFunction</code>)</td>
+      <td><code>kinesis</code>(<code>Sink</code>)</td>
+    </tr>
+    <tr>
+      <td><code>flink-sql-connector-kinesis</code></td>
+      <td><code>4.x</code> or earlier</td>
+      <td><code>kinesis</code>(<code>SourceFunction</code>)</td>
+      <td><code>kinesis</code>(<code>Sink</code>)</td>
+    </tr>
+    </tbody>
+</table>
+
+{{< hint warning >}}
+Only include one artifact, either `flink-sql-connector-aws-kinesis-streams` or 
`flink-sql-connector-kinesis`. Including both will result in clashing 
TableFactory names.
+{{< /hint >}}
+
+These docs are targeted for versions 5.x onwards. The main configuration 
section targets `kinesis` identifier.
+For legacy configuration, please see [Configuration 
(`kinesis-legacy`)](#connector-options-kinesis-legacy)
+
+### Migrating from v4.x to v5.x
+
+There is no state compatibility between Table API and SQL API between 4.x and 
5.x. 
+This is due to the underlying implementation being changed.
+
+Consider starting the job with v5.x `kinesis` table with 
`source.init.position` of `AT_TIMESTAMP` slightly before the time when the job 
with v4.x `kinesis` table was stopped.
+Note that this may result in some re-processed some records.
+
 How to create a Kinesis data stream table
 -----------------------------------------
 
-Follow the instructions from the [Amazon KDS Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to set up a Kinesis stream.
+Follow the instructions from the [Amazon KDS Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) to set 
up a Kinesis stream.
 The following example shows how to create a table backed by a Kinesis data 
stream:
 
 ```sql
@@ -57,9 +119,9 @@ CREATE TABLE KinesisTable (
 PARTITIONED BY (user_id, item_id)
 WITH (
   'connector' = 'kinesis',
-  'stream' = 'user_behavior',
-  'aws.region' = 'us-east-2',
-  'scan.stream.initpos' = 'LATEST',
+  'stream.arn' = 
'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
+  'aws.region' = 'us-east-1',
+  'source.init.position' = 'LATEST',
   'format' = 'csv'
 );
 ```
@@ -67,7 +129,12 @@ WITH (
 Available Metadata
 ------------------
 
-The following metadata can be exposed as read-only (`VIRTUAL`) columns in a 
table definition.
+{{< hint warning >}}
+The `kinesis` table Source has a known bug that means `VIRTUAL` columns are 
not supported. 
+Please use `kinesis-legacy` until [the 
fix](https://issues.apache.org/jira/browse/FLINK-36671) is completed. 
+{{< /hint >}}
+
+The following metadata can be exposed as read-only (`VIRTUAL`) columns in a 
table definition. This is only available in the `kinesis-legacy` connector only.
 
 <table class="table table-bordered">
     <thead>
@@ -111,7 +178,7 @@ CREATE TABLE KinesisTable (
 )
 PARTITIONED BY (user_id, item_id)
 WITH (
-  'connector' = 'kinesis',
+  'connector' = 'kinesis-legacy',
   'stream' = 'user_behavior',
   'aws.region' = 'us-east-2',
   'scan.stream.initpos' = 'LATEST',
@@ -123,6 +190,430 @@ WITH (
 Connector Options
 -----------------
 
+<table class="table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis use 
<code>'kinesis'</code> or <code>'kinesis-legacy'</code>. See <a 
href="#versioning">Versioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>stream.arn</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis data stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis data stream 
records. See <a href="#data-type-mapping">Data Type Mapping</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the stream is defined.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Kinesis (derived from the AWS region setting if 
not set).</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates. This is not recommended for 
production environments, but should only be used for testing purposes.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Authentication 
Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The AWS access key ID to use when setting credentials provider 
type to BASIC.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.basic.secretkey</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The AWS secret key to use when setting credentials provider type 
to BASIC.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.profile.path</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>Optional configuration for profile path if credential provider 
type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.profile.name</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>Optional configuration for profile name if credential provider 
type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.role.arn</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The role ARN to use when credential provider type is set to 
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.role.sessionName</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The role session name to use when credential provider type is set 
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.role.externalId</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The external ID to use when credential provider type is set to 
ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The AWS endpoint for STS (derived from the AWS region setting if 
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.role.provider</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The credentials provider that provides credentials for assuming 
the role when credential provider type is set to ASSUME_ROLE. Roles can be 
nested, so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+         <td>optional</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The absolute path to the web identity token file that should be 
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+         <td><h5>aws.credentials.custom.class</h5></td>
+         <td>required only if credential provider is set to CUSTOM</td>
+      <td>no</td>
+         <td style="word-wrap: break-word;">(none)</td>
+         <td>String</td>
+         <td>The full path (in Java package notation) to the user provided
+      class to use if credential provider type is set to be CUSTOM e.g. 
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>source.init.position</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">LATEST</td>
+      <td>String</td>
+      <td>Initial position to be used when reading from the table. See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>source.init.timestamp</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The initial timestamp to start reading Kinesis stream from (when 
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>source.init.timestamp.format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+      <td>String</td>
+      <td>The date format of initial timestamp to start reading Kinesis stream 
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>source.shard.discovery.interval</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10 s</td>
+      <td>Duration</td>
+      <td>The interval between each attempt to discover new shards.</td>
+    </tr>
+    <tr>
+      <td><h5>source.reader.type</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">POLLING</td>
+      <td>String</td>
+      <td>The <code>ReaderType</code> to use for sources 
(<code>POLLING|EFO</code>).</td>
+    </tr>
+    <tr>
+      <td><h5>source.shard.get-records.max-record-count</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>Only applicable to POLLING <code>ReaderType</code>. The maximum 
number of records to try to get each time we fetch records from a AWS Kinesis 
shard.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.consumer.name</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO 
consumer to register with KDS.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.lifecycle</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">JOB_MANAGED</td>
+      <td>String</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO 
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.subscription.timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">60 s</td>
+      <td>Duration</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO 
Consumer subscription.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.deregister.timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10 s</td>
+      <td>Duration</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer 
deregistration. When timeout is reached, code will continue as per normal.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">100</td>
+      <td>Integer</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of 
attempts for the exponential backoff retry strategy when calling 
<code>DescribeStreamConsumer</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2 s</td>
+      <td>Duration</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the 
exponential backoff retry strategy when calling 
<code>DescribeStreamConsumer</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">60 s</td>
+      <td>Duration</td>
+      <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the 
exponential backoff retry strategy when calling 
<code>DescribeStreamConsumer</code>.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.partitioner</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">random or row-based</td>
+      <td>String</td>
+      <td>Optional output partitioning from Flink's partitions into Kinesis 
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner-field-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">|</td>
+      <td>String</td>
+      <td>Optional field delimiter for a fields-based partitioner derived from 
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> 
for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.producer.*</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td></td>
+      <td>
+        Deprecated options previously used by the legacy connector.
+        Options with equivalant alternatives in 
<code>KinesisStreamsSink</code> are matched 
+        to their respective properties. Unsupported options are logged out to 
user as warnings.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>
+      Maximum number of allowed concurrent requests by 
<code>KinesisAsyncClient</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>
+        Maximum amount of time in ms for requests to be sent by 
<code>KinesisAsyncClient</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.protocol.version</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">HTTP2</td>
+      <td>String</td>
+      <td>Http version used by Kinesis Client.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be passed to 
<code>KinesisAsyncClient</code> to be written downstream.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">16</td>
+      <td>Integer</td>
+      <td>Request threshold for uncompleted requests by 
<code>KinesisAsyncClient</code>before blocking new write requests and applying 
backpressure.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>Request buffer threshold for buffered requests by 
<code>KinesisAsyncClient</code> before blocking new write requests and applying 
backpressure.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">5242880</td>
+      <td>Long</td>
+      <td>Threshold value in bytes for writer buffer in 
<code>KinesisAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in milliseconds for an element to be in a buffer 
of<code>KinesisAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure 
will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+--------
+
+{{< hint info >}}
+Refer to the [Kinesis Datastream API]({{< ref 
"docs/connectors/datastream/kinesis" >}}) documentation for more detailed 
description of features.
+{{< /hint >}}
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner` 
option allows you to control how records written into a multi-shard 
Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, 
so each Flink partition ends up in at most one Kinesis partition (assuming that 
no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the 
default value for tables not defined with a `PARTITION BY` clause.
+* Custom `FixedKinesisPartitioner` subclass: e.g. 
`'org.mycompany.MyPartitioner'`.
+
+{{< hint info >}}
+Records written into tables defining a `PARTITION BY` clause will always be 
partitioned based on a concatenated projection of the `PARTITION BY` fields.
+In this case, the `sink.partitioner` field cannot be used to modify this 
behavior (attempting to do this results in a configuration error).
+You can, however, use the `sink.partitioner-field-delimiter` option to set the 
delimiter of field values in the concatenated 
[PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey)
 string (an empty string is also a valid delimiter).
+{{< /hint >}}
+
+# Data Type Mapping
+
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't 
have a notion of internal record structure.
+Instead, Kinesis records are deserialized and serialized by formats, e.g. 
'avro', 'csv', or 'json'.
+To determine the data type of the messages in your Kinesis-backed tables, pick 
a suitable Flink format with the `format` keyword.
+Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" 
>}}) pages for more details.
+
+Connector Options (`kinesis-legacy`)
+-----------------
+
 <table class="table table-bordered">
     <thead>
     <tr>
@@ -327,7 +818,7 @@ Connector Options
       <td>no</td>
       <td style="word-wrap: break-word;">POLLING</td>
       <td>String</td>
-      <td>The <code>RecordPublisher</code> type to use for sources. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+      <td>The <code>RecordPublisher</code> type to use for sources.</td>
     </tr>
     <tr>
       <td><h5>scan.stream.efo.consumername</h5></td>
@@ -335,7 +826,7 @@ Connector Options
       <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>The name of the EFO consumer to register with KDS. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+      <td>The name of the EFO consumer to register with KDS.</td>
     </tr>
     <tr>
       <td><h5>scan.stream.efo.registration</h5></td>
@@ -343,7 +834,7 @@ Connector Options
       <td>no</td>
       <td style="word-wrap: break-word;">LAZY</td>
       <td>String</td>
-      <td>Determine how and when consumer de-/registration is performed 
(LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for 
details.</td>
+      <td>Determine how and when consumer de-/registration is performed 
(LAZY|EAGER|NONE).</td>
     </tr>
     <tr>
       <td><h5>scan.stream.efo.consumerarn</h5></td>
@@ -351,7 +842,7 @@ Connector Options
       <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>The prefix of consumer ARN for a given stream. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+      <td>The prefix of consumer ARN for a given stream.</td>
     </tr>
     <tr>
       <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td>
@@ -359,7 +850,7 @@ Connector Options
       <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
-      <td>Maximum number of allowed concurrent requests for the EFO client. 
See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+      <td>Maximum number of allowed concurrent requests for the EFO 
client.</td>
     </tr>
     <tr>
       <td><h5>scan.shard-assigner</h5></td>
@@ -827,125 +1318,4 @@ Connector Options
     </tbody>
 </table>
 
-Features
---------
-
-### Authorization
-
-Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) 
to allow reading from / writing to the Kinesis data streams.
-
-### Authentication
-
-Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
-By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.
-
-A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `aws.credentials.provider` setting.
-Supported values are:
-
-* `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
-* `BASIC` - Use access key ID and secret key supplied as configuration.
-* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
-* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
-* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
-* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
-* `CUSTOM` - Provide a custom class that implements the interface 
`AWSCredentialsProvider` and has a constructor 
`MyCustomClass(java.util.Properties config)`. All connector properties will be 
passed down to this custom
-credential provider class via the constructor.
-
-### Start Reading Position
-
-You can configure table sources to start reading a table-backing Kinesis data 
stream from a specific position through the `scan.stream.initpos` option.
-Available values are:
-
-* `LATEST`: read shards starting from the latest record.
-* `TRIM_HORIZON`: read shards starting from the earliest record possible (data 
may be trimmed by Kinesis depending on the current retention settings of the 
backing stream).
-* `AT_TIMESTAMP`: read shards starting from a specified timestamp. The 
timestamp value should be specified through the `scan.stream.initpos-timestamp` 
in one of the following formats:
-   * A non-negative double value representing the number of seconds that has 
elapsed since the Unix epoch (for example, `1459799926.480`).
-   * A value conforming to a user-defined `SimpleDateFormat` specified at 
`scan.stream.initpos-timestamp-format`.
-     If a user does not define a format, the default pattern will be 
`yyyy-MM-dd'T'HH:mm:ss.SSSXXX`.
-     For example, timestamp value is `2016-04-04` and user-defined format is 
`yyyy-MM-dd`, or timestamp value is `2016-04-04T19:58:46.480-00:00` and a 
user-defined format is not provided.
-
-### Sink Partitioning
-
-Kinesis data streams consist of one or more shards, and the `sink.partitioner` 
option allows you to control how records written into a multi-shard 
Kinesis-backed table will be partitioned between its shards.
-Valid values are:
-
-* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, 
so each Flink partition ends up in at most one Kinesis partition (assuming that 
no re-sharding takes place at runtime).
-* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the 
default value for tables not defined with a `PARTITION BY` clause.
-* Custom `FixedKinesisPartitioner` subclass: e.g. 
`'org.mycompany.MyPartitioner'`.
-
-{{< hint info >}}
-Records written into tables defining a `PARTITION BY` clause will always be 
partitioned based on a concatenated projection of the `PARTITION BY` fields.
-In this case, the `sink.partitioner` field cannot be used to modify this 
behavior (attempting to do this results in a configuration error).
-You can, however, use the `sink.partitioner-field-delimiter` option to set the 
delimiter of field values in the concatenated 
[PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey)
 string (an empty string is also a valid delimiter).
-{{< /hint >}}
-
-### Enhanced Fan-Out
-
-[Enhanced Fan-Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum number of concurrent consumers per Kinesis data stream.
-Without EFO, all concurrent Kinesis consumers share a single read quota per 
shard.
-Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers.
-
-<span class="label label-info">Note</span> Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
-
-You can enable and configure EFO with the following properties:
-
-* `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
-* `scan.stream.efo.consumername`: A name to identify the consumer when the 
above value is `EFO`.
-* `scan.stream.efo.registration`: Strategy for (de-)registration  of `EFO` 
consumers with the name given by the `scan.stream.efo.consumername` value. 
Valid strategies are:
-  * `LAZY` (default): Stream consumers are registered when the Flink job 
starts running.
-    If the stream consumer already exists, it will be reused.
-    This is the preferred strategy for the majority of applications.
-    However, jobs with parallelism greater than 1 will result in tasks 
competing to register and acquire the stream consumer ARN.
-    For jobs with very large parallelism this can result in an increased 
start-up time.
-    The `DescribeStreamConsumer` operation has a limit of 20 [transactions per 
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
-    this means application startup time will increase by roughly 
`parallelism/20 seconds`.
-  * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` 
constructor.
-    If the stream consumer already exists, it will be reused.
-    This will result in registration occurring when the job is constructed,
-    either on the Flink Job Manager or client environment submitting the job.
-    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN,
-    reducing startup time over `LAZY` (with large parallelism).
-    However, consider that the client environment will require access to the 
AWS services.
-  * `NONE`: Stream consumer registration is not performed by 
`FlinkKinesisConsumer`.
-    Registration must be performed externally using the [AWS CLI or 
SDK](https://aws.amazon.com/tools/)
-    to invoke 
[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
-    Stream consumer ARNs should be provided to the job via the consumer 
configuration.
-* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally 
registered ARN-consumers (substitute `<stream-name>` with the name of your 
stream in the parameter name).
-   Use this if you choose to use `NONE` as a `scan.stream.efo.registration` 
strategy.
-
-<span class="label label-info">Note</span> For a given Kinesis data stream, 
each EFO consumer must have a unique name.
-However, consumer names do not have to be unique across data streams.
-Reusing a consumer name will result in existing subscriptions being terminated.
-
-<span class="label label-info">Note</span> With the `LAZY` strategy, stream 
consumers are de-registered when the job is shutdown gracefully.
-In the event that a job terminates without executing the shutdown hooks, 
stream consumers will remain active.
-In this situation the stream consumers will be gracefully reused when the 
application restarts.
-With the `NONE` and `EAGER` strategies, stream consumer de-registration is not 
performed by `FlinkKinesisConsumer`.
-
-# Data Type Mapping
-
-
-Kinesis stores records as Base64-encoded binary data objects, so it doesn't 
have a notion of internal record structure.
-Instead, Kinesis records are deserialized and serialized by formats, e.g. 
'avro', 'csv', or 'json'.
-To determine the data type of the messages in your Kinesis-backed tables, pick 
a suitable Flink format with the `format` keyword.
-Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" 
>}}) pages for more details.
-
-# Updates in 1.15
-
-Kinesis table API connector sink data stream depends on 
<code>FlinkKinesisProducer</code> till 1.14, with the introduction of 
<code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has 
been migrated to the new <code>KinesisStreamsSink</code>. Authentication 
options have been migrated identically while sink configuration options are now 
compatible with <code>KinesisStreamsSink</code>. 
-
-Options configuring <code>FlinkKinesisProducer</code> are now deprecated with 
fallback support for common configuration options with 
<code>KinesisStreamsSink</code>. 
-
-<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send 
records to kinesis, 
-which doesn't support aggregation. In consequence, table options configuring 
aggregation in the deprecated <code>FlinkKinesisProducer</code> 
-are now deprecated and will be ignored, this includes 
<code>sink.producer.aggregation-enabled</code> and
-<code>sink.producer.aggregation-count</code>.
-
-<span class="label label-info">Note</span> Migrating applications with 
deprecated options will result in the incompatible deprecated options being 
ignored and warned to users.
-
-Kinesis table API source connector still depends on 
<code>FlinkKinesisConsumer</code> with no change in configuration options.
-
-
 {{< top >}}


Reply via email to