[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709302#comment-16709302
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---------------------------------------

mjsax closed pull request #5980: KAFKA-7660: fix streams and Metrics memory 
leaks
URL: https://github.com/apache/kafka/pull/5980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index a6da9f90397..9e2b6f18f0c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -41,6 +41,8 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Collections.emptyList;
+
 /**
  * A registry of sensors and metrics.
  * <p>
@@ -446,6 +448,9 @@ public void removeSensor(String name) {
                             removeMetric(metric.metricName());
                         log.debug("Removed sensor with name {}", name);
                         childSensors = childrenSensors.remove(sensor);
+                        for (final Sensor parent : sensor.parents()) {
+                            childrenSensors.getOrDefault(parent, 
emptyList()).remove(sensor);
+                        }
                     }
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ccbe8aad9cd..1af9419bc75 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -22,7 +22,6 @@
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -32,6 +31,9 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
 /**
  * A sensor applies a continuous sequence of numerical values to a set of 
associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link 
#record(double)} api and would maintain a set
@@ -133,6 +135,10 @@ public String name() {
         return this.name;
     }
 
+    List<Sensor> parents() {
+        return unmodifiableList(asList(parents));
+    }
+
     /**
      * Record an occurrence, this is just short-hand for {@link 
#record(double) record(1.0)}
      */
@@ -291,7 +297,7 @@ public boolean hasExpired() {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(new 
LinkedList<>(this.metrics.values()));
+        return unmodifiableList(new LinkedList<>(this.metrics.values()));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5c75d03b0e6..6ad48331fde 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -197,6 +199,20 @@ public void testBadSensorHierarchy() {
         metrics.sensor("gc", c1, c2); // should fail
     }
 
+    @Test
+    public void testRemoveChildSensor() {
+        final Metrics metrics = new Metrics();
+
+        final Sensor parent = metrics.sensor("parent");
+        final Sensor child = metrics.sensor("child", parent);
+
+        assertEquals(singletonList(child), 
metrics.childrenSensors().get(parent));
+
+        metrics.removeSensor("child");
+
+        assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+    }
+
     @Test
     public void testRemoveSensor() {
         int size = metrics.metrics().size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 51665e60d0b..ba0b58fe2f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -110,11 +110,9 @@ public final Sensor taskLevelSensor(final String taskName,
     public final void removeAllTaskLevelSensors(final String taskName) {
         final String key = threadName + "." + taskName;
         synchronized (taskLevelSensors) {
-            if (taskLevelSensors.containsKey(key)) {
-                while (!taskLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(taskLevelSensors.get(key).pop());
-                }
-                taskLevelSensors.remove(key);
+            final Deque<String> sensors = taskLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -143,11 +141,9 @@ public final Sensor cacheLevelSensor(final String taskName,
     public final void removeAllCacheLevelSensors(final String taskName, final 
String cacheName) {
         final String key = threadName + "." + taskName + "." + cacheName;
         synchronized (cacheLevelSensors) {
-            if (cacheLevelSensors.containsKey(key)) {
-                while (!cacheLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(cacheLevelSensors.get(key).pop());
-                }
-                cacheLevelSensors.remove(key);
+            final Deque<String> strings = cacheLevelSensors.remove(key);
+            while (strings != null && !strings.isEmpty()) {
+                metrics.removeSensor(strings.pop());
             }
         }
     }
@@ -361,10 +357,16 @@ public void removeSensor(final Sensor sensor) {
         Objects.requireNonNull(sensor, "Sensor is null");
         metrics.removeSensor(sensor.name());
 
-        final Sensor parent = parentSensors.get(sensor);
+        final Sensor parent = parentSensors.remove(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
         }
     }
 
+    /**
+     * Visible for testing
+     */
+    Map<Sensor, Sensor> parentSensors() {
+        return Collections.unmodifiableMap(parentSensors);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
similarity index 64%
rename from 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index a72dc7928d7..cccc4580644 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -14,14 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
 
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.common.metrics.stats.Count;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.Assert.assertEquals;
 
 public class StreamsMetricsImplTest {
@@ -57,6 +63,55 @@ public void testRemoveSensor() {
 
         final Sensor sensor3 = streamsMetrics.addThroughputSensor(taskName, 
scope, entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+
+        assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
+    }
+
+    @Test
+    public void testMutiLevelSensorRemoval() {
+        final Metrics registry = new Metrics();
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
"");
+        for (final MetricName defaultMetric : registry.metrics().keySet()) {
+            registry.removeMetric(defaultMetric);
+        }
+
+        final String taskName = "taskName";
+        final String operation = "operation";
+        final Map<String, String> threadTags = mkMap(mkEntry("threadkey", 
"value"));
+
+        final Map<String, String> taskTags = mkMap(mkEntry("taskkey", 
"value"));
+
+        final Sensor parent1 = metrics.threadLevelSensor(operation, 
Sensor.RecordingLevel.DEBUG);
+        parent1.add(new MetricName("name", "group", "description", 
threadTags), new Count());
+
+        assertEquals(1, registry.metrics().size());
+
+        final Sensor sensor1 = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG, parent1);
+        sensor1.add(new MetricName("name", "group", "description", taskTags), 
new Count());
+
+        assertEquals(2, registry.metrics().size());
+
+        metrics.removeAllTaskLevelSensors(taskName);
+
+        assertEquals(1, registry.metrics().size());
+
+        final Sensor parent2 = metrics.threadLevelSensor(operation, 
Sensor.RecordingLevel.DEBUG);
+        parent2.add(new MetricName("name", "group", "description", 
threadTags), new Count());
+
+        assertEquals(1, registry.metrics().size());
+
+        final Sensor sensor2 = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG, parent2);
+        sensor2.add(new MetricName("name", "group", "description", taskTags), 
new Count());
+
+        assertEquals(2, registry.metrics().size());
+
+        metrics.removeAllTaskLevelSensors(taskName);
+
+        assertEquals(1, registry.metrics().size());
+
+        metrics.removeAllThreadLevelSensors();
+
+        assertEquals(0, registry.metrics().size());
     }
 
     @Test
@@ -90,7 +145,7 @@ public void testThroughputMetrics() {
         final String entity = "entity";
         final String operation = "put";
 
-        final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName,  
scope, entity, operation, Sensor.RecordingLevel.DEBUG);
+        final Sensor sensor1 = streamsMetrics.addThroughputSensor(taskName, 
scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
         final int meterMetricsCount = 2; // Each Meter is a combination of a 
Rate and a Total
         // 2 meter metrics plus a common metric that keeps track of total 
registered metrics in Metrics() constructor


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> --------------------------------
>
>                 Key: KAFKA-7660
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7660
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>    Affects Versions: 2.0.0
>            Reporter: Patrik Kleindl
>            Assignee: John Roesler
>            Priority: Minor
>             Fix For: 2.2.0
>
>         Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to