[ 
https://issues.apache.org/jira/browse/KAFKA-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16498202#comment-16498202
 ] 

ASF GitHub Bot commented on KAFKA-6925:
---------------------------------------

guozhangwang closed pull request #5108: KAFKA-6925: fix parentSensors memory 
leak
URL: https://github.com/apache/kafka/pull/5108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index b2ce2e7dcf8..5d0c46ecba4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -222,6 +222,7 @@ public void removeSensor(Sensor sensor) {
         final Sensor parent = parentSensors.get(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
+            parentSensors.remove(sensor);
         }
 
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index 7b16246da33..7666e42044d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -19,11 +19,15 @@
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -53,19 +57,27 @@ public void testRemoveSensor() {
         String entity = "entity";
         String operation = "put";
         Map<String, String> tags = new HashMap<>();
-        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new 
Metrics(), groupName, tags);
+        final Metrics metrics = new Metrics();
+        final Map<MetricName, KafkaMetric> initialMetrics = 
Collections.unmodifiableMap(new LinkedHashMap<>(metrics.metrics()));
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, 
groupName, tags);
 
         Sensor sensor1 = streamsMetrics.addSensor(sensorName, 
Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor1);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor1a = streamsMetrics.addSensor(sensorName, 
Sensor.RecordingLevel.DEBUG, sensor1);
         streamsMetrics.removeSensor(sensor1a);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, 
entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor2);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, 
operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+        Assert.assertEquals(initialMetrics, metrics.metrics());
+
+        Assert.assertEquals(Collections.emptyMap(), 
streamsMetrics.parentSensors);
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Memory leak in 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6925
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6925
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.2, 1.1.0, 1.0.1
>            Reporter: Marcin Kuthan
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 1.1.1
>
>
> *Note: this issue was fixed incidentally in 2.0, so it is only present in 
> versions 0.x and 1.x.*
>  
> The retained heap of 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  is surprisingly high for long running job. Over 100MB of heap for every 
> stream after a week of uptime, when for the same application a few hours 
> after start heap takes 2MB.
> For the problematic instance majority of memory StreamsMetricsThreadImpl is 
> occupied by hash map entries in parentSensors, over 8000 elements 100+kB 
> each. For fresh instance there are less than 200 elements.
> Below you could find retained set report generated from Eclipse Mat but I'm 
> not fully sure about correctness due to complex object graph in the metrics 
> related code. Number of objects in single 
> StreamThread$StreamsMetricsThreadImpl  instance.
>  
> {code:java}
> Class Name | Objects | Shallow Heap
> -----------------------------------------------------------------------------------------------------------
> org.apache.kafka.common.metrics.KafkaMetric | 140,476 | 4,495,232
> org.apache.kafka.common.MetricName | 140,476 | 4,495,232
> org.apache.kafka.common.metrics.stats.SampledStat$Sample | 73,599 | 3,532,752
> org.apache.kafka.common.metrics.stats.Meter | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Count | 42,104 | 1,347,328
> org.apache.kafka.common.metrics.stats.Rate | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Total | 42,104 | 1,010,496
> org.apache.kafka.common.metrics.stats.Max | 28,134 | 900,288
> org.apache.kafka.common.metrics.stats.Avg | 28,134 | 900,288
> org.apache.kafka.common.metrics.Sensor | 3,164 | 202,496
> org.apache.kafka.common.metrics.Sensor[] | 3,164 | 71,088
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl|
>  1 | 56
> -----------------------------------------------------------------------------------------------------------
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to