Kafka Consumer??????????????

2020-05-29 Thread Even
Hi??
Kafka Consumer
kafka consumer job ??Flink session 
clusterFlink per-job cluster 
kafka??
flink??1.10,kafka??kafka_2.12-2.1.0consumer??val data = 
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
properties))
2020-05-27 17:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.

?????? Kafka Consumer??????????????

2020-05-29 Thread Even





--  --
??: "zz zhang"

?????? Kafka Consumer??????????????

2020-06-01 Thread Even
??pom??provided??jar??




--  --
??: "tison"https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
>
> Best,
> tison.
>
>
> Even <452232...@qq.com> ??2020??5??29?? 6:48??
>
>> 
>>
>>
>>
>>
>> --  --
>> ??: "zz zhang"

kafka consumer exception

2019-02-19 Thread 董鹏
flink大神你们好,在使用flink on kafka(1.0版本) 遇到如下异常:
不影响job,不影响结果,对于这个异常偶尔打出,你们是否有遇到这个问题呢?


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error 
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

??????kafka consumer exception

2019-02-19 Thread ForwardXu



   kafka 
consumerclient-id??flink??client-id??kafka??jira??
https://issues.apache.org/jira/browse/KAFKA-3992??client-id??kafka??clientid??"consumer";
 +  id??







--  --
??: "";
: 2019??2??20??(??) 3:02
??: "user-zh";

: kafka consumer exception



flink??flink on kafka(1.0) ??
??job??


[org.apache.kafka.common.utils.AppInfoParser] [AppInfoParser.java:60] - Error 
registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-31
  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
  at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
  at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
  at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58)
  at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:469)
  at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
  at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
  at java.lang.Thread.run(Thread.java:748)

Flink-1.10.0-StandAlone模式 Kafka Consumer InstanceAlreadyExistsException

2020-06-04 Thread zhang...@lakala.com
Flink-1.10.0 StandAlone部署
Kafka-0.11
JDK8

当我在同一个Flink集群部署两个应用,这两个应用被分配到了同一个TaskManager运行,使用不同的group.id消费同一个topic时,第一个应用启动正常,第二个应用启动时,遇到了如下警告信息,请问,这个警告是如何造成的,是否可以忽略,或者说如何解决。

WARNorg.apache.kafka.common.utils.AppInfoParser- Error registering 
AppInfo mbean
javax.management.InstanceAlreadyExistsException: 
kafka.consumer:type=app-info,id=consumer-5
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:757)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:633)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:502)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:181)




flink kafka consumer部分消息未消费

2019-09-27 Thread zenglong chen
一个test topic,手动往里面添加消息,flink消费后print输出,发现会漏掉部分添加的消息,问题可能出在哪里?前辈有人遇到过类似问题吗?


Re: Kafka Consumer反序列化错问题

2020-05-29 Thread zz zhang
应该是maven-shade那边配置问题,
原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常

夏帅  于2020年5月29日周五 下午4:33写道:
>
> 可以排除一下是否是jar包冲突
>
>
> --
> 发件人:Even <452232...@qq.com>
> 发送时间:2020年5月29日(星期五) 16:17
> 收件人:user-zh 
> 主 题:Kafka Consumer反序列化错问题
>
> Hi!
> 请教一个Kafka Consumer反序列问题:
> 一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink 
> per-job cluster 就报kafka反序列化错,报错信息如下:
> 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = 
> env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
> properties))
> 2020-05-27 17:05:22
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>  at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>  at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>  at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>  at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>  is not an instance of org.apache.kafka.common.serialization.Deserializer
>  at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.  ... 15 more



-- 
Best,
zz zhang


Re: Kafka Consumer反序列化错问题

2020-05-29 Thread tison
这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。

参考这个文档
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath

Best,
tison.


Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:

> 谢谢,请问需要怎么处理避免这个问题?
>
>
>
>
> -- 原始邮件 --
> 发件人: "zz zhang" 发送时间: 2020年5月29日(星期五) 下午5:16
> 收件人: "user-zh" jkill...@dingtalk.com>;
>
> 主题: Re: Kafka Consumer反序列化错问题
>
>
>
> 应该是maven-shade那边配置问题,
>
> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
> apache.flink.kafka.shaded.org
> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>
> 夏帅  >
> > 可以排除一下是否是jar包冲突
> >
> >
> > ------
> > 发件人:Even <452232...@qq.com>
> > 发送时间:2020年5月29日(星期五) 16:17
> > 收件人:user-zh  > 主 题:Kafka Consumer反序列化错问题
> >
> > Hi!
> > 请教一个Kafka Consumer反序列问题:
> > 一个kafka&nbsp;consumer&nbsp;job 提交到Flink session
> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
> > 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
> env.addSource(new FlinkKafkaConsumer[String](topic, new
> SimpleStringSchema(), properties))
> > 2020-05-27&nbsp;17:05:22
> > org.apache.kafka.common.KafkaException: Failed to construct kafka
> consumer
> >  at
> org.apache.kafka.clients.consumer.KafkaConsumer. >  at
> org.apache.kafka.clients.consumer.KafkaConsumer. >  at
> org.apache.kafka.clients.consumer.KafkaConsumer. >  at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> >  at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> >  at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
> >  at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> >  at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> >  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> >  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >  at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >  at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >  at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.KafkaException:
> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> is not an instance of org.apache.kafka.common.serialization.Deserializer
> >  at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
> >  at
> org.apache.kafka.clients.consumer.KafkaConsumer. >  ... 15 more
>
>
>
> --
> Best,
> zz zhang


Re: Kafka Consumer反序列化错问题

2020-05-29 Thread tison
另外关于类加载的一般性文档,可以看下这个

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html

Best,
tison.


tison  于2020年5月29日周五 下午7:46写道:

> 这个原因应该是类加载的顺序问题,你配置一下 child-first 的类加载,如果是 perjob 1.10 上我记得是要指定某个配置。
>
> 参考这个文档
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#user-jars--classpath
>
> Best,
> tison.
>
>
> Even <452232...@qq.com> 于2020年5月29日周五 下午6:48写道:
>
>> 谢谢,请问需要怎么处理避免这个问题?
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "zz zhang"> 发送时间: 2020年5月29日(星期五) 下午5:16
>> 收件人: "user-zh"> jkill...@dingtalk.com>;
>>
>> 主题: Re: Kafka Consumer反序列化错问题
>>
>>
>>
>> 应该是maven-shade那边配置问题,
>>
>> 原始class文件org.apache.kafka.common.serialization.ByteArrayDeserializer被改写为org.
>> apache.flink.kafka.shaded.org
>> .apache.kafka.common.serialization.ByteArrayDeserializer之后,所实现的接口Deserializer也被shade处理了,可能被是org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer,所以导致异常
>>
>> 夏帅 > >
>> > 可以排除一下是否是jar包冲突
>> >
>> >
>> > --
>> > 发件人:Even <452232...@qq.com>
>> > 发送时间:2020年5月29日(星期五) 16:17
>> > 收件人:user-zh > > 主 题:Kafka Consumer反序列化错问题
>> >
>> > Hi!
>> > 请教一个Kafka Consumer反序列问题:
>> > 一个kafka&nbsp;consumer&nbsp;job 提交到Flink session
>> cluster时运行稳定,但是独立提交到到Flink per-job cluster 就报kafka反序列化错,报错信息如下:
>> > 其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data =
>> env.addSource(new FlinkKafkaConsumer[String](topic, new
>> SimpleStringSchema(), properties))
>> > 2020-05-27&nbsp;17:05:22
>> > org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>> >  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.> >  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.> >  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.> >  at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> >  at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> >  at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>> >  at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> >  at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> >  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>> >  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >  at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >  at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >  at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
>> is not an instance of org.apache.kafka.common.serialization.Deserializer
>> >  at
>> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
>> >  at
>> org.apache.kafka.clients.consumer.KafkaConsumer.> >  ... 15 more
>>
>>
>>
>> --
>> Best,
>> zz zhang
>
>


Re: flink kafka consumer部分消息未消费

2019-09-29 Thread Dian Fu
web界面上可以看到每个节点收到了多少条数据,发送了多少条数据,看看在哪里少了。

> 在 2019年9月27日,下午7:44,zenglong chen  写道:
> 
> 一个test topic,手动往里面添加消息,flink消费后print输出,发现会漏掉部分添加的消息,问题可能出在哪里?前辈有人遇到过类似问题吗?



回复:Kafka Consumer反序列化错问题

2020-05-29 Thread 夏帅
可以排除一下是否是jar包冲突


--
发件人:Even <452232...@qq.com>
发送时间:2020年5月29日(星期五) 16:17
收件人:user-zh 
主 题:Kafka Consumer反序列化错问题

Hi!
请教一个Kafka Consumer反序列问题:
一个kafka consumer job 提交到Flink session cluster时运行稳定,但是独立提交到到Flink 
per-job cluster 就报kafka反序列化错,报错信息如下:
其中flink版本为1.10,kafka版本为kafka_2.12-2.1.0;代码中consumer配置为val data = 
env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), 
properties))
2020-05-27 17:05:22
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.

flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-18 Thread m13162790856
具体日主信息如下:


   org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745) Caused by: 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
of org.apache.kafka.common.serialization.Deserializer at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
... 15 more 2020-11-19 15:17:32,0


有哪位同学遇见过

Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 Thread hailongwang
Hi,
   这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。
   如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child 
classload 加载了,
而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 
加载了,那么会有问题。
你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。
希望对你有帮助。


Best,
Hailong Wang

在 2020-11-19 14:33:25,"m13162790856"  写道:
>具体日主信息如下:
>
>
>   org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
>at 
>org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
>org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
>java.lang.Thread.run(Thread.java:745) Caused by: 
>org.apache.kafka.common.KafkaException: 
>org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
>of org.apache.kafka.common.serialization.Deserializer at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
> at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
>... 15 more 2020-11-19 15:17:32,0
>
>
>有哪位同学遇见过


Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 Thread m13162790856
HI:
   偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 
所以包能确保每次启动都是一样,很奇怪这种情况


在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:


Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 
`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 
加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 
加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 希望对你有帮助。 
Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856"  
写道: >具体日主信息如下: > > > org.apache.kafka.common.KafkaException: Failed to 
construct kafka consumer >at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745) Caused by: 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
of org.apache.kafka.common.serialization.Deserializer at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过

Re:Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 Thread hailongwang
可以 grep 看下哪些 jar 包包含这 2 个类的?




在 2020-11-20 08:51:59,"m13162790856"  写道:
>HI:
>   偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 
> 所以包能确保每次启动都是一样,很奇怪这种情况
>
>
>在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
>
>
>Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 
>`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child 
>classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent 
>classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 
>希望对你有帮助。 Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856" 
> 写道: >具体日主信息如下: > > > 
>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
>at 
>org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
>org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
>java.lang.Thread.run(Thread.java:745) Caused by: 
>org.apache.kafka.common.KafkaException: 
>org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
>of org.apache.kafka.common.serialization.Deserializer at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
> at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
>... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过


group.id更改,通过savepoint启动的Flink任务,Kafka consumer是否仍然可以获取到保存在状态中的start position?

2019-10-14 Thread justskinny
Hi,all根据文档,如果从checkpoint或者savepoint中恢复任务,则Kafka Consumer会使用状态中的start 
position。
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
请问通过savepoint恢复的任务,如果group.id更改,Kafka consumer是否仍然可以获取到保存在状态中的start 
position?