hi, 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
我项目中新增目录:resources/META-INF/services 然后从Flink源码中复制了2个文件 org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。 在 2020-07-24 20:16:18,"JasonLee" <17610775...@163.com> 写道: >hi >只需要-sql和-json两个包就可以了 > > >| | >JasonLee >| >| >邮箱:17610775...@163.com >| > >Signature is customized by Netease Mail Master > >On 07/24/2020 17:02, RS wrote: >hi, >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 >编译的jar包是jar-with-dependencies的 > > >代码片段: > public String ddlSql = String.format("CREATE TABLE %s (\n" + > " number BIGINT,\n" + > " msg STRING,\n" + > " username STRING,\n" + > " update_time TIMESTAMP(3)\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = '%s',\n" + > " 'properties.bootstrap.servers' = '%s',\n" + > " 'properties.group.id' = '%s',\n" + > " 'format' = 'json',\n" + > " 'json.fail-on-missing-field' = 'false',\n" + > " 'json.ignore-parse-errors' = 'true'\n" + > ")\n", tableName, topic, servers, group); > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.executeSql(ddlSql); > > >报错信息: >Caused by: org.apache.flink.table.api.ValidationException: Could not find any >factory for identifier 'kafka' that implements >'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. >Available factory identifiers are: >datagen >at >org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) >at >org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >... 33 more > > >参考了这个 >http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 > > >附上pom依赖: ><dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > > >感谢各位~