[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-08-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8736:
---
Priority: Critical  (was: Major)

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Assignee: Matthew Jarvie
>Priority: Critical
> Fix For: 2.4.0, 2.3.1
>
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.
> *Update:*
> The root cause is an internal change from `TreeMap` to 
> `ConcurrentSkipListMap`. In `TreeMap` using `size()` does not harm 
> performance, because is has constant time runtime. Hence, it is a regression 
> that only affects 2.3.0 release.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-08-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8736:
---
Issue Type: Bug  (was: Improvement)

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Assignee: Matthew Jarvie
>Priority: Critical
> Fix For: 2.4.0, 2.3.1
>
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.
> *Update:*
> The root cause is an internal change from `TreeMap` to 
> `ConcurrentSkipListMap`. In `TreeMap` using `size()` does not harm 
> performance, because is has constant time runtime. Hence, it is a regression 
> that only affects 2.3.0 release.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-08-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8736:
---
Description: 
While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
performance improvement. The test showed we were spending 80% of CPU time in 
ConcurrentSkipListMap.size():

 
{noformat}
100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
96.84% 
org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
96.83% 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
96.3% 
org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
 java.lang.Object):87
96.3% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object):133
96.3% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object, org.apache.kafka.streams.processor.To):180
96.3% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
 java.lang.Object, java.lang.Object):201
96.23% 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
 java.lang.Object):117
96.12% 
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
 java.lang.Object):43
96.12% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object):133
96.12% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object, org.apache.kafka.streams.processor.To):180
96.12% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
 java.lang.Object, java.lang.Object):201
96.08% 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
 java.lang.Object):117
82.78% 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
 java.lang.Object):169
82.78% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
 java.lang.Object):612
82.59% 
org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
 java.lang.Object):127
81.11% 
org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
 java.lang.Object):35
81.09% 
org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
 byte[]):131
81.09% 
org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
org.apache.kafka.common.utils.Bytes, 
org.apache.kafka.streams.state.internals.LRUCacheEntry):151
80.53% 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
According to 
[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
 the size method has to traverse all elements to get a count. It looks like the 
count is being compared against 0 to determine if the map is empty; In this 
case, we don't need a full count. Instead, the isEmpty() method should be used, 
which just looks for one node. We patched this and gained about 25% max 
throughput, and this method disappeared from thread dumps as a hot spot.

*Update:*

The root cause is an internal change from `TreeMap` to `ConcurrentSkipListMap`. 
In `TreeMap` using `size()` does not harm performance, because is has constant 
time runtime. Hence, it is a regression that only affects 2.3.0 release.

  was:
While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
performance improvement. The test showed we were spending 80% of CPU time in 
ConcurrentSkipListMap.size():

 
{noformat}
100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
96.84% 
org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
96.83% 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
96.3% 
org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
 

[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-08-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8736:
---
Affects Version/s: (was: 0.10.1.0)
   2.3.0

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Assignee: Matthew Jarvie
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-08-07 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8736:
---
Fix Version/s: (was: 2.2.2)
   (was: 2.1.2)
   (was: 2.0.2)

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Assignee: Matthew Jarvie
>Priority: Major
> Fix For: 2.4.0, 2.3.1
>
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-08-06 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8736:
---
Affects Version/s: (was: 2.3.0)
   0.10.1.0

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Matthew Jarvie
>Assignee: Matthew Jarvie
>Priority: Major
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-07-30 Thread Matthew Jarvie (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Jarvie updated KAFKA-8736:
--
Attachment: size.patch

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Priority: Major
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)