[ https://issues.apache.org/jira/browse/KAFKA-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291331#comment-17291331 ]
A. Sophie Blee-Goldman commented on KAFKA-12375: ------------------------------------------------ Regarding the javax.management.InstanceAlreadyExistsException, this is something I've seen in user logs and tests before. It's definitely a bit confusing/potentially concerning to a user, but I don't think it's a new problem. Maybe the REPLACE_THREAD functionality will have made it slightly more common. Would be nice to fix, but I wouldn't press for that as a blocker of 2.8 > ReplaceStreamThread creates a new consumer with the same name as the one it's > replacing > --------------------------------------------------------------------------------------- > > Key: KAFKA-12375 > URL: https://issues.apache.org/jira/browse/KAFKA-12375 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0 > Reporter: Tomasz Nguyen > Assignee: Tomasz Nguyen > Priority: Blocker > Fix For: 2.8.0 > > > I was debugging the kafka-streams soak cluster and noticed that replacing a > stream thread was causing the streams application to fail. I have managed to > find the following stacktrace: > {code:java} > javax.management.InstanceAlreadyExistsException: > kafka.consumer:type=app-info,id=i-0cdac8830ee1b8f01-StreamThread-1-restore-consumer > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:815) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) > at > org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:338) > at > org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:896) > at > org.apache.kafka.streams.KafkaStreams.addStreamThread(KafkaStreams.java:977) > at > org.apache.kafka.streams.KafkaStreams.replaceStreamThread(KafkaStreams.java:467) > at > org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:487) > at > org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) > {code} > > followed by: > {code:java} > Exception in thread "i-0e4d869ffd67ec825-StreamThread-1" > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2446) > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2430) > at > org.apache.kafka.clients.consumer.KafkaConsumer.enforceRebalance(KafkaConsumer.java:2261) > at > org.apache.kafka.streams.processor.internals.StreamThread.sendShutdownRequest(StreamThread.java:666) > at > org.apache.kafka.streams.KafkaStreams.lambda$handleStreamsUncaughtException$4(KafkaStreams.java:508) > at > org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1579) > at > org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:508) > at > org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) > {code} > My understanding so far is that we re-use the consumer name across thread > generations which can hit a few flavours of a race condition. My suggestion > would be to add the generation-id to the consumer name. > This could be done by adding a thread generation id here > https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java#L66 > or by adding an overload here: > https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L390 > {code:java} > // Some comments here > final Map<String, Object> consumerConfigs = > config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), > threadIdx, generationId); > {code} > I have not yet checked if there are any implications to either of these > solutions -- This message was sent by Atlassian Jira (v8.3.4#803005)