Tilman Krokotsch created FLINK-34044:
----------------------------------------

             Summary: Kinesis Sink Cannot be Created via TableDescriptor
                 Key: FLINK-34044
                 URL: https://issues.apache.org/jira/browse/FLINK-34044
             Project: Flink
          Issue Type: Bug
          Components: Connectors / AWS
    Affects Versions: aws-connector-4.2.0
            Reporter: Tilman Krokotsch


When trying to create a Kinesis Stream Sink in Table API via a TableDescriptor 
I get an error:
{code:java}
Caused by: java.lang.UnsupportedOperationException
    at 
java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
    at 
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
    at 
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
    at 
org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90)
    at 
org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
    at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
    ... 20 more
{code}
Here is a minimum reproducing example with Flink-1.17.2 and 
flink-connector-kinesis-4.2.0:
{code:java}
public class Job {
  public static void main(String[] args) throws Exception {
    // create data stream environment
    StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);

    Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
    tEnv.createTemporaryTable(
        "exampleTable", 
TableDescriptor.forConnector("datagen").schema(a).build());
    TableDescriptor descriptor =
        TableDescriptor.forConnector("kinesis")
            .schema(a)
            .format("json")
            .option("stream", "abc")
            .option("aws.region", "eu-central-1")
            .build();
    tEnv.createTemporaryTable("sinkTable", descriptor);

    tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
  }
} {code}
>From my investigation, the error is triggered by the `ResolvedCatalogTable` 
>used when re-mapping the deprecated Kinesis options in 
>`KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
>an `UnmodifiableMap` which is not mutable.

If the sink table is created via SQL, the error does not occur:
{code:java}
tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
{code}
because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to