[ 
https://issues.apache.org/jira/browse/STORM-4055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880168#comment-17880168
 ] 

Rui Abreu commented on STORM-4055:
----------------------------------

Fixed by: https://github.com/apache/storm/pull/3691

> ConcurrentModificationException on KafkaConsumer when running topology with 
> Metrics Reporters
> ---------------------------------------------------------------------------------------------
>
>                 Key: STORM-4055
>                 URL: https://issues.apache.org/jira/browse/STORM-4055
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.6.1
>            Reporter: Anthony Castrati
>            Priority: Major
>             Fix For: 2.7.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> After a recent upgrade to storm-kafka-client on storm server 2.6.1, we are 
> seeing ConcurrentModificationException in our topology at runtime. I believe 
> this is due to the re-use of a KafkaConsumer instance between the KafkaSpout 
> and the 
> KafkaOffsetPartitionMetrics which were added some time between 2.4.0 and 
> 2.6.1.
>  
> h3. Steps to Reproduce:
> Configure a topology with a basic KafkaSpout. Configure the topology with one 
> of the metrics loggers. We used our own custom one, but reproduced it with 
> ConsoleStormReporter as well. The JMXReporter did not reproduce the issue for 
> us, but we did not dig into why.
> *reporter config:*
> {{topology.metrics.reporters: [}}
> {{  {}}
> {{    "filter": {}}
> {{      "expression": ".*",}}
> {{      "class": "org.apache.storm.metrics2.filters.RegexFilter"}}
> {{    },}}
> {{    "report.period": 15,}}
> {{    "report.period.units": "SECONDS",}}
> {{    "class": "org.apache.storm.metrics2.reporters.ConsoleStormReporter"}}
>     }
> {{]}}
> h3. Stacktrace:
> {quote}[ERROR] Exception thrown from NewRelicReporter#report. Exception was 
> suppressed.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for 
> multi-threaded access. currentThread(name: 
> metrics-newRelicReporter-1-thread-1, id: 24) otherThread(id: 40)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
>  ~[stormjar.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2465)
>  ~[stormjar.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2144)
>  ~[stormjar.jar:?]
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2123)
>  ~[stormjar.jar:?]
>     at 
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics.getBeginningOffsets(KafkaOffsetPartitionMetrics.java:181)
>  ~[stormjar.jar:?]
>     at 
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:93)
>  ~[stormjar.jar:?]
>     at 
> org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics$2.getValue(KafkaOffsetPartitionMetrics.java:90)
>  ~[stormjar.jar:?]
>     at 
> com.codahale.metrics.newrelic.transformer.GaugeTransformer.transform(GaugeTransformer.java:60)
>  ~[stormjar.jar:?]
>     at 
> com.codahale.metrics.newrelic.NewRelicReporter.lambda$transform$0(NewRelicReporter.java:154)
>  ~[stormjar.jar:?]
>     at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown 
> Source) ~[?:?]
>     at 
> java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet.lambda$entryConsumer$0(Unknown
>  Source) ~[?:?]
>     at java.base/java.util.TreeMap$EntrySpliterator.forEachRemaining(Unknown 
> Source) ~[?:?]
>     at 
> java.base/java.util.Collections$UnmodifiableMap$UnmodifiableEntrySet$UnmodifiableEntrySetSpliterator.forEachRemaining(Unknown
>  Source) ~[?:?]
>     at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) 
> ~[?:?]
>     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
> Source) ~[?:?]
>     at 
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown 
> Source) ~[?:?]
>     at 
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown
>  Source) ~[?:?]
>     at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) 
> ~[?:?]
>     at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) 
> ~[?:?]
>     at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown 
> Source) ~[?:?]
>     at 
> java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown 
> Source) ~[?:?]
>     at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) 
> ~[?:?]
>     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
> Source) ~[?:?]
>     at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
> Source) ~[?:?]
>     at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) 
> ~[?:?]
>     at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) 
> ~[?:?]
>     at 
> com.codahale.metrics.newrelic.NewRelicReporter.report(NewRelicReporter.java:138)
>  ~[stormjar.jar:?]
>     at 
> com.codahale.metrics.ScheduledReporter.report(ScheduledReporter.java:243) 
> ~[metrics-core-3.2.6.jar:3.2.6]
>     at 
> com.codahale.metrics.ScheduledReporter$1.run(ScheduledReporter.java:182) 
> [metrics-core-3.2.6.jar:3.2.6]
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source) [?:?]
>     at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) 
> [?:?]
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) [?:?]
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source) [?:?]
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) [?:?]
>     at java.base/java.lang.Thread.run(Unknown Source) [?:?]
> {quote}
> h3. Workaround
> Configure the with RegexFilter or similar that excludes the 
> KafkaOffsetPartitionMetrics.
> h3. Impact
> I am concerned that depending on the timing of the access to the spout that 
> the offending metric could fast forward or rewind the spout. I did not do any 
> further testing to see if the lock could be mis-managed in such a way that 
> the spout is directly impacted, but it is feasible. Impact may need to be 
> adjusted if it is confirmed that a simple metric reporter could result in 
> skipping events or re-processing them.
> h3. Potential Code Issues:
> *KafkaSpout.java*
> {{private transient Consumer<K, V> consumer;}}
> {{...}}
> {{public void open(Map<String, Object> conf, TopologyContext context, 
> SpoutOutputCollector collector) {}}
> {{        ...}}
> {{        //this consumer will be used by the spout everywhere}}
> {{        consumer = 
> kafkaConsumerFactory.createConsumer(kafkaSpoutConfig.getKafkaProps());}}
> {{        tupleListener.open(conf, context);}}
> {{        this.kafkaOffsetMetricManager}}
> {{            = new KafkaOffsetMetricManager<>(() -> 
> Collections.unmodifiableMap(offsetManagers), () -> consumer, context);}}
> () -> consumer does not appear to be a safe provider.  It re-uses the same 
> instance of the KafkaConsumer as the KafkaSpout in another thread and 
> KafkaConsumer is not thread safe.
> {*}KafkaOffsetPartitionMetrics.java: getBeginningOffsets, 
> getEndOffsets{*}{{{{}}{}}}
>  
>  
> {{private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition> 
> topicPartitions) {}}
> {{    Consumer<K, V> consumer = consumerSupplier.get();}}
> {{    ...}}
> {{    try {}}
> {{        // This will actually try to modify the KafkaSpout instance of the 
> consumer which could negatively impact the spout}}
> {{        beginningOffsets = consumer.beginningOffsets(topicPartitions);}}
> {\\{    }}}
> {{    ...}}
> {{{}}{}}}{{{}private Map<TopicPartition, Long> 
> getEndOffsets(Set<TopicPartition> topicPartitions) {{}}}
> {{    Consumer<K, V> consumer = consumerSupplier.get();}}
> {{    ...}}
> {{    try {}}
> {{        // This will actually try to modify the KafkaSpout instance of the 
> consumer which could negatively impact the spout}}
> {{{}        endOffsets = consumer.endOffsets(topicPartitions);{}}}{\{    }
> }}
> {{    ...}}
> {{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to