[ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16906651#comment-16906651
 ] 

ASF GitHub Bot commented on KAFKA-8736:
---------------------------------------

bbejeck commented on pull request #7177: KAFKA-8736: Track size in 
InMemoryKeyValueStore
URL: https://github.com/apache/kafka/pull/7177
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Performance: ThreadCache uses size() for empty cache check
> ----------------------------------------------------------
>
>                 Key: KAFKA-8736
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8736
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Matthew Jarvie
>            Assignee: Matthew Jarvie
>            Priority: Critical
>             Fix For: 2.4.0, 2.3.1
>
>         Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.
> *Update:*
> The root cause is an internal change from `TreeMap` to 
> `ConcurrentSkipListMap`. In `TreeMap` using `size()` does not harm 
> performance, because is has constant time runtime. Hence, it is a regression 
> that only affects 2.3.0 release.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to