Hi Aeden,

we updated the connector property design in 1.11 [1]. The old translation layer exists for backwards compatibility and is indicated by `connector.type=kafka`.

However, `connector = kafka` indicates the new property design and `key.fields` is only available there. Please check all properties again when upgrading, they are mentioned here [2].

Regards,
Timo


[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/


On 06.01.21 18:35, Aeden Jameson wrote:
Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

Hey,

have you added Kafka connector as the dependency? [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies

Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com> napisał(a):

I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type    = 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = '....'

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
         at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
         at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
         at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
         at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
         at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
         ... 21 more

I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?

--
Thank You,
Aeden




Reply via email to