Repository: hive
Updated Branches:
  refs/heads/branch-3 db8e9b0ef -> 0a1bc3583


HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric 
Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a1bc358
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a1bc358
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a1bc358

Branch: refs/heads/branch-3
Commit: 0a1bc358399f9b14999f27bfcb965318fe5ece11
Parents: db8e9b0
Author: Eric Wohlstadter <wohls...@gmail.com>
Authored: Thu Nov 29 12:35:01 2018 -0800
Committer: Prasanth Jayachandran <prasan...@apache.org>
Committed: Mon Dec 10 01:28:46 2018 -0800

----------------------------------------------------------------------
 .../hadoop/hive/common/HeapMemoryMonitor.java   | 22 +++++++++++++++++---
 .../hive/streaming/AbstractRecordWriter.java    |  1 +
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java 
b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
index 42286be..56ec2fd 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.management.NotificationEmitter;
+import javax.management.NotificationListener;
+import javax.management.ListenerNotFoundException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@ public class HeapMemoryMonitor {
 
   private final double threshold;
   private List<Listener> listeners = new ArrayList<>();
+  private NotificationListener notificationListener;
 
   public interface Listener {
     void memoryUsageAboveThreshold(long usedMemory, long maxMemory);
@@ -140,7 +143,7 @@ public class HeapMemoryMonitor {
     }
     MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
     NotificationEmitter emitter = (NotificationEmitter) mxBean;
-    emitter.addNotificationListener((n, hb) -> {
+    notificationListener = (n, hb) -> {
       if (n.getType().equals(
         MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) {
         long maxMemory = tenuredGenPool.getUsage().getMax();
@@ -149,6 +152,19 @@ public class HeapMemoryMonitor {
           listener.memoryUsageAboveThreshold(usedMemory, maxMemory);
         }
       }
-    }, null, null);
+    };
+    emitter.addNotificationListener(notificationListener, null, null);
   }
-}
\ No newline at end of file
+
+  public void close() {
+    if(notificationListener != null) {
+      MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean();
+      NotificationEmitter emitter = (NotificationEmitter) mxBean;
+      try {
+        emitter.removeNotificationListener(notificationListener);
+      } catch(ListenerNotFoundException e) {
+        LOG.warn("Failed to remove HeapMemoryMonitor notification listener 
from MemoryMXBean", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java 
b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
index 9e90d36..0408599 100644
--- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
+++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java
@@ -355,6 +355,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
 
   @Override
   public void close() throws StreamingIOFailure {
+    heapMemoryMonitor.close();
     boolean haveError = false;
     String partition = null;
     if (LOG.isDebugEnabled()) {

Reply via email to