Tilman Krokotsch created FLINK-34044:
----------------------------------------
Summary: Kinesis Sink Cannot be Created via TableDescriptor
Key: FLINK-34044
URL: https://issues.apache.org/jira/browse/FLINK-34044
Project: Flink
Issue Type: Bug
Components: Connectors / AWS
Affects Versions: aws-connector-4.2.0
Reporter: Tilman Krokotsch
When trying to create a Kinesis Stream Sink in Table API via a TableDescriptor
I get an error:
{code:java}
Caused by: java.lang.UnsupportedOperationException
at
java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
at
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
at
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
at
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90)
at
org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
at
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
... 20 more
{code}
Here is a minimum reproducing example with Flink-1.17.2 and
flink-connector-kinesis-4.2.0:
{code:java}
public class Job {
public static void main(String[] args) throws Exception {
// create data stream environment
StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
tEnv.createTemporaryTable(
"exampleTable",
TableDescriptor.forConnector("datagen").schema(a).build());
TableDescriptor descriptor =
TableDescriptor.forConnector("kinesis")
.schema(a)
.format("json")
.option("stream", "abc")
.option("aws.region", "eu-central-1")
.build();
tEnv.createTemporaryTable("sinkTable", descriptor);
tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
}
} {code}
>From my investigation, the error is triggered by the `ResolvedCatalogTable`
>used when re-mapping the deprecated Kinesis options in
>`KinesisProducerOptionsMapper`. The `getOptions` method of the table returns
>an `UnmodifiableMap` which is not mutable.
If the sink table is created via SQL, the error does not occur:
{code:java}
tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
{code}
because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)