hello
你放 flink-sql-connector-kafka_2.11-1.11.3.jar 后有重启 sql client 和 集群吗?
Best
zhisheng
air23 于2021年1月11日周一 下午1:32写道:
> 下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-09 02:08:12,"inza9hi" 写道:
> >搜了下之前的邮件,貌似没有发现和我同样的问题。
> >
> >lib 下的Jar
>
下载个 flink-sql-connector-kafka 这个jar 放在lib下试下
在 2021-01-09 02:08:12,"inza9hi" 写道:
>搜了下之前的邮件,貌似没有发现和我同样的问题。
>
>lib 下的Jar
>flink-csv-1.11.3.jar
>flink-table-blink_2.11-1.11.3.jar
>flink-dist_2.11-1.11.3.jar
搜了下之前的邮件,貌似没有发现和我同样的问题。
lib 下的Jar
flink-csv-1.11.3.jar
flink-table-blink_2.11-1.11.3.jar
flink-dist_2.11-1.11.3.jar
flink-table_2.11-1.11.3.jar
flink-jdbc_2.11-1.11.3.jar log4j-1.2-api-2.12.1.jar
ka',\n" +
>> > " 'topic' = '%s',\n" +
>> > " 'properties.bootstrap.servers' = '%s',\n" +
>> > " 'properties.group.id' = '%s',\n" +
>> > " 'format' = 'json',\n" +
>> > " 'json.fail-on-missing
" +
> > ")\n", tableName, topic, servers, group);
> >
> >
> > StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> > StreamTableEnvironment tableEnv =
> StreamTableEnvironme
+
> " 'format' = 'json',\n" +
> " 'json.fail-on-missing-field' = 'false',\n" +
> " 'json.ignore-parse-errors' = 'true'\n" +
> ")\n", tableName, topic, servers, group);
>
>
> StreamExecutionEnvironment env =
> StreamExecuti
n.ignore-parse-errors' = 'true'\n" +
")\n", tableName, topic, servers, group);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
table
org.apache.flink
flink-connector-kafka_2.12
${flink.version}
org.apache.flink
flink-sql-connector-kafka_2.12
${flink.version}
这两个会有冲突,去掉上面那个
> 2020年7月24日 下午5:02,RS 写道:
>
>
>
邮件格式不对,我重新回复下
我这边是直接打成jar包扔到服务器上运行的,没有在IDEA运行过。
> flink run xxx
没有使用shade-plugin
maven build参数:
1.8
1.11.1
maven-compiler-plugin
${jdk.version}
${jdk.version}
; +
>> " 'json.ignore-parse-errors' = 'true'\n" +
>> ")\n", tableName, topic, servers, group);
>>
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableE
")\n", tableName, topic, servers, group);
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> tableEnv.executeSql(ddlSql);
>
>
>
ronment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(ddlSql);
报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any
factory for identifier 'kafka
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成
flink-sql-connector-kafka
后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下:
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
Hi,
1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
spi)
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
Best,
Jingsong
On Mon, Jul 13, 2020
你好本超,
是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
Benchao Li 于2020年7月13日周一 下午3:42写道:
> 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> 当然,直接粗暴的放到lib下,也是可以的。
>
> Leonard Xu 于2020年7月13日周一 下午3:38写道:
>
> > Hi
> > 你可以试下把
这样还是不行,我尝试flink-connector-kafka-0.11_2.11-1.11.0.jar放到lib下,报了另外一个问题:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase
另外,我是用 bin/flink run -yid xxx xxx.jar 的方式提交任务的,报错是直接在终端报错,没有提交到flink
jobmanager上。
Leonard Xu
你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
当然,直接粗暴的放到lib下,也是可以的。
Leonard Xu 于2020年7月13日周一 下午3:38写道:
> Hi
> 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
>
> 祝好
>
> > 在
Hi
你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
祝好
> 在 2020年7月13日,15:28,王松 写道:
>
> 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
>
> 我机器上flink/lib下jar包如下:
> -rw-rw-r-- 1 hadoop hadoop117719 6月 30 12:41
您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
我机器上flink/lib下jar包如下:
-rw-rw-r-- 1 hadoop hadoop117719 6月 30 12:41 flink-avro-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 90782 7月 8 10:09 flink-csv-1.11.0.jar
-rw-r--r-- 1 hadoop hadoop 108349203 7月 8 10:09
Hi,
flink-connector-kafka_${scala.binary.version 和
flink-sql-connector-kafka_${scala.binary.version 只用加载一个应该就好了,前者的话是dataStream 或者
Table API 程序使用,
后者的话主要是对前者做了shade处理,方便用户在 SQL
Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
[1]
@Leonard Xu,
非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
=
org.apache.flink
Hi, 王松
这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector
的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream
connector 同时引用是会冲突的,请根据你的需要使用。
祝好,
Leonard Xu
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了
Best,
tison.
王松 于2020年7月13日周一 下午12:54写道:
> 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that i
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.
请问是什么原因导致的呢?
代码如下
24 matches
Mail list logo