Repository: hive
Updated Branches:
  refs/heads/master aeb877220 -> 47c714167


HIVE-16079: HS2: high memory pressure due to duplicate Properties objects 
(Misha Dmitriev, reviewed by Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/47c71416
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/47c71416
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/47c71416

Branch: refs/heads/master
Commit: 47c714167ab16316954a55c0351bb34e7441f104
Parents: aeb8772
Author: Misha Dmitriev <mi...@cloudera.com>
Authored: Thu Apr 27 13:28:51 2017 -0500
Committer: Sergio Pena <sergio.p...@cloudera.com>
Committed: Thu Apr 27 13:28:51 2017 -0500

----------------------------------------------------------------------
 .../hive/common/CopyOnFirstWriteProperties.java | 344 +++++++++++++++++++
 .../hive/ql/exec/SerializationUtilities.java    |  30 ++
 .../hadoop/hive/ql/plan/PartitionDesc.java      |  28 +-
 3 files changed, 384 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/47c71416/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java
----------------------------------------------------------------------
diff --git 
a/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java 
b/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java
new file mode 100644
index 0000000..d4d078b
--- /dev/null
+++ 
b/common/src/java/org/apache/hadoop/hive/common/CopyOnFirstWriteProperties.java
@@ -0,0 +1,344 @@
+package org.apache.hadoop.hive.common;
+
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * A special subclass of Properties, designed to save memory when many 
identical
+ * copies of Properties would otherwise be created. To achieve that, we use the
+ * 'interned' field, which points to the same Properties object for all 
instances
+ * of CopyOnFirstWriteProperties that were created with identical contents.
+ * However, as soon as any mutating method is called, contents are copied from
+ * the 'interned' properties into this instance.
+ */
+public class CopyOnFirstWriteProperties extends Properties {
+
+  private Properties interned;
+
+  private static Interner<Properties> INTERNER = Interners.newWeakInterner();
+  private static Field defaultsField;
+  static {
+    try {
+      defaultsField = Properties.class.getDeclaredField("defaults");
+      defaultsField.setAccessible(true);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public CopyOnFirstWriteProperties(Properties p) {
+    setInterned(p);
+  }
+
+  /*************   Public API of java.util.Properties   ************/
+
+  @Override
+  public String getProperty(String key) {
+    if (interned != null) return interned.getProperty(key);
+    else return super.getProperty(key);
+  }
+
+  @Override
+  public String getProperty(String key, String defaultValue) {
+    if (interned != null) return interned.getProperty(key, defaultValue);
+    else return super.getProperty(key, defaultValue);
+  }
+
+  @Override
+  public void list(PrintStream out) {
+    if (interned != null) interned.list(out);
+    else super.list(out);
+  }
+
+  @Override
+  public void list(PrintWriter out) {
+    if (interned != null) interned.list(out);
+    else super.list(out);
+  }
+
+  @Override
+  public synchronized void load(InputStream inStream) throws IOException {
+    if (interned != null) copyFromInternedToThis();
+    super.load(inStream);
+  }
+
+  @Override
+  public synchronized void load(Reader reader) throws IOException {
+    if (interned != null) copyFromInternedToThis();
+    super.load(reader);
+  }
+
+  @Override
+  public synchronized void loadFromXML(InputStream inStream) throws 
IOException {
+    if (interned != null) copyFromInternedToThis();
+    super.loadFromXML(inStream);
+  }
+
+  @Override
+  public Enumeration<?> propertyNames() {
+    if (interned != null) return interned.propertyNames();
+    else return super.propertyNames();
+  }
+
+  @Override
+  public synchronized Object setProperty(String key, String value) {
+    if (interned != null) copyFromInternedToThis();
+    return super.setProperty(key, value);
+  }
+
+  @Override
+  public void store(OutputStream out, String comments) throws IOException {
+    if (interned != null) interned.store(out, comments);
+    else super.store(out, comments);
+  }
+
+  @Override
+  public void storeToXML(OutputStream os, String comment) throws IOException {
+    if (interned != null) interned.storeToXML(os, comment);
+    else super.storeToXML(os, comment);
+  }
+
+  @Override
+  public void storeToXML(OutputStream os, String comment, String encoding)
+      throws IOException {
+    if (interned != null) interned.storeToXML(os, comment, encoding);
+    else super.storeToXML(os, comment, encoding);
+  }
+
+  @Override
+  public Set<String> stringPropertyNames() {
+    if (interned != null) return interned.stringPropertyNames();
+    else return super.stringPropertyNames();
+  }
+
+  /*************   Public API of java.util.Hashtable   ************/
+
+  @Override
+  public synchronized void clear() {
+    if (interned != null) copyFromInternedToThis();
+    super.clear();
+  }
+
+  @Override
+  public synchronized Object clone() {
+    if (interned != null) return new CopyOnFirstWriteProperties(interned);
+    else return super.clone();
+  }
+
+  @Override
+  public synchronized Object compute(Object key, BiFunction remappingFunction) 
{
+    if (interned != null) copyFromInternedToThis();  // We do this because if 
function returns null,
+                                       // the mapping for key is removed, i.e. 
the table is mutated.
+    return super.compute(key, remappingFunction);
+  }
+
+  @Override
+  public synchronized Object computeIfAbsent(Object key, Function 
mappingFunction) {
+    if (interned != null) copyFromInternedToThis();
+    return super.computeIfAbsent(key, mappingFunction);
+  }
+
+  @Override
+  public synchronized Object computeIfPresent(Object key, BiFunction 
remappingFunction) {
+    if (interned != null) copyFromInternedToThis();
+    return super.computeIfPresent(key, remappingFunction);
+  }
+
+  @Override
+  public synchronized boolean contains(Object value) {
+    if (interned != null) return interned.contains(value);
+    else return super.contains(value);
+  }
+
+  @Override
+  public synchronized boolean containsKey(Object key) {
+    if (interned != null) return interned.containsKey(key);
+    else return super.containsKey(key);
+  }
+
+  @Override
+  public synchronized boolean containsValue(Object value) {
+    if (interned != null) return interned.containsValue(value);
+    else return super.containsValue(value);
+  }
+
+  @Override
+  public synchronized Enumeration<Object> elements() {
+    if (interned != null) return interned.elements();
+    else return super.elements();
+  }
+
+  @Override
+  public Set<Map.Entry<Object, Object>> entrySet() {
+    if (interned != null) return interned.entrySet();
+    else return super.entrySet();
+  }
+
+  @Override
+  public synchronized boolean equals(Object o) {
+    if (interned != null) return interned.equals(o);
+    else return super.equals(o);
+  }
+
+  @Override
+  public synchronized void forEach(BiConsumer action) {
+    if (interned != null) interned.forEach(action);
+    else super.forEach(action);
+  }
+
+  @Override
+  public synchronized Object get(Object key) {
+    if (interned != null) return interned.get(key);
+    else return super.get(key);
+  }
+
+  @Override
+  public synchronized Object getOrDefault(Object key, Object defaultValue) {
+    if (interned != null) return interned.getOrDefault(key, defaultValue);
+    else return super.getOrDefault(key, defaultValue);
+  }
+
+  @Override
+  public synchronized int hashCode() {
+    if (interned != null) return interned.hashCode();
+    else return super.hashCode();
+  }
+
+  @Override
+  public synchronized boolean isEmpty() {
+    if (interned != null) return interned.isEmpty();
+    else return super.isEmpty();
+  }
+
+  @Override
+  public synchronized Enumeration<Object> keys() {
+    if (interned != null) return interned.keys();
+    else return super.keys();
+  }
+
+  @Override
+  public Set<Object> keySet() {
+    if (interned != null) return interned.keySet();
+    else return super.keySet();
+  }
+
+  @Override
+  public synchronized Object merge(Object key, Object value, BiFunction 
remappingFunction) {
+    if (interned != null) copyFromInternedToThis();
+    return super.merge(key, value, remappingFunction);
+  }
+
+  @Override
+  public synchronized Object put(Object key, Object value) {
+    if (interned != null) copyFromInternedToThis();
+    return super.put(key, value);
+  }
+
+  @Override
+  public synchronized void putAll(Map<? extends Object, ? extends Object> t) {
+    if (interned != null) copyFromInternedToThis();
+    super.putAll(t);
+  }
+
+  @Override
+  public synchronized Object putIfAbsent(Object key, Object value) {
+    if (interned != null) copyFromInternedToThis();
+    return super.putIfAbsent(key, value);
+  }
+
+  @Override
+  public synchronized Object remove(Object key) {
+    if (interned != null) copyFromInternedToThis();
+    return super.remove(key);
+  }
+
+  @Override
+  public synchronized boolean remove(Object key, Object value) {
+    if (interned != null) copyFromInternedToThis();
+    return super.remove(key, value);
+  }
+
+  @Override
+  public synchronized Object replace(Object key, Object value) {
+    if (interned != null) copyFromInternedToThis();
+    return super.replace(key, value);
+  }
+
+  @Override
+  public synchronized boolean replace(Object key, Object oldValue, Object 
newValue) {
+    if (interned != null) copyFromInternedToThis();
+    return super.replace(key, oldValue, newValue);
+  }
+
+  @Override
+  public synchronized void replaceAll(BiFunction function) {
+    if (interned != null) copyFromInternedToThis();
+    super.replaceAll(function);
+  }
+
+  @Override
+  public synchronized int size() {
+    if (interned != null) return interned.size();
+    else return super.size();
+  }
+
+  @Override
+  public synchronized String toString() {
+    if (interned != null) return interned.toString();
+    else return super.toString();
+  }
+
+  @Override
+  public Collection<Object> values() {
+    if (interned != null) return interned.values();
+    else return super.values();
+  }
+
+  /*************   Private implementation ************/
+
+  private void copyFromInternedToThis() {
+    for (Map.Entry<?,?> e : interned.entrySet()) {
+      super.put(e.getKey(), e.getValue());
+    }
+    try {
+      // Unfortunately, we cannot directly read a protected field of non-this 
object
+      this.defaults = (Properties) defaultsField.get(interned);
+    } catch (IllegalAccessException e) {   // Shouldn't happen
+      throw new RuntimeException(e);
+    }
+    setInterned(null);
+  }
+
+  public void setInterned(Properties p) {
+    if (p != null) {
+      this.interned = INTERNER.intern(p);
+    } else {
+      this.interned = p;
+    }
+  }
+
+  // These methods are required by serialization
+
+  public CopyOnFirstWriteProperties() {
+  }
+
+  public Properties getInterned() {
+    return interned;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/47c71416/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 247d589..01a652d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -33,10 +33,12 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -223,6 +225,7 @@ public class SerializationUtilities {
       kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
       kryo.register(Path.class, new PathSerializer());
       kryo.register(Arrays.asList("").getClass(), new 
ArraysAsListSerializer());
+      kryo.register(CopyOnFirstWriteProperties.class, new 
CopyOnFirstWritePropertiesSerializer());
 
       ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
           .setFallbackInstantiatorStrategy(
@@ -422,6 +425,33 @@ public class SerializationUtilities {
   }
 
   /**
+   * CopyOnFirstWriteProperties needs a special serializer, since it extends 
Properties,
+   * which implements Map, so MapSerializer would be used for it by default. 
Yet it has
+   * the additional 'interned' field that the standard MapSerializer doesn't 
handle
+   * properly. But FieldSerializer doesn't work for it as well, because the 
Hashtable
+   * superclass declares most of its fields transient.
+   */
+  private static class CopyOnFirstWritePropertiesSerializer extends
+      com.esotericsoftware.kryo.serializers.MapSerializer {
+
+    @Override
+    public void write(Kryo kryo, Output output, Map map) {
+      super.write(kryo, output, map);
+      CopyOnFirstWriteProperties p = (CopyOnFirstWriteProperties) map;
+      Properties ip = p.getInterned();
+      kryo.writeObjectOrNull(output, ip, Properties.class);
+    }
+
+    @Override
+    public Map read(Kryo kryo, Input input, Class<Map> type) {
+      Map map = super.read(kryo, input, type);
+      Properties ip = kryo.readObjectOrNull(input, Properties.class);
+      ((CopyOnFirstWriteProperties) map).setInterned(ip);
+      return map;
+    }
+  }
+
+  /**
    * Serializes the plan.
    *
    * @param plan The plan, such as QueryPlan, MapredWork, etc.

http://git-wip-us.apache.org/repos/asf/hive/blob/47c71416/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
index d05c1c6..68dcd0d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
@@ -29,6 +29,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
 import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -55,13 +56,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
 @Explain(displayName = "Partition", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
 public class PartitionDesc implements Serializable, Cloneable {
 
-  static {
-    STRING_INTERNER = Interners.newWeakInterner();
-    CLASS_INTERNER = Interners.newWeakInterner();
-  }
-
-  private static final Interner<String> STRING_INTERNER;
-  private static final Interner<Class<?>> CLASS_INTERNER;
+  private static final Interner<Class<?>> CLASS_INTERNER = 
Interners.newWeakInterner();
 
   private TableDesc tableDesc;
   private LinkedHashMap<String, String> partSpec;
@@ -220,8 +215,12 @@ public class PartitionDesc implements Serializable, 
Cloneable {
   }
 
   public void setProperties(final Properties properties) {
-    internProperties(properties);
-    this.properties = properties;
+    if (properties instanceof CopyOnFirstWriteProperties) {
+      this.properties = properties;
+    } else {
+      internProperties(properties);
+      this.properties = new CopyOnFirstWriteProperties(properties);
+    }
   }
 
   private static TableDesc getTableDesc(Table table) {
@@ -235,8 +234,7 @@ public class PartitionDesc implements Serializable, 
Cloneable {
       String key = (String) keys.nextElement();
       String oldValue = properties.getProperty(key);
       if (oldValue != null) {
-        String value = STRING_INTERNER.intern(oldValue);
-        properties.setProperty(key, value);
+        properties.setProperty(key, oldValue.intern());
       }
     }
   }
@@ -280,13 +278,7 @@ public class PartitionDesc implements Serializable, 
Cloneable {
     ret.inputFileFormatClass = inputFileFormatClass;
     ret.outputFileFormatClass = outputFileFormatClass;
     if (properties != null) {
-      Properties newProp = new Properties();
-      Enumeration<Object> keysProp = properties.keys();
-      while (keysProp.hasMoreElements()) {
-        Object key = keysProp.nextElement();
-        newProp.put(key, properties.get(key));
-      }
-      ret.setProperties(newProp);
+      ret.setProperties((Properties) properties.clone());
     }
     ret.tableDesc = (TableDesc) tableDesc.clone();
     // The partition spec is not present

Reply via email to