Hi Timo,

Thanks for responding. You're right. So I did update the properties.
>From what I can tell the new design you're referring to uses the
KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
support those options. Is that right? So I updated my configuration to

connector    = 'kafka'
topic   = 'my-topic'
properties.group.id = 'my-consumer-group'
properties.bootstrap.servers = '...'
format = 'avro'
format.avro-schema = '....'
key.fields = 'my_key_field'

However, the property format.avro-schema doesn't appear to be
supported by KafkaDynamicTableFactory. I get this exception.

Caused by: org.apache.flink.table.api.ValidationException: Unsupported
options found for connector 'kafka'.

Unsupported options:

format.avro-schema

Supported options:

connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.parallelism
sink.partitioner
sink.semantic
topic
topic-pattern
value.fields-include
value.format
        at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
        at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
        ... 21 more

FAILURE: Build failed with an exception.




The format.avro-schema property was supported it what looks to me the
old design in in KafkaTableSourceSinkFactoryBase with this line,

    properties.add(FORMAT + ".*");

    
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160

Does format.avro-schema need to be specified differently?

Thank you,
Aeden

On Thu, Jan 7, 2021 at 12:15 AM Timo Walther <twal...@apache.org> wrote:
>
> Hi Aeden,
>
> we updated the connector property design in 1.11 [1]. The old
> translation layer exists for backwards compatibility and is indicated by
> `connector.type=kafka`.
>
> However, `connector = kafka` indicates the new property design and
> `key.fields` is only available there. Please check all properties again
> when upgrading, they are mentioned here [2].
>
> Regards,
> Timo
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>
>
> On 06.01.21 18:35, Aeden Jameson wrote:
> > Yes, I do have that dependency. I see it in the dependency view of
> > intellij and directly. in the uber jar. Thanks for responding.
> >
> > - Aeden
> >
> > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski <pnowoj...@apache.org> wrote:
> >>
> >> Hey,
> >>
> >> have you added Kafka connector as the dependency? [1]
> >>
> >> [1] 
> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> >>
> >> Best,
> >> Piotrek
> >>
> >> śr., 6 sty 2021 o 04:37 Aeden Jameson <aeden.jame...@gmail.com> napisał(a):
> >>>
> >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> >>> feature of the Kafa SQL Connector. My current connector is configured
> >>> as ,
> >>>
> >>> connector.type    = 'kafka'
> >>> connector.version = 'universal'
> >>> connector.topic   = 'my-topic'
> >>> connector.properties.group.id = 'my-consumer-group'
> >>> connector.properties.bootstrap.servers = '...'
> >>> format.type = 'avro'
> >>> format.avro-schema = '....'
> >>>
> >>> I tried adding
> >>>
> >>> key.fields = 'my_key_field'
> >>>
> >>> as well as
> >>>
> >>> key.format = 'avro'
> >>> key.fields = 'my_key_field'
> >>>
> >>> but I get the exception
> >>>
> >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> >>> Could not find a suitable table factory for
> >>> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>> the classpath.
> >>>
> >>> Reason: No factory supports all properties.
> >>>
> >>> The matching candidates:
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>> Unsupported property keys:
> >>> key.fields
> >>> key.format
> >>>
> >>> The following factories have been considered:
> >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>>          at 
> >>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >>>          at 
> >>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
> >>>          at 
> >>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> >>>          at 
> >>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> >>>          at 
> >>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
> >>>          ... 21 more
> >>>
> >>> I have validated that the uber jar clearly contains the 1.12
> >>> dependencies. What is that magic combination of properties to get
> >>> key.fields to work? Or is it not supported with avro?
> >>>
> >>> --
> >>> Thank You,
> >>> Aeden
> >
> >
> >
>

Reply via email to