YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format
只实现了新接口,所以会找不到。
目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。
可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

Best,
Jark

On Tue, 24 Nov 2020 at 18:52, jy l <ljy94...@gmail.com> wrote:

> Hi:
> flink版本1.12.0:
>
> 我想在sql-client-defaults.yaml中配置一张表,配置如下:
>
> tables:
>
>   - name: t_users
>
>     type: source-table
>
>     connector:
>
>         property-version: 1
>
>         type: kafka
>
>         version: universal
>
>         topic: ods.userAnalysis.user_profile
>
>         startup-mode: latest-offset
>
>         properties:
>
>             bootstrap.servers: hostname:9092
>
>             group.id: flink-analysis
>
>     format:
>
>         type: debezium-avro-confluent
>
>         property-version: 1
>
>         debezium-avro-confluent.schema-registry.url: http://hostname:8081
>
>         #schema-registry.url: http://hostname:8081
>
>     schema:
>
>         - name: userId
>
>           data-type: STRING
>
>         - name: province
>
>           data-type: STRING
>
>         - name: city
>
>           data-type: STRING
>
>         - name: age
>
>           data-type: INT
>
>         - name: education
>
>           data-type: STRING
>
>         - name: jobType
>
>           data-type: STRING
>
>         - name: marriage
>
>           data-type: STRING
>
>         - name: sex
>
>           data-type: STRING
>
>         - name: interest
>
>           data-type: STRING
>
>
>
>
> 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
>
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
>
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>
> the classpath.
>
>
> Reason: Required context properties mismatch.
>
>
> The following properties are requested:
>
> connector.properties.bootstrap.servers=henghe66:9092
>
> connector.properties.group.id=flink-analysis
>
> connector.property-version=1
>
> connector.startup-mode=latest-offset
>
> connector.topic=ods.userAnalysis.user_profile
>
> connector.type=kafka
>
> connector.version=universal
>
> format.debezium-avro-confluent.schema-registry.url=
> http://192.168.101.43:8081
>
> format.property-version=1
>
> format.type=debezium-avro-confluent
>
> schema.0.data-type=VARCHAR(2147483647)
>
> schema.0.name=userId
>
> schema.1.data-type=VARCHAR(2147483647)
>
> schema.1.name=province
>
> schema.2.data-type=VARCHAR(2147483647)
>
> schema.2.name=city
>
> schema.3.data-type=INT
>
> schema.3.name=age
>
> schema.4.data-type=VARCHAR(2147483647)
>
> schema.4.name=education
>
> schema.5.data-type=VARCHAR(2147483647)
>
> schema.5.name=jobType
>
> schema.6.data-type=VARCHAR(2147483647)
>
> schema.6.name=marriage
>
> schema.7.data-type=VARCHAR(2147483647)
>
> schema.7.name=sex
>
> schema.8.data-type=VARCHAR(2147483647)
>
> schema.8.name=interest
>
>
> The following factories have been considered:
>
> org.apache.flink.formats.avro.AvroRowFormatFactory
>
> org.apache.flink.formats.csv.CsvRowFormatFactory
>
> org.apache.flink.formats.json.JsonRowFormatFactory
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)
>
> at
>
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)
>
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
>
> ... 3 more
>
>
> 此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。
>
> 所以难道是我在sql-client-defaults.yaml中配置错了吗?
>
> 请知道的大佬告知。
>
>
> 祝好!
>

回复