This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2cb86ff03747499bfceda74cb8cc1ea48c385452 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Thu Jan 6 16:21:52 2022 +0100 [FLINK-25391][connector-kinesis] Forward catalog table options --- docs/content/docs/connectors/table/kinesis.md | 79 ++++++++++++++++++++-- .../kinesis/table/KinesisDynamicTableFactory.java | 16 ++++- 2 files changed, 88 insertions(+), 7 deletions(-) diff --git a/docs/content/docs/connectors/table/kinesis.md b/docs/content/docs/connectors/table/kinesis.md index f26b1e9..1840862 100644 --- a/docs/content/docs/connectors/table/kinesis.md +++ b/docs/content/docs/connectors/table/kinesis.md @@ -122,11 +122,12 @@ Connector Options <table class="table table-bordered"> <thead> <tr> - <th class="text-left" style="width: 25%">Option</th> - <th class="text-center" style="width: 8%">Required</th> - <th class="text-center" style="width: 7%">Default</th> - <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 50%">Description</th> + <th class="text-left" style="width: 25%">Option</th> + <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> + <th class="text-center" style="width: 7%">Default</th> + <th class="text-center" style="width: 10%">Type</th> + <th class="text-center" style="width: 42%">Description</th> </tr> <tr> <th colspan="5" class="text-left" style="width: 100%">Common Options</th> @@ -136,6 +137,7 @@ Connector Options <tr> <td><h5>connector</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td> @@ -143,6 +145,7 @@ Connector Options <tr> <td><h5>stream</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Name of the Kinesis data stream backing this table.</td> @@ -150,6 +153,7 @@ Connector Options <tr> <td><h5>format</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td> @@ -157,6 +161,7 @@ Connector Options <tr> <td><h5>aws.region</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The AWS region where the stream is defined. Either this or <code>aws.endpoint</code> are required.</td> @@ -164,6 +169,7 @@ Connector Options <tr> <td><h5>aws.endpoint</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td> @@ -185,6 +191,7 @@ Connector Options <tr> <td><h5>aws.credentials.provider</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">AUTO</td> <td>String</td> <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td> @@ -192,6 +199,7 @@ Connector Options <tr> <td><h5>aws.credentials.basic.accesskeyid</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td> @@ -199,6 +207,7 @@ Connector Options <tr> <td><h5>aws.credentials.basic.secretkey</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The AWS secret key to use when setting credentials provider type to BASIC.</td> @@ -206,6 +215,7 @@ Connector Options <tr> <td><h5>aws.credentials.profile.path</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td> @@ -213,6 +223,7 @@ Connector Options <tr> <td><h5>aws.credentials.profile.name</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td> @@ -220,6 +231,7 @@ Connector Options <tr> <td><h5>aws.credentials.role.arn</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> @@ -227,6 +239,7 @@ Connector Options <tr> <td><h5>aws.credentials.role.sessionName</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> @@ -234,6 +247,7 @@ Connector Options <tr> <td><h5>aws.credentials.role.externalId</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td> @@ -241,6 +255,7 @@ Connector Options <tr> <td><h5>aws.credentials.role.provider</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td> @@ -248,6 +263,7 @@ Connector Options <tr> <td><h5>aws.credentials.webIdentityToken.file</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td> @@ -262,6 +278,7 @@ Connector Options <tr> <td><h5>scan.stream.initpos</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">LATEST</td> <td>String</td> <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td> @@ -269,6 +286,7 @@ Connector Options <tr> <td><h5>scan.stream.initpos-timestamp</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td> @@ -276,6 +294,7 @@ Connector Options <tr> <td><h5>scan.stream.initpos-timestamp-format</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td> <td>String</td> <td>The date format of initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td> @@ -283,6 +302,7 @@ Connector Options <tr> <td><h5>scan.stream.recordpublisher</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">POLLING</td> <td>String</td> <td>The <code>RecordPublisher</code> type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> @@ -290,6 +310,7 @@ Connector Options <tr> <td><h5>scan.stream.efo.consumername</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The name of the EFO consumer to register with KDS. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> @@ -297,6 +318,7 @@ Connector Options <tr> <td><h5>scan.stream.efo.registration</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">LAZY</td> <td>String</td> <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> @@ -304,6 +326,7 @@ Connector Options <tr> <td><h5>scan.stream.efo.consumerarn</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The prefix of consumer ARN for a given stream. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> @@ -311,6 +334,7 @@ Connector Options <tr> <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10000</td> <td>Integer</td> <td>Maximum number of allowed concurrent requests for the EFO client. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> @@ -318,6 +342,7 @@ Connector Options <tr> <td><h5>scan.stream.describe.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">50</td> <td>Integer</td> <td>The maximum number of <code>describeStream</code> attempts if we get a recoverable exception.</td> @@ -325,6 +350,7 @@ Connector Options <tr> <td><h5>scan.stream.describe.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">2000</td> <td>Long</td> <td>The base backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td> @@ -332,6 +358,7 @@ Connector Options <tr> <td><h5>scan.stream.describe.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">5000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td> @@ -339,6 +366,7 @@ Connector Options <tr> <td><h5>scan.stream.describe.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td> @@ -346,6 +374,7 @@ Connector Options <tr> <td><h5>scan.list.shards.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> <td>The maximum number of <code>listShards</code> attempts if we get a recoverable exception.</td> @@ -353,6 +382,7 @@ Connector Options <tr> <td><h5>scan.list.shards.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1000</td> <td>Long</td> <td>The base backoff time (in milliseconds) between each <code>listShards</code> attempt.</td> @@ -360,6 +390,7 @@ Connector Options <tr> <td><h5>scan.list.shards.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">5000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between each <code>listShards</code> attempt.</td> @@ -367,6 +398,7 @@ Connector Options <tr> <td><h5>scan.list.shards.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>listShards</code> attempt.</td> @@ -374,6 +406,7 @@ Connector Options <tr> <td><h5>scan.stream.describestreamconsumer.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">50</td> <td>Integer</td> <td>The maximum number of <code>describeStreamConsumer</code> attempts if we get a recoverable exception.</td> @@ -381,6 +414,7 @@ Connector Options <tr> <td><h5>scan.stream.describestreamconsumer.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">2000</td> <td>Long</td> <td>The base backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td> @@ -388,6 +422,7 @@ Connector Options <tr> <td><h5>scan.stream.describestreamconsumer.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">5000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td> @@ -395,6 +430,7 @@ Connector Options <tr> <td><h5>scan.stream.describestreamconsumer.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>describeStreamConsumer</code> attempt.</td> @@ -402,6 +438,7 @@ Connector Options <tr> <td><h5>scan.stream.registerstreamconsumer.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> <td>The maximum number of <code>registerStream</code> attempts if we get a recoverable exception.</td> @@ -409,6 +446,7 @@ Connector Options <tr> <td><h5>scan.stream.registerstreamconsumer.timeout</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">60</td> <td>Integer</td> <td>The maximum time in seconds to wait for a stream consumer to become active before giving up.</td> @@ -416,6 +454,7 @@ Connector Options <tr> <td><h5>scan.stream.registerstreamconsumer.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">500</td> <td>Long</td> <td>The base backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td> @@ -423,6 +462,7 @@ Connector Options <tr> <td><h5>scan.stream.registerstreamconsumer.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">2000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td> @@ -430,6 +470,7 @@ Connector Options <tr> <td><h5>scan.stream.registerstreamconsumer.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>registerStream</code> attempt.</td> @@ -437,6 +478,7 @@ Connector Options <tr> <td><h5>scan.stream.deregisterstreamconsumer.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> <td>The maximum number of <code>deregisterStream</code> attempts if we get a recoverable exception.</td> @@ -444,6 +486,7 @@ Connector Options <tr> <td><h5>scan.stream.deregisterstreamconsumer.timeout</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">60</td> <td>Integer</td> <td>The maximum time in seconds to wait for a stream consumer to deregister before giving up.</td> @@ -451,6 +494,7 @@ Connector Options <tr> <td><h5>scan.stream.deregisterstreamconsumer.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">500</td> <td>Long</td> <td>The base backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td> @@ -458,6 +502,7 @@ Connector Options <tr> <td><h5>scan.stream.deregisterstreamconsumer.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">2000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td> @@ -465,6 +510,7 @@ Connector Options <tr> <td><h5>scan.stream.deregisterstreamconsumer.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>deregisterStream</code> attempt.</td> @@ -472,6 +518,7 @@ Connector Options <tr> <td><h5>scan.shard.subscribetoshard.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> <td>The maximum number of <code>subscribeToShard</code> attempts if we get a recoverable exception.</td> @@ -479,6 +526,7 @@ Connector Options <tr> <td><h5>scan.shard.subscribetoshard.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1000</td> <td>Long</td> <td>The base backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td> @@ -486,6 +534,7 @@ Connector Options <tr> <td><h5>scan.shard.subscribetoshard.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">2000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td> @@ -493,6 +542,7 @@ Connector Options <tr> <td><h5>scan.shard.subscribetoshard.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>subscribeToShard</code> attempt.</td> @@ -500,6 +550,7 @@ Connector Options <tr> <td><h5>scan.shard.getrecords.maxrecordcount</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10000</td> <td>Integer</td> <td>The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.</td> @@ -507,6 +558,7 @@ Connector Options <tr> <td><h5>scan.shard.getrecords.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">3</td> <td>Integer</td> <td>The maximum number of <code>getRecords</code> attempts if we get a recoverable exception.</td> @@ -514,6 +566,7 @@ Connector Options <tr> <td><h5>scan.shard.getrecords.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">300</td> <td>Long</td> <td>The base backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td> @@ -521,6 +574,7 @@ Connector Options <tr> <td><h5>scan.shard.getrecords.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td> @@ -528,6 +582,7 @@ Connector Options <tr> <td><h5>scan.shard.getrecords.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>getRecords</code> attempt.</td> @@ -535,6 +590,7 @@ Connector Options <tr> <td><h5>scan.shard.getrecords.intervalmillis</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">200</td> <td>Long</td> <td>The interval (in milliseconds) between each <code>getRecords</code> request to a AWS Kinesis shard in milliseconds.</td> @@ -542,6 +598,7 @@ Connector Options <tr> <td><h5>scan.shard.getiterator.maxretries</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">3</td> <td>Integer</td> <td>The maximum number of <code>getShardIterator</code> attempts if we get ProvisionedThroughputExceededException.</td> @@ -549,6 +606,7 @@ Connector Options <tr> <td><h5>scan.shard.getiterator.backoff.base</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">300</td> <td>Long</td> <td>The base backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td> @@ -556,6 +614,7 @@ Connector Options <tr> <td><h5>scan.shard.getiterator.backoff.max</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1000</td> <td>Long</td> <td>The maximum backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td> @@ -563,6 +622,7 @@ Connector Options <tr> <td><h5>scan.shard.getiterator.backoff.expconst</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">1.5</td> <td>Double</td> <td>The power constant for exponential backoff between each <code>getShardIterator</code> attempt.</td> @@ -570,6 +630,7 @@ Connector Options <tr> <td><h5>scan.shard.discovery.intervalmillis</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">10000</td> <td>Integer</td> <td>The interval between each attempt to discover new shards.</td> @@ -577,6 +638,7 @@ Connector Options <tr> <td><h5>scan.shard.adaptivereads</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> <td>The config to turn on adaptive reads from a shard. See the <code>AdaptivePollingRecordPublisher</code> documentation for details.</td> @@ -584,6 +646,7 @@ Connector Options <tr> <td><h5>scan.shard.idle.interval</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">-1</td> <td>Long</td> <td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td> @@ -591,6 +654,7 @@ Connector Options <tr> <td><h5>scan.watermark.sync.interval</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">30000</td> <td>Long</td> <td>The interval (in milliseconds) for periodically synchronizing the shared watermark state.</td> @@ -598,6 +662,7 @@ Connector Options <tr> <td><h5>scan.watermark.lookahead.millis</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">0</td> <td>Long</td> <td>The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.</td> @@ -605,6 +670,7 @@ Connector Options <tr> <td><h5>scan.watermark.sync.queue.capacity</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">100</td> <td>Integer</td> <td>The maximum number of records that will be buffered before suspending consumption of a shard.</td> @@ -619,6 +685,7 @@ Connector Options <tr> <td><h5>sink.partitioner</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">random or row-based</td> <td>String</td> <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> @@ -626,6 +693,7 @@ Connector Options <tr> <td><h5>sink.partitioner-field-delimiter</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">|</td> <td>String</td> <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> @@ -633,6 +701,7 @@ Connector Options <tr> <td><h5>sink.producer.*</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td></td> <td> diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java index 2f7e569..7a7dc96 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicTableFactory.java @@ -31,11 +31,14 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; -import java.util.Collections; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER; +import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER_FIELD_DELIMITER; import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; @@ -82,6 +85,15 @@ public class KinesisDynamicTableFactory implements DynamicTableSourceFactory { @Override public Set<ConfigOption<?>> optionalOptions() { - return Collections.emptySet(); + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(SINK_PARTITIONER); + options.add(SINK_PARTITIONER_FIELD_DELIMITER); + return options; + } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of(STREAM, SINK_PARTITIONER, SINK_PARTITIONER_FIELD_DELIMITER) + .collect(Collectors.toSet()); } }