Khanh Vu created FLINK-33181:
--------------------------------
Summary: Table using `kinesis` connector can not be used for both
read & write operations if it's defined with unsupported sink property
Key: FLINK-33181
URL: https://issues.apache.org/jira/browse/FLINK-33181
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis, Table SQL / Runtime
Affects Versions: 1.15.4
Reporter: Khanh Vu
First, I define a table which uses `kinesis` connector with an unsupported
property for sink, e.g. `scan.stream.initpos`:
```
%flink.ssql(type=update)
-- Create input
DROP TABLE IF EXISTS `kds_input`;
CREATE TABLE `kds_input` (
`some_string` STRING,
`some_int` BIGINT,
`time` AS PROCTIME()
) WITH (
'connector' = 'kinesis',
'stream' = 'ExampleInputStream',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'csv'
);
```
I can read from my table (kds_input) without any issue, but it will throw
exception if I try to write to the table:
```
%flink.ssql(type=update)
-- Use to generate data in the input table
DROP TABLE IF EXISTS connector_cve_datagen;
CREATE TABLE connector_cve_datagen(
`some_string` STRING,
`some_int` BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.some_string.length' = '2');
INSERT INTO kds_input SELECT some_string, some_int from connector_cve_datagen
```
Exception observed:
```
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options
found for 'kinesis'.
Unsupported options:
scan.stream.initpos
Supported options:
aws.region
connector
csv.allow-comments
csv.array-element-delimiter
csv.disable-quote-character
csv.escape-character
csv.field-delimiter
csv.ignore-parse-errors
csv.null-literal
csv.quote-character
format
property-version
sink.batch.max-size
sink.fail-on-error
sink.flush-buffer.size
sink.flush-buffer.timeout
sink.partitioner
sink.partitioner-field-delimiter
sink.producer.collection-max-count (deprecated)
sink.producer.collection-max-size (deprecated)
sink.producer.fail-on-error (deprecated)
sink.producer.record-max-buffered-time (deprecated)
sink.requests.max-buffered
sink.requests.max-inflight
stream
at
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
at
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
at
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
at
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
at
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
at
org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:65)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:259)
... 36 more
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)