[ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16902544#comment-16902544
 ] 

Matthias J. Sax commented on KAFKA-8736:
----------------------------------------

My last comment is incorrect. This is a regression but, due to an internal 
change from `TreeMap` to `ConcurrentSkipListMap`. In `TreeMap` using `size()` 
does not harm performance, because is has constant time runtime. Hence, it is a 
regression that only affects 2.3.0 release.

Because it's a regression, I updated the ticket as critical bug.

> Performance: ThreadCache uses size() for empty cache check
> ----------------------------------------------------------
>
>                 Key: KAFKA-8736
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8736
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Matthew Jarvie
>            Assignee: Matthew Jarvie
>            Priority: Critical
>             Fix For: 2.4.0, 2.3.1
>
>         Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.
> *Update:*
> The root cause is an internal change from `TreeMap` to 
> `ConcurrentSkipListMap`. In `TreeMap` using `size()` does not harm 
> performance, because is has constant time runtime. Hence, it is a regression 
> that only affects 2.3.0 release.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to