YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260
Best, Jark On Tue, 24 Nov 2020 at 18:52, jy l <ljy94...@gmail.com> wrote: > Hi: > flink版本1.12.0: > > 我想在sql-client-defaults.yaml中配置一张表,配置如下: > > tables: > > - name: t_users > > type: source-table > > connector: > > property-version: 1 > > type: kafka > > version: universal > > topic: ods.userAnalysis.user_profile > > startup-mode: latest-offset > > properties: > > bootstrap.servers: hostname:9092 > > group.id: flink-analysis > > format: > > type: debezium-avro-confluent > > property-version: 1 > > debezium-avro-confluent.schema-registry.url: http://hostname:8081 > > #schema-registry.url: http://hostname:8081 > > schema: > > - name: userId > > data-type: STRING > > - name: province > > data-type: STRING > > - name: city > > data-type: STRING > > - name: age > > data-type: INT > > - name: education > > data-type: STRING > > - name: jobType > > data-type: STRING > > - name: marriage > > data-type: STRING > > - name: sex > > data-type: STRING > > - name: interest > > data-type: STRING > > > > > 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下: > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected exception. > This is a bug. Please consider filing an issue. > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) > > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) > > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > the classpath. > > > Reason: Required context properties mismatch. > > > The following properties are requested: > > connector.properties.bootstrap.servers=henghe66:9092 > > connector.properties.group.id=flink-analysis > > connector.property-version=1 > > connector.startup-mode=latest-offset > > connector.topic=ods.userAnalysis.user_profile > > connector.type=kafka > > connector.version=universal > > format.debezium-avro-confluent.schema-registry.url= > http://192.168.101.43:8081 > > format.property-version=1 > > format.type=debezium-avro-confluent > > schema.0.data-type=VARCHAR(2147483647) > > schema.0.name=userId > > schema.1.data-type=VARCHAR(2147483647) > > schema.1.name=province > > schema.2.data-type=VARCHAR(2147483647) > > schema.2.name=city > > schema.3.data-type=INT > > schema.3.name=age > > schema.4.data-type=VARCHAR(2147483647) > > schema.4.name=education > > schema.5.data-type=VARCHAR(2147483647) > > schema.5.name=jobType > > schema.6.data-type=VARCHAR(2147483647) > > schema.6.name=marriage > > schema.7.data-type=VARCHAR(2147483647) > > schema.7.name=sex > > schema.8.data-type=VARCHAR(2147483647) > > schema.8.name=interest > > > The following factories have been considered: > > org.apache.flink.formats.avro.AvroRowFormatFactory > > org.apache.flink.formats.csv.CsvRowFormatFactory > > org.apache.flink.formats.json.JsonRowFormatFactory > > at > > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > > at > > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > > at > > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289) > > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171) > > at > > org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61) > > at > > org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63) > > at > > org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646) > > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:185) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:138) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867) > > ... 3 more > > > 此过程我不在sql-client-defaults.yaml中配置,而是启动sql cli后用DDL创建表是可以正常启动的。 > > 所以难道是我在sql-client-defaults.yaml中配置错了吗? > > 请知道的大佬告知。 > > > 祝好! >