MAPREDUCE-7150. Optimize collections used by MR JHS to reduce its memory. (Contributed by Misha Dmitriev)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/babd1449 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/babd1449 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/babd1449 Branch: refs/heads/HDDS-4 Commit: babd1449bf8898f44c434c852e67240721c0eb00 Parents: c2288ac Author: Haibo Chen <haiboc...@apache.org> Authored: Tue Oct 16 13:44:41 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Tue Oct 16 13:44:41 2018 -0700 ---------------------------------------------------------------------- .../counters/FileSystemCounterGroup.java | 56 ++++++++++++-------- .../mapreduce/jobhistory/JobHistoryParser.java | 2 +- .../hadoop/mapreduce/v2/hs/CompletedTask.java | 5 +- .../mapreduce/v2/hs/CompletedTaskAttempt.java | 2 +- 4 files changed, 38 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/babd1449/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java index 046368e..ed7f271 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java @@ -61,8 +61,9 @@ public abstract class FileSystemCounterGroup<C extends Counter> // C[] would need Array.newInstance which requires a Class<C> reference. // Just a few local casts probably worth not having to carry it around. - private final Map<String, Object[]> map = - new ConcurrentSkipListMap<String, Object[]>(); + // Initialized lazily, since in some situations millions of empty maps can + // waste a substantial (e.g. 4% as we observed) portion of the heap + private Map<String, Object[]> map; private String displayName; private static final Joiner NAME_JOINER = Joiner.on('_'); @@ -214,6 +215,9 @@ public abstract class FileSystemCounterGroup<C extends Counter> @SuppressWarnings("unchecked") public synchronized C findCounter(String scheme, FileSystemCounter key) { final String canonicalScheme = checkScheme(scheme); + if (map == null) { + map = new ConcurrentSkipListMap<>(); + } Object[] counters = map.get(canonicalScheme); int ord = key.ordinal(); if (counters == null) { @@ -247,10 +251,12 @@ public abstract class FileSystemCounterGroup<C extends Counter> protected abstract C newCounter(String scheme, FileSystemCounter key); @Override - public int size() { + public synchronized int size() { int n = 0; - for (Object[] counters : map.values()) { - n += numSetCounters(counters); + if (map != null) { + for (Object[] counters : map.values()) { + n += numSetCounters(counters); + } } return n; } @@ -271,19 +277,23 @@ public abstract class FileSystemCounterGroup<C extends Counter> * FileSystemGroup ::= #scheme (scheme #counter (key value)*)* */ @Override - public void write(DataOutput out) throws IOException { - WritableUtils.writeVInt(out, map.size()); // #scheme - for (Map.Entry<String, Object[]> entry : map.entrySet()) { - WritableUtils.writeString(out, entry.getKey()); // scheme - // #counter for the above scheme - WritableUtils.writeVInt(out, numSetCounters(entry.getValue())); - for (Object counter : entry.getValue()) { - if (counter == null) continue; - @SuppressWarnings("unchecked") - FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter(); - WritableUtils.writeVInt(out, c.key.ordinal()); // key - WritableUtils.writeVLong(out, c.getValue()); // value + public synchronized void write(DataOutput out) throws IOException { + if (map != null) { + WritableUtils.writeVInt(out, map.size()); // #scheme + for (Map.Entry<String, Object[]> entry : map.entrySet()) { + WritableUtils.writeString(out, entry.getKey()); // scheme + // #counter for the above scheme + WritableUtils.writeVInt(out, numSetCounters(entry.getValue())); + for (Object counter : entry.getValue()) { + if (counter == null) continue; + @SuppressWarnings("unchecked") + FSCounter c = (FSCounter) ((Counter) counter).getUnderlyingCounter(); + WritableUtils.writeVInt(out, c.key.ordinal()); // key + WritableUtils.writeVLong(out, c.getValue()); // value + } } + } else { + WritableUtils.writeVInt(out, 0); } } @@ -310,8 +320,8 @@ public abstract class FileSystemCounterGroup<C extends Counter> @Override public Iterator<C> iterator() { return new AbstractIterator<C>() { - Iterator<Object[]> it = map.values().iterator(); - Object[] counters = it.hasNext() ? it.next() : null; + Iterator<Object[]> it = map != null ? map.values().iterator() : null; + Object[] counters = (it != null && it.hasNext()) ? it.next() : null; int i = 0; @Override protected C computeNext() { @@ -322,7 +332,7 @@ public abstract class FileSystemCounterGroup<C extends Counter> if (counter != null) return counter; } i = 0; - counters = it.hasNext() ? it.next() : null; + counters = (it != null && it.hasNext()) ? it.next() : null; } return endOfData(); } @@ -343,8 +353,10 @@ public abstract class FileSystemCounterGroup<C extends Counter> public synchronized int hashCode() { // need to be deep as counters is an array int hash = FileSystemCounter.class.hashCode(); - for (Object[] counters : map.values()) { - if (counters != null) hash ^= Arrays.hashCode(counters); + if (map != null) { + for (Object[] counters : map.values()) { + if (counters != null) hash ^= Arrays.hashCode(counters); + } } return hash; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/babd1449/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index 6efb4f7..57c58ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -600,7 +600,7 @@ public class JobHistoryParser implements HistoryEventHandler { public TaskInfo() { startTime = finishTime = -1; error = splitLocations = ""; - attemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>(); + attemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>(2); } public void printAll() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/babd1449/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java index 81fddaf..63b3600 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTask.java @@ -20,7 +20,6 @@ package org.apache.hadoop.mapreduce.v2.hs; import java.util.ArrayList; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,11 +47,11 @@ public class CompletedTask implements Task { private final TaskInfo taskInfo; private TaskReport report; private TaskAttemptId successfulAttempt; - private List<String> reportDiagnostics = new LinkedList<String>(); + private List<String> reportDiagnostics = new ArrayList<String>(2); private Lock taskAttemptsLock = new ReentrantLock(); private AtomicBoolean taskAttemptsLoaded = new AtomicBoolean(false); private final Map<TaskAttemptId, TaskAttempt> attempts = - new LinkedHashMap<TaskAttemptId, TaskAttempt>(); + new LinkedHashMap<TaskAttemptId, TaskAttempt>(2); CompletedTask(TaskId taskId, TaskInfo taskInfo) { //TODO JobHistoryParser.handleTaskFailedAttempt should use state from the event. http://git-wip-us.apache.org/repos/asf/hadoop/blob/babd1449/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java index c87d82b..17ec017 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java @@ -39,7 +39,7 @@ public class CompletedTaskAttempt implements TaskAttempt { private final TaskAttemptInfo attemptInfo; private final TaskAttemptId attemptId; private final TaskAttemptState state; - private final List<String> diagnostics = new ArrayList<String>(); + private final List<String> diagnostics = new ArrayList<String>(2); private TaskAttemptReport report; private String localDiagMessage; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org