[ 
https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805717#comment-16805717
 ] 

Nico Kruber commented on FLINK-8093:
------------------------------------

Actually, that's not really a Kafka bug if they are relying on an AtomicInteger 
but we are using different class loaders. I'm also not too sure how much adding 
the timestamp helped because all parallel instances for the same topic will 
still clash (one timestamp created during job submission would be serialized 
and used by all task slots' instances, wouldn't it?

Ultimately, I think the best workaround would be for Flink to set the client ID 
based on the full operator's name with subtask id.

> flink job fail because of kafka producer create fail of 
> "javax.management.InstanceAlreadyExistsException"
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-8093
>                 URL: https://issues.apache.org/jira/browse/FLINK-8093
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.3.2
>         Environment: flink 1.3.2, kafka 0.9.1
>            Reporter: dongtingting
>            Priority: Critical
>
> one taskmanager has multiple taskslot, one task fail because of create 
> kafkaProducer fail,the reason for create kafkaProducer fail is 
> “javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace 
> is :
> 2017-11-04 19:41:23,281 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from 
> RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:181)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202)
>         at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.producer:type=producer-metrics,client-id=producer-3
>         at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
>         at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255)
>         at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.<init>(RecordAccumulator.java:111)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:261)
>         ... 9 more
> Caused by: javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3
>         at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>         at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>         at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>         at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
>         ... 16 more
> I doubt that task in different taskslot of one taskmanager use different 
> classloader, and taskid may be  the same in one process。 So this lead to 
> create kafkaProducer fail in one taskManager。 
> Does anybody encountered the same problem? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to