[ 
https://issues.apache.org/jira/browse/KAFKA-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291777#comment-17291777
 ] 

Bruno Cadonna commented on KAFKA-12375:
---------------------------------------

I agree with [~ableegoldman] that ensuring a unique thread ID solves also other 
issues like the metrics issue above. We cannot wait for the metrics to be 
removed from the metrics because that happens during the shutdown that means 
after the stream thread has been replaced. 

> ReplaceStreamThread creates a new consumer with the same name as the one it's 
> replacing
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12375
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12375
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Tomasz Nguyen
>            Assignee: Tomasz Nguyen
>            Priority: Blocker
>             Fix For: 2.8.0
>
>
> I was debugging the kafka-streams soak cluster and noticed that replacing a 
> stream thread was causing the streams application to fail. I have managed to 
> find the following stacktrace:
> {code:java}
> javax.management.InstanceAlreadyExistsException: 
> kafka.consumer:type=app-info,id=i-0cdac8830ee1b8f01-StreamThread-1-restore-consumer
>  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>  at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
> at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:815)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:338)
>  at 
> org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:896)
>  at 
> org.apache.kafka.streams.KafkaStreams.addStreamThread(KafkaStreams.java:977) 
> at 
> org.apache.kafka.streams.KafkaStreams.replaceStreamThread(KafkaStreams.java:467)
>  at 
> org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:487)
>  at 
> org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
> {code}
>  
> followed by:
> {code:java}
> Exception in thread "i-0e4d869ffd67ec825-StreamThread-1" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2446)
>      at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2430)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.enforceRebalance(KafkaConsumer.java:2261)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.sendShutdownRequest(StreamThread.java:666)
>  at 
> org.apache.kafka.streams.KafkaStreams.lambda$handleStreamsUncaughtException$4(KafkaStreams.java:508)
>  at 
> org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1579)
>     at 
> org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:508)
>   at 
> org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
> {code}
> My understanding so far is that we re-use the consumer name across thread 
> generations which can hit a few flavours of a race condition. My suggestion 
> would be to add the generation-id to the consumer name.
> This could be done by adding a thread generation id here
> https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java#L66
> or by adding an overload here: 
> https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L390
> {code:java}
> // Some comments here
> final Map<String, Object> consumerConfigs = 
> config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), 
> threadIdx, generationId);
> {code}
> I have not yet checked if there are any implications to either of these 
> solutions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to