nicusX commented on code in PR #179:
URL:
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1833085942
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -57,16 +87,21 @@ CREATE TABLE KinesisTable (
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
- 'stream' = 'user_behavior',
+ 'stream.arn' =
'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
'aws.region' = 'us-east-2',
Review Comment:
Region in `stream.arn` and `aws.region` should match
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -57,16 +87,21 @@ CREATE TABLE KinesisTable (
PARTITIONED BY (user_id, item_id)
WITH (
'connector' = 'kinesis',
- 'stream' = 'user_behavior',
+ 'stream.arn' =
'arn:aws:kinesis:us-east-1:012345678901:stream/my-stream-name',
'aws.region' = 'us-east-2',
- 'scan.stream.initpos' = 'LATEST',
+ 'source.init.position' = 'LATEST',
'format' = 'csv'
);
```
Available Metadata
------------------
+{{< hint warning >}}
+The `kinesis` table Source has a known bug that means `VIRTUAL` columns are
not supported.
+Please use `kinesis-legacy` until [the
fix](https://issues.apache.org/jira/browse/FLINK-36671) is completed.
+{{< /hint >}}
+
The following metadata can be exposed as read-only (`VIRTUAL`) columns in a
table definition.
Review Comment:
I would reiterate "...currently available with `kinesis-legacy` connector
only"
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
Review Comment:
Explain when you need this
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code> to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>Request buffer threshold for buffered requests by
<code>KinesisAsyncClient</code> before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5242880</td>
+ <td>Long</td>
+ <td>Threshold value in bytes for writer buffer in
<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.timeout</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5000</td>
+ <td>Long</td>
+ <td>Threshold time in milliseconds for an element to be in a buffer
of<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.fail-on-error</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Flag used for retrying failed requests. If set any request failure
will not be retried and will fail the job.</td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+--------
+
+{{< hint info >}}
+Refer to the [Kinesis Datastream API]({{< ref
"docs/connectors/datastream/kinesis" >}}) documentation for more detailed
description of features.
+{{< /hint >}}
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner`
option allows you to control how records written into a multi-shard
Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index,
so each Flink partition ends up in at most one Kinesis partition (assuming that
no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the
default value for tables not defined with a `PARTITION BY` clause.
+* Custom `FixedKinesisPartitioner` subclass: e.g.
`'org.mycompany.MyPartitioner'`.
+
+{{< hint info >}}
+Records written into tables defining a `PARTITION BY` clause will always be
partitioned based on a concatenated projection of the `PARTITION BY` fields.
+In this case, the `sink.partitioner` field cannot be used to modify this
behavior (attempting to do this results in a configuration error).
+You can, however, use the `sink.partitioner-field-delimiter` option to set the
delimiter of field values in the concatenated
[PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey)
string (an empty string is also a valid delimiter).
+{{< /hint >}}
+
+# Data Type Mapping
+
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't
have a notion of internal record structure.
Review Comment:
> Text formats, such as `json` or `csv` are written to Kinesis without
modifications. Binary formats such as `avro` are Base64-encoded and then
written to Kinesis as text.
(is this right?)
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code> to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests and applying
backpressure.</td>
Review Comment:
In general, the batching mechanism would deserve a dedicated chapter,
possibly in DataStream docs, and linked from here
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -40,10 +40,40 @@ Dependencies
The Kinesis connector is not part of the binary distribution.
See how to link with it for cluster execution [here]({{< ref
"docs/dev/configuration/overview" >}}).
+### Versioning
+
+There are two available Table API and SQL distributions for the Kinesis
connector.
+This has resulted from an ongoing migration from the deprecated
`SourceFunction` and `SinkFunction` interfaces to the new `Source` and `Sink`
interfaces.
+
+The Table API and SQL interfaces in Flink only allow one TableFactory for each
connector identifier. The following list clarifies the underlying interface
that is used depending on the distribution selected:
Review Comment:
This is not clear. Most of users are not familiar with the TableFactory
mechanism. Possibly, you could add an explanation:
> ...only allow one TableFactory for each connector identifier. **Only one
connector named `kinesis` can be included in your application's
dependencies.**...
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
Review Comment:
Actually
> either `kinesis` or `kinesis-legacy`. See "Versioning" [link]
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
Review Comment:
It should link to the "Event Time Alignment for Shard Consumers" chapter in
DataStream doc or repeat that chapter here
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
Review Comment:
What are the options?
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -40,10 +40,40 @@ Dependencies
The Kinesis connector is not part of the binary distribution.
See how to link with it for cluster execution [here]({{< ref
"docs/dev/configuration/overview" >}}).
+### Versioning
+
+There are two available Table API and SQL distributions for the Kinesis
connector.
+This has resulted from an ongoing migration from the deprecated
`SourceFunction` and `SinkFunction` interfaces to the new `Source` and `Sink`
interfaces.
+
+The Table API and SQL interfaces in Flink only allow one TableFactory for each
connector identifier. The following list clarifies the underlying interface
that is used depending on the distribution selected:
Review Comment:
Also, I would make the following a table
| Connector dependency | Connector version | Source connector name
(interface) | Sink connector name (interface) |
|------------------------|--------------------|-------------------------|----------------------|
| `flink-sql-connector-aws-kinesis-streams` | `4.x` or earlier | N/A |
`kinesis` (`Sink`) |
| `flink-sql-connector-aws-kinesis-streams` | `5.x` or later | `kinesis`
(`Source`) | `kinesis` (`Sink`) |
| `flink-sql-connector-kinesis` | `4.x` or earlier | `kinesis`
(`SourceFunction`) | `kinesis` (`Sink`) |
| `flink-sql-connector-kinesis` | `5.x` or later | `kinesis` (`Source`),
or `kinesis-legacy` (`SourceFunction`) | `kinesis` (`Sink`) |
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code> to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>Request buffer threshold for buffered requests by
<code>KinesisAsyncClient</code> before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5242880</td>
+ <td>Long</td>
+ <td>Threshold value in bytes for writer buffer in
<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.timeout</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5000</td>
+ <td>Long</td>
+ <td>Threshold time in milliseconds for an element to be in a buffer
of<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.fail-on-error</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Flag used for retrying failed requests. If set any request failure
will not be retried and will fail the job.</td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+--------
+
+{{< hint info >}}
+Refer to the [Kinesis Datastream API]({{< ref
"docs/connectors/datastream/kinesis" >}}) documentation for more detailed
description of features.
+{{< /hint >}}
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner`
option allows you to control how records written into a multi-shard
Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index,
so each Flink partition ends up in at most one Kinesis partition (assuming that
no re-sharding takes place at runtime).
Review Comment:
It would be clearer putting this are mutually exclusive options.
> Partitioning is defined by either using `PARTITION BY` in the table
definition or by specifying specify `sink.partitioner`. Using both will result
in a configuration error.
> Valid values for `sink.partitioner`:
> * `fixed` ...
> * `random` ...
> * Custom `FixedKinesisPartitioner` subclass...
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code> to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests and applying
backpressure.</td>
Review Comment:
This is actually something that should be explained in DataStream too.
Also, explaining that a "request" is a batch
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -40,10 +40,40 @@ Dependencies
The Kinesis connector is not part of the binary distribution.
See how to link with it for cluster execution [here]({{< ref
"docs/dev/configuration/overview" >}}).
+### Versioning
+
+There are two available Table API and SQL distributions for the Kinesis
connector.
+This has resulted from an ongoing migration from the deprecated
`SourceFunction` and `SinkFunction` interfaces to the new `Source` and `Sink`
interfaces.
+
+The Table API and SQL interfaces in Flink only allow one TableFactory for each
connector identifier. The following list clarifies the underlying interface
that is used depending on the distribution selected:
Review Comment:
Also, need to explain what NOT to do, i.e. adding both
`flink-sql-connector-aws-kinesis-streams` and `flink-sql-connector-kinesis`
dependencies, because it would cause a clash of connector names
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code> to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>Request buffer threshold for buffered requests by
<code>KinesisAsyncClient</code> before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5242880</td>
+ <td>Long</td>
+ <td>Threshold value in bytes for writer buffer in
<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.timeout</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5000</td>
+ <td>Long</td>
+ <td>Threshold time in milliseconds for an element to be in a buffer
of<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.fail-on-error</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Flag used for retrying failed requests. If set any request failure
will not be retried and will fail the job.</td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+--------
+
+{{< hint info >}}
+Refer to the [Kinesis Datastream API]({{< ref
"docs/connectors/datastream/kinesis" >}}) documentation for more detailed
description of features.
+{{< /hint >}}
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner`
option allows you to control how records written into a multi-shard
Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index,
so each Flink partition ends up in at most one Kinesis partition (assuming that
no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the
default value for tables not defined with a `PARTITION BY` clause.
+* Custom `FixedKinesisPartitioner` subclass: e.g.
`'org.mycompany.MyPartitioner'`.
+
+{{< hint info >}}
+Records written into tables defining a `PARTITION BY` clause will always be
partitioned based on a concatenated projection of the `PARTITION BY` fields.
+In this case, the `sink.partitioner` field cannot be used to modify this
behavior (attempting to do this results in a configuration error).
Review Comment:
If alternative options are explained above, this line becomes redundant
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
Review Comment:
Also, in this Table API docs there is no chapter explaining EFO.
Either repeat the EFO chapter in this doc page or link to the chapter in
DataStream docs
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
Review Comment:
What are the valid options?
What is the effect of selecting different options?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]