可以排除一下是否是jar包冲突

------------------------------------------------------------------
发件人:Even <452232...@qq.com>
发送时间:2020年5月29日(星期五) 16:17
收件人:user-zh <user-zh@flink.apache.org>
主 题:Kafka Consumer反序列化错问题

Hi!
请教一个Kafka Consumer反序列问题:
一个kafka&nbsp;consumer&nbsp;job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink 
per-job cluster 就报kafka反序列化错,报错信息如下:
其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = 
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
properties))
2020-05-27&nbsp;17:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:811)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:659)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:639)
 at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 is not an instance of org.apache.kafka.common.serialization.Deserializer
 at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&gt;(KafkaConsumer.java:705)
 ... 15 more

回复