Hi Timo, Thanks for responding. You're right. So I did update the properties. >From what I can tell the new design you're referring to uses the KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields) options, instead of KafkaTableSourceSinkFactoryBase, which doesn't support those options. Is that right? So I updated my configuration to
connector = 'kafka' topic = 'my-topic' properties.group.id = 'my-consumer-group' properties.bootstrap.servers = '...' format = 'avro' format.avro-schema = '....' key.fields = 'my_key_field' However, the property format.avro-schema doesn't appear to be supported by KafkaDynamicTableFactory. I get this exception. Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for connector 'kafka'. Unsupported options: format.avro-schema Supported options: connector format key.fields key.fields-prefix key.format properties.bootstrap.servers properties.group.id property-version scan.startup.mode scan.startup.specific-offsets scan.startup.timestamp-millis scan.topic-partition-discovery.interval sink.parallelism sink.partitioner sink.semantic topic topic-pattern value.fields-include value.format at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554) at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573) at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122) ... 21 more FAILURE: Build failed with an exception. The format.avro-schema property was supported it what looks to me the old design in in KafkaTableSourceSinkFactoryBase with this line, properties.add(FORMAT + ".*"); https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 Does format.avro-schema need to be specified differently? Thank you, Aeden On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <twal...@apache.org> wrote: > > Hi Aeden, > > we updated the connector property design in 1.11 [1]. The old > translation layer exists for backwards compatibility and is indicated by > `connector.type=kafka`. > > However, `connector = kafka` indicates the new property design and > `key.fields` is only available there. Please check all properties again > when upgrading, they are mentioned here [2]. > > Regards, > Timo > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/ > > > On 06.01.21 18:35, Aeden Jameson wrote: > > Yes, I do have that dependency. I see it in the dependency view of > > intellij and directly. in the uber jar. Thanks for responding. > > > > - Aeden > > > > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > >> > >> Hey, > >> > >> have you added Kafka connector as the dependency? [1] > >> > >> [1] > >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies > >> > >> Best, > >> Piotrek > >> > >> śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com> napisał(a): > >>> > >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields > >>> feature of the Kafa SQL Connector. My current connector is configured > >>> as , > >>> > >>> connector.type = 'kafka' > >>> connector.version = 'universal' > >>> connector.topic = 'my-topic' > >>> connector.properties.group.id = 'my-consumer-group' > >>> connector.properties.bootstrap.servers = '...' > >>> format.type = 'avro' > >>> format.avro-schema = '....' > >>> > >>> I tried adding > >>> > >>> key.fields = 'my_key_field' > >>> > >>> as well as > >>> > >>> key.format = 'avro' > >>> key.fields = 'my_key_field' > >>> > >>> but I get the exception > >>> > >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > >>> Could not find a suitable table factory for > >>> 'org.apache.flink.table.factories.TableSourceFactory' in > >>> the classpath. > >>> > >>> Reason: No factory supports all properties. > >>> > >>> The matching candidates: > >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>> Unsupported property keys: > >>> key.fields > >>> key.format > >>> > >>> The following factories have been considered: > >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory > >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory > >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory > >>> at > >>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434) > >>> at > >>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195) > >>> at > >>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > >>> at > >>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > >>> at > >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) > >>> ... 21 more > >>> > >>> I have validated that the uber jar clearly contains the 1.12 > >>> dependencies. What is that magic combination of properties to get > >>> key.fields to work? Or is it not supported with avro? > >>> > >>> -- > >>> Thank You, > >>> Aeden > > > > > > >