[ 
https://issues.apache.org/jira/browse/KAFKA-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12375:
----------------------------------------------

    Assignee: Tomasz Nguyen

> ReplaceStreamThread creates a new consumer with the same name as the one it's 
> replacing
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12375
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12375
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Tomasz Nguyen
>            Assignee: Tomasz Nguyen
>            Priority: Major
>             Fix For: 2.8.0
>
>
> I was debugging the kafka-streams soak cluster and noticed that replacing a 
> stream thread was causing the streams application to fail. I have managed to 
> find the following stacktrace:
> {code:java}
> javax.management.InstanceAlreadyExistsException: 
> kafka.consumer:type=app-info,id=i-0cdac8830ee1b8f01-StreamThread-1-restore-consumer
>  at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>  at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>  at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) 
> at 
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:815)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:338)
>  at 
> org.apache.kafka.streams.KafkaStreams.createAndAddStreamThread(KafkaStreams.java:896)
>  at 
> org.apache.kafka.streams.KafkaStreams.addStreamThread(KafkaStreams.java:977) 
> at 
> org.apache.kafka.streams.KafkaStreams.replaceStreamThread(KafkaStreams.java:467)
>  at 
> org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:487)
>  at 
> org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
> {code}
>  
> followed by:
> {code:java}
> Exception in thread "i-0e4d869ffd67ec825-StreamThread-1" 
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2446)
>      at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2430)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.enforceRebalance(KafkaConsumer.java:2261)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.sendShutdownRequest(StreamThread.java:666)
>  at 
> org.apache.kafka.streams.KafkaStreams.lambda$handleStreamsUncaughtException$4(KafkaStreams.java:508)
>  at 
> org.apache.kafka.streams.KafkaStreams.processStreamThread(KafkaStreams.java:1579)
>     at 
> org.apache.kafka.streams.KafkaStreams.handleStreamsUncaughtException(KafkaStreams.java:508)
>   at 
> org.apache.kafka.streams.KafkaStreams.lambda$setUncaughtExceptionHandler$1(KafkaStreams.java:427)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
> {code}
> My understanding so far is that we re-use the consumer name across thread 
> generations which can hit a few flavours of a race condition. My suggestion 
> would be to add the generation-id to the consumer name.
> This could be done by adding a thread generation id here
> https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java#L66
> or by adding an overload here: 
> https://github.com/apache/kafka/blob/b35ca4349dabb199411cb6bc4c80ef89f19d9328/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L390
> {code:java}
> // Some comments here
> final Map<String, Object> consumerConfigs = 
> config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), 
> threadIdx, generationId);
> {code}
> I have not yet checked if there are any implications to either of these 
> solutions



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to