[
https://issues.apache.org/jira/browse/KAFKA-3992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Cook reopened KAFKA-3992:
-----------------------------------
I have moved to 0.10 of the Kafka Consumer Client and I am still seeing this
issue.
I believe that what is happening is that in cases where we do not provide a
client id, Kafka generates one for us. The problem seems to be that the
client.id generation in Kafka is not thread-safe, as when I start 3 consumers
in parallel (all at the same time), one of them succeeds, but the other two
fail with the error in this issue as they get assigned the "consumer-1" id.
However:
If I use the exact same code, but start a single consumer at a time with a
delay in-between, I can successfully consume in parallel.
> InstanceAlreadyExistsException Error for Consumers Starting in Parallel
> -----------------------------------------------------------------------
>
> Key: KAFKA-3992
> URL: https://issues.apache.org/jira/browse/KAFKA-3992
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.0
> Reporter: Alexander Cook
> Assignee: Ewen Cheslack-Postava
>
> I see the following error sometimes when I start multiple consumers at about
> the same time in the same process (separate threads). Everything seems to
> work fine afterwards, so should this not actually be an ERROR level message,
> or could there be something going wrong that I don't see?
> Let me know if I can provide any more info!
> Error processing messages: Error registering mbean
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> org.apache.kafka.common.KafkaException: Error registering mbean
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
>
> Caused by: javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> Here is the full stack trace:
> M[?:com.ibm.streamsx.messaging.kafka.KafkaConsumerV9.produceTuples:-1] -
> Error processing messages: Error registering mbean
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> org.apache.kafka.common.KafkaException: Error registering mbean
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> at
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
> at
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
> at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
> at
> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:641)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:268)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:126)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:186)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
> at
> com.ibm.streamsx.messaging.kafka.KafkaConsumerV9.produceTuples(KafkaConsumerV9.java:129)
> at
> com.ibm.streamsx.messaging.kafka.KafkaConsumerV9$1.run(KafkaConsumerV9.java:70)
> at java.lang.Thread.run(Thread.java:785)
> at
> com.ibm.streams.operator.internal.runtime.OperatorThreadFactory$2.run(OperatorThreadFactory.java:137)
> Caused by: javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=consumer-node-metrics,client-id=consumer-1,node-id=node--1
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:449)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1910)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:978)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:912)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:336)
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:534)
> at
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
> ... 18 more
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)