Repository: hive
Updated Branches:
  refs/heads/branch-3 649c1c55b -> db8e9b0ef


HIVE-20740 : Remove global lock in ObjectStore.setConf method (Vihang 
Karajgaonkar, reviewed by Andrew Sherman and Naveen Gangam)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/db8e9b0e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/db8e9b0e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/db8e9b0e

Branch: refs/heads/branch-3
Commit: db8e9b0efd058c0a0bd18334f44608c93a84077e
Parents: 649c1c5
Author: Vihang Karajgaonkar <vihan...@apache.org>
Authored: Thu Nov 29 15:07:42 2018 -0800
Committer: Vihang Karajgaonkar <vihan...@apache.org>
Committed: Thu Nov 29 15:13:23 2018 -0800

----------------------------------------------------------------------
 .../hive/ql/parse/TestReplicationScenarios.java |   4 +-
 .../apache/hive/jdbc/TestJdbcWithMiniHS2.java   |   3 +-
 .../hadoop/hive/ql/session/SessionState.java    |   3 +-
 .../hadoop/hive/metastore/ObjectStore.java      | 447 ++-------------
 .../metastore/PersistenceManagerProvider.java   | 537 +++++++++++++++++++
 .../hadoop/hive/metastore/TestObjectStore.java  |  56 +-
 6 files changed, 640 insertions(+), 410 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 4b6bc77..b3a19cb 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
 import 
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
-import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.PersistenceManagerProvider;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -179,7 +179,7 @@ public class TestReplicationScenarios {
     driverMirror = DriverFactory.newDriver(hconfMirror);
     metaStoreClientMirror = new HiveMetaStoreClient(hconfMirror);
 
-    ObjectStore.setTwoMetastoreTesting(true);
+    PersistenceManagerProvider.setTwoMetastoreTesting(true);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index 099b67a..4d048ef 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.PersistenceManagerProvider;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.UDF;
@@ -1090,7 +1091,7 @@ public class TestJdbcWithMiniHS2 {
     NucleusContext nc = null;
     Map<String, ClassLoaderResolver> cMap;
     try {
-      pmf = ObjectStore.class.getDeclaredField("pmf");
+      pmf = PersistenceManagerProvider.class.getDeclaredField("pmf");
       if (pmf != null) {
         pmf.setAccessible(true);
         jdoPmf = (JDOPersistenceManagerFactory) pmf.get(null);

http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 6f39b03..844620f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.PersistenceManagerProvider;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -1810,7 +1811,7 @@ public class SessionState {
         }
         Class<?> clazz = Class.forName(realStoreImpl);
         if (ObjectStore.class.isAssignableFrom(clazz)) {
-          ObjectStore.unCacheDataNucleusClassLoaders();
+          PersistenceManagerProvider.clearOutPmfClassLoaderCache();
         }
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 545b5bd..59c4d22 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -23,7 +23,6 @@ import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -34,7 +33,6 @@ import java.sql.Statement;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -45,29 +43,21 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
-import javax.jdo.JDOCanRetryException;
 import javax.jdo.JDODataStoreException;
 import javax.jdo.JDOException;
-import javax.jdo.JDOHelper;
 import javax.jdo.JDOObjectNotFoundException;
 import javax.jdo.PersistenceManager;
-import javax.jdo.PersistenceManagerFactory;
 import javax.jdo.Query;
 import javax.jdo.Transaction;
-import javax.jdo.datastore.DataStoreCache;
 import javax.jdo.datastore.JDOConnection;
 import javax.jdo.identity.IntIdentity;
-import javax.sql.DataSource;
 
 import com.google.common.base.Strings;
 
@@ -162,8 +152,6 @@ import 
org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
 import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
-import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.model.MCatalog;
@@ -217,15 +205,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.ObjectPair;
 import org.apache.thrift.TException;
-import org.datanucleus.AbstractNucleusContext;
-import org.datanucleus.ClassLoaderResolver;
-import org.datanucleus.ClassLoaderResolverImpl;
-import org.datanucleus.NucleusContext;
-import org.datanucleus.api.jdo.JDOPersistenceManager;
-import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
 import org.datanucleus.store.rdbms.exceptions.MissingTableException;
-import org.datanucleus.store.scostore.Store;
-import org.datanucleus.util.WeakValueMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -246,14 +226,9 @@ import com.google.common.collect.Sets;
  * filestore.
  */
 public class ObjectStore implements RawStore, Configurable {
-  private static Properties prop = null;
-  private static PersistenceManagerFactory pmf = null;
-  private static boolean forTwoMetastoreTesting = false;
 
   private static final DateTimeFormatter YMDHMS_FORMAT = 
DateTimeFormatter.ofPattern(
       "yyyy_MM_dd_HH_mm_ss");
-
-  private static Lock pmfPropLock = new ReentrantLock();
   /**
   * Verify the schema only once per JVM since the db connection info is static
   */
@@ -339,65 +314,43 @@ public class ObjectStore implements RawStore, 
Configurable {
   @Override
   @SuppressWarnings("nls")
   public void setConf(Configuration conf) {
-    // Although an instance of ObjectStore is accessed by one thread, there may
-    // be many threads with ObjectStore instances. So the static variables
-    // pmf and prop need to be protected with locks.
-    pmfPropLock.lock();
-    try {
-      isInitialized = false;
-      this.conf = conf;
-      configureSSL(conf);
-      Properties propsFromConf = getDataSourceProps(conf);
-      boolean propsChanged = !propsFromConf.equals(prop);
-
-      if (propsChanged) {
-        if (pmf != null){
-          clearOutPmfClassLoaderCache(pmf);
-          if (!forTwoMetastoreTesting) {
-            // close the underlying connection pool to avoid leaks
-            pmf.close();
-          }
-        }
-        pmf = null;
-        prop = null;
-      }
-
-      assert(!isActiveTransaction());
-      shutdown();
-      // Always want to re-create pm as we don't know if it were created by the
-      // most recent instance of the pmf
-      pm = null;
-      directSql = null;
-      expressionProxy = null;
-      openTrasactionCalls = 0;
-      currentTransaction = null;
-      transactionStatus = TXN_STATUS.NO_STATE;
-
-      initialize(propsFromConf);
-
-      String partitionValidationRegex =
-          MetastoreConf.getVar(this.conf, 
ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
-      if (partitionValidationRegex != null && 
!partitionValidationRegex.isEmpty()) {
-        partitionValidationPattern = Pattern.compile(partitionValidationRegex);
-      } else {
-        partitionValidationPattern = null;
-      }
+    isInitialized = false;
+    this.conf = conf;
+    configureSSL(conf);
+    PersistenceManagerProvider.updatePmfProperties(conf);
+
+    assert (!isActiveTransaction());
+    shutdown();
+    // Always want to re-create pm as we don't know if it were created by the
+    // most recent instance of the pmf
+    pm = null;
+    directSql = null;
+    expressionProxy = null;
+    openTrasactionCalls = 0;
+    currentTransaction = null;
+    transactionStatus = TXN_STATUS.NO_STATE;
+
+    initialize();
+
+    String partitionValidationRegex =
+        MetastoreConf.getVar(this.conf, 
ConfVars.PARTITION_NAME_WHITELIST_PATTERN);
+    if (partitionValidationRegex != null && 
!partitionValidationRegex.isEmpty()) {
+      partitionValidationPattern = Pattern.compile(partitionValidationRegex);
+    } else {
+      partitionValidationPattern = null;
+    }
 
-      // Note, if metrics have not been initialized this will return null, 
which means we aren't
-      // using metrics.  Thus we should always check whether this is non-null 
before using.
-      MetricRegistry registry = Metrics.getRegistry();
-      if (registry != null) {
-        directSqlErrors = 
Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS);
-      }
+    // Note, if metrics have not been initialized this will return null, which 
means we aren't
+    // using metrics.  Thus we should always check whether this is non-null 
before using.
+    MetricRegistry registry = Metrics.getRegistry();
+    if (registry != null) {
+      directSqlErrors = 
Metrics.getOrCreateCounter(MetricsConstants.DIRECTSQL_ERRORS);
+    }
 
-      if (!isInitialized) {
-        throw new RuntimeException(
-        "Unable to create persistence manager. Check dss.log for details");
-      } else {
-        LOG.info("Initialized ObjectStore");
-      }
-    } finally {
-      pmfPropLock.unlock();
+    if (!isInitialized) {
+      throw new RuntimeException("Unable to create persistence manager. Check 
log for details");
+    } else {
+      LOG.debug("Initialized ObjectStore");
     }
   }
 
@@ -410,81 +363,13 @@ public class ObjectStore implements RawStore, 
Configurable {
   }
 
   @SuppressWarnings("nls")
-  private void initialize(Properties dsProps) {
-    int retryLimit = MetastoreConf.getIntVar(conf, 
ConfVars.HMS_HANDLER_ATTEMPTS);
-    long retryInterval = MetastoreConf.getTimeVar(conf,
-        ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS);
-    int numTries = retryLimit;
-
-    while (numTries > 0){
-      try {
-        initializeHelper(dsProps);
-        return; // If we reach here, we succeed.
-      } catch (Exception e){
-        shutdown();
-        numTries--;
-        boolean retriable = isRetriableException(e);
-        if ((numTries > 0) && retriable){
-          LOG.info("Retriable exception while instantiating ObjectStore, 
retrying. " +
-              "{} tries left", numTries, e);
-          try {
-            Thread.sleep(retryInterval);
-          } catch (InterruptedException ie) {
-            // Restore the interrupted status, since we do not want to catch 
it.
-            LOG.debug("Interrupted while sleeping before retrying.", ie);
-            Thread.currentThread().interrupt();
-          }
-          // If we're here, we'll proceed down the next while loop iteration.
-        } else {
-          // we've reached our limit, throw the last one.
-          if (retriable){
-            LOG.warn("Exception retry limit reached, not retrying any longer.",
-              e);
-          } else {
-            LOG.debug("Non-retriable exception during ObjectStore 
initialize.", e);
-          }
-          throw e;
-        }
-      }
-    }
-  }
-
-  private static final Set<Class<? extends Throwable>> 
retriableExceptionClasses =
-      new HashSet<>(Arrays.asList(JDOCanRetryException.class));
-  /**
-   * Helper function for initialize to determine if we should retry an 
exception.
-   * We return true if the exception is of a known type of retriable 
exceptions, or if one
-   * of its recursive .getCause returns a known type of retriable exception.
-   */
-  private boolean isRetriableException(Throwable e) {
-    if (e == null){
-      return false;
-    }
-    if (retriableExceptionClasses.contains(e.getClass())){
-      return true;
-    }
-    for (Class<? extends Throwable> c : retriableExceptionClasses){
-      if (c.isInstance(e)){
-        return true;
-      }
-    }
-
-    if (e.getCause() == null){
-      return false;
-    }
-    return isRetriableException(e.getCause());
-  }
-
-  /**
-   * private helper to do initialization routine, so we can retry if needed if 
it fails.
-   * @param dsProps
-   */
-  private void initializeHelper(Properties dsProps) {
-    LOG.info("ObjectStore, initialize called");
-    prop = dsProps;
-    pm = getPersistenceManager();
+  private void initialize() {
+    LOG.debug("ObjectStore, initialize called");
+    // if this method fails, PersistenceManagerProvider will retry for the 
configured number of times
+    // before giving up
+    pm = PersistenceManagerProvider.getPersistenceManager();
     LOG.info("RawStore: {}, with PersistenceManager: {}" +
-            " created in the thread with id: {}", this, pm, 
Thread.currentThread().getId());
+        " created in the thread with id: {}", this, pm, 
Thread.currentThread().getId());
     try {
       String productName = MetaStoreDirectSql.getProductName(pm);
       sqlGenerator = new 
SQLGenerator(DatabaseProduct.determineDatabaseProduct(productName), conf);
@@ -497,7 +382,7 @@ public class ObjectStore implements RawStore, Configurable {
       dbType = determineDatabaseProduct();
       expressionProxy = createExpressionProxy(conf);
       if (MetastoreConf.getBoolVar(getConf(), ConfVars.TRY_DIRECT_SQL)) {
-        String schema = prop.getProperty("javax.jdo.mapping.Schema");
+        String schema = 
PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema");
         schema = org.apache.commons.lang.StringUtils.defaultIfBlank(schema, 
null);
         directSql = new MetaStoreDirectSql(pm, conf, schema);
       }
@@ -564,135 +449,10 @@ public class ObjectStore implements RawStore, 
Configurable {
     }
   }
 
-  /**
-   * Properties specified in hive-default.xml override the properties specified
-   * in jpox.properties.
-   */
-  @SuppressWarnings("nls")
-  private static Properties getDataSourceProps(Configuration conf) {
-    Properties prop = new Properties();
-    correctAutoStartMechanism(conf);
-
-    // First, go through and set all our values for datanucleus and javax.jdo 
parameters.  This
-    // has to be a separate first step because we don't set the default values 
in the config object.
-    for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) {
-      String confVal = MetastoreConf.getAsString(conf, var);
-      String varName = var.getVarname();
-      Object prevVal = prop.setProperty(varName, confVal);
-      if (MetastoreConf.isPrintable(varName)) {
-        LOG.debug("Overriding {} value {} from jpox.properties with {}",
-          varName, prevVal, confVal);
-      }
-    }
-
-    // Now, we need to look for any values that the user set that 
MetastoreConf doesn't know about.
-    // TODO Commenting this out for now, as it breaks because the conf values 
aren't getting properly
-    // interpolated in case of variables.  See HIVE-17788.
-    /*
-    for (Map.Entry<String, String> e : conf) {
-      if (e.getKey().startsWith("datanucleus.") || 
e.getKey().startsWith("javax.jdo.")) {
-        // We have to handle this differently depending on whether it is a 
value known to
-        // MetastoreConf or not.  If it is, we need to get the default value 
if a value isn't
-        // provided.  If not, we just set whatever the user has set.
-        Object prevVal = prop.setProperty(e.getKey(), e.getValue());
-        if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) {
-          LOG.debug("Overriding " + e.getKey() + " value " + prevVal
-              + " from  jpox.properties with " + e.getValue());
-        }
-      }
-    }
-    */
-
-    // Password may no longer be in the conf, use getPassword()
-    try {
-      String passwd = MetastoreConf.getPassword(conf, 
MetastoreConf.ConfVars.PWD);
-      if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) {
-        // We can get away with the use of varname here because varname == 
hiveName for PWD
-        prop.setProperty(ConfVars.PWD.getVarname(), passwd);
-      }
-    } catch (IOException err) {
-      throw new RuntimeException("Error getting metastore password: " + 
err.getMessage(), err);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      for (Entry<Object, Object> e : prop.entrySet()) {
-        if (MetastoreConf.isPrintable(e.getKey().toString())) {
-          LOG.debug("{} = {}", e.getKey(), e.getValue());
-        }
-      }
-    }
-
-    return prop;
-  }
-
-  /**
-   * Update conf to set datanucleus.autoStartMechanismMode=ignored.
-   * This is necessary to able to use older version of hive against
-   * an upgraded but compatible metastore schema in db from new version
-   * of hive
-   * @param conf
-   */
-  private static void correctAutoStartMechanism(Configuration conf) {
-    final String autoStartKey = "datanucleus.autoStartMechanismMode";
-    final String autoStartIgnore = "ignored";
-    String currentAutoStartVal = conf.get(autoStartKey);
-    if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) {
-      LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", 
autoStartKey,
-        conf.get(autoStartKey), autoStartIgnore);
-    }
-    conf.set(autoStartKey, autoStartIgnore);
-  }
-
-  private static synchronized PersistenceManagerFactory getPMF() {
-    if (pmf == null) {
-
-      Configuration conf = MetastoreConf.newMetastoreConf();
-      DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
-      if (dsp == null) {
-        pmf = JDOHelper.getPersistenceManagerFactory(prop);
-      } else {
-        try {
-          DataSource ds = dsp.create(conf);
-          Map<Object, Object> dsProperties = new HashMap<>();
-          //Any preexisting datanucleus property should be passed along
-          dsProperties.putAll(prop);
-          dsProperties.put("datanucleus.ConnectionFactory", ds);
-          dsProperties.put("javax.jdo.PersistenceManagerFactoryClass",
-              "org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
-          pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
-        } catch (SQLException e) {
-          LOG.warn("Could not create PersistenceManagerFactory using " +
-              "connection pool properties, will fall back", e);
-          pmf = JDOHelper.getPersistenceManagerFactory(prop);
-        }
-      }
-      DataStoreCache dsc = pmf.getDataStoreCache();
-      if (dsc != null) {
-        String objTypes = MetastoreConf.getVar(conf, 
ConfVars.CACHE_PINOBJTYPES);
-        LOG.info("Setting MetaStore object pin classes with 
hive.metastore.cache.pinobjtypes=\"{}\"", objTypes);
-        if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) {
-          String[] typeTokens = objTypes.toLowerCase().split(",");
-          for (String type : typeTokens) {
-            type = type.trim();
-            if (PINCLASSMAP.containsKey(type)) {
-              dsc.pinAll(true, PINCLASSMAP.get(type));
-            } else {
-              LOG.warn("{} is not one of the pinnable object types: {}", type,
-                org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), 
" "));
-            }
-          }
-        }
-      } else {
-        LOG.warn("PersistenceManagerFactory returned null DataStoreCache 
object. Unable to initialize object pin types defined by 
hive.metastore.cache.pinobjtypes");
-      }
-    }
-    return pmf;
-  }
-
   @InterfaceAudience.LimitedPrivate({"HCATALOG"})
   @InterfaceStability.Evolving
   public PersistenceManager getPersistenceManager() {
-    return getPMF().getPersistenceManager();
+    return PersistenceManagerProvider.getPersistenceManager();
   }
 
   @Override
@@ -9759,113 +9519,6 @@ public class ObjectStore implements RawStore, 
Configurable {
     throw new UnsupportedOperationException();
   }
 
-  /**
-   * Removed cached classloaders from DataNucleus
-   * DataNucleus caches classloaders in NucleusContext.
-   * In UDFs, this can result in classloaders not getting GCed resulting in 
PermGen leaks.
-   * This is particularly an issue when using embedded metastore with 
HiveServer2,
-   * since the current classloader gets modified with each new add jar,
-   * becoming the classloader for downstream classes, which DataNucleus ends 
up using.
-   * The NucleusContext cache gets freed up only on calling a close on it.
-   * We're not closing NucleusContext since it does a bunch of other things 
which we don't want.
-   * We're not clearing the cache HashMap by calling HashMap#clear to avoid 
concurrency issues.
-   */
-  public static void unCacheDataNucleusClassLoaders() {
-    PersistenceManagerFactory pmf = ObjectStore.getPMF();
-    clearOutPmfClassLoaderCache(pmf);
-  }
-
-  private static void clearOutPmfClassLoaderCache(PersistenceManagerFactory 
pmf) {
-    if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) {
-      return;
-    }
-    // NOTE : This is hacky, and this section of code is fragile depending on 
DN code varnames
-    // so it's likely to stop working at some time in the future, especially 
if we upgrade DN
-    // versions, so we actively need to find a better way to make sure the 
leak doesn't happen
-    // instead of just clearing out the cache after every call.
-    JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
-    NucleusContext nc = jdoPmf.getNucleusContext();
-    try {
-      Field pmCache = pmf.getClass().getDeclaredField("pmCache");
-      pmCache.setAccessible(true);
-      Set<JDOPersistenceManager> pmSet = 
(Set<JDOPersistenceManager>)pmCache.get(pmf);
-      for (JDOPersistenceManager pm : pmSet) {
-        org.datanucleus.ExecutionContext ec = pm.getExecutionContext();
-        if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
-          ClassLoaderResolver clr = 
((org.datanucleus.ExecutionContextThreadedImpl)ec).getClassLoaderResolver();
-          clearClr(clr);
-        }
-      }
-      org.datanucleus.plugin.PluginManager pluginManager = 
jdoPmf.getNucleusContext().getPluginManager();
-      Field registryField = 
pluginManager.getClass().getDeclaredField("registry");
-      registryField.setAccessible(true);
-      org.datanucleus.plugin.PluginRegistry registry = 
(org.datanucleus.plugin.PluginRegistry)registryField.get(pluginManager);
-      if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) 
{
-        org.datanucleus.plugin.NonManagedPluginRegistry nRegistry = 
(org.datanucleus.plugin.NonManagedPluginRegistry)registry;
-        Field clrField = nRegistry.getClass().getDeclaredField("clr");
-        clrField.setAccessible(true);
-        ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(nRegistry);
-        clearClr(clr);
-      }
-      if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
-        org.datanucleus.PersistenceNucleusContextImpl pnc = 
(org.datanucleus.PersistenceNucleusContextImpl)nc;
-        org.datanucleus.store.types.TypeManagerImpl tm = 
(org.datanucleus.store.types.TypeManagerImpl)pnc.getTypeManager();
-        Field clrField = tm.getClass().getDeclaredField("clr");
-        clrField.setAccessible(true);
-        ClassLoaderResolver clr = (ClassLoaderResolver)clrField.get(tm);
-        clearClr(clr);
-        Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
-        storeMgrField.setAccessible(true);
-        org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr = 
(org.datanucleus.store.rdbms.RDBMSStoreManager)storeMgrField.get(pnc);
-        Field backingStoreField = 
storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
-        backingStoreField.setAccessible(true);
-        Map<String, Store> backingStoreByMemberName = (Map<String, 
Store>)backingStoreField.get(storeMgr);
-        for (Store store : backingStoreByMemberName.values()) {
-          org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore = 
(org.datanucleus.store.rdbms.scostore.BaseContainerStore)store;
-          clrField = 
org.datanucleus.store.rdbms.scostore.BaseContainerStore.class.getDeclaredField("clr");
-          clrField.setAccessible(true);
-          clr = (ClassLoaderResolver)clrField.get(baseStore);
-          clearClr(clr);
-        }
-      }
-      Field classLoaderResolverMap = 
AbstractNucleusContext.class.getDeclaredField(
-          "classLoaderResolverMap");
-      classLoaderResolverMap.setAccessible(true);
-      Map<String,ClassLoaderResolver> loaderMap =
-          (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
-      for (ClassLoaderResolver clr : loaderMap.values()){
-        clearClr(clr);
-      }
-      classLoaderResolverMap.set(nc, new HashMap<String, 
ClassLoaderResolver>());
-      LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
-    } catch (Exception e) {
-      LOG.warn("Failed to remove cached classloaders from DataNucleus 
NucleusContext", e);
-    }
-  }
-
-  private static void clearClr(ClassLoaderResolver clr) throws Exception {
-    if (clr != null){
-      if (clr instanceof ClassLoaderResolverImpl){
-        ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr;
-        long resourcesCleared = clearFieldMap(clri,"resources");
-        long loadedClassesCleared = clearFieldMap(clri,"loadedClasses");
-        long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses");
-        LOG.debug("Cleared ClassLoaderResolverImpl: {}, {}, {}",
-            resourcesCleared, loadedClassesCleared, unloadedClassesCleared);
-      }
-    }
-  }
-  private static long clearFieldMap(ClassLoaderResolverImpl clri, String 
mapFieldName) throws Exception {
-    Field mapField = 
ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName);
-    mapField.setAccessible(true);
-
-    Map<String,Class> map = (Map<String, Class>) mapField.get(clri);
-    long sz = map.size();
-    mapField.set(clri, Collections.synchronizedMap(new WeakValueMap()));
-    return sz;
-  }
-
-
   @Override
   public List<SQLPrimaryKey> getPrimaryKeys(String catName, String db_name, 
String tbl_name)
       throws MetaException {
@@ -10927,20 +10580,6 @@ public class ObjectStore implements RawStore, 
Configurable {
     }
   }
 
-  /**
-   * To make possible to run multiple metastore in unit test
-   * @param twoMetastoreTesting if we are using multiple metastore in unit test
-   */
-  @VisibleForTesting
-  public static void setTwoMetastoreTesting(boolean twoMetastoreTesting) {
-    forTwoMetastoreTesting = twoMetastoreTesting;
-  }
-
-  @VisibleForTesting
-  Properties getProp() {
-    return prop;
-  }
-
   private void checkForConstraintException(Exception e, String msg) throws 
AlreadyExistsException {
     if (getConstraintException(e) != null) {
       LOG.error(msg, e);

http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
new file mode 100644
index 0000000..20f0738
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -0,0 +1,537 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+import org.apache.hadoop.hive.metastore.model.MDatabase;
+import org.apache.hadoop.hive.metastore.model.MFieldSchema;
+import org.apache.hadoop.hive.metastore.model.MOrder;
+import org.apache.hadoop.hive.metastore.model.MPartition;
+import org.apache.hadoop.hive.metastore.model.MSerDeInfo;
+import org.apache.hadoop.hive.metastore.model.MStorageDescriptor;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.model.MType;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.datanucleus.AbstractNucleusContext;
+import org.datanucleus.ClassLoaderResolver;
+import org.datanucleus.ClassLoaderResolverImpl;
+import org.datanucleus.NucleusContext;
+import org.datanucleus.PropertyNames;
+import org.datanucleus.api.jdo.JDOPersistenceManager;
+import org.datanucleus.api.jdo.JDOPersistenceManagerFactory;
+import org.datanucleus.store.scostore.Store;
+import org.datanucleus.util.WeakValueMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jdo.JDOCanRetryException;
+import javax.jdo.JDOHelper;
+import javax.jdo.PersistenceManager;
+import javax.jdo.PersistenceManagerFactory;
+import javax.jdo.datastore.DataStoreCache;
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/**
+ * This class is a wrapper class around PersistenceManagerFactory and its 
properties
+ * These objects are static and need to be carefully modified together such 
that there are no
+ * race-conditions when updating them. Additionally, this class provides 
thread-safe methods
+ * to get PersistenceManager instances from the current 
PersistenceManagerFactory. The most
+ * common usage of this class is to create a PersistenceManager from existing 
PersistenceManagerFactory
+ * PersistenceManagerFactory properties are modified less often and hence the 
update pmf properties
+ * can make use of read/write locks such that it is only blocking when current 
properties change.
+ */
+public class PersistenceManagerProvider {
+  private static PersistenceManagerFactory pmf;
+  private static Properties prop;
+  private static final ReentrantReadWriteLock pmfLock = new 
ReentrantReadWriteLock();
+  private static final Lock pmfReadLock = pmfLock.readLock();
+  private static final Lock pmfWriteLock = pmfLock.writeLock();
+  private static final Logger LOG = 
LoggerFactory.getLogger(PersistenceManagerProvider.class);
+  private static final Map<String, Class<?>> PINCLASSMAP;
+  private static boolean forTwoMetastoreTesting;
+  private static int retryLimit;
+  private static long retryInterval;
+
+  static {
+    Map<String, Class<?>> map = new HashMap<>();
+    map.put("table", MTable.class);
+    map.put("storagedescriptor", MStorageDescriptor.class);
+    map.put("serdeinfo", MSerDeInfo.class);
+    map.put("partition", MPartition.class);
+    map.put("database", MDatabase.class);
+    map.put("type", MType.class);
+    map.put("fieldschema", MFieldSchema.class);
+    map.put("order", MOrder.class);
+    PINCLASSMAP = Collections.unmodifiableMap(map);
+  }
+
+  private PersistenceManagerProvider() {
+    // prevent instantiation
+  }
+
+  private static final Set<Class<? extends Throwable>> 
retriableExceptionClasses =
+      new HashSet<>(Arrays.asList(JDOCanRetryException.class));
+  /**
+   * Helper function for initialize to determine if we should retry an 
exception.
+   * We return true if the exception is of a known type of retriable 
exceptions, or if one
+   * of its recursive .getCause returns a known type of retriable exception.
+   */
+  private static boolean isRetriableException(Throwable e) {
+    if (e == null){
+      return false;
+    }
+    if (retriableExceptionClasses.contains(e.getClass())){
+      return true;
+    }
+    for (Class<? extends Throwable> c : retriableExceptionClasses){
+      if (c.isInstance(e)){
+        return true;
+      }
+    }
+
+    if (e.getCause() == null){
+      return false;
+    }
+    return isRetriableException(e.getCause());
+  }
+  /**
+   * This method updates the PersistenceManagerFactory and its properties if 
the given
+   * configuration is different from its current set of properties. Most 
common case is that
+   * the persistenceManagerFactory properties do not change, and hence this 
method is optimized to
+   * be non-blocking in such cases. However, if the properties are different, 
this method blocks
+   * other threads until the properties are updated, current pmf is closed and
+   * a new pmf is re-initialized. Note that when a PersistenceManagerFactory 
is re-initialized all
+   * the PersistenceManagers which are instantiated using old factory become 
invalid and will throw
+   * JDOUserException. Hence it is recommended that this method is called in 
the setup/init phase
+   * of the Metastore service when there are no other active threads serving 
clients.
+   *
+   * @param conf Configuration which provides the datanucleus/datasource 
properties for comparison
+   */
+  public static void updatePmfProperties(Configuration conf) {
+    // take a read lock to check if the datasource properties changed.
+    // Most common case is that datasource properties do not change
+    Properties propsFromConf = 
PersistenceManagerProvider.getDataSourceProps(conf);
+    pmfReadLock.lock();
+    // keep track of if the read-lock is acquired by this thread
+    // so that we can unlock it before leaving this method
+    // this is needed because pmf methods below could throw JDOException 
(unchecked exception)
+    // which can lead to readLock not being acquired at the end of the inner 
try-finally
+    // block below
+    boolean readLockAcquired = true;
+    try {
+      // if pmf properties change, need to update, release read lock and take 
write lock
+      if (prop == null || pmf == null || !propsFromConf.equals(prop)) {
+        pmfReadLock.unlock();
+        readLockAcquired = false;
+        pmfWriteLock.lock();
+        try {
+          // check if we need to update pmf again here in case some other 
thread already did it
+          // for us after releasing readlock and before acquiring write lock 
above
+          if (prop == null || pmf == null || !propsFromConf.equals(prop)) {
+            // OK, now we really need to re-initialize pmf and pmf properties
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Updating the pmf due to property change");
+              if (prop == null) {
+                LOG.info("Current pmf properties are uninitialized");
+              } else {
+                for (String key : prop.stringPropertyNames()) {
+                  if (!key.equals(propsFromConf.get(key))) {
+                     if (MetastoreConf.isPrintable(key) && 
!key.equals(ConfVars.CONNECT_URL_KEY.getVarname())) {
+                       // The jdbc connection url can contain sensitive 
information like username and password
+                       // which should be masked out before logging.
+                       String oldVal = prop.getProperty(key);
+                       String newVal = propsFromConf.getProperty(key);
+                       LOG.debug("Found {} to be different. Old val : {} : New 
Val : {}", key,
+                           oldVal, newVal);
+                     } else {
+                      LOG.debug("Found masked property {} to be different", 
key);
+                    }
+                  }
+                }
+              }
+            }
+            if (pmf != null) {
+              clearOutPmfClassLoaderCache();
+              if (!forTwoMetastoreTesting) {
+                // close the underlying connection pool to avoid leaks
+                LOG.debug("Closing PersistenceManagerFactory");
+                pmf.close();
+                LOG.debug("PersistenceManagerFactory closed");
+              }
+              pmf = null;
+            }
+            // update the pmf properties object then initialize pmf using them
+            prop = propsFromConf;
+            retryLimit = MetastoreConf.getIntVar(conf, 
ConfVars.HMS_HANDLER_ATTEMPTS);
+            retryInterval = MetastoreConf
+                .getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, 
TimeUnit.MILLISECONDS);
+            // init PMF with retry logic
+            retry(() -> {initPMF(conf); return null;});
+          }
+          // downgrade by acquiring read lock before releasing write lock
+          pmfReadLock.lock();
+          readLockAcquired = true;
+        } finally {
+          pmfWriteLock.unlock();
+        }
+      }
+    } finally {
+      if (readLockAcquired) {
+        pmfReadLock.unlock();
+      }
+    }
+  }
+
+  private static void initPMF(Configuration conf) {
+    DataSourceProvider dsp = 
DataSourceProviderFactory.getDataSourceProvider(conf);
+
+    if (dsp == null) {
+      pmf = JDOHelper.getPersistenceManagerFactory(prop);
+    } else {
+      try {
+        DataSource ds = dsp.create(conf);
+        Map<Object, Object> dsProperties = new HashMap<>();
+        //Any preexisting datanucleus property should be passed along
+        dsProperties.putAll(prop);
+        dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
+        dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
+        dsProperties.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(),
+            "org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
+        pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
+      } catch (SQLException e) {
+        LOG.warn("Could not create PersistenceManagerFactory using "
+            + "connection pool properties, will fall back", e);
+        pmf = JDOHelper.getPersistenceManagerFactory(prop);
+      }
+    }
+    DataStoreCache dsc = pmf.getDataStoreCache();
+    if (dsc != null) {
+      String objTypes = MetastoreConf.getVar(conf, ConfVars.CACHE_PINOBJTYPES);
+      LOG.info(
+          "Setting MetaStore object pin classes with 
hive.metastore.cache.pinobjtypes=\"{}\"",
+          objTypes);
+      if (org.apache.commons.lang.StringUtils.isNotEmpty(objTypes)) {
+        String[] typeTokens = objTypes.toLowerCase().split(",");
+        for (String type : typeTokens) {
+          type = type.trim();
+          if (PINCLASSMAP.containsKey(type)) {
+            dsc.pinAll(true, PINCLASSMAP.get(type));
+          } else {
+            LOG.warn("{} is not one of the pinnable object types: {}", type,
+                org.apache.commons.lang.StringUtils.join(PINCLASSMAP.keySet(), 
" "));
+          }
+        }
+      }
+    } else {
+      LOG.warn("PersistenceManagerFactory returned null DataStoreCache object. 
"
+          + "Unable to initialize object pin types defined by 
hive.metastore.cache.pinobjtypes");
+    }
+  }
+
+  /**
+   * Removed cached classloaders from DataNucleus
+   * DataNucleus caches classloaders in NucleusContext.
+   * In UDFs, this can result in classloaders not getting GCed resulting in 
PermGen leaks.
+   * This is particularly an issue when using embedded metastore with 
HiveServer2,
+   * since the current classloader gets modified with each new add jar,
+   * becoming the classloader for downstream classes, which DataNucleus ends 
up using.
+   * The NucleusContext cache gets freed up only on calling a close on it.
+   * We're not closing NucleusContext since it does a bunch of other things 
which we don't want.
+   * We're not clearing the cache HashMap by calling HashMap#clear to avoid 
concurrency issues.
+   */
+  public static void clearOutPmfClassLoaderCache() {
+    pmfWriteLock.lock();
+    try {
+      if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) {
+        return;
+      }
+      // NOTE : This is hacky, and this section of code is fragile depending 
on DN code varnames
+      // so it's likely to stop working at some time in the future, especially 
if we upgrade DN
+      // versions, so we actively need to find a better way to make sure the 
leak doesn't happen
+      // instead of just clearing out the cache after every call.
+      JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
+      NucleusContext nc = jdoPmf.getNucleusContext();
+      try {
+        Field pmCache = pmf.getClass().getDeclaredField("pmCache");
+        pmCache.setAccessible(true);
+        Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>) 
pmCache.get(pmf);
+        for (JDOPersistenceManager pm : pmSet) {
+          org.datanucleus.ExecutionContext ec = pm.getExecutionContext();
+          if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
+            ClassLoaderResolver clr =
+                ((org.datanucleus.ExecutionContextThreadedImpl) 
ec).getClassLoaderResolver();
+            clearClr(clr);
+          }
+        }
+        org.datanucleus.plugin.PluginManager pluginManager =
+            jdoPmf.getNucleusContext().getPluginManager();
+        Field registryField = 
pluginManager.getClass().getDeclaredField("registry");
+        registryField.setAccessible(true);
+        org.datanucleus.plugin.PluginRegistry registry =
+            (org.datanucleus.plugin.PluginRegistry) 
registryField.get(pluginManager);
+        if (registry instanceof 
org.datanucleus.plugin.NonManagedPluginRegistry) {
+          org.datanucleus.plugin.NonManagedPluginRegistry nRegistry =
+              (org.datanucleus.plugin.NonManagedPluginRegistry) registry;
+          Field clrField = nRegistry.getClass().getDeclaredField("clr");
+          clrField.setAccessible(true);
+          ClassLoaderResolver clr = (ClassLoaderResolver) 
clrField.get(nRegistry);
+          clearClr(clr);
+        }
+        if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
+          org.datanucleus.PersistenceNucleusContextImpl pnc =
+              (org.datanucleus.PersistenceNucleusContextImpl) nc;
+          org.datanucleus.store.types.TypeManagerImpl tm =
+              (org.datanucleus.store.types.TypeManagerImpl) 
pnc.getTypeManager();
+          Field clrField = tm.getClass().getDeclaredField("clr");
+          clrField.setAccessible(true);
+          ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm);
+          clearClr(clr);
+          Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
+          storeMgrField.setAccessible(true);
+          org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr =
+              (org.datanucleus.store.rdbms.RDBMSStoreManager) 
storeMgrField.get(pnc);
+          Field backingStoreField =
+              storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
+          backingStoreField.setAccessible(true);
+          Map<String, Store> backingStoreByMemberName =
+              (Map<String, Store>) backingStoreField.get(storeMgr);
+          for (Store store : backingStoreByMemberName.values()) {
+            org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore =
+                (org.datanucleus.store.rdbms.scostore.BaseContainerStore) 
store;
+            clrField = 
org.datanucleus.store.rdbms.scostore.BaseContainerStore.class
+                .getDeclaredField("clr");
+            clrField.setAccessible(true);
+            clr = (ClassLoaderResolver) clrField.get(baseStore);
+            clearClr(clr);
+          }
+        }
+        Field classLoaderResolverMap =
+            
AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap");
+        classLoaderResolverMap.setAccessible(true);
+        Map<String, ClassLoaderResolver> loaderMap =
+            (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
+        for (ClassLoaderResolver clr : loaderMap.values()) {
+          clearClr(clr);
+        }
+        classLoaderResolverMap.set(nc, new HashMap<String, 
ClassLoaderResolver>());
+        LOG.debug("Removed cached classloaders from DataNucleus 
NucleusContext");
+      } catch (Exception e) {
+        LOG.warn("Failed to remove cached classloaders from DataNucleus 
NucleusContext", e);
+      }
+    } finally {
+      pmfWriteLock.unlock();
+    }
+  }
+
+  private static void clearClr(ClassLoaderResolver clr) throws Exception {
+    if (clr != null) {
+      if (clr instanceof ClassLoaderResolverImpl) {
+        ClassLoaderResolverImpl clri = (ClassLoaderResolverImpl) clr;
+        long resourcesCleared = clearFieldMap(clri, "resources");
+        long loadedClassesCleared = clearFieldMap(clri, "loadedClasses");
+        long unloadedClassesCleared = clearFieldMap(clri, "unloadedClasses");
+        LOG.debug("Cleared ClassLoaderResolverImpl: {}, {}, {}", 
resourcesCleared,
+            loadedClassesCleared, unloadedClassesCleared);
+      }
+    }
+  }
+
+  private static long clearFieldMap(ClassLoaderResolverImpl clri, String 
mapFieldName)
+      throws Exception {
+    Field mapField = 
ClassLoaderResolverImpl.class.getDeclaredField(mapFieldName);
+    mapField.setAccessible(true);
+
+    Map<String, Class> map = (Map<String, Class>) mapField.get(clri);
+    long sz = map.size();
+    mapField.set(clri, Collections.synchronizedMap(new WeakValueMap()));
+    return sz;
+  }
+
+  /**
+   * creates a PersistenceManager instance for the current 
PersistenceManagerFactory. Note that this
+   * acquires a read-lock on PersistenceManagerFactory so that this method 
will block if any other
+   * thread is actively, (re-)initializing PersistenceManagerFactory when this 
method is called
+   * Note that this method throws a RuntimeException, if 
PersistenceManagerFactory is not yet initialized.
+   *
+   * @return PersistenceManager from the current PersistenceManagerFactory 
instance
+   */
+  public static PersistenceManager getPersistenceManager() {
+    pmfReadLock.lock();
+    try {
+      if (pmf == null) {
+        throw new RuntimeException(
+            "Cannot create PersistenceManager. PersistenceManagerFactory is 
not yet initialized");
+      }
+      return retry(pmf::getPersistenceManager);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      pmfReadLock.unlock();
+    }
+  }
+
+  /**
+   * Properties specified in hive-default.xml override the properties specified
+   * in jpox.properties.
+   */
+  @SuppressWarnings("nls")
+  private static Properties getDataSourceProps(Configuration conf) {
+    Properties prop = new Properties();
+    correctAutoStartMechanism(conf);
+
+    // First, go through and set all our values for datanucleus and javax.jdo 
parameters.  This
+    // has to be a separate first step because we don't set the default values 
in the config object.
+    for (ConfVars var : MetastoreConf.dataNucleusAndJdoConfs) {
+      String confVal = MetastoreConf.getAsString(conf, var);
+      String varName = var.getVarname();
+      Object prevVal = prop.setProperty(varName, confVal);
+      if (MetastoreConf.isPrintable(varName)) {
+        LOG.debug("Overriding {} value {} from jpox.properties with {}", 
varName, prevVal, confVal);
+      }
+    }
+
+    // Now, we need to look for any values that the user set that 
MetastoreConf doesn't know about.
+    // TODO Commenting this out for now, as it breaks because the conf values 
aren't getting properly
+    // interpolated in case of variables.  See HIVE-17788.
+    /*
+    for (Map.Entry<String, String> e : conf) {
+      if (e.getKey().startsWith("datanucleus.") || 
e.getKey().startsWith("javax.jdo.")) {
+        // We have to handle this differently depending on whether it is a 
value known to
+        // MetastoreConf or not.  If it is, we need to get the default value 
if a value isn't
+        // provided.  If not, we just set whatever the user has set.
+        Object prevVal = prop.setProperty(e.getKey(), e.getValue());
+        if (LOG.isDebugEnabled() && MetastoreConf.isPrintable(e.getKey())) {
+          LOG.debug("Overriding " + e.getKey() + " value " + prevVal
+              + " from  jpox.properties with " + e.getValue());
+        }
+      }
+    }
+    */
+
+    // Password may no longer be in the conf, use getPassword()
+    try {
+      String passwd = MetastoreConf.getPassword(conf, 
MetastoreConf.ConfVars.PWD);
+      if (org.apache.commons.lang.StringUtils.isNotEmpty(passwd)) {
+        // We can get away with the use of varname here because varname == 
hiveName for PWD
+        prop.setProperty(ConfVars.PWD.getVarname(), passwd);
+      }
+    } catch (IOException err) {
+      throw new RuntimeException("Error getting metastore password: " + 
err.getMessage(), err);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      for (Entry<Object, Object> e : prop.entrySet()) {
+        if (MetastoreConf.isPrintable(e.getKey().toString())) {
+          LOG.debug("{} = {}", e.getKey(), e.getValue());
+        }
+      }
+    }
+
+    return prop;
+  }
+
+  /**
+   * Update conf to set datanucleus.autoStartMechanismMode=ignored.
+   * This is necessary to able to use older version of hive against
+   * an upgraded but compatible metastore schema in db from new version
+   * of hive
+   *
+   * @param conf
+   */
+  private static void correctAutoStartMechanism(Configuration conf) {
+    final String autoStartKey = "datanucleus.autoStartMechanismMode";
+    final String autoStartIgnore = "ignored";
+    String currentAutoStartVal = conf.get(autoStartKey);
+    if (!autoStartIgnore.equalsIgnoreCase(currentAutoStartVal)) {
+      LOG.warn("{} is set to unsupported value {} . Setting it to value: {}", 
autoStartKey,
+          conf.get(autoStartKey), autoStartIgnore);
+    }
+    conf.set(autoStartKey, autoStartIgnore);
+  }
+
+  /**
+   * To make possible to run multiple metastore in unit test
+   *
+   * @param twoMetastoreTesting if we are using multiple metastore in unit test
+   */
+  @VisibleForTesting
+  public static void setTwoMetastoreTesting(boolean twoMetastoreTesting) {
+    forTwoMetastoreTesting = twoMetastoreTesting;
+  }
+
+  public static String getProperty(String key) {
+    return prop == null ? null : prop.getProperty(key);
+  }
+
+  private static <T> T retry(Supplier<T> s) {
+    Exception ex = null;
+    int myRetryLimit = retryLimit;
+    while (myRetryLimit > 0) {
+      try {
+        return s.get();
+      } catch (Exception e) {
+        myRetryLimit--;
+        boolean retriable = isRetriableException(e);
+        if (myRetryLimit > 0 && retriable) {
+          LOG.info("Retriable exception while invoking method, retrying. {} 
attempts left",
+              myRetryLimit, e);
+          try {
+            Thread.sleep(retryInterval);
+          } catch (InterruptedException ie) {
+            // Restore the interrupted status, since we do not want to catch 
it.
+            LOG.debug("Interrupted while sleeping before retrying.", ie);
+            Thread.currentThread().interrupt();
+          }
+          // If we're here, we'll proceed down the next while loop iteration.
+        } else {
+          // we've reached our limit, throw the last one.
+          if (retriable) {
+            LOG.warn("Exception retry limit reached, not retrying any 
longer.", e);
+          } else {
+            LOG.debug("Non-retriable exception.", e);
+          }
+          ex = e;
+        }
+      }
+    }
+    throw new RuntimeException(ex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/db8e9b0e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --git 
a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 2d9c229..23c7670 100644
--- 
a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ 
b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -62,19 +62,25 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jdo.Query;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
 
 @Category(MetastoreUnitTest.class)
@@ -511,8 +517,8 @@ public class TestObjectStore {
     localConf.set(key1, value1);
     objectStore = new ObjectStore();
     objectStore.setConf(localConf);
-    Assert.assertEquals(value, objectStore.getProp().getProperty(key));
-    Assert.assertNull(objectStore.getProp().getProperty(key1));
+    Assert.assertEquals(value, PersistenceManagerProvider.getProperty(key));
+    Assert.assertNull(PersistenceManagerProvider.getProperty(key1));
   }
 
   /**
@@ -714,6 +720,52 @@ public class TestObjectStore {
     Assert.assertTrue("Expect no active transactions.", 
!objectStore.isActiveTransaction());
   }
 
+  /**
+   * This test calls ObjectStore.setConf methods from multiple threads. Each 
threads uses its
+   * own instance of ObjectStore to simulate thread-local objectstore 
behaviour.
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentPMFInitialize() throws Exception {
+    final String dataSourceProp = "datanucleus.connectionPool.maxPoolSize";
+    // Barrier is used to ensure that all threads start race at the same time
+    final int numThreads = 10;
+    final int numIteration = 50;
+    final CyclicBarrier barrier = new CyclicBarrier(numThreads);
+    final AtomicInteger counter = new AtomicInteger(0);
+    ExecutorService executor = newFixedThreadPool(numThreads);
+    List<Future<Void>> results = new ArrayList<>(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      final Random random = new Random();
+      Configuration conf = MetastoreConf.newMetastoreConf();
+      MetaStoreTestUtils.setConfForStandloneMode(conf);
+      results.add(executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          // each thread gets its own ObjectStore to simulate threadLocal store
+          ObjectStore objectStore = new ObjectStore();
+          barrier.await();
+          for (int j = 0; j < numIteration; j++) {
+            // set connectionPool to a random value to increase the likelihood 
of pmf
+            // re-initialization
+            int randomNumber = random.nextInt(100);
+            if (randomNumber % 2 == 0) {
+              objectStore.setConf(conf);
+            } else {
+              Assert.assertNotNull(objectStore.getPersistenceManager());
+            }
+            counter.getAndIncrement();
+          }
+          return null;
+        }
+      }));
+    }
+    for (Future<Void> future : results) {
+      future.get(120, TimeUnit.SECONDS);
+    }
+    Assert.assertEquals("Unexpected number of setConf calls", numIteration * 
numThreads,
+        counter.get());
+  }
 
   private void createTestCatalog(String catName) throws MetaException {
     Catalog cat = new CatalogBuilder()

Reply via email to