This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2cb86ff03747499bfceda74cb8cc1ea48c385452
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Jan 6 16:21:52 2022 +0100

    [FLINK-25391][connector-kinesis] Forward catalog table options
---
 docs/content/docs/connectors/table/kinesis.md      | 79 ++++++++++++++++++++--
 .../kinesis/table/KinesisDynamicTableFactory.java  | 16 ++++-
 2 files changed, 88 insertions(+), 7 deletions(-)

diff --git a/docs/content/docs/connectors/table/kinesis.md 
b/docs/content/docs/connectors/table/kinesis.md
index f26b1e9..1840862 100644
--- a/docs/content/docs/connectors/table/kinesis.md
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -122,11 +122,12 @@ Connector Options
 <table class="table table-bordered">
     <thead>
     <tr>
-      <th class="text-left" style="width: 25%">Option</th>
-      <th class="text-center" style="width: 8%">Required</th>
-      <th class="text-center" style="width: 7%">Default</th>
-      <th class="text-center" style="width: 10%">Type</th>
-      <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
     </tr>
     <tr>
       <th colspan="5" class="text-left" style="width: 100%">Common Options</th>
@@ -136,6 +137,7 @@ Connector Options
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use. For Kinesis use 
<code>'kinesis'</code>.</td>
@@ -143,6 +145,7 @@ Connector Options
     <tr>
       <td><h5>stream</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Name of the Kinesis data stream backing this table.</td>
@@ -150,6 +153,7 @@ Connector Options
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The format used to deserialize and serialize Kinesis data stream 
records. See <a href="#data-type-mapping">Data Type Mapping</a> for 
details.</td>
@@ -157,6 +161,7 @@ Connector Options
     <tr>
       <td><h5>aws.region</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The AWS region where the stream is defined. Either this or 
<code>aws.endpoint</code> are required.</td>
@@ -164,6 +169,7 @@ Connector Options
     <tr>
       <td><h5>aws.endpoint</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The AWS endpoint for Kinesis (derived from the AWS region setting if 
not set). Either this or <code>aws.region</code> are required.</td>
@@ -185,6 +191,7 @@ Connector Options
     <tr>
       <td><h5>aws.credentials.provider</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">AUTO</td>
       <td>String</td>
       <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
@@ -192,6 +199,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.basic.accesskeyid</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The AWS access key ID to use when setting credentials provider 
type to BASIC.</td>
@@ -199,6 +207,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.basic.secretkey</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The AWS secret key to use when setting credentials provider type 
to BASIC.</td>
@@ -206,6 +215,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.profile.path</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>Optional configuration for profile path if credential provider 
type is set to be PROFILE.</td>
@@ -213,6 +223,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.profile.name</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>Optional configuration for profile name if credential provider 
type is set to be PROFILE.</td>
@@ -220,6 +231,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.role.arn</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The role ARN to use when credential provider type is set to 
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
@@ -227,6 +239,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.role.sessionName</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The role session name to use when credential provider type is set 
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
@@ -234,6 +247,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.role.externalId</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The external ID to use when credential provider type is set to 
ASSUME_ROLE.</td>
@@ -241,6 +255,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.role.provider</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The credentials provider that provides credentials for assuming 
the role when credential provider type is set to ASSUME_ROLE. Roles can be 
nested, so this value can again be set to ASSUME_ROLE</td>
@@ -248,6 +263,7 @@ Connector Options
     <tr>
          <td><h5>aws.credentials.webIdentityToken.file</h5></td>
          <td>optional</td>
+      <td>no</td>
          <td style="word-wrap: break-word;">(none)</td>
          <td>String</td>
          <td>The absolute path to the web identity token file that should be 
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
@@ -262,6 +278,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.initpos</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">LATEST</td>
       <td>String</td>
       <td>Initial position to be used when reading from the table. See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
@@ -269,6 +286,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.initpos-timestamp</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The initial timestamp to start reading Kinesis stream from (when 
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
@@ -276,6 +294,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.initpos-timestamp-format</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
       <td>String</td>
       <td>The date format of initial timestamp to start reading Kinesis stream 
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
@@ -283,6 +302,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.recordpublisher</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">POLLING</td>
       <td>String</td>
       <td>The <code>RecordPublisher</code> type to use for sources. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -290,6 +310,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.consumername</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The name of the EFO consumer to register with KDS. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -297,6 +318,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.registration</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">LAZY</td>
       <td>String</td>
       <td>Determine how and when consumer de-/registration is performed 
(LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for 
details.</td>
@@ -304,6 +326,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.consumerarn</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The prefix of consumer ARN for a given stream. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -311,6 +334,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
       <td>Maximum number of allowed concurrent requests for the EFO client. 
See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
@@ -318,6 +342,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">50</td>
       <td>Integer</td>
       <td>The maximum number of <code>describeStream</code> attempts if we get 
a recoverable exception.</td>
@@ -325,6 +350,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each 
<code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
@@ -332,6 +358,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">5000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds)  between each 
<code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
@@ -339,6 +366,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describe.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
@@ -346,6 +374,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>listShards</code> attempts if we get a 
recoverable exception.</td>
@@ -353,6 +382,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each 
<code>listShards</code> attempt.</td>
@@ -360,6 +390,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">5000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each 
<code>listShards</code> attempt.</td>
@@ -367,6 +398,7 @@ Connector Options
     <tr>
       <td><h5>scan.list.shards.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>listShards</code> attempt.</td>
@@ -374,6 +406,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">50</td>
       <td>Integer</td>
       <td>The maximum number of <code>describeStreamConsumer</code> attempts 
if we get a recoverable exception.</td>
@@ -381,6 +414,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each 
<code>describeStreamConsumer</code> attempt.</td>
@@ -388,6 +422,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">5000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each 
<code>describeStreamConsumer</code> attempt.</td>
@@ -395,6 +430,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.describestreamconsumer.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>describeStreamConsumer</code> attempt.</td>
@@ -402,6 +438,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>registerStream</code> attempts if we get 
a recoverable exception.</td>
@@ -409,6 +446,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.timeout</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">60</td>
       <td>Integer</td>
       <td>The maximum time in seconds to wait for a stream consumer to become 
active before giving up.</td>
@@ -416,6 +454,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">500</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each 
<code>registerStream</code> attempt.</td>
@@ -423,6 +462,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each 
<code>registerStream</code> attempt.</td>
@@ -430,6 +470,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.registerstreamconsumer.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>registerStream</code> attempt.</td>
@@ -437,6 +478,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>deregisterStream</code> attempts if we 
get a recoverable exception.</td>
@@ -444,6 +486,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.timeout</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">60</td>
       <td>Integer</td>
       <td>The maximum time in seconds to wait for a stream consumer to 
deregister before giving up.</td>
@@ -451,6 +494,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">500</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each 
<code>deregisterStream</code> attempt.</td>
@@ -458,6 +502,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each 
<code>deregisterStream</code> attempt.</td>
@@ -465,6 +510,7 @@ Connector Options
     <tr>
       <td><h5>scan.stream.deregisterstreamconsumer.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>deregisterStream</code> attempt.</td>
@@ -472,6 +518,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10</td>
       <td>Integer</td>
       <td>The maximum number of <code>subscribeToShard</code> attempts if we 
get a recoverable exception.</td>
@@ -479,6 +526,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between each 
<code>subscribeToShard</code> attempt.</td>
@@ -486,6 +534,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">2000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between each 
<code>subscribeToShard</code> attempt.</td>
@@ -493,6 +542,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.subscribetoshard.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>subscribeToShard</code> attempt.</td>
@@ -500,6 +550,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.maxrecordcount</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
       <td>The maximum number of records to try to get each time we fetch 
records from a AWS Kinesis shard.</td>
@@ -507,6 +558,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The maximum number of <code>getRecords</code> attempts if we get a 
recoverable exception.</td>
@@ -514,6 +566,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">300</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between 
<code>getRecords</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
@@ -521,6 +574,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between 
<code>getRecords</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
@@ -528,6 +582,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>getRecords</code> attempt.</td>
@@ -535,6 +590,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getrecords.intervalmillis</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">200</td>
       <td>Long</td>
       <td>The interval (in milliseconds) between each <code>getRecords</code> 
request to a AWS Kinesis shard in milliseconds.</td>
@@ -542,6 +598,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.maxretries</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The maximum number of <code>getShardIterator</code> attempts if we 
get ProvisionedThroughputExceededException.</td>
@@ -549,6 +606,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.backoff.base</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">300</td>
       <td>Long</td>
       <td>The base backoff time (in milliseconds) between 
<code>getShardIterator</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
@@ -556,6 +614,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.backoff.max</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Long</td>
       <td>The maximum backoff time (in milliseconds) between 
<code>getShardIterator</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
@@ -563,6 +622,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.getiterator.backoff.expconst</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">1.5</td>
       <td>Double</td>
       <td>The power constant for exponential backoff between each 
<code>getShardIterator</code> attempt.</td>
@@ -570,6 +630,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.discovery.intervalmillis</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">10000</td>
       <td>Integer</td>
       <td>The interval between each attempt to discover new shards.</td>
@@ -577,6 +638,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.adaptivereads</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>The config to turn on adaptive reads from a shard. See the 
<code>AdaptivePollingRecordPublisher</code> documentation for details.</td>
@@ -584,6 +646,7 @@ Connector Options
     <tr>
       <td><h5>scan.shard.idle.interval</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">-1</td>
       <td>Long</td>
       <td>The interval (in milliseconds) after which to consider a shard idle 
for purposes of watermark generation. A positive value will allow the watermark 
to progress even when some shards don't receive new records.</td>
@@ -591,6 +654,7 @@ Connector Options
     <tr>
       <td><h5>scan.watermark.sync.interval</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">30000</td>
       <td>Long</td>
       <td>The interval (in milliseconds) for periodically synchronizing the 
shared watermark state.</td>
@@ -598,6 +662,7 @@ Connector Options
     <tr>
       <td><h5>scan.watermark.lookahead.millis</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">0</td>
       <td>Long</td>
       <td>The maximum delta (in milliseconds) allowed for the reader to 
advance ahead of the shared global watermark.</td>
@@ -605,6 +670,7 @@ Connector Options
     <tr>
       <td><h5>scan.watermark.sync.queue.capacity</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
       <td>The maximum number of records that will be buffered before 
suspending consumption of a shard.</td>
@@ -619,6 +685,7 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">random or row-based</td>
       <td>String</td>
       <td>Optional output partitioning from Flink's partitions into Kinesis 
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
@@ -626,6 +693,7 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner-field-delimiter</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">|</td>
       <td>String</td>
       <td>Optional field delimiter for a fields-based partitioner derived from 
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> 
for details.</td>
@@ -633,6 +701,7 @@ Connector Options
     <tr>
       <td><h5>sink.producer.*</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td></td>
       <td>
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
index 2f7e569..7a7dc96 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java
@@ -31,11 +31,14 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER;
+import static 
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER_FIELD_DELIMITER;
 import static 
org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 
@@ -82,6 +85,15 @@ public class KinesisDynamicTableFactory implements 
DynamicTableSourceFactory {
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        return Collections.emptySet();
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(SINK_PARTITIONER);
+        options.add(SINK_PARTITIONER_FIELD_DELIMITER);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(STREAM, SINK_PARTITIONER, 
SINK_PARTITIONER_FIELD_DELIMITER)
+                .collect(Collectors.toSet());
     }
 }

Reply via email to