Repository: hive Updated Branches: refs/heads/branch-3 db8e9b0ef -> 0a1bc3583
HIVE-20981: streaming/AbstractRecordWriter leaks HeapMemoryMonitor (Eric Wohlstadter, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0a1bc358 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0a1bc358 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0a1bc358 Branch: refs/heads/branch-3 Commit: 0a1bc358399f9b14999f27bfcb965318fe5ece11 Parents: db8e9b0 Author: Eric Wohlstadter <wohls...@gmail.com> Authored: Thu Nov 29 12:35:01 2018 -0800 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Dec 10 01:28:46 2018 -0800 ---------------------------------------------------------------------- .../hadoop/hive/common/HeapMemoryMonitor.java | 22 +++++++++++++++++--- .../hive/streaming/AbstractRecordWriter.java | 1 + 2 files changed, 20 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java index 42286be..56ec2fd 100644 --- a/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/HeapMemoryMonitor.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.List; import javax.management.NotificationEmitter; +import javax.management.NotificationListener; +import javax.management.ListenerNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +46,7 @@ public class HeapMemoryMonitor { private final double threshold; private List<Listener> listeners = new ArrayList<>(); + private NotificationListener notificationListener; public interface Listener { void memoryUsageAboveThreshold(long usedMemory, long maxMemory); @@ -140,7 +143,7 @@ public class HeapMemoryMonitor { } MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); NotificationEmitter emitter = (NotificationEmitter) mxBean; - emitter.addNotificationListener((n, hb) -> { + notificationListener = (n, hb) -> { if (n.getType().equals( MemoryNotificationInfo.MEMORY_COLLECTION_THRESHOLD_EXCEEDED)) { long maxMemory = tenuredGenPool.getUsage().getMax(); @@ -149,6 +152,19 @@ public class HeapMemoryMonitor { listener.memoryUsageAboveThreshold(usedMemory, maxMemory); } } - }, null, null); + }; + emitter.addNotificationListener(notificationListener, null, null); } -} \ No newline at end of file + + public void close() { + if(notificationListener != null) { + MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + NotificationEmitter emitter = (NotificationEmitter) mxBean; + try { + emitter.removeNotificationListener(notificationListener); + } catch(ListenerNotFoundException e) { + LOG.warn("Failed to remove HeapMemoryMonitor notification listener from MemoryMXBean", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0a1bc358/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 9e90d36..0408599 100644 --- a/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ b/streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -355,6 +355,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { @Override public void close() throws StreamingIOFailure { + heapMemoryMonitor.close(); boolean haveError = false; String partition = null; if (LOG.isDebugEnabled()) {