Repository: hive Updated Branches: refs/heads/master aeb877220 -> 47c714167
HIVE-16079: HS2: high memory pressure due to duplicate Properties objects (Misha Dmitriev, reviewed by Sergio Pena) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/47c71416 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/47c71416 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/47c71416 Branch: refs/heads/master Commit: 47c714167ab16316954a55c0351bb34e7441f104 Parents: aeb8772 Author: Misha Dmitriev <mi...@cloudera.com> Authored: Thu Apr 27 13:28:51 2017 -0500 Committer: Sergio Pena <sergio.p...@cloudera.com> Committed: Thu Apr 27 13:28:51 2017 -0500 ---------------------------------------------------------------------- .../hive/common/CopyOnFirstWriteProperties.java | 344 +++++++++++++++++++ .../hive/ql/exec/SerializationUtilities.java | 30 ++ .../hadoop/hive/ql/plan/PartitionDesc.java | 28 +- 3 files changed, 384 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/47c71416/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java b/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java new file mode 100644 index 0000000..d4d078b --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java @@ -0,0 +1,344 @@ +package org.apache.hadoop.hive.common; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.Reader; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Enumeration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * A special subclass of Properties, designed to save memory when many identical + * copies of Properties would otherwise be created. To achieve that, we use the + * 'interned' field, which points to the same Properties object for all instances + * of CopyOnFirstWriteProperties that were created with identical contents. + * However, as soon as any mutating method is called, contents are copied from + * the 'interned' properties into this instance. + */ +public class CopyOnFirstWriteProperties extends Properties { + + private Properties interned; + + private static Interner<Properties> INTERNER = Interners.newWeakInterner(); + private static Field defaultsField; + static { + try { + defaultsField = Properties.class.getDeclaredField("defaults"); + defaultsField.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public CopyOnFirstWriteProperties(Properties p) { + setInterned(p); + } + + /************* Public API of java.util.Properties ************/ + + @Override + public String getProperty(String key) { + if (interned != null) return interned.getProperty(key); + else return super.getProperty(key); + } + + @Override + public String getProperty(String key, String defaultValue) { + if (interned != null) return interned.getProperty(key, defaultValue); + else return super.getProperty(key, defaultValue); + } + + @Override + public void list(PrintStream out) { + if (interned != null) interned.list(out); + else super.list(out); + } + + @Override + public void list(PrintWriter out) { + if (interned != null) interned.list(out); + else super.list(out); + } + + @Override + public synchronized void load(InputStream inStream) throws IOException { + if (interned != null) copyFromInternedToThis(); + super.load(inStream); + } + + @Override + public synchronized void load(Reader reader) throws IOException { + if (interned != null) copyFromInternedToThis(); + super.load(reader); + } + + @Override + public synchronized void loadFromXML(InputStream inStream) throws IOException { + if (interned != null) copyFromInternedToThis(); + super.loadFromXML(inStream); + } + + @Override + public Enumeration<?> propertyNames() { + if (interned != null) return interned.propertyNames(); + else return super.propertyNames(); + } + + @Override + public synchronized Object setProperty(String key, String value) { + if (interned != null) copyFromInternedToThis(); + return super.setProperty(key, value); + } + + @Override + public void store(OutputStream out, String comments) throws IOException { + if (interned != null) interned.store(out, comments); + else super.store(out, comments); + } + + @Override + public void storeToXML(OutputStream os, String comment) throws IOException { + if (interned != null) interned.storeToXML(os, comment); + else super.storeToXML(os, comment); + } + + @Override + public void storeToXML(OutputStream os, String comment, String encoding) + throws IOException { + if (interned != null) interned.storeToXML(os, comment, encoding); + else super.storeToXML(os, comment, encoding); + } + + @Override + public Set<String> stringPropertyNames() { + if (interned != null) return interned.stringPropertyNames(); + else return super.stringPropertyNames(); + } + + /************* Public API of java.util.Hashtable ************/ + + @Override + public synchronized void clear() { + if (interned != null) copyFromInternedToThis(); + super.clear(); + } + + @Override + public synchronized Object clone() { + if (interned != null) return new CopyOnFirstWriteProperties(interned); + else return super.clone(); + } + + @Override + public synchronized Object compute(Object key, BiFunction remappingFunction) { + if (interned != null) copyFromInternedToThis(); // We do this because if function returns null, + // the mapping for key is removed, i.e. the table is mutated. + return super.compute(key, remappingFunction); + } + + @Override + public synchronized Object computeIfAbsent(Object key, Function mappingFunction) { + if (interned != null) copyFromInternedToThis(); + return super.computeIfAbsent(key, mappingFunction); + } + + @Override + public synchronized Object computeIfPresent(Object key, BiFunction remappingFunction) { + if (interned != null) copyFromInternedToThis(); + return super.computeIfPresent(key, remappingFunction); + } + + @Override + public synchronized boolean contains(Object value) { + if (interned != null) return interned.contains(value); + else return super.contains(value); + } + + @Override + public synchronized boolean containsKey(Object key) { + if (interned != null) return interned.containsKey(key); + else return super.containsKey(key); + } + + @Override + public synchronized boolean containsValue(Object value) { + if (interned != null) return interned.containsValue(value); + else return super.containsValue(value); + } + + @Override + public synchronized Enumeration<Object> elements() { + if (interned != null) return interned.elements(); + else return super.elements(); + } + + @Override + public Set<Map.Entry<Object, Object>> entrySet() { + if (interned != null) return interned.entrySet(); + else return super.entrySet(); + } + + @Override + public synchronized boolean equals(Object o) { + if (interned != null) return interned.equals(o); + else return super.equals(o); + } + + @Override + public synchronized void forEach(BiConsumer action) { + if (interned != null) interned.forEach(action); + else super.forEach(action); + } + + @Override + public synchronized Object get(Object key) { + if (interned != null) return interned.get(key); + else return super.get(key); + } + + @Override + public synchronized Object getOrDefault(Object key, Object defaultValue) { + if (interned != null) return interned.getOrDefault(key, defaultValue); + else return super.getOrDefault(key, defaultValue); + } + + @Override + public synchronized int hashCode() { + if (interned != null) return interned.hashCode(); + else return super.hashCode(); + } + + @Override + public synchronized boolean isEmpty() { + if (interned != null) return interned.isEmpty(); + else return super.isEmpty(); + } + + @Override + public synchronized Enumeration<Object> keys() { + if (interned != null) return interned.keys(); + else return super.keys(); + } + + @Override + public Set<Object> keySet() { + if (interned != null) return interned.keySet(); + else return super.keySet(); + } + + @Override + public synchronized Object merge(Object key, Object value, BiFunction remappingFunction) { + if (interned != null) copyFromInternedToThis(); + return super.merge(key, value, remappingFunction); + } + + @Override + public synchronized Object put(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.put(key, value); + } + + @Override + public synchronized void putAll(Map<? extends Object, ? extends Object> t) { + if (interned != null) copyFromInternedToThis(); + super.putAll(t); + } + + @Override + public synchronized Object putIfAbsent(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.putIfAbsent(key, value); + } + + @Override + public synchronized Object remove(Object key) { + if (interned != null) copyFromInternedToThis(); + return super.remove(key); + } + + @Override + public synchronized boolean remove(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.remove(key, value); + } + + @Override + public synchronized Object replace(Object key, Object value) { + if (interned != null) copyFromInternedToThis(); + return super.replace(key, value); + } + + @Override + public synchronized boolean replace(Object key, Object oldValue, Object newValue) { + if (interned != null) copyFromInternedToThis(); + return super.replace(key, oldValue, newValue); + } + + @Override + public synchronized void replaceAll(BiFunction function) { + if (interned != null) copyFromInternedToThis(); + super.replaceAll(function); + } + + @Override + public synchronized int size() { + if (interned != null) return interned.size(); + else return super.size(); + } + + @Override + public synchronized String toString() { + if (interned != null) return interned.toString(); + else return super.toString(); + } + + @Override + public Collection<Object> values() { + if (interned != null) return interned.values(); + else return super.values(); + } + + /************* Private implementation ************/ + + private void copyFromInternedToThis() { + for (Map.Entry<?,?> e : interned.entrySet()) { + super.put(e.getKey(), e.getValue()); + } + try { + // Unfortunately, we cannot directly read a protected field of non-this object + this.defaults = (Properties) defaultsField.get(interned); + } catch (IllegalAccessException e) { // Shouldn't happen + throw new RuntimeException(e); + } + setInterned(null); + } + + public void setInterned(Properties p) { + if (p != null) { + this.interned = INTERNER.intern(p); + } else { + this.interned = p; + } + } + + // These methods are required by serialization + + public CopyOnFirstWriteProperties() { + } + + public Properties getInterned() { + return interned; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/47c71416/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 247d589..01a652d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -33,10 +33,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -223,6 +225,7 @@ public class SerializationUtilities { kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); + kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer()); ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy( @@ -422,6 +425,33 @@ public class SerializationUtilities { } /** + * CopyOnFirstWriteProperties needs a special serializer, since it extends Properties, + * which implements Map, so MapSerializer would be used for it by default. Yet it has + * the additional 'interned' field that the standard MapSerializer doesn't handle + * properly. But FieldSerializer doesn't work for it as well, because the Hashtable + * superclass declares most of its fields transient. + */ + private static class CopyOnFirstWritePropertiesSerializer extends + com.esotericsoftware.kryo.serializers.MapSerializer { + + @Override + public void write(Kryo kryo, Output output, Map map) { + super.write(kryo, output, map); + CopyOnFirstWriteProperties p = (CopyOnFirstWriteProperties) map; + Properties ip = p.getInterned(); + kryo.writeObjectOrNull(output, ip, Properties.class); + } + + @Override + public Map read(Kryo kryo, Input input, Class<Map> type) { + Map map = super.read(kryo, input, type); + Properties ip = kryo.readObjectOrNull(input, Properties.class); + ((CopyOnFirstWriteProperties) map).setInterned(ip); + return map; + } + } + + /** * Serializes the plan. * * @param plan The plan, such as QueryPlan, MapredWork, etc. http://git-wip-us.apache.org/repos/asf/hive/blob/47c71416/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index d05c1c6..68dcd0d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -55,13 +56,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; @Explain(displayName = "Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class PartitionDesc implements Serializable, Cloneable { - static { - STRING_INTERNER = Interners.newWeakInterner(); - CLASS_INTERNER = Interners.newWeakInterner(); - } - - private static final Interner<String> STRING_INTERNER; - private static final Interner<Class<?>> CLASS_INTERNER; + private static final Interner<Class<?>> CLASS_INTERNER = Interners.newWeakInterner(); private TableDesc tableDesc; private LinkedHashMap<String, String> partSpec; @@ -220,8 +215,12 @@ public class PartitionDesc implements Serializable, Cloneable { } public void setProperties(final Properties properties) { - internProperties(properties); - this.properties = properties; + if (properties instanceof CopyOnFirstWriteProperties) { + this.properties = properties; + } else { + internProperties(properties); + this.properties = new CopyOnFirstWriteProperties(properties); + } } private static TableDesc getTableDesc(Table table) { @@ -235,8 +234,7 @@ public class PartitionDesc implements Serializable, Cloneable { String key = (String) keys.nextElement(); String oldValue = properties.getProperty(key); if (oldValue != null) { - String value = STRING_INTERNER.intern(oldValue); - properties.setProperty(key, value); + properties.setProperty(key, oldValue.intern()); } } } @@ -280,13 +278,7 @@ public class PartitionDesc implements Serializable, Cloneable { ret.inputFileFormatClass = inputFileFormatClass; ret.outputFileFormatClass = outputFileFormatClass; if (properties != null) { - Properties newProp = new Properties(); - Enumeration<Object> keysProp = properties.keys(); - while (keysProp.hasMoreElements()) { - Object key = keysProp.nextElement(); - newProp.put(key, properties.get(key)); - } - ret.setProperties(newProp); + ret.setProperties((Properties) properties.clone()); } ret.tableDesc = (TableDesc) tableDesc.clone(); // The partition spec is not present